# Part 1: Data Ingestion and Preparation

##### Mounting the blob container into Databricks

In [0]:
# Create 3 variables that stores the storage account name, storage account access key, and blob container name
storage_account_name = "utsbdenae"
storage_account_access_key = "da0dnAjssyW616+ml3k+eHiagjqQ/oycCGLgvcNSNOkOoWsarBo9Svm/rbFVYEQqYpn1k5/fXFQ4+AStzdboWw=="
blob_container_name = "bde-at-2"

In [0]:
# Mounting the blob container into the Databricks File system using the storage account name, storage account access key, and blob container name
dbutils.fs.mount(
    source = f'wasbs://{blob_container_name}@{storage_account_name}.blob.core.windows.net',
    mount_point = f'/mnt/{blob_container_name}/',
    extra_configs = {'fs.azure.account.key.' + storage_account_name + '.blob.core.windows.net': storage_account_access_key}
)

Out[25]: True

In [0]:
# List of all the files inside the mounted Azure container
dbutils.fs.ls("/mnt/bde-at-2/")

Out[26]: [FileInfo(path='dbfs:/mnt/bde-at-2/green/', name='green/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/mnt/bde-at-2/yellow/', name='yellow/', size=0, modificationTime=0)]

##### Checking the schema of the parquet files

In [0]:
# Check that the schema of all the parquet files for green taxis with ehail_fee as double is consistent
spark.read.option("mergeSchema", "true").parquet("/mnt/bde-at-2/green/double")

Out[28]: DataFrame[VendorID: bigint, lpep_pickup_datetime: timestamp, lpep_dropoff_datetime: timestamp, store_and_fwd_flag: string, RatecodeID: double, PULocationID: bigint, DOLocationID: bigint, passenger_count: double, trip_distance: double, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, ehail_fee: double, improvement_surcharge: double, total_amount: double, payment_type: double, trip_type: double, congestion_surcharge: double]

In [0]:
# Check that the schema of all the parquet files for green taxis with ehail_fee as integer is consistent
spark.read.option("mergeSchema", "true").parquet("/mnt/bde-at-2/green/integer")

Out[29]: DataFrame[VendorID: bigint, lpep_pickup_datetime: timestamp, lpep_dropoff_datetime: timestamp, store_and_fwd_flag: string, RatecodeID: double, PULocationID: bigint, DOLocationID: bigint, passenger_count: double, trip_distance: double, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, ehail_fee: int, improvement_surcharge: double, total_amount: double, payment_type: double, trip_type: double, congestion_surcharge: double]

In [0]:
# Check that the schema of all the parquet files for yellow taxis with airport_fee as integer is consistent
spark.read.option("mergeSchema", "true").parquet("/mnt/bde-at-2/yellow/integer")

Out[8]: DataFrame[VendorID: bigint, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: double, trip_distance: double, RatecodeID: double, store_and_fwd_flag: string, PULocationID: bigint, DOLocationID: bigint, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, airport_fee: int]

In [0]:
# Check that the schema of all the parquet files for yellow taxis with airport_fee as double is consistent
spark.read.option("mergeSchema", "true").parquet("/mnt/bde-at-2/yellow/double")

Out[9]: DataFrame[VendorID: bigint, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: double, trip_distance: double, RatecodeID: double, store_and_fwd_flag: string, PULocationID: bigint, DOLocationID: bigint, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, airport_fee: double]

##### Loading the data for yellow and green taxis

In [0]:
# Read all the parquet files for green taxis with ehail_fee as double
green_double_df = spark.read.parquet("/mnt/bde-at-2/green/double")

# Write the green_double_df to parquet files
green_double_df.write.parquet("/dbfs/green_taxi", mode = 'append')

In [0]:
# Read all the parquet files for green taxis with ehail_fee as integer
green_integer_df = spark.read.parquet("/mnt/bde-at-2/green/integer")

# Cast ehail_fee as double
green_cast_df = green_integer_df.withColumn("ehail_fee",green_integer_df.ehail_fee.cast('double'))

# Write the green_cast_df to parquet files and append to the dbfs folder
green_cast_df.write.parquet("/dbfs/green_taxi", mode = 'append')

In [0]:
# Read all the parquet files for yellow taxis with airport_fee as double
yellow_double_df = spark.read.parquet("/mnt/bde-at-2/yellow/double")

# Write the yellow_double_df to parquet files
yellow_double_df.write.parquet("/dbfs/yellow_taxi", mode = 'append')

In [0]:
# Read all the parquet files for yellow taxis with airport_fee as integer
yellow_integer_df = spark.read.parquet("/mnt/bde-at-2/yellow/integer")

# Cast ehail_fee as double
yellow_cast_df = yellow_integer_df.withColumn("airport_fee",yellow_integer_df.airport_fee.cast('double'))

# Write the yellow_cast_df to parquet files and append to the dbfs folder
yellow_cast_df.write.parquet("/dbfs/yellow_taxi", mode = 'append')

###### Reading the final data for yellow and green taxis

In [0]:
# Read parquet files for green taxis
green_df = spark.read.parquet("/dbfs/green_taxi")

In [0]:
# Read parquet files for yellow taxis
yellow_df = spark.read.parquet("/dbfs/yellow_taxi")

##### Checking the row count for yellow and green taxis data

In [0]:
# Check the number of rows for green taxis in the dataframe
green_df.count()

Out[4]: 9390483

In [0]:
# Check the number of rows for yellow taxis in the dataframe
yellow_df.count()

Out[5]: 152823008

##### Converting 'yellow Apr 2022' file to csv

In [0]:
# Read the yellow taxi file for April 2022
yellow_apr_2022 = spark.read.parquet("/mnt/bde-at-2/yellow/double/yellow_tripdata_2022-04.parquet")

# Write the yellow taxi dataframe for april 2022 to a csv file and send it to Azure blob storage
yellow_apr_2022.write.csv("/mnt/bde-at-2/output_csv_file/")

##### Exploring and cleaning the data

In [0]:

# Print the schema of green_df
green_df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [0]:
# Print the schema of yellow_df
yellow_df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [0]:
# Check the first 5 rows of the data sets
green_df.show(5)
yellow_df.show(5)

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2019-12-18 15:52:30|  2019-12-18 15:54:39|                 N|       1.0|         264|         264|            5.0|          0.0|        3.5|  0.5|    0.

In [0]:
# Checking the distinct values of ehail_fee and airport_fee
green_df.select("ehail_fee").distinct().show()
yellow_df.select("airport_fee").distinct().show()

+---------+
|ehail_fee|
+---------+
|     null|
|      0.0|
|     1.95|
+---------+

+-----------+
|airport_fee|
+-----------+
|       null|
|        0.0|
|       1.25|
|      -1.25|
+-----------+



###### Removing trips that finished before the start time

In [0]:
# Removing trips that finished before the start time for both yellow and green taxis
yellow_df_cleaned = yellow_df.where(yellow_df.tpep_pickup_datetime < yellow_df.tpep_dropoff_datetime)
green_df_cleaned = green_df.where(green_df.lpep_pickup_datetime < green_df.lpep_dropoff_datetime)

In [0]:
# Check the row count of the yellow cleaned data frame
yellow_df_cleaned.count()

Out[36]: 149017227

In [0]:
# Check the row count of the green cleaned data frame
green_df_cleaned.count()

Out[35]: 8912730

###### Removing trips with negative speed for both yellow and green taxis

In [0]:
import pyspark.sql.functions as F

# Create a column with the duration of the trip for yellow taxis
yellow_df_cleaned = yellow_df_cleaned.withColumn('duration_seconds', F.col("tpep_dropoff_datetime").cast("long") - F.col("tpep_pickup_datetime").cast("long"))
# Remove trips with negative speed for yellow taxis
yellow_df_cleaned = yellow_df_cleaned.where((yellow_df_cleaned.trip_distance/yellow_df_cleaned.duration_seconds) > 0)

# Create a column with the duration of the trip for yellow taxis
green_df_cleaned = green_df_cleaned.withColumn('duration_seconds', F.col("lpep_dropoff_datetime").cast("long") - F.col("lpep_pickup_datetime").cast("long"))
# Remove trips with negative speed for yellow taxis
green_df_cleaned = green_df_cleaned.where((green_df_cleaned.trip_distance/green_df_cleaned.duration_seconds) > 0)

###### Removing trips with very high speed

In [0]:
# Convert the unit of the duration column from seconds to hours for both data frames
yellow_df_cleaned = yellow_df_cleaned.withColumn('duration_hours', F.col("duration_seconds")/3600)
green_df_cleaned = green_df_cleaned.withColumn('duration_hours', F.col("duration_seconds")/3600)

In [0]:
# Calculate the speed of the trips in mph for both data frames
yellow_df_cleaned = yellow_df_cleaned.withColumn('speed_mph', F.col("trip_distance")/F.col("duration_hours"))
green_df_cleaned = green_df_cleaned.withColumn('speed_mph', F.col("trip_distance")/F.col("duration_hours"))

In [0]:
# Remove trips that have speed more than 55 mph (NY state speed limit)
yellow_df_cleaned = yellow_df_cleaned.where(yellow_df_cleaned.speed_mph <= 55)
green_df_cleaned = green_df_cleaned.where(green_df_cleaned.speed_mph <= 55)

###### Removing trips that were too long or too short duration wise

In [0]:
# Checking the yellow april 2022 dataframe to understand the range of values of the duration column
yellow_apr_2022 = spark.read.parquet("/mnt/bde-at-2/yellow/double/yellow_tripdata_2022-04.parquet")
yellow_apr_2022 = yellow_apr_2022.withColumn('duration_mins', (F.col("tpep_dropoff_datetime").cast("long") - F.col("tpep_pickup_datetime").cast("long"))/60)
yellow_apr_2022_quantiles = yellow_apr_2022.approxQuantile("duration_mins", [0.01, 0.25, 0.75, 0.99], 0)
yellow_apr_2022_quantiles

Out[26]: [0.48333333333333334, 7.383333333333334, 19.483333333333334, 65.26666666666667]

In [0]:
# Checking the maximum value of yellow april 2022 dataframe for duration
yellow_apr_2022.groupby().max("duration_mins").first()

Out[27]: Row(max(duration_mins)=3495.65)

In [0]:
# Removing trips that were shorter than 1 minute
yellow_df_cleaned = yellow_df_cleaned.where(yellow_df_cleaned.duration_seconds > 60)
# Removing trips that were longer than 2 hours
yellow_df_cleaned = yellow_df_cleaned.where(yellow_df_cleaned.duration_seconds <= 7200)

# Removing trips that were shorter than 1 minute
green_df_cleaned = green_df_cleaned.where(green_df_cleaned.duration_seconds > 60)
# Removing trips that were longer than 2 hours
green_df_cleaned = green_df_cleaned.where(green_df_cleaned.duration_seconds <= 7200)

###### Removing trips that were too short or too long distance wise

In [0]:
# Checking the range of values for trip_distance in yellow_apr_2022
yellow_apr_2022_quantiles = yellow_apr_2022.approxQuantile("trip_distance", [0.05, 0.25, 0.75, 0.99], 0)
yellow_apr_2022_quantiles

Out[29]: [0.52, 1.12, 3.55, 20.06]

In [0]:
# Checking the range of values for trip_distance in green_aug_2019
green_aug_2019 = spark.read.parquet("/mnt/bde-at-2/green/double/green_tripdata_2019-08.parquet")
green_aug_2019_quantiles = green_aug_2019.approxQuantile("trip_distance", [0.05, 0.25, 0.75, 0.99], 0)
green_aug_2019_quantiles

Out[30]: [0.34, 1.1, 4.18, 19.78]

In [0]:
# Checking the maximum value of yellow april 2022 dataframe for trip_distance
yellow_apr_2022.groupby().max("trip_distance").first()

Out[31]: Row(max(trip_distance)=329408.43)

In [0]:
# Checking the maximum value of green_aug_2019 dataframe for trip_distance
yellow_apr_2022.groupby().max("trip_distance").first()

Out[32]: Row(max(trip_distance)=329408.43)

In [0]:
# Remove trips that were less than 0.3 miles
yellow_df_cleaned = yellow_df_cleaned.where(yellow_df_cleaned.trip_distance >= 0.3)
# Remove trips that were more than 35 miles
yellow_df_cleaned = yellow_df_cleaned.where(yellow_df_cleaned.trip_distance <= 35)

# Remove trips that were less than 0.3 miles
green_df_cleaned = green_df_cleaned.where(green_df_cleaned.trip_distance >= 0.3)
# Remove trips that were more than 35 miles
green_df_cleaned = green_df_cleaned.where(green_df_cleaned.trip_distance <= 35)

###### Removing trips that started before January 2019 or started after April 2022

In [0]:
# Remove trips that started before January 2019 for green taxis
yellow_df_cleaned = yellow_df_cleaned.where(yellow_df_cleaned.tpep_pickup_datetime >= '2019-01-01 00:00:00')
# Remove trips that started after April 2022 for green taxis
yellow_df_cleaned = yellow_df_cleaned.where(yellow_df_cleaned.tpep_pickup_datetime < '2022-05-01 00:00:00')

# Remove trips that started before January 2019 for green taxis
green_df_cleaned = green_df_cleaned.where(green_df_cleaned.lpep_pickup_datetime >= '2019-01-01 00:00:00')
# Remove trips that started after April 2022 for green taxis
green_df_cleaned = green_df_cleaned.where(green_df_cleaned.lpep_pickup_datetime < '2022-05-01 00:00:00')

##### Combine the yellow and green taxis data together

In [0]:
# Check the column names of both data frames
print(yellow_df_cleaned.columns)
print(green_df_cleaned.columns)
# Check the number of columns for both data frames
print(len(yellow_df_cleaned.columns))
print(len(green_df_cleaned.columns))

['VendorID', 'pickup_datetime', 'dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'duration_seconds', 'duration_hours', 'speed_mph', 'taxi_color']
['VendorID', 'pickup_datetime', 'dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'duration_seconds', 'duration_hours', 'speed_mph', 'taxi_color']
22
22


In [0]:
# Change the column names for pickup time and dropoff time for both data frames
yellow_df_cleaned = yellow_df_cleaned.withColumnRenamed("tpep_pickup_datetime", "pickup_datetime")
yellow_df_cleaned = yellow_df_cleaned.withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")
green_df_cleaned = green_df_cleaned.withColumnRenamed("lpep_pickup_datetime", "pickup_datetime")
green_df_cleaned = green_df_cleaned.withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")

In [0]:
# Create a list of column names that are common to both data frames
common_colnames = ['VendorID', 'pickup_datetime', 'dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge','duration_seconds', 'duration_hours', 'speed_mph']

# Select only the common columns for both data frames
yellow_df_cleaned = yellow_df_cleaned.select(common_colnames)
green_df_cleaned = green_df_cleaned.select(common_colnames)

In [0]:
# Add taxi color to both data frames
yellow_df_cleaned = yellow_df_cleaned.withColumn("taxi_color", F.lit("yellow"))
yellow_df_cleaned.select("taxi_color").distinct().show()
green_df_cleaned = green_df_cleaned.withColumn("taxi_color", F.lit("green"))
green_df_cleaned.select("taxi_color").distinct().show()
# Reference: https://www.geeksforgeeks.org/how-to-add-a-constant-column-in-a-pyspark-dataframe/

+----------+
|taxi_color|
+----------+
|    yellow|
+----------+

+----------+
|taxi_color|
+----------+
|     green|
+----------+



In [0]:
# Combine the yellow and green taxis dataframes
merged_yellow_green_data = yellow_df_cleaned.union(green_df_cleaned)
# Reference: https://www.geeksforgeeks.org/python-pyspark-union-and-unionall/

In [0]:
# Check the schema of the merged dataframe
merged_yellow_green_data.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- duration_seconds: long (nullable = true)
 |-- duration_hours: double (nullable = true)
 |-- speed_mph: double (nullable = true)
 |-- taxi_color: string (nullable = false)



##### Exporting the combined data to dbfs and loading it as a table or view

In [0]:
# Export the combined data to dbfs
merged_yellow_green_data.write.parquet("/dbfs/combined_yellow_green_dataset")

In [0]:
# Read the combined data from dbfs
combined_data = spark.read.parquet("/dbfs/combined_yellow_green_dataset")

# Create a temporary view for the combined data
combined_data.createOrReplaceTempView("combined_data_view")

In [0]:
# Check the first 5 rows of the combined_data_view
spark.sql("SELECT * FROM combined_data_view limit 5").display()

VendorID,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,duration_seconds,duration_hours,speed_mph,taxi_color
1,2019-03-01T00:25:27.000+0000,2019-03-01T00:36:37.000+0000,2.0,3.7,1.0,N,95,130,1.0,13.0,0.5,0.5,0.7,0.0,0.3,15.0,0.0,670,0.1861111111111111,19.88059701492537,yellow
1,2019-03-01T00:05:21.000+0000,2019-03-01T00:38:23.000+0000,1.0,14.1,1.0,N,249,28,1.0,41.0,3.0,0.5,10.1,5.76,0.3,60.66,2.5,1982,0.5505555555555556,25.610494450050453,yellow
1,2019-03-01T00:48:55.000+0000,2019-03-01T01:06:03.000+0000,1.0,9.6,1.0,N,138,98,2.0,27.0,0.5,0.5,0.0,0.0,0.3,28.3,0.0,1028,0.2855555555555555,33.61867704280156,yellow
1,2019-03-01T00:11:42.000+0000,2019-03-01T00:16:40.000+0000,1.0,0.8,1.0,N,48,48,1.0,5.5,3.0,0.5,3.0,0.0,0.3,12.3,2.5,298,0.0827777777777777,9.664429530201344,yellow
1,2019-03-01T00:45:03.000+0000,2019-03-01T00:49:38.000+0000,1.0,1.2,1.0,N,246,48,2.0,6.0,3.0,0.5,0.0,0.0,0.3,9.8,2.5,275,0.0763888888888889,15.709090909090907,yellow


# Part 2: Business Questions

##### Part 2: Q1

###### Finding the total number of trips for each year and month

In [0]:
# Finding the total number of trips for each year and month
year_month_stats = spark.sql('''
SELECT DATE_FORMAT(pickup_datetime, 'MMM y') AS year_month, count(*) AS total_number_trips
FROM combined_data_view
GROUP BY year_month
''')

###### Finding which day of week had the most trips for each year and month

In [0]:
# Finding out which day of week had the most trips for each year and month
day_highest_trips = spark.sql('''
WITH trip_ranks AS
(SELECT year_month, day_week, number_trips,
RANK() over (partition by year_month order by number_trips desc) as rk
FROM
(SELECT DATE_FORMAT(pickup_datetime, 'MMM y') AS year_month, 
DATE_FORMAT(pickup_datetime, 'E') AS day_week, count(*) AS number_trips
FROM combined_data_view
GROUP BY year_month, day_week) AS num_trips_day)
SELECT year_month, day_week
FROM trip_ranks
WHERE rk = 1
''')

In [0]:
# Joining the dataframes year_month_stats and day_highest_trips
year_month_stats = year_month_stats.join(day_highest_trips, "year_month", "inner")

###### Finding out which hour of day had the most trips for each year and month

In [0]:
# Finding out which hour of day had the most trips for each year and month
hour_highest_trips = spark.sql('''
WITH trip_ranks_hour AS
(SELECT year_month, hour_day, number_trips,
RANK() over (partition by year_month order by number_trips desc) as rk
FROM
(SELECT DATE_FORMAT(pickup_datetime, 'MMM y') AS year_month, 
DATE_FORMAT(pickup_datetime, 'H') AS hour_day, count(*) AS number_trips
FROM combined_data_view
GROUP BY year_month, hour_day) AS num_trips_hour_day)
SELECT year_month, hour_day
FROM trip_ranks_hour
WHERE rk = 1
''')

In [0]:
# Joining the dataframes year_month_stats and hour_highest_trips
year_month_stats = year_month_stats.join(hour_highest_trips, "year_month", "inner")

###### Finding the average number of passengers for each year and month

In [0]:
# Finding the average number of passengers for each year and month
avg_passengers = spark.sql('''
SELECT DATE_FORMAT(pickup_datetime, 'MMM y') AS year_month, avg(passenger_count) as avg_number_passengers
FROM combined_data_view
GROUP BY year_month
''')

In [0]:
# Joining the dataframes year_month_stats and avg_passengers
year_month_stats = year_month_stats.join(avg_passengers, "year_month", "inner")

###### Finding the average amount paid per trip for each year and month

In [0]:
# Finding the average amount paid per trip for each year and month
avg_amount_paid_per_trip = spark.sql('''
SELECT DATE_FORMAT(pickup_datetime, 'MMM y') AS year_month, avg(total_amount) as avg_amount_paid_per_trip
FROM combined_data_view
GROUP BY year_month
''')

In [0]:
# Joining the dataframes year_month_stats and avg_amount_paid_per_trip
year_month_stats = year_month_stats.join(avg_amount_paid_per_trip, "year_month", "inner")

###### Finding the average amount paid per passenger for each year and month

In [0]:
# Finding the average amount paid per passenger for each year and month
avg_amount_paid_per_passenger = spark.sql('''
SELECT DATE_FORMAT(pickup_datetime, 'MMM y') AS year_month, sum(total_amount)/sum(passenger_count) as avg_amount_paid_per_passenger
FROM combined_data_view
GROUP BY year_month
''')

In [0]:
# Joining the dataframes year_month_stats and avg_amount_paid_per_passenger
year_month_stats = year_month_stats.join(avg_amount_paid_per_passenger, "year_month", "inner")

###### Output for Part2: Q1

In [0]:
# Display the year_month_stats dataframe
year_month_stats.show(100)

+----------+------------------+--------+--------+---------------------+------------------------+-----------------------------+
|year_month|total_number_trips|day_week|hour_day|avg_number_passengers|avg_amount_paid_per_trip|avg_amount_paid_per_passenger|
+----------+------------------+--------+--------+---------------------+------------------------+-----------------------------+
|  Apr 2019|           7869371|     Tue|      18|   1.5584953351902708|      19.231208815099134|           12.489683252052405|
|  Apr 2020|            260055|     Thu|      15|   1.2856064975218007|       16.72128976560618|           14.667189510058911|
|  Apr 2021|           2190198|     Fri|      14|   1.4074969228496874|      18.654750616320165|            14.29203481443685|
|  Apr 2022|           3584374|     Fri|      18|   1.4124220144597017|      20.937243122027276|           15.342567304260932|
|  Aug 2019|           6342229|     Thu|      18|    1.560987193836325|      19.405661757805934|            12.

##### Part 2: Q2

###### Finding the the average, median, minimum and maximum trip duration(mins), distance(km) and speed(kph) for each taxi color

In [0]:
# Finding the the average, median, minimum and maximum trip duration(mins), distance(km) and speed(kph) for each taxi color
spark.sql('''
SELECT taxi_color, round(avg(duration_seconds/60),2) as avg_duration_mins, 
round(approx_percentile(duration_seconds/60, 0.5),2) as median_duration_mins,
round(min(duration_seconds/60),2) as min_duration_mins, 
round(max(duration_seconds/60),2) as max_duration_mins,
round(avg(trip_distance*1.60934),2) as avg_distance_km,
round(approx_percentile(trip_distance*1.60934, 0.5),2) as median_distance_km,
round(min(trip_distance*1.60934),2) as min_distance_km,
round(max(trip_distance*1.60934),2) as max_distance_km,
round(avg((trip_distance*1.60934)/duration_hours),2) as avg_speed_kph,
round(approx_percentile((trip_distance*1.60934)/duration_hours, 0.5),2) as median_speed_kph,
round(min((trip_distance*1.60934)/duration_hours),2) as min_speed_kph,
round(max((trip_distance*1.60934)/duration_hours),2) as max_speed_kph
FROM combined_data_view
GROUP BY taxi_color
''').show()

+----------+-----------------+--------------------+-----------------+-----------------+---------------+------------------+---------------+---------------+-------------+----------------+-------------+-------------+
|taxi_color|avg_duration_mins|median_duration_mins|min_duration_mins|max_duration_mins|avg_distance_km|median_distance_km|min_distance_km|max_distance_km|avg_speed_kph|median_speed_kph|min_speed_kph|max_speed_kph|
+----------+-----------------+--------------------+-----------------+-----------------+---------------+------------------+---------------+---------------+-------------+----------------+-------------+-------------+
|    yellow|            14.37|               11.23|             1.02|            120.0|           4.98|              2.75|           0.48|          56.33|        18.97|           16.54|         0.24|        88.51|
|     green|            16.93|               12.77|             1.02|            120.0|           6.23|              3.65|           0.48|      

##### Part 2: Q3

###### Finding the percentage of trips where drivers received a tip

In [0]:
# Finding the percentage of trips where drivers received a tip
spark.sql('''
SELECT (count(*)/(SELECT count(*) FROM combined_data_view))*100 as percentage_trips_received_tip
FROM combined_data_view
WHERE tip_amount != 0
''').show()

+-----------------------------+
|percentage_trips_received_tip|
+-----------------------------+
|            69.38746755299915|
+-----------------------------+



##### Part 2: Q4

###### Finding the percentage of trips with tips where drivers received a tip of at least $10

In [0]:
# Finding the percentage of trips with tips where drivers received a tip of at least $10
spark.sql('''
WITH tips_received as
(SELECT *
FROM combined_data_view
WHERE tip_amount != 0)
SELECT (count(*)/(SELECT count(*) FROM tips_received))*100 as percentage_trips_tips_more_than_ten_dollars
FROM tips_received
WHERE tip_amount >= 10
''').show()

+-------------------------------------------+
|percentage_trips_tips_more_than_ten_dollars|
+-------------------------------------------+
|                         3.4928124891805483|
+-------------------------------------------+



##### Part 2: Q5

###### Finding average speed, average distance per dollar for 5 bins of durations

In [0]:
# Finding average speed, average distance per dollar for 5 bins of durations
spark.sql('''
WITH combined_data_with_duration_bins as
(SELECT *,
CASE
    WHEN (duration_seconds/60) < 5 THEN 'Under 5 mins'
    WHEN (duration_seconds/60) >= 5 and (duration_seconds/60) < 10 THEN 'From 5 mins to 10 mins'
    WHEN (duration_seconds/60) >= 10 and (duration_seconds/60) < 20 THEN 'From 10 mins to 20 mins'
    WHEN (duration_seconds/60) >= 20 and (duration_seconds/60) < 30 THEN 'From 20 mins to 30 mins'
    WHEN (duration_seconds/60) >= 30 and (duration_seconds/60) < 60 THEN 'From 30 mins to 60 mins'
    WHEN (duration_seconds/60) >= 60  THEN 'At least 60 mins'
END as duration_bins
FROM combined_data_view)
SELECT duration_bins, 
round(avg((trip_distance*1.60934)/duration_hours),2) as avg_speed_kph,
round(avg((trip_distance*1.60934)/total_amount),2) as avg_distance_per_dollar
FROM combined_data_with_duration_bins
GROUP BY duration_bins
''').display()

duration_bins,avg_speed_kph,avg_distance_per_dollar
From 5 mins to 10 mins,16.9,0.17
Under 5 mins,19.76,0.13
From 10 mins to 20 mins,17.66,0.23
From 20 mins to 30 mins,21.92,0.29
At least 60 mins,22.88,0.43
From 30 mins to 60 mins,27.35,0.37


##### Part 2: Q6

###### Duration bin to maximize the driver's income

In [0]:
# Duration bin to maximize the driver's income
spark.sql('''
WITH combined_data_with_duration_bins as
(SELECT *,
CASE
    WHEN (duration_seconds/60) < 5 THEN 'Under 5 mins'
    WHEN (duration_seconds/60) >= 5 and (duration_seconds/60) < 10 THEN 'From 5 mins to 10 mins'
    WHEN (duration_seconds/60) >= 10 and (duration_seconds/60) < 20 THEN 'From 10 mins to 20 mins'
    WHEN (duration_seconds/60) >= 20 and (duration_seconds/60) < 30 THEN 'From 20 mins to 30 mins'
    WHEN (duration_seconds/60) >= 30 and (duration_seconds/60) < 60 THEN 'From 30 mins to 60 mins'
    WHEN (duration_seconds/60) >= 60  THEN 'At least 60 mins'
END as duration_bins
FROM combined_data_view)
SELECT duration_bins, 
round(sum(total_amount),2) as sum_amount,
round(avg(total_amount),2) as avg_amount,
round(approx_percentile(total_amount, 0.5),2) as median_amount,
round(avg((trip_distance*1.60934)/total_amount),2) as avg_distance_per_dollar_total_amount
FROM combined_data_with_duration_bins
GROUP BY duration_bins
''').display()

duration_bins,sum_amount,avg_amount,median_amount,avg_distance_per_dollar_total_amount
From 5 mins to 10 mins,561773457.46,11.9,11.8,0.17
Under 5 mins,191220717.47,9.06,9.3,0.13
From 10 mins to 20 mins,990304409.63,17.71,16.56,0.23
From 20 mins to 30 mins,578912379.0,29.0,25.38,0.29
At least 60 mins,97202270.0,67.45,66.36,0.43
From 30 mins to 60 mins,599063114.92,48.76,45.92,0.37
