In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("spark://de-zoomcamp.us-central1-c.c.blissful-flames-375219.internal:7077") \
    .appName('test') \
    .getOrCreate()
    #.master("spark://localhost:7077") \

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/03/01 17:33:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

In [4]:
import os

# Setting the working directory
working_dir = os.getcwd()
parent_working_dir = os.path.dirname(working_dir) 
data_dir = os.path.join(parent_working_dir, 'data')
print(f'Current Directory: {working_dir}')
print(f'Data Directory: {data_dir}')

# Moving to the data directory
os.chdir(data_dir)
print(f'Changing directory to Data directory: {os.getcwd()}')

Current Directory: /home/sanyashireen/week_5_batch_processing/code
Data Directory: /home/sanyashireen/week_5_batch_processing/data
Changing directory to Data directory: /home/sanyashireen/week_5_batch_processing/data


In [5]:
# Adding all green taxi data for 2020 and 2021 to one PySpark DF
# Path: Stop at the root folder that contains all the parts of one file
# Eg: The root folder data/green/2021/01 contains all the partitions for the month on January which is one file
df_green = spark.read.parquet(f'{data_dir}/pq/green/*/*')

                                                                                

In [6]:
df_green.show(5)

[Stage 1:>                                                          (0 + 1) / 1]

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2020-01-23 13:10:15|  2020-01-23 13:38:16|                 N|         1|          74|         130|              1|        12.77|       36.0|  0.0|    0.

                                                                                

In [7]:
df_green.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [8]:
# Adding all yellow taxi data for 2020 and 2021 to one PySpark DF

df_yellow = spark.read.parquet(f'{data_dir}/pq/yellow/*/*')

In [9]:
df_yellow.show(5)
df_yellow.printSchema()

[Stage 3:>                                                          (0 + 1) / 1]                                                                                

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       2| 2020-01-06 09:18:38|  2020-01-06 09:33:56|              1|         3.03|         1|                 N|         263|         233|           1|       12.0|  0.0|    0.5|       1.5|         0.0|                  0.3

In [10]:
# Viewing the column names as a list
print(f'Columns of green taxi_data:\n {df_green.columns}')
print(f'\n\nColumns of yellow taxi_data:\n {df_yellow.columns}')

Columns of green taxi_data:
 ['VendorID', 'lpep_pickup_datetime', 'lpep_dropoff_datetime', 'store_and_fwd_flag', 'RatecodeID', 'PULocationID', 'DOLocationID', 'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'ehail_fee', 'improvement_surcharge', 'total_amount', 'payment_type', 'trip_type', 'congestion_surcharge']


Columns of yellow taxi_data:
 ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge']


In [12]:
# The pickup_datetime column is missing so will rename those columns in both the datasets
df_green = df_green \
                    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
                    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')
df_yellow = df_yellow \
                    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
                    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')
common_columns = set(df_green.columns) & set(df_yellow.columns)
print(common_columns)        

{'congestion_surcharge', 'trip_distance', 'dropoff_datetime', 'passenger_count', 'extra', 'PULocationID', 'improvement_surcharge', 'mta_tax', 'VendorID', 'total_amount', 'RatecodeID', 'tip_amount', 'pickup_datetime', 'store_and_fwd_flag', 'tolls_amount', 'payment_type', 'fare_amount', 'DOLocationID'}


In [13]:
#we will check if the column in green trip data is available in yellow trip data and then append it to a list
common_columns = [x for x in df_green.columns if x in set(df_yellow.columns)]
print(common_columns)

['VendorID', 'pickup_datetime', 'dropoff_datetime', 'store_and_fwd_flag', 'RatecodeID', 'PULocationID', 'DOLocationID', 'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'payment_type', 'congestion_surcharge']


In [18]:
# saving the common columns for future use to specify the schema
common_columns = [
     'VendorID',
     'pickup_datetime',
     'dropoff_datetime',
     'store_and_fwd_flag',
     'RatecodeID',
     'PULocationID',
     'DOLocationID',
     'passenger_count',
     'trip_distance',
     'fare_amount',
     'extra',
     'mta_tax',
     'tip_amount',
     'tolls_amount',
     'improvement_surcharge',
     'total_amount',
     'payment_type',
     'congestion_surcharge']

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge']

In [14]:
from pyspark.sql import functions as F

In [15]:
# Select these common columns from the datset and assign to a new dataset
# select() function is similar to the select clause in SQL
df_green_sel = df_green \
                        .select(common_columns)\
                        .withColumn('service_type', F.lit('green'))
# Select these common columns from the datset
# select() function is similar to the select clause in SQL
df_yellow_sel = df_yellow \
                        .select(common_columns)\
                        .withColumn('service_type', F.lit('yellow'))

In [16]:
# Combine the two datasets
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [17]:
# Count the number of rows for each service type
# count() function is a lazy function, hence we use the show() function to execute it
df_trips_data.groupBy('service_type').count().show()



+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2304517|
|      yellow|39649199|
+------------+--------+



                                                                                

In [19]:
df_trips_data.registerTempTable('trips_data')



In [20]:
df_result = spark.sql("""
SELECT 
    -- Reveneue grouping 
    PULocationID as revenue_zone,
    date_trunc('month', pickup_datetime) as revenue_month, 
    service_type, 
    -- Revenue calculation 
    SUM(fare_amount) as revenue_monthly_fare,
    SUM(extra) as revenue_monthly_extra,
    SUM(mta_tax) as revenue_monthly_mta_tax,
    SUM(tip_amount) as revenue_monthly_tip_amount,
    SUM(tolls_amount) as revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) as revenue_monthly_improvement_surcharge,
    SUM(total_amount) as revenue_monthly_total_amount,
    SUM(congestion_surcharge) as revenue_monthly_congestion_surcharge,

    -- Additional calculations
    avg(passenger_count) as avg_montly_passenger_count,
    avg(trip_distance) as avg_montly_trip_distance

FROM 
    trips_data
GROUP BY 
    1,2,3;
""")

In [33]:
# writing the data to the local machine
# to set the partitioning to 215
df_result = df_result.repartition(215)

In [35]:
# to change the partitioning to 1
df_result.coalesce(1).write.parquet(f'{data_dir}/report/revenue/', mode='overwrite')

                                                                                