In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
            .master("local[*]") \
            .appName("test") \
            .getOrCreate()

# Partition df

In [3]:
from pyspark.sql import types
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [4]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-01.csv')

In [5]:
df = df.repartition(24)

In [6]:
df.write.parquet('fhvhv/2021/01/')

In [7]:
df = spark.read.parquet('fhvhv/2021/01/')
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [8]:
from pyspark.sql import functions as F
df.show()

+-----------------+--------------------+-------------------+----------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+----------------+------------+------------+-------+
|           B02883| 2019-01-02 17:47:11|2019-01-02 18:01:43|            NULL|          18|        NULL| B02883|
|           B02835| 2019-01-03 11:22:49|2019-01-03 11:36:08|            NULL|         158|        NULL| B02835|
|           B02875| 2019-01-02 20:16:05|2019-01-02 20:26:40|            NULL|         246|        NULL| B02875|
|           B02510| 2019-01-01 13:10:11|2019-01-01 13:14:47|            NULL|         177|           1| B02876|
|           B02882| 2019-01-01 02:29:49|2019-01-01 03:14:58|            NULL|          48|        NULL| B02882|
|           B02765| 2019-01-03 04:26:13|2019-01-03 05:01:03|            NULL|           1|        NULL| 

In [9]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

In [10]:
crazy_stuff('B02884')

's/b44'

In [11]:
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .select( 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+-----------+------------+------------+------------+
|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-----------+------------+------------+------------+
| 2019-01-02|        NULL|          18|        NULL|
| 2019-01-03|        NULL|         158|        NULL|
| 2019-01-02|        NULL|         246|        NULL|
| 2019-01-01|        NULL|         177|           1|
| 2019-01-01|        NULL|          48|        NULL|
| 2019-01-03|        NULL|           1|        NULL|
| 2019-01-01|        NULL|          49|           1|
| 2019-01-02|        NULL|          79|        NULL|
| 2019-01-03|        NULL|         163|           1|
| 2019-01-01|        NULL|          37|           1|
| 2019-01-03|        NULL|          65|           1|
| 2019-01-03|        NULL|         172|           2|
| 2019-01-01|        NULL|        NULL|        NULL|
| 2019-01-04|        NULL|        NULL|        NULL|
| 2019-01-01|        NULL|         191|        NULL|
| 2019-01-01|        NULL|          80|       

In [17]:
# crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())
# df \
#     .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
#     .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
#     # .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
#     .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
#     .show()

In [12]:
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
  .filter(df.hvfhs_license_num == 'HV0003')

DataFrame[pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, DOLocationID: int]