In [7]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import types
from pyspark.sql import functions as F
import pyarrow

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/02 05:43:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [10]:
df_green_pd = pd.read_parquet('../data/raw/green/2021/01/green_tripdata_2021_01.parquet')

In [43]:
green_schema = types.StructType([
    types.StructField("VendorID", types.LongType(), True),
    types.StructField("lpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("lpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("RatecodeID", types.DoubleType(), True),
    types.StructField("PULocationID", types.LongType(), True),
    types.StructField("DOLocationID", types.LongType(), True),
    types.StructField("passenger_count", types.DoubleType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("ehail_fee", types.IntegerType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("payment_type", types.DoubleType(), True),
    types.StructField("trip_type", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

In [44]:
df_green = spark.read \
    .option("header", "true") \
    .schema(green_schema) \
    .parquet('../data/raw/green/2021/01/')

In [46]:
df_green.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [66]:
yellow_schema = types.StructType([
    types.StructField("VendorID", types.LongType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.DoubleType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("RatecodeID", types.DoubleType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.LongType(), True),
    types.StructField("DOLocationID", types.LongType(), True),
    types.StructField("payment_type", types.LongType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

In [67]:
df_yellow = spark.read \
    .option("header", "true") \
    .schema(yellow_schema) \
    .parquet('../data/raw/yellow/2021/01/')

In [69]:
df_yellow.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [78]:
year = 2020
for month in range(1,13):
    print(f'processing data for {year}/{month}')
    input_path = f'../data/raw/green/{year}/{month:02d}/'
    output_path = f'../data/part/green/{year}/{month:02d}/'
    df_green_part = spark.read \
        .option("header", "true") \
        .schema(green_schema) \
        .parquet(input_path)
    df_green_part.repartition(4).write.mode("overwrite").parquet(output_path)

processing data for 2020/1


                                                                                

processing data for 2020/2


                                                                                

processing data for 2020/3
processing data for 2020/4
processing data for 2020/5
processing data for 2020/6
processing data for 2020/7
processing data for 2020/8
processing data for 2020/9
processing data for 2020/10
processing data for 2020/11
processing data for 2020/12


In [81]:
year = 2021
for month in range(1,13):
    print(f'processing data for {year}/{month}')
    input_path = f'../data/raw/yellow/{year}/{month:02d}/'
    output_path = f'../data/part/yellow/{year}/{month:02d}/'
    df_green_part = spark.read \
        .option("header", "true") \
        .schema(yellow_schema) \
        .parquet(input_path)
    df_green_part.repartition(4).write.mode("overwrite").parquet(output_path)

processing data for 2021/1


                                                                                

processing data for 2021/2


                                                                                

processing data for 2021/3


                                                                                

processing data for 2021/4


                                                                                

processing data for 2021/5


                                                                                

processing data for 2021/6


                                                                                

processing data for 2021/7


                                                                                

processing data for 2021/8


                                                                                

processing data for 2021/9


                                                                                

processing data for 2021/10


                                                                                

processing data for 2021/11


                                                                                

processing data for 2021/12


                                                                                