In [4]:
import findspark
findspark.init()

In [5]:
import pyspark

In [7]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [20]:
import pandas as pd

In [21]:
df_pandas = pd.read_csv('fhvhv_tripdata_2021-01.csv.gz', nrows=1001)

In [23]:
df_pandas.to_csv('fhvhv_tripdata_2021-01-1001.csv', header=True, index=False)

In [25]:
from pyspark.sql import types

In [26]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [27]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-01-1001.csv')

In [29]:
df = df.repartition(24)

In [30]:
df.write.parquet('fhvhv/2021/01/')

In [52]:
df = spark.read.parquet('fhvhv/2021/01/')

In [68]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [34]:
print(df.rdd.getNumPartitions())

4


In [35]:
from pyspark.sql import functions as F

In [50]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02879|2021-01-01 00:59:12|2021-01-01 01:11:50|         174|         208|   null|
|           HV0003|              B02866|2021-01-01 00:38:16|2021-01-01 00:51:40|         233|         144|   null|
|           HV0003|              B02882|2021-01-01 00:43:38|2021-01-01 01:00:14|         244|         254|   null|
|           HV0003|              B02835|2021-01-01 00:08:10|2021-01-01 00:20:26|         119|         159|   null|
|           HV0003|              B02889|2021-01-01 00:15:56|2021-01-01 00:26:18|         164|         141|   null|
|           HV0004|              B02800|2021-01-01 00:28:06|2021-01-01 00:40:46|

In [39]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

In [40]:
crazy_stuff('B02884')

's/b44'

In [41]:
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [42]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  e/b3f| 2021-01-01|  2021-01-01|         174|         208|
|  e/b32| 2021-01-01|  2021-01-01|         233|         144|
|  e/b42| 2021-01-01|  2021-01-01|         244|         254|
|  s/b13| 2021-01-01|  2021-01-01|         119|         159|
|  a/b49| 2021-01-01|  2021-01-01|         164|         141|
|  s/af0| 2021-01-01|  2021-01-01|         151|         143|
|  e/b35| 2021-01-01|  2021-01-01|         231|          40|
|  e/9ce| 2021-01-01|  2021-01-01|         224|         170|
|  s/b3d| 2021-01-01|  2021-01-01|          49|          37|
|  e/9ce| 2021-01-01|  2021-01-01|          52|         255|
|  e/9ce| 2021-01-01|  2021-01-01|         235|          60|
|  e/b3f| 2021-01-01|  2021-01-01|         168|         126|
|  s/b13| 2021-01-01|  2021-01-01|         168|         168|
|  e/9d0| 2021-01-01|  2

In [46]:
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
  .filter(df.hvfhs_license_num == 'HV0003').head(5)

[Row(pickup_datetime=datetime.datetime(2021, 1, 1, 0, 59, 12), dropoff_datetime=datetime.datetime(2021, 1, 1, 1, 11, 50), PULocationID=174, DOLocationID=208),
 Row(pickup_datetime=datetime.datetime(2021, 1, 1, 0, 38, 16), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 51, 40), PULocationID=233, DOLocationID=144),
 Row(pickup_datetime=datetime.datetime(2021, 1, 1, 0, 43, 38), dropoff_datetime=datetime.datetime(2021, 1, 1, 1, 0, 14), PULocationID=244, DOLocationID=254),
 Row(pickup_datetime=datetime.datetime(2021, 1, 1, 0, 8, 10), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 20, 26), PULocationID=119, DOLocationID=159),
 Row(pickup_datetime=datetime.datetime(2021, 1, 1, 0, 15, 56), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 26, 18), PULocationID=164, DOLocationID=141)]