In [76]:
import pyspark
from pyspark.sql import SparkSession

In [77]:
spark = SparkSession.builder\
                    .master("local[*]")\
                    .appName("SparkSQL")\
                    .getOrCreate()

In [78]:
df_green = spark.read.parquet("data/raw/green/*/*/*")

In [79]:
df_green.show(5)

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2019-12-18 15:52:30|  2019-12-18 15:54:39|                 N|       1.0|         264|         264|            5.0|          0.0|        3.5|  0.5|    0.

In [80]:
df_green.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- lpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [81]:
df_green.count(), len(df_green.columns)

(2802931, 20)

In [82]:
df_green.rdd.getNumPartitions()

4

In [83]:
df_yellow = spark.read.parquet("data/raw/yellow/*/*/*")

In [84]:
df_yellow.schema

StructType([StructField('VendorID', LongType(), True), StructField('tpep_pickup_datetime', TimestampNTZType(), True), StructField('tpep_dropoff_datetime', TimestampNTZType(), True), StructField('passenger_count', DoubleType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', DoubleType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('airport_fee', IntegerType(), True)])

In [85]:
df_yellow.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)



In [86]:
df_yellow.count(), len(df_green.columns)

(55553400, 20)

Schemas are alittle different, so we will need to do some data cleaning.

In [87]:
set(df_yellow.columns) & set(df_green.columns)

{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}

In [88]:
len(set(df_yellow.columns) & set(df_green.columns))

16

Only 16 columns are same in both datasets

{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}

In [89]:
set(df_yellow.columns).symmetric_difference(df_green.columns)

{'airport_fee',
 'ehail_fee',
 'lpep_dropoff_datetime',
 'lpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'tpep_pickup_datetime',
 'trip_type'}

In [90]:
# Define the set of column names
column_names = {'airport_fee', 'ehail_fee', 'lpep_dropoff_datetime', 'lpep_pickup_datetime', 'tpep_dropoff_datetime', 'tpep_pickup_datetime', 'trip_type'}

# Check which dataframe each column comes from
for column in column_names:
    if column in df_yellow.columns:
        print(f"{column} -> df_yellow")
    if column in df_green.columns:
        print(f"{column} -> df_green")

trip_type -> df_green
airport_fee -> df_yellow
tpep_dropoff_datetime -> df_yellow
tpep_pickup_datetime -> df_yellow
ehail_fee -> df_green
lpep_dropoff_datetime -> df_green
lpep_pickup_datetime -> df_green


In [91]:
df_green = df_green.withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")\
        .withColumnRenamed("lpep_pickup_datetime", "pickup_datetime")

In [92]:
df_yellow = df_yellow.withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")\
        .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime")

In [93]:
column_names = set(df_yellow.columns).symmetric_difference(df_green.columns)
# Check which dataframe each column comes from
for column in column_names:
    if column in df_yellow.columns:
        print(f"{column} -> df_yellow")
    if column in df_green.columns:
        print(f"{column} -> df_green")

airport_fee -> df_yellow
ehail_fee -> df_green
trip_type -> df_green


In [94]:
# Common columns in both datasets
set(df_yellow.columns) & set(df_green.columns)

{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'dropoff_datetime',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'pickup_datetime',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}

In [95]:
from pyspark.sql.functions import lit
df_green = df_green.withColumn("source", lit("green"))
df_yellow = df_yellow.withColumn("source", lit("yellow"))

In [96]:
df_green.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- source: string (nullable = false)



In [97]:
df_yellow.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)
 |-- source: string (nullable = false)



In [98]:
common_columns = set(df_yellow.columns) & set(df_green.columns)
df_yellow_common = df_yellow.select(*common_columns)

In [99]:
common_columns

{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'dropoff_datetime',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'pickup_datetime',
 'source',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}

In [100]:
df_green_common = df_green.select(*common_columns)

In [101]:
df_green_common.columns

['RatecodeID',
 'fare_amount',
 'VendorID',
 'pickup_datetime',
 'tolls_amount',
 'congestion_surcharge',
 'payment_type',
 'source',
 'improvement_surcharge',
 'DOLocationID',
 'extra',
 'dropoff_datetime',
 'trip_distance',
 'PULocationID',
 'passenger_count',
 'mta_tax',
 'total_amount',
 'store_and_fwd_flag',
 'tip_amount']

In [102]:
df_trips = df_green_common.unionAll(df_yellow_common)

In [103]:
df_trips.show(10)

+----------+-----------+--------+-------------------+------------+--------------------+------------+------+---------------------+------------+-----+-------------------+-------------+------------+---------------+-------+------------+------------------+----------+
|RatecodeID|fare_amount|VendorID|    pickup_datetime|tolls_amount|congestion_surcharge|payment_type|source|improvement_surcharge|DOLocationID|extra|   dropoff_datetime|trip_distance|PULocationID|passenger_count|mta_tax|total_amount|store_and_fwd_flag|tip_amount|
+----------+-----------+--------+-------------------+------------+--------------------+------------+------+---------------------+------------+-----+-------------------+-------------+------------+---------------+-------+------------+------------------+----------+
|       1.0|        3.5|       2|2019-12-18 15:52:30|         0.0|                 0.0|         1.0| green|                  0.3|         264|  0.5|2019-12-18 15:54:39|          0.0|         264|            5.0|

In [104]:
df_trips.select("source").distinct().show()



+------+
|source|
+------+
| green|
|yellow|
+------+



                                                                                

In [106]:
df_trips.groupBy("source").count().show()



+------+--------+
|source|   count|
+------+--------+
| green| 2802931|
|yellow|55553400|
+------+--------+



                                                                                

In [109]:
df_trips.createOrReplaceTempView("trips")

In [113]:
spark.sql("""
        SELECT source, COUNT(*) AS count
        FROM trips
        GROUP BY source
""").show()



+------+--------+
|source|   count|
+------+--------+
| green| 2802931|
|yellow|55553400|
+------+--------+



                                                                                

In [130]:
with open("vendor_monthly_revenue.sql", 'r') as sql_file:
    monthly_revenue = sql_file.read()
monthly_revenue

"SELECT\n    -- Reveneue grouping\n    DATE_TRUNC('month', pickup_datetime) AS revenue_month,\n    VendorID as vendor_id,\n    --Note: For BQ use instead: DATE_TRUNC(pickup_datetime, month) AS revenue_month,\n    source,\n\n    -- Revenue calculation\n    SUM(mta_tax) AS revenue_monthly_mta_tax,\n    SUM(tip_amount) AS revenue_monthly_tip_amount,\n    SUM(tolls_amount) AS revenue_monthly_tolls_amount,\n    SUM(extra) AS revenue_monthly_extra,\n    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,\n    SUM(total_amount) AS revenue_monthly_total_amount,\n    SUM(fare_amount) AS revenue_monthly_fare,\n    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,\n\n    -- Additional calculations\n    AVG(passenger_count) AS avg_montly_passenger_count,\n    AVG(trip_distance) AS avg_montly_trip_distance\n\n    FROM trips\n    GROUP BY 1,2,3"

In [135]:
result = spark.sql(monthly_revenue)

In [137]:
result.write.parquet("data/report/vendor_monthly_revenue/")

                                                                                

In [138]:
result.coalesce(1).write.parquet("data/report/coalesce_vendor_monthly_revenue/")

                                                                                