In [30]:
import pyspark
from pyspark.sql import SparkSession

In [33]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("test") \
    .getOrCreate()

In [34]:
df_green = spark.read \
    .option("header", "true") \
    .csv("data/raw/green/2021/01/green_tripdata_2021_01.csv.gz")

In [36]:
df_green.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2021-01-01 00:15:56|  2021-01-01 00:19:52|                 N|         1|          43|         151|              1|         1.01|        5.5|  0.5|    0.

In [24]:
import pandas as pd

In [46]:
df_green_pd = pd.read_csv("data/raw/green/2021/01/green_tripdata_2021_01.csv.gz", nrows=1000)

In [48]:
df_yellow_pd = pd.read_csv("data/raw/yellow/2021/01/yellow_tripdata_2021_01.csv.gz", nrows=1000)

In [47]:
spark.createDataFrame(df_green_pd).schema

StructType([StructField('VendorID', LongType(), True), StructField('lpep_pickup_datetime', StringType(), True), StructField('lpep_dropoff_datetime', StringType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('RatecodeID', LongType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('ehail_fee', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('payment_type', LongType(), True), StructField('trip_type', LongType(), True), StructField('congestion_surcharge', DoubleType(), True)])

In [49]:
spark.createDataFrame(df_yellow_pd).schema

StructType([StructField('VendorID', LongType(), True), StructField('tpep_pickup_datetime', StringType(), True), StructField('tpep_dropoff_datetime', StringType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True)])

In [41]:
from pyspark.sql import types

In [43]:
green_schema = types.StructType([
types.StructField('VendorID', types.IntegerType(), True), 
types.StructField('lpep_pickup_datetime', types.TimestampType(), True), 
types.StructField('lpep_dropoff_datetime', types.TimestampType(), True), 
types.StructField('store_and_fwd_flag', types.StringType(), True), 
types.StructField('RatecodeID', types.IntegerType(), True), 
types.StructField('PULocationID', types.IntegerType(), True), 
types.StructField('DOLocationID', types.IntegerType(), True), 
types.StructField('passenger_count', types.IntegerType(), True), 
types.StructField('trip_distance', types.DoubleType(), True), 
types.StructField('fare_amount', types.DoubleType(), True), 
types.StructField('extra', types.DoubleType(), True), 
types.StructField('mta_tax', types.DoubleType(), True), 
types.StructField('tip_amount', types.DoubleType(), True), 
types.StructField('tolls_amount', types.DoubleType(), True), 
types.StructField('ehail_fee', types.DoubleType(), True), 
types.StructField('improvement_surcharge', types.DoubleType(), True), 
types.StructField('total_amount', types.DoubleType(), True), 
types.StructField('payment_type', types.IntegerType(), True), 
types.StructField('trip_type', types.IntegerType(), True), 
types.StructField('congestion_surcharge', types.DoubleType(), True)])

In [50]:
yellow_schema = types.StructType([
types.StructField('VendorID', types.IntegerType(), True), 
types.StructField('tpep_pickup_datetime', types.TimestampType(), True), 
types.StructField('tpep_dropoff_datetime', types.TimestampType(), True), 
types.StructField('passenger_count', types.IntegerType(), True), 
types.StructField('trip_distance', types.DoubleType(), True), 
types.StructField('RatecodeID', types.IntegerType(), True), 
types.StructField('store_and_fwd_flag', types.StringType(), True), 
types.StructField('PULocationID', types.IntegerType(), True), 
types.StructField('DOLocationID', types.IntegerType(), True), 
types.StructField('payment_type', types.IntegerType(), True), 
types.StructField('fare_amount', types.DoubleType(), True), 
types.StructField('extra', types.DoubleType(), True), 
types.StructField('mta_tax', types.DoubleType(), True), 
types.StructField('tip_amount', types.DoubleType(), True), 
types.StructField('tolls_amount', types.DoubleType(), True), 
types.StructField('improvement_surcharge', types.DoubleType(), True), 
types.StructField('total_amount', types.DoubleType(), True), 
types.StructField('congestion_surcharge', types.DoubleType(), True)])

In [58]:
year = 2021
for month in range(1, 13):
    print(f"processing data for {year}/{month}")
    input_path = f"data/raw/green/{year}/{month:02d}/"
    output_path = f"data/pq/green/{year}/{month:02d}/"
    df_green = spark.read \
        .option("header", "true") \
        .schema(green_schema) \
        .csv(input_path)

    df_green \
        .repartition(4) \
        .write.parquet(output_path)

processing data for 2021/1
processing data for 2021/2
processing data for 2021/3
processing data for 2021/4
processing data for 2021/5
processing data for 2021/6
processing data for 2021/7
processing data for 2021/8


AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/truongvude/myprojects/dezoomcamp/week_5_batch/code/data/raw/green/2021/08.

In [60]:
year = 2021
for month in range(1, 13):
    print(f"processing data for {year}/{month}")
    input_path = f"data/raw/yellow/{year}/{month:02d}/"
    output_path = f"data/pq/yellow/{year}/{month:02d}/"
    df_yellow = spark.read \
        .option("header", "true") \
        .schema(yellow_schema) \
        .csv(input_path)

    df_yellow \
        .repartition(4) \
        .write.parquet(output_path)

processing data for 2021/1


                                                                                

processing data for 2021/2


                                                                                

processing data for 2021/3


                                                                                

processing data for 2021/4


                                                                                

processing data for 2021/5


                                                                                

processing data for 2021/6


                                                                                

processing data for 2021/7


[Stage 127:>                                                        (0 + 4) / 4]

processing data for 2021/8


                                                                                

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/truongvude/myprojects/dezoomcamp/week_5_batch/code/data/raw/yellow/2021/08.