## **PART 1: Data Ingestion and Preparation**

In [0]:
# Read files in Databricks from Azure Blob Storage and make a copy to DBFS
storage_account_name = 'your_storage_account_name'
storage_account_access_key = 'your_storage_account_access_key'
blob_container_name = 'your_container_name'

In [0]:
dbutils.fs.mount(
    source = f'wasbs://{blob_container_name}@{storage_account_name}.blob.core.windows.net',
     mount_point = "/mnt/taxidata",
    extra_configs = {f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_access_key}
)

Out[11]: True

Move Green Taxi files and Yellow Taxi files into seperate folders

In [0]:
# Move Green Taxi files and Yellow Taxi files into seperate folders
## Create directories
dbutils.fs.mkdirs('/dbfs/taxidata/yellowtaxi')
dbutils.fs.mkdirs('/dbfs/taxidata/greentaxi')

Out[12]: True

In [0]:
## Copy yellow taxi files
dbutils.fs.cp('/mnt/taxidata/yellow_taxi_2015.parquet', '/dbfs/taxidata/yellowtaxi/yellow_taxi_2015.parquet')
dbutils.fs.cp('/mnt/taxidata/yellow_taxi_2016.parquet', '/dbfs/taxidata/yellowtaxi/yellow_taxi_2016.parquet')
dbutils.fs.cp('/mnt/taxidata/yellow_taxi_2017.parquet', '/dbfs/taxidata/yellowtaxi/yellow_taxi_2017.parquet')
dbutils.fs.cp('/mnt/taxidata/yellow_taxi_2018.parquet', '/dbfs/taxidata/yellowtaxi/yellow_taxi_2018.parquet')
dbutils.fs.cp('/mnt/taxidata/yellow_taxi_2019.parquet', '/dbfs/taxidata/yellowtaxi/yellow_taxi_2019.parquet')
dbutils.fs.cp('/mnt/taxidata/yellow_taxi_2020.parquet', '/dbfs/taxidata/yellowtaxi/yellow_taxi_2020.parquet')
dbutils.fs.cp('/mnt/taxidata/yellow_taxi_2021.parquet', '/dbfs/taxidata/yellowtaxi/yellow_taxi_2021.parquet')
dbutils.fs.cp('/mnt/taxidata/yellow_taxi_2022.parquet', '/dbfs/taxidata/yellowtaxi/yellow_taxi_2022.parquet')

## Copy green taxi files
dbutils.fs.cp('/mnt/taxidata/green_taxi_2015.parquet', '/dbfs/taxidata/greentaxi/green_taxi_2015.parquet')
dbutils.fs.cp('/mnt/taxidata/green_taxi_2016.parquet', '/dbfs/taxidata/greentaxi/green_taxi_2016.parquet')
dbutils.fs.cp('/mnt/taxidata/green_taxi_2017.parquet', '/dbfs/taxidata/greentaxi/green_taxi_2017.parquet')
dbutils.fs.cp('/mnt/taxidata/green_taxi_2018.parquet', '/dbfs/taxidata/greentaxi/green_taxi_2018.parquet')
dbutils.fs.cp('/mnt/taxidata/green_taxi_2019.parquet', '/dbfs/taxidata/greentaxi/green_taxi_2019.parquet')
dbutils.fs.cp('/mnt/taxidata/green_taxi_2020.parquet', '/dbfs/taxidata/greentaxi/green_taxi_2020.parquet')
dbutils.fs.cp('/mnt/taxidata/green_taxi_2021.parquet', '/dbfs/taxidata/greentaxi/green_taxi_2021.parquet')
dbutils.fs.cp('/mnt/taxidata/green_taxi_2022.parquet', '/dbfs/taxidata/greentaxi/green_taxi_2022.parquet')


Out[13]: True

In [0]:
# Read dataframe
df_yellow = spark.read.parquet('/dbfs/taxidata/yellowtaxi')

In [0]:
# Read dataframe
df_green = spark.read.parquet('/dbfs/taxidata/greentaxi')

In [0]:
# Count the numbers of records for yellow taxi
yellow_taxi_count = df_yellow.count()
print(f"Yellow taxi: {yellow_taxi_count}")

Yellow taxi: 663055251


In [0]:
# Count the numbers of records for green taxi
green_taxi_count = df_green.count()
print(f"Green taxi: {green_taxi_count}")

Green taxi: 66200401


In [0]:
# Read taxi_zone_lookup csv file 
taxi_zone_lookup = spark.read.option("header", "true").csv("/mnt/taxidata/taxi_zone_lookup.csv")

### Explore the dataset

In [0]:
# Import packages
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import to_timestamp

In [0]:
# Create a Spark Session
spark = SparkSession.builder.appName('Taxi data processing').getOrCreate()

In [0]:
# Convert pickup and dropoff datetime columns from string to timestamp
df_green = df_green.withColumn('lpep_pickup_timestamp', to_timestamp(col('lpep_pickup_datetime'), 'yyyy-MM-dd HH:mm:ss'))
df_green = df_green.withColumn('lpep_dropoff_timestamp', to_timestamp(col('lpep_dropoff_datetime'), 'yyyy-MM-dd HH:mm:ss'))

df_yellow = df_yellow.withColumn('tpep_pickup_timestamp', to_timestamp(col('tpep_pickup_datetime'), 'yyyy-MM-dd HH:mm:ss'))
df_yellow = df_yellow.withColumn('tpep_dropoff_timestamp', to_timestamp(col('tpep_dropoff_datetime'), 'yyyy-MM-dd HH:mm:ss'))

Remove trips finishing before starting time

In [0]:
# df_green
df_green = df_green.filter(df_green['lpep_dropoff_datetime'] >= df_green['lpep_pickup_datetime'])

# df_yellow
df_yellow = df_yellow.filter(df_yellow['tpep_dropoff_datetime'] >= df_yellow['tpep_pickup_datetime'])

Remove trips where the pickup/dropoff datetime is outside of the range

In [0]:
# Define the valid datetime range
valid_start_date = '2015-01-01T00:00:00.000+00:00'
valid_end_date = '2022-12-31T23:59:59.999+00:00'

In [0]:
# df_green 
df_green = df_green.filter((col('lpep_pickup_timestamp') >= valid_start_date) & (col('lpep_pickup_timestamp') <= valid_end_date) &
                           (col('lpep_dropoff_timestamp') >= valid_start_date) & (col('lpep_dropoff_timestamp') <= valid_end_date))

# df_yellow
df_yellow = df_yellow.filter((col('tpep_pickup_timestamp') >= valid_start_date) & (col('tpep_pickup_timestamp') <= valid_end_date) &
                             (col('tpep_dropoff_timestamp') >= valid_start_date) & (col('tpep_dropoff_timestamp') <= valid_end_date))

Remove trips with negative speed

In [0]:
# Import unix_timestamp package
from pyspark.sql.functions import unix_timestamp

In [0]:
# Calculate trip duration
## df_green
df_green = df_green.withColumn('trip_duration', (unix_timestamp(col('lpep_dropoff_timestamp')) - unix_timestamp(col('lpep_pickup_timestamp'))))

## df_yellow
df_yellow = df_yellow.withColumn('trip_duration', (unix_timestamp(col('tpep_dropoff_timestamp')) - unix_timestamp(col('tpep_pickup_timestamp'))))

In [0]:
# Remove trips having trip duration <= 0
df_green = df_green.filter(col('trip_duration') > 0)
df_yellow = df_yellow.filter(col('trip_duration') > 0)

In [0]:
# Calculate speed of trips
df_green = df_green.withColumn('speed', col('trip_distance')/(col('trip_duration')/3600))
df_yellow = df_yellow.withColumn('speed', col('trip_distance')/(col('trip_duration')/3600))

In [0]:
# Remove trips with negative speed
df_green = df_green.filter(col('speed') >= 0)
df_yellow = df_yellow.filter(col('speed') >= 0)

Remove trips with very high speed

In [0]:
# Set speed limit within NYC as 55km/h
speed_limit = 55

# Remove trips having speed higher than 55km/h
df_green = df_green.filter(col('speed') <= speed_limit)
df_yellow = df_yellow.filter(col('speed') <= speed_limit)

Remove trips that are travelling too short or too long (duration wise)

In [0]:
from pyspark.sql.functions import min, max

In [0]:
# Define min and max duration for a trip
min_duration = 180 # 3 minutes
max_duration = 7200 # 2 hours

# Filter too long or too short trips
df_green = df_green.filter((col('trip_duration') >= min_duration) & (col('trip_duration') <= max_duration))
df_yellow = df_yellow.filter((col('trip_duration') >= min_duration) & (col('trip_duration') <= max_duration))

Remove trips that are travelling too short or too long (distance wise)

In [0]:
# Define min and max distance for a trip
min_distance = 0.5 
max_distance = 50 

# Filter too long or too short trips
df_green = df_green.filter((col('trip_distance') >= min_distance) & (col('trip_distance') <= max_distance))
df_yellow = df_yellow.filter((col('trip_distance') >= min_distance) & (col('trip_distance') <= max_distance))

Remove trips with invalid number of passengers

In [0]:
df_green = df_green.filter((col('passenger_count') <= 5) & (col('passenger_count') > 0))
df_yellow = df_yellow.filter((col('passenger_count') <= 5) & (col('passenger_count') > 0))

In [0]:
# Check number of records after filtering
green_taxi_count = df_green.count()
yellow_taxi_count = df_yellow.count()
print(f"Green taxi: {green_taxi_count}")
print(f"Yellow taxi: {yellow_taxi_count}")

Green taxi: 57827890
Yellow taxi: 589812060


In [0]:
# Combine yellow and green taxi dataset
from pyspark.sql.functions import lit

# Drop columns
df_green = df_green.drop('ehail_fee')

# Add missing columns to 2 datasets
df_green = (df_green.withColumn('airport_fee', lit(None))
            .withColumn('taxi_color',lit('green')))

df_yellow = (df_yellow.withColumn('trip_type', lit(None))
             .withColumn('taxi_color',lit('yellow')))


# Rename lpep_pickup_datetime and lpep_dropoff_datetime in df_green 
df_green = (df_green
    .withColumnRenamed('lpep_pickup_datetime', 'tpep_pickup_datetime')
    .withColumnRenamed('lpep_dropoff_datetime', 'tpep_dropoff_datetime')
    .withColumnRenamed('lpep_pickup_timestamp', 'tpep_pickup_timestamp')
    .withColumnRenamed('lpep_dropoff_timestamp', 'tpep_dropoff_timestamp'))

In [0]:
# Merge the dataframes
data = df_yellow.unionByName(df_green)

In [0]:
data.count()

647639950

Combine the new dataframe with the location data

In [0]:
# Alias the dataframes to differentiate between pickup and dropoff joins
lookup_pu = taxi_zone_lookup.alias('lookup_pu')
lookup_do = taxi_zone_lookup.alias('lookup_do')

# Left join for pickup Location
df_with_pu = data.join(
    lookup_pu,
    data['PULocationID'] == lookup_pu['LocationID'],
    how='left'
).withColumnRenamed('Borough', 'PUBorough') \
 .withColumnRenamed('Zone', 'PUZone') \
 .withColumnRenamed('service_zone', 'PUservice_zone')

# Left join for drop-off Location
df_final = df_with_pu.join(
    lookup_do,
    df_with_pu['DOLocationID'] == lookup_do['LocationID'],
    how='left'
).withColumnRenamed('Borough', 'DOBorough') \
 .withColumnRenamed('Zone', 'DOZone') \
 .withColumnRenamed('service_zone', 'DOservice_zone')

# Drop any unnecessary columns
df_final = df_final.drop(lookup_pu['LocationID']).drop(lookup_do['LocationID'])

In [0]:
# count the final records
df_final.count()

647639950

#### Export into a parquet file

In [0]:
df_final.write.mode('overwrite').parquet('/dbfs/nyc_taxi_data.parquet')


In [0]:
# Read nyc_taxi_data.parquet
df_final = spark.read.parquet('/dbfs/nyc_taxi_data.parquet')


In [0]:
# Create a temporary view
df_final.createOrReplaceTempView('data_table')

In [0]:
table = spark.sql('SELECT * FROM data_table')

## **PART 2: Business Questions**

#### Question 1

In [0]:
%sql
SELECT
  DATE_FORMAT(tpep_pickup_datetime, 'yyyy-MM') AS year_month,
  COUNT(*) AS total_trips,
  DATE_FORMAT(tpep_pickup_datetime, 'EEEE') AS day_of_week,
  HOUR(tpep_pickup_datetime) AS hour_of_day,
  AVG(passenger_count) AS avg_passenger,
  AVG(total_amount) AS avg_total_amount,
  AVG(total_amount / passenger_count) AS avg_amount_per_passenger
FROM data_table
GROUP BY year_month, day_of_week, hour_of_day
ORDER BY year_month, total_trips DESC

year_month,total_trips,day_of_week,hour_of_day,avg_passenger,avg_total_amount,avg_amount_per_passenger
2015-01,157287,Friday,19,1.5325106334280647,15.098523145596491,12.567613943518635
2015-01,152044,Saturday,19,1.588980821341191,13.72140946042661,11.140416543898509
2015-01,147997,Saturday,23,1.6077555626127558,15.241422866687769,12.2906339497913
2015-01,147771,Friday,18,1.5138626658816683,15.331735523216713,12.831006383756442
2015-01,146085,Saturday,18,1.5714686655029606,13.624205907526372,11.10738929276618
2015-01,145233,Friday,20,1.5390854695558172,14.861050381119458,12.322833925489473
2015-01,145168,Friday,22,1.585487159704618,15.379618373203392,12.531301147640267
2015-01,143237,Friday,23,1.581330242884171,15.879558633601484,12.96833502400892
2015-01,143077,Saturday,22,1.6088190275166518,14.806905372638797,11.919402688765228
2015-01,139981,Friday,21,1.55663268586451,15.104676134625588,12.436422288266728


#### Question 2

In [0]:
# Separate the dataframe into green and yellow
green = table.filter(table.taxi_color == 'green')
yellow = table.filter(table.taxi_color == 'yellow')

In [0]:
# Create a temporary view
green.createOrReplaceTempView('green_table')
green = spark.sql('SELECT * FROM green_table')

yellow.createOrReplaceTempView('yellow_table')
yellow = spark.sql('SELECT * FROM yellow_table')

In [0]:
%sql
SELECT
    ROUND(AVG(trip_duration/60), 2) AS avg_trip_duration,
    ROUND(PERCENTILE(trip_duration/60,0.5),2) AS median_trip_duration,
    ROUND(MIN(trip_duration/60), 2) AS min_trip_duration,
    ROUND(MAX(trip_duration/60), 2) AS max_trip_duration,
    ROUND(AVG(trip_distance*1.60934), 2) AS avg_distance_km,
    ROUND(PERCENTILE(trip_distance*1.60934, 0.5), 2) AS median_distance_km,
    ROUND(MIN(trip_distance*1.60934), 2) AS min_distance_km,
    ROUND(MAX(trip_distance*1.60934), 2) AS max_distance_km,
    ROUND(AVG((trip_distance*1.60934) / (trip_duration/3600)), 2) AS avg_speed_kmph,
    ROUND(PERCENTILE((trip_distance*1.60934) / (trip_duration/3600), 0.5), 2) AS median_speed_kmph,
    ROUND(MIN((trip_distance*1.60934) / (trip_duration/3600)), 2) AS min_speed_kmph,
    ROUND(MAX((trip_distance*1.60934) / (trip_duration/3600)), 2) AS max_speed_kmph
FROM green_table

avg_trip_duration,median_trip_duration,min_trip_duration,max_trip_duration,avg_distance_km,median_distance_km,min_distance_km,max_distance_km,avg_speed_kmph,median_speed_kmph,min_speed_kmph,max_speed_kmph
14.24,11.08,3.0,120.0,4.98,3.27,0.8,80.47,20.23,18.31,0.4,88.51


In [0]:
%sql
SELECT
    ROUND(AVG(trip_duration/60), 2) AS avg_trip_duration,
    ROUND(PERCENTILE(trip_duration/60,0.5),2) AS median_trip_duration,
    ROUND(MIN(trip_duration/60), 2) AS min_trip_duration,
    ROUND(MAX(trip_duration/60), 2) AS max_trip_duration,
    ROUND(AVG(trip_distance*1.60934), 2) AS avg_distance_km,
    ROUND(PERCENTILE(trip_distance*1.60934, 0.5), 2) AS median_distance_km,
    ROUND(MIN(trip_distance*1.60934), 2) AS min_distance_km,
    ROUND(MAX(trip_distance*1.60934), 2) AS max_distance_km,
    ROUND(AVG((trip_distance*1.60934) / (trip_duration/3600)), 2) AS avg_speed_kmph,
    ROUND(PERCENTILE((trip_distance*1.60934) / (trip_duration/3600), 0.5), 2) AS median_speed_kmph,
    ROUND(MIN((trip_distance*1.60934) / (trip_duration/3600)), 2) AS min_speed_kmph,
    ROUND(MAX((trip_distance*1.60934) / (trip_duration/3600)), 2) AS max_speed_kmph
FROM yellow_table

avg_trip_duration,median_trip_duration,min_trip_duration,max_trip_duration,avg_distance_km,median_distance_km,min_distance_km,max_distance_km,avg_speed_kmph,median_speed_kmph,min_speed_kmph,max_speed_kmph
15.06,11.87,3.0,120.0,5.1,2.9,0.8,80.47,18.75,16.42,0.4,88.51


#### Question 3

In [0]:
%sql
SELECT
    PUBorough,
    DOBorough,
    DATE_FORMAT(tpep_pickup_datetime, 'yyyy-MM') AS year_month,
    DATE_FORMAT(tpep_pickup_datetime, 'EEEE') AS day_of_week,
    HOUR(tpep_pickup_datetime) AS hour_of_day,
    COUNT(*) AS total_trips,
    ROUND(AVG(trip_distance), 2) AS avg_distance,
    ROUND(AVG(total_amount), 2) AS avg_amount_per_trip,
    ROUND(SUM(total_amount), 2) AS total_amount_paid
FROM green_table
GROUP BY PUBorough, DOBorough, year_month, day_of_week, hour_of_day
ORDER BY PUBorough, DOBorough, year_month, day_of_week, hour_of_day


PUBorough,DOBorough,year_month,day_of_week,hour_of_day,total_trips,avg_distance,avg_amount_per_trip,total_amount_paid
Bronx,Bronx,2015-01,Friday,0,290,2.36,9.81,2846.11
Bronx,Bronx,2015-01,Friday,1,213,2.13,9.77,2080.37
Bronx,Bronx,2015-01,Friday,2,142,2.24,9.7,1376.96
Bronx,Bronx,2015-01,Friday,3,88,3.48,11.81,1039.18
Bronx,Bronx,2015-01,Friday,4,64,2.57,9.75,624.28
Bronx,Bronx,2015-01,Friday,5,68,2.77,9.75,663.1
Bronx,Bronx,2015-01,Friday,6,160,2.8,10.29,1646.16
Bronx,Bronx,2015-01,Friday,7,532,2.1,10.46,5566.12
Bronx,Bronx,2015-01,Friday,8,841,2.29,11.39,9578.48
Bronx,Bronx,2015-01,Friday,9,590,2.24,10.61,6258.57


In [0]:
%sql
SELECT
    PUBorough,
    DOBorough,
    DATE_FORMAT(tpep_pickup_datetime, 'yyyy-MM') AS year_month,
    DATE_FORMAT(tpep_pickup_datetime, 'EEEE') AS day_of_week,
    HOUR(tpep_pickup_datetime) AS hour_of_day,
    COUNT(*) AS total_trips,
    ROUND(AVG(trip_distance), 2) AS avg_distance,
    ROUND(AVG(total_amount), 2) AS avg_amount_per_trip,
    ROUND(SUM(total_amount), 2) AS total_amount_paid
FROM yellow_table
GROUP BY PUBorough, DOBorough, year_month, day_of_week, hour_of_day
ORDER BY PUBorough, DOBorough, year_month, day_of_week, hour_of_day

PUBorough,DOBorough,year_month,day_of_week,hour_of_day,total_trips,avg_distance,avg_amount_per_trip,total_amount_paid
Bronx,Bronx,2015-01,Friday,0,39,3.32,14.33,558.94
Bronx,Bronx,2015-01,Friday,1,28,3.09,13.71,384.01
Bronx,Bronx,2015-01,Friday,2,29,2.91,12.95,375.64
Bronx,Bronx,2015-01,Friday,3,26,2.4,11.82,307.38
Bronx,Bronx,2015-01,Friday,4,29,2.77,12.45,360.92
Bronx,Bronx,2015-01,Friday,5,18,2.89,12.68,228.3
Bronx,Bronx,2015-01,Friday,6,32,2.33,10.4,332.85
Bronx,Bronx,2015-01,Friday,7,51,2.16,11.92,608.13
Bronx,Bronx,2015-01,Friday,8,55,2.08,12.01,660.32
Bronx,Bronx,2015-01,Friday,9,20,2.28,12.99,259.81


#### Question 4

In [0]:
%sql
SELECT (COUNT(CASE WHEN tip_amount > 0 THEN 1 END) * 100 / COUNT(*)) AS percentage_trips_with_tips
FROM data_table;

percentage_trips_with_tips
64.41972580598217


#### Question 5

In [0]:
%sql
SELECT (COUNT(CASE WHEN tip_amount >= 5 THEN 1 END) * 100 / COUNT(*)) AS percentage_trips_with_tips_at_least_5_dollars
FROM data_table;

percentage_trips_with_tips_at_least_5_dollars
8.210529168251588


#### Question 6

In [0]:
%sql
WITH trip_statistics AS (
    SELECT
        CASE
            WHEN trip_duration < 300 THEN 'Under 5 Mins'
            WHEN trip_duration >= 300 AND trip_duration < 600 THEN 'From 5 mins to 10 mins'
            WHEN trip_duration >= 600 AND trip_duration < 1200 THEN 'From 10 mins to 20 mins'
            WHEN trip_duration >= 1200 AND trip_duration < 1800 THEN 'From 20 mins to 30 mins'
            WHEN trip_duration >= 1800 AND trip_duration < 3600 THEN 'From 30 mins to 60 mins'
            ELSE 'At least 60 mins'
        END AS duration_bin,
        (trip_distance * 1.60934) / (trip_duration / 3600) AS speed,
        (trip_distance * 1.60934) / total_amount AS distance_per_dollar,
        tip_amount
    FROM data_table
    WHERE trip_duration > 0
)

SELECT
    duration_bin,
    ROUND((SUM(CASE WHEN tip_amount > 0 THEN 1 ELSE 0 END) * 100 / COUNT(*)), 4) AS percentage_trips_with_tips,
    ROUND((SUM(CASE WHEN tip_amount >= 5 THEN 1 ELSE 0 END) * 100 / SUM(CASE WHEN tip_amount > 0 THEN 1 ELSE 0 END)), 4) AS percentage_trips_with_tips_at_least_5_dollars,
    ROUND(AVG(speed), 2) AS avg_speed_kmph,
    ROUND(AVG(distance_per_dollar), 2) AS avg_distance_per_dollar_kmpd
FROM trip_statistics
GROUP BY duration_bin;

duration_bin,percentage_trips_with_tips,percentage_trips_with_tips_at_least_5_dollars,avg_speed_kmph,avg_distance_per_dollar_kmpd
From 5 mins to 10 mins,62.9163,0.4501,17.21,0.21
Under 5 Mins,59.2211,0.3822,19.69,0.18
From 10 mins to 20 mins,65.9989,3.9366,17.71,0.26
From 20 mins to 30 mins,66.6003,29.584,21.16,0.31
At least 60 mins,58.9663,94.8876,22.74,0.5
From 30 mins to 60 mins,66.052,76.348,25.62,0.37


#### Question 7

In [0]:
%sql
WITH trip_statistics AS (
    SELECT
        CASE
            WHEN trip_duration < 300 THEN 'Under 5 Mins'
            WHEN trip_duration >= 300 AND trip_duration < 600 THEN 'From 5 mins to 10 mins'
            WHEN trip_duration >= 600 AND trip_duration < 1200 THEN 'From 10 mins to 20 mins'
            WHEN trip_duration >= 1200 AND trip_duration < 1800 THEN 'From 20 mins to 30 mins'
            WHEN trip_duration >= 1800 AND trip_duration < 3600 THEN 'From 30 mins to 60 mins'
            ELSE 'At least 60 mins'
        END AS duration_bin,
        (trip_distance * 1.60934) / (trip_duration / 3600) AS speed,
        (trip_distance * 1.60934) / total_amount AS distance_per_dollar,
        tip_amount,
        total_amount, trip_duration
    FROM data_table
    WHERE trip_duration > 0
)
SELECT
    duration_bin,
    SUM(total_amount) AS total_income,
    SUM(trip_duration)/3600 AS total_trip_hours,
    SUM(total_amount) / (SUM(trip_duration)/3600) AS avg_income_per_hours
FROM trip_statistics
GROUP BY duration_bin

duration_bin,total_income,total_trip_hours,avg_income_per_hours
From 5 mins to 10 mins,2017173348.933092,24945013.06611111,80.86479424111867
Under 5 Mins,470264353.0107109,4219329.160555556,111.45476807237092
From 10 mins to 20 mins,3788833500.458516,57032258.425,66.43316616053357
From 20 mins to 30 mins,2316268478.0617347,35108884.25861111,65.97385610434563
At least 60 mins,383852159.5200648,6903671.509722223,55.60116221918988
From 30 mins to 60 mins,2326355309.49347,33539491.726944443,69.36167454274683


## **PART 3: Machine Learning**

In [0]:
# Import Spark Session
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, round, when, count
from pyspark.ml import Pipeline

# Import VectorAssembler to combine multiple columns into a single feature vector
# Import StringIndexer to index categorical columns 
from pyspark.ml.feature import VectorAssembler

# Import data types for defining schema 
from pyspark.sql.types import IntegerType, DoubleType

# Calculate RMSE for the baseline model
from pyspark.ml.evaluation import RegressionEvaluator 

# Import models
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import LinearRegression

In [0]:
# Create a Spark Session
data_processing_spark = SparkSession.builder.appName('Total_amount_prediction').getOrCreate()

In [0]:
list_columns = ['passenger_count', 'trip_distance', 'extra', 'mta_tax', 'tip_amount', 
                'improvement_surcharge', 'total_amount', 'congestion_surcharge', 
                'airport_fee', 'trip_duration', 'speed', 'taxi_color']
df_new = df_final.select(list_columns)

In [0]:
# Convert data types
df_new = df_new.withColumn('trip_duration', col('trip_duration').cast(DoubleType()))
df_new = df_new.withColumn('passenger_count', col('passenger_count').cast(IntegerType()))

In [0]:
# Handle null values
# check for null or specific 'N' values
null_summary = []

for column_name in df_new.columns:
    count_null = df_new.filter((col(column_name).isNull()) | (col(column_name) == 'N')).count()
    if count_null > 0:
        null_summary.append((column_name, count_null))

# Convert summary to a DataFrame for better visibility
null_df = spark.createDataFrame(null_summary, ["Column Name", "Null Count"])
display(null_df)

Column Name,Null Count
improvement_surcharge,4
congestion_surcharge,488552331
airport_fee,590169639


In [0]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Data Imputation") \
    .getOrCreate()

# Initialize the Imputer
imputer = Imputer(inputCols=null_cols, outputCols=[f"{col}_imputed" for col in null_cols], strategy='mean')
imputer_model = imputer.fit(df_new)

# Transform the DataFrame to impute missing values
df_imputed = imputer_model.transform(df_new)

# Check for null values after imputation
null_summary = []

for column_name in df_imputed.columns:
    count_null = df_imputed.filter(col(column_name).isNull()).count()
    if count_null > 0:
        null_summary.append((column_name, count_null))

# Convert summary to a DataFrame for better visibility
null_df = spark.createDataFrame(null_summary, ["Column Name", "Null Count"])
display(null_df)

# Remove rows with null values in the imputed columns if necessary
df_cleaned = df_imputed.dropna(subset=null_cols)

# Show the cleaned DataFrame
df_cleaned.show(truncate=False)

Column Name,Null Count
improvement_surcharge,4
congestion_surcharge,488552331
airport_fee,590169639


+---------------+-------------+-----+-------+----------+---------------------+------------+--------------------+-----------+-------------+------------------+----------+----------------------------+---------------------------+------------------+-----------------------------+----------------------------+-------------------+
|passenger_count|trip_distance|extra|mta_tax|tip_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|trip_duration|speed             |taxi_color|improvement_surcharge_impute|congestion_surcharge_impute|airport_fee_impute|improvement_surcharge_imputed|congestion_surcharge_imputed|airport_fee_imputed|
+---------------+-------------+-----+-------+----------+---------------------+------------+--------------------+-----------+-------------+------------------+----------+----------------------------+---------------------------+------------------+-----------------------------+----------------------------+-------------------+
|1              |2.1        

In [0]:
# Import Imputer  to impute missing values in numerical columns
from pyspark.ml.feature import Imputer

# Define the columns with null values
null_cols = ['improvement_surcharge', 'congestion_surcharge', 'airport_fee']

imputer = Imputer(inputCols=null_cols, outputCols=[f"{col}_imputed" for col in null_cols], strategy='mean')
imputer_model = imputer.fit(df_new)

# Transform the DataFrame to impute missing values
df_new = imputer_model.transform(df_new)

#### Feature selection

In [0]:
selected_features = ['passenger_count', 'trip_distance', 'extra', 'mta_tax', 'tip_amount',
               'improvement_surcharge', 'congestion_surcharge', 'airport_fee', 'trip_duration', 'speed']
target_feature = 'total_amount'

In [0]:
# VectorAssembler to combine features into a single vector
assembler = VectorAssembler(
    inputCols=selected_features,
    outputCol="features"
)

In [0]:
final_data = assembler.transform(df_cleaned)

In [0]:
final_data.write.mode('overwrite').parquet('/dbfs/final_data.parquet')

In [0]:
# Splitting
# Train and validation: all data except October, November, December 2022
train_validation = final_data.filter(col('tpep_pickup_datetime') < '2022-10-01')

# Testset: October, November, and December 2022
test = final_data.filter(col('tpep_pickup_datetime') >= '2022-10-01')

In [0]:
# Split the dataset into training and validation sets
(train, validation) = train_validation.randomSplit([0.8, 0.2], seed=42)

#### Baseline model

In [0]:
# Calculate the average amount per trip for each taxi color
avg_amount = df_final.groupBy('taxi_color').agg(round(avg('total_amount'), 2).alias('avg_amount_per_trip'))
display(avg_amount)

taxi_color,avg_amount_per_trip
green,15.5
yellow,17.64


In [0]:
# Join with the train DataFrame to add the baseline prediction to each trip
df_with_baseline_train = train_validation.join(avg_amount, on='taxi_color', how='left')

# Baseline prediction: use the average amount per trip for each taxi color in the training DataFrame
df_with_baseline_train = df_with_baseline_train.withColumn('baseline_prediction', col('avg_amount_per_trip'))

df_with_baseline_validation = test.join(avg_amount, on='taxi_color', how='left')

# Baseline prediction
df_with_baseline_validation = df_with_baseline_validation.withColumn('baseline_prediction', col('avg_amount_per_trip'))

# Calculate RMSE for the baseline model on the validation set
baseline_evaluator = RegressionEvaluator(labelCol='total_amount', predictionCol='baseline_prediction', metricName='rmse')
baseline_rmse = baseline_evaluator.evaluate(df_with_baseline_validation)
print(f"Baseline Model RMSE: {baseline_rmse}")

Baseline Model RMSE: 18.79564997547481


#### Model development

In [0]:
# Linear Regression
linear = LinearRegression(featuresCol="features", labelCol=target_feature)

# Fitting model on the train data
lreg_model = linear.fit(train)

In [0]:
# Random Forest
randomforest = RandomForestRegressor(featuresCol="features", labelCol=target_feature)

# Fitting model on the train data
rf_model = randomforest.fit(train)

In [0]:
# Making prediction 
linear_predictions = lreg_model.transform(test)
randomforest_predictions = rf_model.transform(test)

# Define evaluator
evaluator = RegressionEvaluator(labelCol=target_feature, predictionCol="prediction", metricName="rmse")

# Calculate RMSE score of linear regression and random forest
rmse1 = evaluator.evaluate(linear_predictions)
rmse2 = evaluator.evaluate(randomforest_predictions)

In [0]:
rmse1

30.53241184778551

In [0]:
rmse2

5.5626999918609865