In [1]:
import pyspark 
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
        .master('local[*]') \
        .appName('test')\
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/03/06 23:30:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# reading csv file in spark
df_green = spark.read \
     .option("header","true")\
     .csv("code/data/raw/green/2021/01/")

In [4]:
df_green.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2021-01-01 00:15:56|  2021-01-01 00:19:52|                 N|         1|          43|         151|              1|         1.01|        5.5|  0.5|    0.

In [5]:
df_green.printSchema() # we can see not has an schema

root
 |-- VendorID: string (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- trip_type: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)



In [6]:
import pandas as pd


df_green_pandas = pd.read_csv("code/data/raw/green/2021/01/green_tripdata_2021_01.csv.gz", nrows=1000)
df_green_pandas.dtypes

VendorID                   int64
lpep_pickup_datetime      object
lpep_dropoff_datetime     object
store_and_fwd_flag        object
RatecodeID                 int64
PULocationID               int64
DOLocationID               int64
passenger_count            int64
trip_distance            float64
fare_amount              float64
extra                    float64
mta_tax                  float64
tip_amount               float64
tolls_amount             float64
ehail_fee                float64
improvement_surcharge    float64
total_amount             float64
payment_type               int64
trip_type                  int64
congestion_surcharge     float64
dtype: object

In [9]:
df_green_pandas = df_green_pandas.convert_dtypes()

In [11]:
# generating error right now so doing explicitly
#spark.createDataFrame(df_green_pandas)

In [12]:
df_green.schema

StructType([StructField('VendorID', StringType(), True), StructField('lpep_pickup_datetime', StringType(), True), StructField('lpep_dropoff_datetime', StringType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('RatecodeID', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('passenger_count', StringType(), True), StructField('trip_distance', StringType(), True), StructField('fare_amount', StringType(), True), StructField('extra', StringType(), True), StructField('mta_tax', StringType(), True), StructField('tip_amount', StringType(), True), StructField('tolls_amount', StringType(), True), StructField('ehail_fee', StringType(), True), StructField('improvement_surcharge', StringType(), True), StructField('total_amount', StringType(), True), StructField('payment_type', StringType(), True), StructField('trip_type', StringType(), True), StructField('congestion_surcharge', StringType()

In [15]:
from pyspark.sql import types

In [22]:
green_schema = types.StructType([
        types.StructField('VendorID', types.IntegerType(), True), 
        types.StructField('lpep_pickup_datetime', types.TimestampType(), True), 
        types.StructField('lpep_dropoff_datetime', types.TimestampType(), True), 
        types.StructField('store_and_fwd_flag', types.StringType(), True), 
        types.StructField('RatecodeID', types.IntegerType(), True), 
        types.StructField('PULocationID', types.IntegerType(), True), 
        types.StructField('DOLocationID', types.IntegerType(), True), 
        types.StructField('passenger_count', types.IntegerType(), True), 
        types.StructField('trip_distance', types.DoubleType(), True), 
        types.StructField('fare_amount', types.DoubleType(), True), 
        types.StructField('extra', types.DoubleType(), True), 
        types.StructField('mta_tax', types.DoubleType(), True),
        types.StructField('tip_amount', types.DoubleType(), True), 
        types.StructField('tolls_amount', types.DoubleType(), True), 
        types.StructField('ehail_fee', types.DoubleType(), True), 
        types.StructField('improvement_surcharge', types.DoubleType(), True), 
        types.StructField('total_amount', types.DoubleType(), True), 
        types.StructField('payment_type', types.IntegerType(), True), 
        types.StructField('trip_type', types.IntegerType(), True), 
        types.StructField('congestion_surcharge', types.DoubleType(), True)
    ])

In [23]:
# reading csv file in spark
df_green = spark.read \
     .option("header","true")\
     .schema(green_schema)\
     .csv("code/data/raw/green/2021/01/")

In [24]:
df_green.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [30]:
yellow_schema = types.StructType([
        types.StructField('VendorID', types.IntegerType(), True), 
        types.StructField('tpep_pickup_datetime', types.TimestampType(), True), 
        types.StructField('tpep_dropoff_datetime', types.TimestampType(), True), 
        types.StructField('passenger_count', types.IntegerType(), True), 
        types.StructField('trip_distance', types.DoubleType(), True), 
        types.StructField('RatecodeID', types.IntegerType(), True), 
        types.StructField('store_and_fwd_flag', types.StringType(), True), 
        types.StructField('PULocationID', types.IntegerType(), True), 
        types.StructField('DOLocationID', types.IntegerType(), True), 
        types.StructField('payment_type', types.IntegerType(), True),
        types.StructField('fare_amount', types.DoubleType(), True), 
        types.StructField('extra', types.DoubleType(), True), 
        types.StructField('mta_tax', types.DoubleType(), True),
        types.StructField('tip_amount', types.DoubleType(), True), 
        types.StructField('tolls_amount', types.DoubleType(), True), 
        types.StructField('improvement_surcharge', types.DoubleType(), True), 
        types.StructField('total_amount', types.DoubleType(), True),
        types.StructField('congestion_surcharge', types.DoubleType(), True),       
    ])

In [31]:
df_yellow = spark.read \
     .option("header","true")\
     .schema(yellow_schema)\
     .csv("code/data/raw/yellow/2021/01/")

In [32]:
df_yellow.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [None]:
year = 2020

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'code/data/raw/green/{year}/{month:02d}/'
    output_path = f'code/data/pq/green/{year}/{month:02d}/'

    df_green = spark.read \
        .option("header", "true") \
        .schema(green_schema) \
        .csv(input_path)

    df_green \
        .repartition(4) \
        .write.parquet(output_path)


In [35]:


year = 2021

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'code/data/raw/green/{year}/{month:02d}/'
    output_path = f'code/data/pq/green/{year}/{month:02d}/'

    df_green = spark.read \
        .option("header", "true") \
        .schema(green_schema) \
        .csv(input_path)

    df_green \
        .repartition(4) \
        .write.parquet(output_path)



processing data for 2021/1
processing data for 2021/2
processing data for 2021/3
processing data for 2021/4
processing data for 2021/5
processing data for 2021/6
processing data for 2021/7


[Stage 56:>                                                         (0 + 1) / 1]

processing data for 2021/8


                                                                                

AnalysisException: Path does not exist: file:/home/ramisu/ramiro-data-engineering-zoomcamp/5_batch/code/data/raw/green/2021/08

In [36]:
year = 2020

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'code/data/raw/yellow/{year}/{month:02d}/'
    output_path = f'code/data/pq/yellow/{year}/{month:02d}/'

    df_yellow = spark.read \
        .option("header", "true") \
        .schema(yellow_schema) \
        .csv(input_path)

    df_yellow \
        .repartition(4) \
        .write.parquet(output_path)

processing data for 2020/1


                                                                                

processing data for 2020/2


                                                                                

processing data for 2020/3


                                                                                

processing data for 2020/4


                                                                                

processing data for 2020/5


                                                                                

processing data for 2020/6


                                                                                

processing data for 2020/7


                                                                                

processing data for 2020/8


                                                                                

processing data for 2020/9


                                                                                

processing data for 2020/10


                                                                                

processing data for 2020/11


                                                                                

processing data for 2020/12


                                                                                

In [None]:
year = 2021

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'code/data/raw/yellow/{year}/{month:02d}/'
    output_path = f'code/data/pq/yellow/{year}/{month:02d}/'

    df_yellow = spark.read \
        .option("header", "true") \
        .schema(yellow_schema) \
        .csv(input_path)

    df_yellow \
        .repartition(4) \
        .write.parquet(output_path)
