In [14]:
import pyspark
import pandas as pd

from pyspark.sql import SparkSession, types
from pyspark.sql.functions import format_number

In [15]:
spark = SparkSession.builder.master("local[*]").appName('test').getOrCreate()

In [16]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

In [17]:
green_schema =  types.StructType([
                    types.StructField('VendorID', types.LongType(), nullable=True),
                    types.StructField('lpep_pickup_datetime', types.TimestampType(), nullable=True),
                    types.StructField('lpep_dropoff_datetime', types.TimestampType(), nullable=True),
                    types.StructField('store_and_fwd_flag', types.StringType(), nullable=True),
                    types.StructField('RatecodeID', types.DoubleType(), nullable=True),
                    types.StructField('PULocationID', types.LongType(), nullable=True),
                    types.StructField('DOLocationID', types.LongType(), nullable=True),
                    types.StructField('passenger_count', types.DoubleType(), nullable=True),
                    types.StructField('trip_distance', types.DoubleType(), nullable=True),
                    types.StructField('fare_amount', types.DoubleType(), nullable=True),
                    types.StructField('extra', types.DoubleType(), nullable=True),
                    types.StructField('mta_tax', types.DoubleType(), nullable=True),
                    types.StructField('tip_amount', types.DoubleType(), nullable=True),
                    types.StructField('tolls_amount', types.DoubleType(), nullable=True),
                    types.StructField('ehail_fee', types.IntegerType(), nullable=True),
                    types.StructField('improvement_surcharge', types.DoubleType(), nullable=True),
                    types.StructField('total_amount', types.DoubleType(), nullable=True),
                    types.StructField('payment_type', types.DoubleType(), nullable=True),
                    types.StructField('trip_type', types.DoubleType(), nullable=True),
                    types.StructField('congestion_surcharge', types.DoubleType(), nullable=True)])

In [18]:
yellow_schema = types.StructType([
                    types.StructField('VendorID', types.LongType(), True), 
                    types.StructField('tpep_pickup_datetime', types.TimestampType(), True), 
                    types.StructField('tpep_dropoff_datetime', types.TimestampType(), True), 
                    types.StructField('passenger_count', types.DoubleType(), True), 
                    types.StructField('trip_distance', types.DoubleType(), True), 
                    types.StructField('RatecodeID', types.DoubleType(), True), 
                    types.StructField('store_and_fwd_flag', types.StringType(), True), 
                    types.StructField('PULocationID', types.LongType(), True), 
                    types.StructField('DOLocationID', types.LongType(), True), 
                    types.StructField('payment_type', types.LongType(), True), 
                    types.StructField('fare_amount', types.DoubleType(), True), 
                    types.StructField('extra', types.DoubleType(), True), 
                    types.StructField('mta_tax', types.DoubleType(), True), 
                    types.StructField('tip_amount', types.DoubleType(), True), 
                    types.StructField('tolls_amount', types.DoubleType(), True), 
                    types.StructField('improvement_surcharge', types.DoubleType(), True), 
                    types.StructField('total_amount', types.DoubleType(), True), 
                    types.StructField('congestion_surcharge', types.DoubleType(), True), 
                    types.StructField('airport_fee', types.DoubleType(), True)])

In [19]:
year = 2020

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'data/raw/green/{year}/{month:02d}/'
    output_path = f'data/pq/green/{year}/{month:02d}/'
  
    df_green = spark.read.format('parquet').schema(green_schema).load(input_path)

    df_green.repartition(4).write.parquet(output_path)

processing data for 2020/1


AnalysisException: path file:/home/tania/git/DataEngineeringZoomCamp/week_5_batch_processing/code/data/pq/green/2020/01 already exists.

In [None]:
year = 2021 

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'data/raw/green/{year}/{month:02d}/'
    output_path = f'data/pq/green/{year}/{month:02d}/'

    df_green = spark.read.format('parquet').schema(green_schema).load(input_path)

    df_green.repartition(4).write.parquet(output_path)

In [None]:
year = 2020

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'data/raw/yellow/{year}/{month:02d}/'
    output_path = f'data/pq/yellow/{year}/{month:02d}/'
    
    if month == 8:
        yellow_df = spark.read.format("parquet") \
                        .option("mergeSchema", "true") \
                        .schema(yellow_schema) \
                        .load(input_path)
        
    else:
        yellow_df = spark.read.parquet(input_path)
        yellow_df = yellow_df.withColumn('airport_fee',  yellow_df["airport_fee"].cast(types.DoubleType()))
        yellow_df = yellow_df.withColumn("airport_fee", format_number(yellow_df['airport_fee'], 2))   

    yellow_df.repartition(4).write.parquet(output_path)

In [None]:
year = 2021

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'data/raw/yellow/{year}/{month:02d}/'
    output_path = f'data/pq/yellow/{year}/{month:02d}/'
    
    if month == 8:
        yellow_df = spark.read.format("parquet") \
                        .option("mergeSchema", "true") \
                        .schema(yellow_schema) \
                        .load(input_path)
        
    else:
        yellow_df = spark.read.parquet(input_path)
        yellow_df = yellow_df.withColumn('airport_fee',  yellow_df["airport_fee"].cast(types.DoubleType()))
        yellow_df = yellow_df.withColumn("airport_fee", format_number(yellow_df['airport_fee'], 2))   

    yellow_df.repartition(4).write.parquet(output_path)