# Start a PySpark application

In [4]:
# edit port number according to what's defined in spark-base.Dockerfile
from pyspark.sql import SparkSession
spark = SparkSession.\
        builder.\
        appName("process_parquet").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

23/03/31 07:32:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# Read the parquet files

In [5]:
df_yellow = spark.read.parquet("./data/raw/yellow_tripdata_2020-01.parquet")
df_green = spark.read.parquet("./data/raw/green_tripdata_2020-01.parquet")
df_fhv = spark.read.parquet("./data/raw/fhv_tripdata_2020-01.parquet")

                                                                                

# Check out the schema of the file

In [26]:
df_yellow.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [4]:
df_green.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [5]:
df_fhv.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: double (nullable = true)
 |-- SR_Flag: integer (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



# Change the schema of the file
Change all columns with LongType to IntegerType.

In [10]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
df_yellow = df_yellow\
        .withColumn('VendorID', col('VendorID').cast(IntegerType()))\
        .withColumn('PULocationID', col('PULocationID').cast(IntegerType()))\
        .withColumn('DOLocationID', col('DOLocationID').cast(IntegerType()))\
        .withColumn('payment_type', col('payment_type').cast(IntegerType()))

df_yellow.show()
# df_yellow.printSchema()

[Stage 3:>                                                          (0 + 1) / 1]

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       1| 2020-01-01 00:28:15|  2020-01-01 00:33:03|            1.0|          1.2|       1.0|                 N|         238|         239|           1|        6.0|  3.0|    0.5|      1.47|         0.0|                  0.3

                                                                                

## To cast all the columns to the **same** type

In [7]:
from pyspark.sql.functions import col
df_eg = df_yellow.select(*(col(c).cast("integer").alias(c) for c in df_yellow.columns))
# df_eg.show()
df_eg.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: integer (nullable = true)
 |-- tpep_dropoff_datetime: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: integer (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: integer (nullable = true)
 |-- extra: integer (nullable = true)
 |-- mta_tax: integer (nullable = true)
 |-- tip_amount: integer (nullable = true)
 |-- tolls_amount: integer (nullable = true)
 |-- improvement_surcharge: integer (nullable = true)
 |-- total_amount: integer (nullable = true)
 |-- congestion_surcharge: integer (nullable = true)



In [13]:
# Writing the processed files
df_eg.write.parquet('./data/processed/yellow_tripdata_2020-01.parquet')

                                                                                

# Processing all the parquet files we are going to process

Converting columns with LongType to Integer Type for the data files.

## Yellow Taxi

In [14]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

year_list = [2020, 2021]
for year in year_list:
    for month in range(1, 13):
        # for month to always have 2 digits
        month = str(month).zfill(2)
        print(f'processing data for {year}/{month}')
        
        # read the parquet file
        df_yellow = spark.read.parquet(f'./data/raw/yellow_tripdata_{year}-{month}.parquet')
        
        # convert long to integer type in schema
        df_proc = df_yellow\
        .withColumn('VendorID', col('VendorID').cast(IntegerType()))\
        .withColumn('PULocationID', col('PULocationID').cast(IntegerType()))\
        .withColumn('DOLocationID', col('DOLocationID').cast(IntegerType()))\
        .withColumn('payment_type', col('payment_type').cast(IntegerType()))
        
        # write df_proc to a new parquet file
        df_proc.write.parquet(f'./data/processed/yellow/yellow_tripdata_{year}-{month}.parquet')

processing data for 2020/01


                                                                                

processing data for 2020/02


                                                                                

processing data for 2020/03


                                                                                

processing data for 2020/04
processing data for 2020/05
processing data for 2020/06


                                                                                

processing data for 2020/07


                                                                                

processing data for 2020/08


                                                                                

processing data for 2020/09


                                                                                

processing data for 2020/10


                                                                                

processing data for 2020/11


                                                                                

processing data for 2020/12


                                                                                

processing data for 2021/01


                                                                                

processing data for 2021/02


                                                                                

processing data for 2021/03


                                                                                

processing data for 2021/04


                                                                                

processing data for 2021/05


                                                                                

processing data for 2021/06


                                                                                

processing data for 2021/07


                                                                                

processing data for 2021/08


                                                                                

processing data for 2021/09


                                                                                

processing data for 2021/10


                                                                                

processing data for 2021/11


                                                                                

processing data for 2021/12


                                                                                

## Green Taxi

In [15]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

year_list = [2020, 2021]
for year in year_list:
    for month in range(1, 13):
        # for month to always have 2 digits
        month = str(month).zfill(2)
        print(f'processing data for {year}/{month}')
        
        # read the parquet file
        df_green = spark.read.parquet(f'./data/raw/green_tripdata_{year}-{month}.parquet')
        
        # convert long to integer type in schema
        df_green = df_green\
        .withColumn('VendorID', col('VendorID').cast(IntegerType()))\
        .withColumn('PULocationID', col('PULocationID').cast(IntegerType()))\
        .withColumn('DOLocationID', col('DOLocationID').cast(IntegerType()))
        
        # write df_proc to a new parquet file
        df_proc.write.parquet(f'./data/processed/green/green_tripdata_{year}-{month}.parquet')

processing data for 2020/01


                                                                                

processing data for 2020/02


                                                                                

processing data for 2020/03


                                                                                

processing data for 2020/04


                                                                                

processing data for 2020/05


                                                                                

processing data for 2020/06


                                                                                

processing data for 2020/07


                                                                                

processing data for 2020/08


                                                                                

processing data for 2020/09


                                                                                

processing data for 2020/10


                                                                                

processing data for 2020/11


                                                                                

processing data for 2020/12


                                                                                

processing data for 2021/01


                                                                                

processing data for 2021/02


                                                                                

processing data for 2021/03


                                                                                

processing data for 2021/04


                                                                                

processing data for 2021/05


                                                                                

processing data for 2021/06


                                                                                

processing data for 2021/07


                                                                                

processing data for 2021/08


                                                                                

processing data for 2021/09


                                                                                

processing data for 2021/10


                                                                                

processing data for 2021/11


                                                                                

processing data for 2021/12


                                                                                

## For-Hire Vehicle

There are no columns with LongType, hence no conversion is done for this data set.

In [13]:
# linux commands
!rm -rf ./data/processed/yellow
# !rm -rf ./data/processed/green