In [1]:
import pyspark
from pyspark.sql import SparkSession

ModuleNotFoundError: No module named 'pyspark'

In [None]:
#create spark session
spark = SparkSession.builder\
.master('loacal[*]')\
.appName('ny_taxi')\
.getOrCreate()

In [None]:
#read grren and yellow dataset into daatframe
df_green = spark.read.parquet('data/green/*/*')
df_yellow = spark.read.parquet('data/yellow/*/*')

In [None]:
#prepare to join the two dfs
set(df_green.columns) & set(df_yellow.columns)

In [None]:
#it is not showing the dropoff and pickup datetime cols. so we should unify the name
df_green\
.withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime')\
.withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

df_yellow\
.withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime')\
.withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [None]:
common_cols = []
yellow_cols = set(df_yellow.columns)
for col in df_green.columns:
    if col in yellow_cols:
        common_cols.append(col)

In [None]:
#select common columns in both datasets adding extra column for taxi color
df_selected_y = df_yellow.select(common_cols).withColumn('taxi_type',F.lit('yellow'))
df_selected_g = df_green.select(common_cols).withColumn('taxi_type',F.lit('green'))

In [None]:
trip_df = df_selected_y.unionAll(df_selected_g)

In [None]:
trip_df.groupBy('taxi_type').count().show()

In [None]:
#to be able to run query on df we need to register it as a temporary table
trip_df.registerTempTable('trip_tb')

In [None]:
df_sql =spark.sql("""

SELECT 
    -- Reveneue grouping 
    PULocationID AS revenue_zone,
    date_trunc('month', pickup_datetime) AS revenue_month, 
    taxi_type, 

    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_montly_passenger_count,
    AVG(trip_distance) AS avg_montly_trip_distance
FROM
    trip_tb
GROUP BY
    1, 2, 3

"""
)

In [None]:
df_sql.show()

In [None]:
#write the resulta to parquet
df_sql.write.parquet('data/reports/')

In [None]:
#use coalesce to reduce the number of partitions
df_sql.coalesce(|).write.parquet('data/reports',mode='overwrite')