In [58]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()
spark

In [None]:
# Get fhvhv data
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-01.parquet

In [3]:
# Check number of rows
!wc -l fhvhv_tripdata_2021-01.parquet

1006794 fhvhv_tripdata_2021-01.parquet


In [48]:
# Read parquet into spark df
df = spark.read \
    .option("header", "true") \
    .parquet('fhvhv_tripdata_2021-01.parquet')

In [49]:
# Check df contents
df.head(5)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', originating_base_num='B02682', request_datetime=datetime.datetime(2021, 1, 1, 0, 28, 9), on_scene_datetime=datetime.datetime(2021, 1, 1, 0, 31, 42), pickup_datetime=datetime.datetime(2021, 1, 1, 0, 33, 44), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 49, 7), PULocationID=230, DOLocationID=166, trip_miles=5.26, trip_time=923, base_passenger_fare=22.28, tolls=0.0, bcf=0.67, sales_tax=1.98, congestion_surcharge=2.75, airport_fee=None, tips=0.0, driver_pay=14.99, shared_request_flag='N', shared_match_flag='N', access_a_ride_flag=' ', wav_request_flag='N', wav_match_flag='N'),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', originating_base_num='B02682', request_datetime=datetime.datetime(2021, 1, 1, 0, 45, 56), on_scene_datetime=datetime.datetime(2021, 1, 1, 0, 55, 19), pickup_datetime=datetime.datetime(2021, 1, 1, 0, 55, 19), dropoff_datetime=datetime.datetime(2021, 1, 1, 1, 18, 21), PULocationID=152, DO

In [3]:
# Check df schema (parquet files already have schema, compared to csv which doesn't)
df.schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(originating_base_num,StringType,true),StructField(request_datetime,TimestampType,true),StructField(on_scene_datetime,TimestampType,true),StructField(pickup_datetime,TimestampType,true),StructField(dropoff_datetime,TimestampType,true),StructField(PULocationID,LongType,true),StructField(DOLocationID,LongType,true),StructField(trip_miles,DoubleType,true),StructField(trip_time,LongType,true),StructField(base_passenger_fare,DoubleType,true),StructField(tolls,DoubleType,true),StructField(bcf,DoubleType,true),StructField(sales_tax,DoubleType,true),StructField(congestion_surcharge,DoubleType,true),StructField(airport_fee,DoubleType,true),StructField(tips,DoubleType,true),StructField(driver_pay,DoubleType,true),StructField(shared_request_flag,StringType,true),StructField(shared_match_flag,StringType,true),StructField(access_a_ride_flag,StringType,true),StructField(wav_req

In [33]:
# Skip this command: won't work because it will get the first 101 lines of the parquet, not the first 101 rows.
!head -n 101 fhvhv_tripdata_2021-01.parquet > fhvhv_tripdata_2021-01_head.parquet

In [32]:
import pandas as pd

In [33]:
df_pandas = pd.read_parquet("fhvhv_tripdata_2021-01.parquet")

In [32]:
df_pandas.shape[0]

11908468

In [35]:
df_head = df_pandas.head(101)

In [36]:
df_head.shape[0]

101

In [37]:
df_head

Unnamed: 0,hvfhs_license_num,dispatching_base_num,originating_base_num,request_datetime,on_scene_datetime,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,trip_miles,...,sales_tax,congestion_surcharge,airport_fee,tips,driver_pay,shared_request_flag,shared_match_flag,access_a_ride_flag,wav_request_flag,wav_match_flag
0,HV0003,B02682,B02682,2021-01-01 00:28:09,2021-01-01 00:31:42,2021-01-01 00:33:44,2021-01-01 00:49:07,230,166,5.260,...,1.98,2.75,,0.00,14.99,N,N,,N,N
1,HV0003,B02682,B02682,2021-01-01 00:45:56,2021-01-01 00:55:19,2021-01-01 00:55:19,2021-01-01 01:18:21,152,167,3.650,...,1.63,0.00,,0.00,17.06,N,N,,N,N
2,HV0003,B02764,B02764,2021-01-01 00:21:15,2021-01-01 00:22:41,2021-01-01 00:23:56,2021-01-01 00:38:05,233,142,3.510,...,1.25,2.75,,0.94,12.98,N,N,,N,N
3,HV0003,B02764,B02764,2021-01-01 00:39:12,2021-01-01 00:42:37,2021-01-01 00:42:51,2021-01-01 00:45:50,142,143,0.740,...,0.70,2.75,,0.00,7.41,N,N,,N,N
4,HV0003,B02764,B02764,2021-01-01 00:46:11,2021-01-01 00:47:17,2021-01-01 00:48:14,2021-01-01 01:08:42,143,78,9.200,...,2.41,2.75,,0.00,22.44,N,N,,N,N
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
96,HV0005,B02510,,2021-01-01 00:11:55,NaT,2021-01-01 00:15:03,2021-01-01 00:41:05,48,97,6.881,...,2.74,2.75,,0.00,20.71,N,N,N,N,N
97,HV0005,B02510,,2021-01-01 00:40:38,NaT,2021-01-01 00:47:09,2021-01-01 01:05:40,97,231,3.447,...,1.69,2.75,,0.00,15.04,N,N,N,N,N
98,HV0005,B02510,,2021-01-01 00:05:44,NaT,2021-01-01 00:09:53,2021-01-01 00:49:57,61,117,13.469,...,3.99,0.00,,0.00,35.08,N,N,N,N,N
99,HV0005,B02510,,2021-01-01 00:41:41,NaT,2021-01-01 00:52:59,2021-01-01 01:27:18,117,76,10.282,...,3.01,0.00,,0.00,28.66,N,N,N,N,N


In [38]:
df_pandas.dtypes

hvfhs_license_num               object
dispatching_base_num            object
originating_base_num            object
request_datetime        datetime64[ns]
on_scene_datetime       datetime64[ns]
pickup_datetime         datetime64[ns]
dropoff_datetime        datetime64[ns]
PULocationID                     int64
DOLocationID                     int64
trip_miles                     float64
trip_time                        int64
base_passenger_fare            float64
tolls                          float64
bcf                            float64
sales_tax                      float64
congestion_surcharge           float64
airport_fee                    float64
tips                           float64
driver_pay                     float64
shared_request_flag             object
shared_match_flag               object
access_a_ride_flag              object
wav_request_flag                object
wav_match_flag                  object
dtype: object

In [39]:
# Get schema for parquet file
spark.createDataFrame(df_head).schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(originating_base_num,StringType,true),StructField(request_datetime,TimestampType,true),StructField(on_scene_datetime,TimestampType,true),StructField(pickup_datetime,TimestampType,true),StructField(dropoff_datetime,TimestampType,true),StructField(PULocationID,LongType,true),StructField(DOLocationID,LongType,true),StructField(trip_miles,DoubleType,true),StructField(trip_time,LongType,true),StructField(base_passenger_fare,DoubleType,true),StructField(tolls,DoubleType,true),StructField(bcf,DoubleType,true),StructField(sales_tax,DoubleType,true),StructField(congestion_surcharge,DoubleType,true),StructField(airport_fee,DoubleType,true),StructField(tips,DoubleType,true),StructField(driver_pay,DoubleType,true),StructField(shared_request_flag,StringType,true),StructField(shared_match_flag,StringType,true),StructField(access_a_ride_flag,StringType,true),StructField(wav_req

In [9]:
from pyspark.sql import types

In [10]:
# Create python formatted schema so parquet schema can be changed
schema = types.StructType([
    types.StructField('hvfhs_license_num',types.StringType(),True),
    types.StructField('dispatching_base_num',types.StringType(),True),
    types.StructField('originating_base_num',types.StringType(),True),
    types.StructField('request_datetime',types.TimestampType(),True),
    types.StructField('on_scene_datetime',types.TimestampType(),True),
    types.StructField('pickup_datetime',types.TimestampType(),True),
    types.StructField('dropoff_datetime',types.TimestampType(),True),
    types.StructField('PULocationID',types.LongType(),True),
    types.StructField('DOLocationID',types.LongType(),True),
    types.StructField('trip_miles',types.DoubleType(),True),
    types.StructField('trip_time',types.LongType(),True),
    types.StructField('base_passenger_fare',types.DoubleType(),True),
    types.StructField('tolls',types.DoubleType(),True),
    types.StructField('bcf',types.DoubleType(),True),
    types.StructField('sales_tax',types.DoubleType(),True),
    types.StructField('congestion_surcharge',types.DoubleType(),True),
    types.StructField('airport_fee',types.DoubleType(),True),
    types.StructField('tips',types.DoubleType(),True),
    types.StructField('driver_pay',types.DoubleType(),True),
    types.StructField('shared_request_flag',types.StringType(),True),
    types.StructField('shared_match_flag',types.StringType(),True),
    types.StructField('access_a_ride_flag',types.StringType(),True),
    types.StructField('wav_request_flag',types.StringType(),True),
    types.StructField('wav_match_flag',types.StringType(),True)
]
)

In [11]:
schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(originating_base_num,StringType,true),StructField(request_datetime,TimestampType,true),StructField(on_scene_datetime,TimestampType,true),StructField(pickup_datetime,TimestampType,true),StructField(dropoff_datetime,TimestampType,true),StructField(PULocationID,LongType,true),StructField(DOLocationID,LongType,true),StructField(trip_miles,DoubleType,true),StructField(trip_time,LongType,true),StructField(base_passenger_fare,DoubleType,true),StructField(tolls,DoubleType,true),StructField(bcf,DoubleType,true),StructField(sales_tax,DoubleType,true),StructField(congestion_surcharge,DoubleType,true),StructField(airport_fee,DoubleType,true),StructField(tips,DoubleType,true),StructField(driver_pay,DoubleType,true),StructField(shared_request_flag,StringType,true),StructField(shared_match_flag,StringType,true),StructField(access_a_ride_flag,StringType,true),StructField(wav_req

In [12]:
# Set predefined schema when parrquet is read
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .parquet('fhvhv_tripdata_2021-01.parquet')

In [None]:
df.head(5)

In [121]:
df.schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(originating_base_num,StringType,true),StructField(request_datetime,TimestampType,true),StructField(on_scene_datetime,TimestampType,true),StructField(pickup_datetime,TimestampType,true),StructField(dropoff_datetime,TimestampType,true),StructField(PULocationID,LongType,true),StructField(DOLocationID,LongType,true),StructField(trip_miles,DoubleType,true),StructField(trip_time,LongType,true),StructField(base_passenger_fare,DoubleType,true),StructField(tolls,DoubleType,true),StructField(bcf,DoubleType,true),StructField(sales_tax,DoubleType,true),StructField(congestion_surcharge,DoubleType,true),StructField(airport_fee,DoubleType,true),StructField(tips,DoubleType,true),StructField(driver_pay,DoubleType,true),StructField(shared_request_flag,StringType,true),StructField(shared_match_flag,StringType,true),StructField(access_a_ride_flag,StringType,true),StructField(wav_req

In [122]:
df.dtypes

[('hvfhs_license_num', 'string'),
 ('dispatching_base_num', 'string'),
 ('originating_base_num', 'string'),
 ('request_datetime', 'timestamp'),
 ('on_scene_datetime', 'timestamp'),
 ('pickup_datetime', 'timestamp'),
 ('dropoff_datetime', 'timestamp'),
 ('PULocationID', 'bigint'),
 ('DOLocationID', 'bigint'),
 ('trip_miles', 'double'),
 ('trip_time', 'bigint'),
 ('base_passenger_fare', 'double'),
 ('tolls', 'double'),
 ('bcf', 'double'),
 ('sales_tax', 'double'),
 ('congestion_surcharge', 'double'),
 ('airport_fee', 'double'),
 ('tips', 'double'),
 ('driver_pay', 'double'),
 ('shared_request_flag', 'string'),
 ('shared_match_flag', 'string'),
 ('access_a_ride_flag', 'string'),
 ('wav_request_flag', 'string'),
 ('wav_match_flag', 'string')]

In [13]:
# Partition the spark df
df = df.repartition(24) # 1st stage: 24 tasks

In [None]:
# Write the partitioned df into a folder
df.write.parquet('fhvhv/2021/01') # 2nd stage: C4 tasks to save the results

In [126]:
# Count the approx number of files created 
!ls -lh fhvhv/2021/01/ | wc -l

26


In [40]:
# Read the partitioned parquet
df = spark.read.parquet('fhvhv/2021/01')

In [41]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp (nullable = true)
 |-- on_scene_datetime: timestamp (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_flag: string (nul

In [42]:
'''
Transformation (select, filtering, group by etc.) are lazy and are not executed straight away and won't show on Spark Master UI
Action (show, take, head, write) are not lazy and are executed straight away
'''
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
    .filter(df.hvfhs_license_num == 'HV0003') \
    .show()

+-------------------+-------------------+------------+------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|
+-------------------+-------------------+------------+------------+
|2021-01-15 14:11:07|2021-01-15 14:41:51|         142|         107|
|2021-01-29 18:20:30|2021-01-29 18:32:03|         159|          78|
|2021-01-11 21:49:41|2021-01-11 22:00:20|         158|          87|
|2021-01-09 07:49:14|2021-01-09 08:02:16|         118|           6|
|2021-01-27 17:56:55|2021-01-27 18:11:04|         116|          50|
|2021-01-23 15:15:15|2021-01-23 15:25:35|          17|         189|
|2021-01-29 16:05:00|2021-01-29 16:20:57|         205|         215|
|2021-01-30 23:46:53|2021-01-31 00:00:42|          69|         212|
|2021-01-28 21:56:06|2021-01-28 21:59:55|          20|          78|
|2021-01-07 20:32:43|2021-01-07 20:53:50|          82|          75|
|2021-01-26 20:35:42|2021-01-26 20:45:22|          47|          59|
|2021-01-21 14:07:53|2021-01-21 14:21:52|       

In [20]:
from pyspark.sql import functions as F

In [23]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.pickup_datetime)) \
    .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+-----------+------------+------------+------------+
|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-----------+------------+------------+------------+
| 2021-01-14|  2021-01-14|          17|          61|
| 2021-01-14|  2021-01-14|          61|          71|
| 2021-01-15|  2021-01-15|         142|         107|
| 2021-01-29|  2021-01-29|         159|          78|
| 2021-01-07|  2021-01-07|         119|         235|
| 2021-01-11|  2021-01-11|         158|          87|
| 2021-01-09|  2021-01-09|         118|           6|
| 2021-01-27|  2021-01-27|         116|          50|
| 2021-01-23|  2021-01-23|          17|         189|
| 2021-01-15|  2021-01-15|         117|          86|
| 2021-01-29|  2021-01-29|         205|         215|
| 2021-01-23|  2021-01-23|          36|         198|
| 2021-01-19|  2021-01-19|          87|          33|
| 2021-01-30|  2021-01-30|          69|         212|
| 2021-01-28|  2021-01-28|          20|          78|
| 2021-01-07|  2021-01-07|          82|       

In [25]:
# You can create functions to manipulate the df data that you wouldn't usually see when working with SQL on a data warehouse
# This logic is just random, but the concept of creating udf is powerful depending on use case
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 ==0:
        return f's/{num:03x}'
    elif num % 3 ==0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

In [30]:
# Create udf (user defined fuctions) with Spark
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [43]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.pickup_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('pickup_date', 'dropoff_date', 'base_id','PULocationID', 'DOLocationID') \
    .show()

+-----------+------------+-------+------------+------------+
|pickup_date|dropoff_date|base_id|PULocationID|DOLocationID|
+-----------+------------+-------+------------+------------+
| 2021-01-14|  2021-01-14|  e/9ce|          17|          61|
| 2021-01-14|  2021-01-14|  e/9ce|          61|          71|
| 2021-01-15|  2021-01-15|  e/acc|         142|         107|
| 2021-01-29|  2021-01-29|  a/b49|         159|          78|
| 2021-01-07|  2021-01-07|  e/9ce|         119|         235|
| 2021-01-11|  2021-01-11|  e/acc|         158|          87|
| 2021-01-09|  2021-01-09|  e/b32|         118|           6|
| 2021-01-27|  2021-01-27|  e/b38|         116|          50|
| 2021-01-23|  2021-01-23|  e/a39|          17|         189|
| 2021-01-15|  2021-01-15|  e/9ce|         117|          86|
| 2021-01-29|  2021-01-29|  e/acc|         205|         215|
| 2021-01-23|  2021-01-23|  e/9ce|          36|         198|
| 2021-01-19|  2021-01-19|  e/9ce|          87|          33|
| 2021-01-30|  2021-01-3