In [46]:
from pyspark.sql import SparkSession
import pyspark

In [47]:
from pyspark.sql import functions as F

In [48]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('zoomcamp') \
    .config('spark.executor.memory', '8g') \
    .getOrCreate()
spark

In [49]:
df_green = spark.read.parquet('../../dataset/pq/green/*/*')
df_green = df_green\
        .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime')\
        .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')\
        .withColumn('service_type', F.lit('green'))
df_green.show()

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+------------+
|VendorID|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|service_type|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+------------+
|       2|2020-01-23 13:10:15|2020-01-23 13:38:16|                 N|         1|          74|         130|              1|        12.

In [50]:
df_yellow = spark.read.parquet('../../dataset/pq/yellow/*/*')
df_yellow = df_yellow\
        .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime')\
        .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')\
        .withColumn('service_type', F.lit('yellow'))
df_yellow.show()

+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+------------+
|VendorID|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|service_type|
+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+------------+
|       2|2020-01-06 09:18:38|2020-01-06 09:33:56|              1|         3.03|         1|                 N|         263|         233|           1|       12.0|  0.0|    0.5|       1.5|       

In [51]:
common_columns = []
for col in df_green.columns:
    if col in df_yellow.columns:
        common_columns.append(col)
common_columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge',
 'service_type']

In [53]:
df_trips_data = df_green.select(common_columns).unionAll(df_yellow.select(common_columns))
df_trips_data.show()

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|VendorID|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|congestion_surcharge|service_type|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|       2|2020-01-23 13:10:15|2020-01-23 13:38:16|                 N|         1|          74|         130|              1|        12.77|       36.0|  0.0|    0.5|      2.05|        6.12|       

In [54]:
df_trips_data.groupBy('service_type').count().show()



+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2227999|
|      yellow|38279434|
+------------+--------+



                                                                                

In [62]:
df_trips_data.registerTempTable('trips_data')

df_result = spark.sql("""
SELECT 
    -- Reveneue grouping 
    PULocationID AS revenue_zone,
    date_trunc('month', pickup_datetime) AS revenue_month, 
    service_type, 

    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_montly_passenger_count,
    AVG(trip_distance) AS avg_montly_trip_distance
FROM
    trips_data
GROUP BY
    1, 2, 3
""")



In [64]:
df_result.coalesce(1).write.parquet('../../dataset/report/revenue/', mode='overwrite')

                                                                                