In [10]:
import pyspark
from pyspark.sql import functions as F

In [3]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import types

spark = SparkSession.builder \
    .master("local[4]") \
    .appName('spark sql') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/09/03 12:12:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/09/03 12:12:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
df_green = spark.read.parquet('data/pq/green/*/*')

                                                                                

In [6]:
df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

In [7]:
df_yellow = spark.read.parquet('data/pq/yellow/*/*')

In [8]:
df_yellow = df_yellow \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [9]:
common_colums = []

yellow_columns = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_columns:
        common_colums.append(col)

In [11]:
df_green_sel = df_green \
    .select(common_colums) \
    .withColumn('service_type', F.lit('green'))

In [12]:
df_yellow_sel = df_yellow \
    .select(common_colums) \
    .withColumn('service_type', F.lit('yellow'))

In [13]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [14]:
df_trips_data.groupBy('service_type').count().show()



+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 1734051|
|      yellow|24648499|
+------------+--------+



                                                                                

In [16]:
df_trips_data.createOrReplaceTempView('trips_data')

In [17]:
# Aggregation based on zone and month
df_result = spark.sql("""
SELECT 
    -- Reveneue grouping 
    PULocationID AS revenue_zone,
    date_trunc('month', pickup_datetime) AS revenue_month, 
    service_type, 

    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_montly_passenger_count,
    AVG(trip_distance) AS avg_montly_trip_distance
FROM
    trips_data
GROUP BY
    1, 2, 3
""")

In [19]:
df_result.show(vertical=True)



-RECORD 0----------------------------------------------------
 revenue_zone                          | 250                 
 revenue_month                         | 2020-02-01 00:00:00 
 service_type                          | green               
 revenue_monthly_fare                  | 15359.960000000061  
 revenue_monthly_extra                 | 1282.5              
 revenue_monthly_mta_tax               | 117.5               
 revenue_monthly_tip_amount            | 56.01               
 revenue_monthly_tolls_amount          | 590.3200000000003   
 revenue_monthly_improvement_surcharge | 180.0000000000011   
 revenue_monthly_total_amount          | 17598.43999999995   
 revenue_monthly_congestion_surcharge  | 11.0                
 avg_montly_passenger_count            | 1.2394957983193278  
 avg_montly_trip_distance              | 4.962810650887575   
-RECORD 1----------------------------------------------------
 revenue_zone                          | 158                 
 revenue

                                                                                

In [20]:
# Combine partitions into a single parquet file
df_result.coalesce(1).write.parquet('data/report/revenue/', mode='overwrite')

                                                                                