In [28]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("spark://dengine:7077") \
    .appName('test') \
    .getOrCreate()

In [29]:
spark

In [30]:
df_green = spark.read.parquet('data/pq/green/*/*')

In [31]:
df_yellow = spark.read.parquet('data/pq/yellow/*/*')

In [32]:
df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

In [33]:
df_yellow = df_yellow \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [34]:
common_columns = [
    'VendorID',
    'pickup_datetime',
    'dropoff_datetime',
    'store_and_fwd_flag',
    'RatecodeID',
    'PULocationID',
    'DOLocationID',
    'passenger_count',
    'trip_distance',
    'fare_amount',
    'extra',
    'mta_tax',
    'tip_amount',
    'tolls_amount',
    'improvement_surcharge',
    'total_amount',
    'payment_type',
    'congestion_surcharge'
]

In [35]:
from pyspark.sql import functions as F

In [36]:
df_green_sel = df_green \
    .select(common_columns) \
    .withColumn('service_type', F.lit('green'))

In [37]:
df_yellow_sel = df_yellow \
    .select(common_columns) \
    .withColumn('service_type', F.lit('yellow'))

In [38]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [39]:
df_trips_data.registerTempTable('trips_data')

In [40]:
df_result = spark.sql("""
SELECT 
   -- Reveneue grouping 
   PULocationID AS revenue_zone,
   date_trunc('month', pickup_datetime) AS revenue_month, 
   service_type, 
   
   -- Revenue calculation 
   SUM(fare_amount) AS revenue_monthly_fare,
   SUM(extra) AS revenue_monthly_extra,
   SUM(mta_tax) AS revenue_monthly_mta_tax,
   SUM(tip_amount) AS revenue_monthly_tip_amount,
   SUM(tolls_amount) AS revenue_monthly_tolls_amount,
   SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
   SUM(total_amount) AS revenue_monthly_total_amount,
   SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,
   
   -- Additional calculations
   AVG(passenger_count) AS avg_montly_passenger_count,
   AVG(trip_distance) AS avg_montly_trip_distance

FROM trips_data
GROUP BY 
   1, 2, 3

""")

In [None]:
df_result.coaslesce(1).write.parquet('data/report/revenue/', mode='overwrite')