In [57]:
import pyspark
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
import pyarrow as pq

In [30]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()


In [31]:
import pandas as pd
from pyspark.sql import types

In [117]:
green_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("lpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("lpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("ehail_fee", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("trip_type", types.IntegerType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True),
    types.StructField("airport_fee", types.DoubleType(), True)
])

yellow_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True),
    types.StructField("airport_fee", types.DoubleType(), True)
])

In [71]:
df_pandas = pd.read_parquet('yellow_tripdata_2021-01.parquet')

In [72]:
df_spark = spark.read \
        .option("header", "true") \
        .parquet('yellow_tripdata_2021-01.parquet')

In [73]:
df_pandas.head(5)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,


In [74]:
df_spark.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [75]:
df_spark = df_spark \
    .withColumn('VendorID',F.col('VendorID').cast('integer')) \
    .withColumn('PULocationID',F.col('PULocationID').cast('integer')) \
    .withColumn('DOLocationID',F.col('DOLocationID').cast('integer'))


In [76]:
df_spark.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [66]:
df_spark.repartition(4) \
        .write.parquet(output_path, mode='overwrite')

In [21]:
# # save the parquet file as a csv
# df_pandas.to_csv('yellow_tripdata_2021-01.csv')

In [16]:
# df_g_pd = pd.read_csv('data/raw/green/2021/01/green_tripdata_2020_01.csv.gz', nrows=1000)

In [None]:
# year = 2020

# for month in range(1, 13):
#     print(f'processing data for {year}/{month:02d}')

#     input_path = f'data/raw/green/{year}/{month:02d}/'
#     output_path = f'data/csv/green/{year}/{month:02d}/'
    
#     print('attempting to read the parquet partition')

#     df_ = read_parquet(input_path)

# #     df_green.printSchema()

#     df_green \
#         .repartition(4) \
#         .write.parquet(output_path, mode='overwrite')

In [152]:
year = 2020

for month in range(1, 13):
    print(f'processing data for {year}/{month:02d}')

    input_path = f'data/raw/green/{year}/{month:02d}/'
    output_path = f'data/pq/green/{year}/{month:02d}/'
    
    print('attempting to read the parquet partition')

    df_green = spark.read \
        .option("header", "true") \
        .parquet(input_path)
    
    print('converting columns to the right datatype')

    df_green = df_green \
        .withColumn('VendorID',F.col('VendorID').cast('integer')) \
        .withColumn('PULocationID',F.col('PULocationID').cast('integer')) \
        .withColumn('DOLocationID',F.col('DOLocationID').cast('integer')) \
        .withColumn('passenger_count',F.col('passenger_count').cast('integer')) \
        .withColumn('RatecodeID',F.col('RatecodeID').cast('integer')) \
        .withColumn('payment_type',F.col('payment_type').cast('integer')) \
        .withColumn('trip_type',F.col('trip_type').cast('integer')) \
        .withColumn('ehail_fee',F.col('ehail_fee').cast('double'))

    print('attempting to write to the parquet partition')

    df_green \
        .repartition(4) \
        .write.parquet(output_path, mode='overwrite')


processing data for 2020/01
attempting to read the parquet partition
converting columns to the right datatype
attempting to write to the parquet partition
processing data for 2020/02
attempting to read the parquet partition
converting columns to the right datatype
attempting to write to the parquet partition
processing data for 2020/03
attempting to read the parquet partition
converting columns to the right datatype
attempting to write to the parquet partition
processing data for 2020/04
attempting to read the parquet partition
converting columns to the right datatype
attempting to write to the parquet partition
processing data for 2020/05
attempting to read the parquet partition
converting columns to the right datatype
attempting to write to the parquet partition
processing data for 2020/06
attempting to read the parquet partition
converting columns to the right datatype
attempting to write to the parquet partition
processing data for 2020/07
attempting to read the parquet partition
c

In [150]:
year = 2020

for month in range(1, 13):
    print(f'processing data for {year}/{month:02d}')

    input_path = f'data/raw/yellow/{year}/{month:02d}/'
    output_path = f'data/pq/yellow/{year}/{month:02d}/'
    
    print('attempting to read the parquet partition')

    df_yellow = spark.read \
        .option("header", "true") \
        .parquet(input_path)
    
    print('converting columns to the right datatype')

    df_yellow.printSchema()

    df_yellow = df_yellow \
        .withColumn('VendorID',F.col('VendorID').cast('integer')) \
        .withColumn('PULocationID',F.col('PULocationID').cast('integer')) \
        .withColumn('DOLocationID',F.col('DOLocationID').cast('integer')) \
        .withColumn('passenger_count',F.col('passenger_count').cast('integer')) \
        .withColumn('RatecodeID',F.col('RatecodeID').cast('integer')) \
        .withColumn('payment_type',F.col('payment_type').cast('integer'))

    df_yellow.printSchema()

    df_yellow \
        .repartition(4) \
        .write.parquet(output_path, mode='overwrite')

processing data for 2020/01
attempting to read the parquet partition
converting columns to the right datatype
root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)

root
 |-- VendorID: integer

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)

processing data for 2020/06
attempting to read the parquet partition
converting columns to the right datatype
root
 |-- Ven

processing data for 2020/10
attempting to read the parquet partition
converting columns to the right datatype
root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)

root
 |-- VendorID: integer

In [148]:
#testing
df_new = spark.read.option("header", "true").parquet('data/raw/green/2021/01/')

In [149]:
#testing
df_new.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [142]:
#testing
df_green = df_green \
    .withColumn('VendorID',F.col('VendorID').cast('integer')) \
    .withColumn('PULocationID',F.col('PULocationID').cast('integer')) \
    .withColumn('DOLocationID',F.col('DOLocationID').cast('integer')) \
    .withColumn('passenger_count',F.col('passenger_count').cast('integer')) \
    .withColumn('RatecodeID',F.col('RatecodeID').cast('integer')) \
    .withColumn('payment_type',F.col('payment_type').cast('integer')) \
    .withColumn('trip_type',F.col('trip_type').cast('integer')) \
    .withColumn('ehail_fee',F.col('ehail_fee').cast('double'))


In [143]:
#testing
df_new \
        .repartition(4) \
        .write.parquet('data/pq/yellow/2021/01/new/', mode='overwrite')

In [144]:
#testing
df_new.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)

