7. Combine the 2 type of taxi dataframe in one dataframe. 

In [0]:
#Load the cleaned sampled yellow taxi dataframe
df_y_s_cleaned = spark.read.option("header", True).parquet("/FileStore/tables/df_yellow_cleaned.parquet")
display(df_y_s_cleaned)
df_y_s_cleaned.count()

In [0]:
#Load the cleaned green taxi dataframe
df_g_cleaned = spark.read.option("header", True).parquet("/FileStore/tables/df_green_cleaned.parquet")
display(df_g_cleaned)
df_g_cleaned.count()

In [0]:
#Create the new column to identify which type of NYC taxi yellow = 1 and green = 2
from pyspark.sql.functions import lit
df_y_s_cleaned = df_y_s_cleaned.withColumn("taxi_color", lit(1))
display(df_y_s_cleaned)

In [0]:
#Do the same to green taxi dataframe
df_g_cleaned = df_g_cleaned.withColumn("taxi_color", lit(2))
display(df_g_cleaned)

In [0]:
#Combine 2 datasets together.
df_combined = df_y_s_cleaned.unionByName(df_g_cleaned, allowMissingColumns=True)
display(df_combined)
df_combined.count()


8. Combine with Location dataset

In [0]:
#Load the loaction dataframe
df_l = spark.read.option("header", True).csv("/FileStore/tables/taxi_zone_lookup.csv")
display(df_l)
df_l.count()

In [0]:
#find any missing data
from pyspark.sql.functions import col 
df_combined.select("PULocationID", "DOLocationID") \
    .where((col("PULocationID").isNull()) | (col("DOLocationID").isNull())) \
    .show()

In [0]:
#Clone df_l into 2 dataframes and change the column name to make them be able to join the 1 table
df_l_pu = df_l.withColumnRenamed("LocationID", "PULocationID").withColumnRenamed("Zone", "Zone_Pickup").withColumnRenamed("Borough", "Borough_Pickup").withColumnRenamed("service_zone", "service_zone_pickup")
df_l_do = df_l.withColumnRenamed("LocationID", "DOLocationID").withColumnRenamed("Zone", "Zone_Dropoff").withColumnRenamed("Borough", "Borough_Dropoff").withColumnRenamed("service_zone", "service_zone_dropoff")

In [0]:
#Show the location tables
print("Pick Up Location")
df_l_pu.show()

print("Drop Off Location")
df_l_do.show()


In [0]:
# Join for Pickup Location 
df_combined_n = df_combined.join(df_l_pu, "PULocationID", how="left")
display(df_combined_n)

In [0]:
# Join for Dropoff Location 
df_combined_n = df_combined_n.join(df_l_do, "DOLocationID", how="left")
display(df_combined_n)

In [0]:
#Count row to make sure there is no row missing.
df_combined_n.count()

In [0]:
#Save the whole dataset
df_combined_n.write.mode("overwrite").parquet("/FileStore/tables/df_final.parquet")

In [0]:
df_final = spark.read.option("header", True).parquet("/FileStore/tables/df_final.parquet")
display(df_final)
df_final.count()

##Part 2 - Business Questions

In [0]:
#Use SQL spark 
df_final.createOrReplaceTempView("taxi_trips")

In [0]:
#Show the whole dataset
df = spark.sql("SELECT * FROM taxi_trips LIMIT 5")
display(df)

In [0]:
#1. For each year and month (e.g January 2020 => “2020-01-01” or “2020-01” or “Jan 2020"
# a. What was the total number of trips?
# b. Which day of week (e.g. monday, tuesday, etc..) had the most trips?
# c. Which hour of the day had the most trips?
# d. What was the average number of passengers?
# e. What was the average amount paid per trip (using total_amount)?
# f. What was the average amount paid per passenger (using total_amount)?
answer_1 = spark.sql("""
WITH trips_year_month AS (
    SELECT 
        DATE_FORMAT(d_pickup, 'yyyy-MM') AS year_month,
        DATE_FORMAT(d_pickup, 'EEEE') AS day_of_week,
        HOUR(t_pickup) AS hour_of_day,
        COUNT(*) AS total_trips,
        AVG(passenger_count) AS avg_passenger,
        AVG(total_amount) AS avg_paid_per_trip,
        AVG(total_amount / passenger_count) AS avg_paid_per_passenger
    FROM taxi_trips
    GROUP BY year_month, day_of_week, hour_of_day
),
ranked_d_hr AS (
    SELECT *,
        ROW_NUMBER() OVER (PARTITION BY year_month, day_of_week ORDER BY total_trips DESC) AS rank_d,
        ROW_NUMBER() OVER (PARTITION BY year_month, hour_of_day ORDER BY total_trips DESC) AS rank_hr,
        SUM(total_trips) OVER (PARTITION BY year_month ) AS month_total_trips
    FROM trips_year_month
)
SELECT
    year_month,
    FIRST(month_total_trips) AS month_total_trips,
    FIRST(day_of_week) AS most_trips_day,
    FIRST(hour_of_day) AS most_trips_hour,
    ROUND(AVG(avg_passenger), 2) AS avg_passenger,
    ROUND(AVG(avg_paid_per_trip), 2) AS avg_paid_per_trip,
    ROUND(AVG(avg_paid_per_passenger), 2) AS avg_paid_per_passenger
FROM ranked_d_hr
WHERE rank_d = 1 AND rank_hr = 1
GROUP BY year_month
ORDER BY year_month
;

""")

display(answer_1)

In [0]:
# 2. For each taxi colour (yellow and green):
# a. What was the average, median, minimum and maximum trip duration in minutes (with 2 decimals, eg. 90 seconds = 1.50 min)?
# b. What was the average, median, minimum and maximum trip distance in km?
# c. What was the average, median, minimum and maximum speed in km per hour?
answer_2 = spark.sql("""
SELECT
    taxi_color,
    ROUND(AVG(duration_hours * 60), 2) AS avg_duration_min,             --a. Duration_hours of trip convert into minute
    ROUND(PERCENTILE_APPROX(duration_hours * 60, 0.5), 2) AS med_duration_min,
    ROUND(MIN(duration_hours * 60), 2) AS min_duration_min,
    ROUND(MAX(duration_hours * 60), 2) AS max_duration_min,
    
    ROUND(AVG(trip_distance * 1.61), 2) AS avg_trip_distance_km,        --b. Trip_distance convert into kilometre
    ROUND(PERCENTILE_APPROX(trip_distance * 1.61, 0.5), 2) AS med_trip_distance_km,
    ROUND(MIN(trip_distance * 1.61), 2) AS min_trip_distance_km,
    ROUND(MAX(trip_distance * 1.61), 2) AS max_trip_distance_km,
    ROUND(AVG(trip_speed * 1.61), 2) AS avg_trip_speed_km_hr,

    ROUND(PERCENTILE_APPROX(trip_speed * 1.61, 0.5), 2) AS med_trip_speed_km_hr,    --c. trip_speed convert into km/hr
    ROUND(MIN(trip_speed * 1.61), 2) AS min_trip_speed_km_hr,
    ROUND(MAX(trip_speed * 1.61), 2) AS max_trip_speed_km_hr
FROM
    taxi_trips
GROUP BY taxi_color
ORDER BY taxi_color
;
""")
display(answer_2)


In [0]:
# 3. For each taxi colour (yellow and green), each pair of pick up and drop off locations
# (use boroughs not the id), each month, each day of week and each hours:
# a. What was the total number of trips?
# b. What was the average distance?
# c. What was the average amount paid per trip (using total_amount)?
# d. What was the total amount paid (using total_amount)?

answer_3 = spark.sql("""
SELECT
    taxi_color,
    borough_pickup,
    borough_dropoff,
    MONTH(d_pickup) AS month,
    DATE_FORMAT(d_pickup, 'EEEE') AS day_of_week,
    HOUR(t_pickup) AS hour,
    COUNT(*) AS total_trips,
    ROUND(AVG(trip_distance), 2) AS avg_distance,
    ROUND(AVG(total_amount), 2) AS avg_paid_per_trip,
    ROUND(SUM(total_amount), 2) AS total_amount_paid
FROM 
    taxi_trips
GROUP BY
    taxi_color,
    borough_pickup,
    borough_dropoff,
    month,
    day_of_week,
    hour

ORDER BY
    taxi_color,
    month,
    day_of_week,
    hour
;
""")
display(answer_3)

In [0]:
#4. What was the percentage of trips where drivers received tips?
answer_4 = spark.sql("""
SELECT 
    SUM(CASE WHEN tip_amount > 0 THEN 1 END) AS number_tipped_trips,
    ROUND((number_tipped_trips / COUNT(*)) *100, 2) AS percentage_of_tipped_trip
FROM taxi_trips;
;
""")
display(answer_4)

In [0]:
#5. For trips where the driver received tips, what was the percentage where the driver received tips of at least $5?
answer_5 = spark.sql("""
SELECT 
    SUM(CASE WHEN tip_amount >= 5 THEN 1 END) AS number_tipped_trips_at_least_5,
    ROUND((number_tipped_trips_at_least_5 / COUNT(*)) *100, 2) AS percentage_of_tipped_trip_at_least_5
FROM 
    taxi_trips
WHERE 
    tip_amount > 0
;
""")
display(answer_5)

In [0]:
# 6. Classify each trip into bins of durations:
# a. Under 5 Mins
# b. From 5 mins to 10 mins
# c. From 10 mins to 20 mins
# d. From 20 mins to 30 mins
# e. From 30 mins to 60 mins
# f. At least 60 mins
# Then for each bins, calculate:
# a. Average speed (km per hour)
# b. Average distance per dollar (km per $)
answer_6 = spark.sql("""
SELECT 
    bins_duration,
    ROUND(AVG(avg_trip_speed_kmhr), 2) AS avg_trip_speed_kmhr,
    ROUND(AVG(avg_distance_km_per_dollar), 2) AS avg_distance_km_per_dollar
FROM (
    SELECT
        (duration_hours * 60) AS duration_mins,
        CASE 
            WHEN duration_mins < 5 THEN '1) Under 5 Mins'                            -- Under 5 mins
            WHEN duration_mins >= 5 AND duration_mins < 10 THEN '2) 5 to 10 Mins'    -- 5 mins to 10 mins
            WHEN duration_mins >= 10 AND duration_mins < 20 THEN '3) 10 to 20 Mins'  -- 10 mins to 20 mins
            WHEN duration_mins >= 20 AND duration_mins < 30 THEN '4) 20 to 30 Mins'  -- 20 mins to 30 mins
            WHEN duration_mins >= 30 AND duration_mins < 60 THEN '5) 30 to 60 Mins'  -- 30 mins to 60 mins
            ELSE '6) At least 60 Mins'                                               -- At least 60 mins
        END AS bins_duration,
        ROUND(trip_speed * 1.61, 2) AS avg_trip_speed_kmhr,                  -- Convert trip_speed to km/hr
        ROUND((trip_distance * 1.61) / total_amount, 2) AS avg_distance_km_per_dollar    -- Average distance per dollar
    FROM 
        taxi_trips
) AS sub_query
GROUP BY
    bins_duration
ORDER BY
    bins_duration;

""")
display(answer_6)

In [0]:
#7. Which duration bin will you advise a taxi driver to target to maximise his income?
# Calculate the average fare per trip, average distance per dollar, Total trips in each bin and total time spent 
answer_7 = spark.sql("""
SELECT
    bins_duration,
    ROUND(AVG(total_amount), 2) AS avg_paid_per_trip,         -- Average fare per trip
    ROUND(AVG((trip_distance * 1.61) / total_amount), 2) AS avg_distance_per_dollar, -- Average distance per dollar
    COUNT(*) AS total_trips,                                  -- Total number of trips in each bin
    ROUND(SUM(duration_hours * 60), 2) AS total_duration_mins -- Total time spent on trips
FROM (
    SELECT
        duration_hours * 60 AS duration_mins,
        CASE 
            WHEN duration_mins < 5 THEN '1) Under 5 Mins'
            WHEN duration_mins >= 5 AND duration_mins < 10 THEN '2) 5 to 10 Mins'
            WHEN duration_mins >= 10 AND duration_mins < 20 THEN '3) 10 to 20 Mins'
            WHEN duration_mins >= 20 AND duration_mins < 30 THEN '4) 20 to 30 Mins'
            WHEN duration_mins >= 30 AND duration_mins < 60 THEN '5) 30 to 60 Mins'
            ELSE '6) Over 60 Mins'
        END AS bins_duration,
        total_amount,
        trip_distance,
        duration_hours
    FROM taxi_trips
) AS sub_query
GROUP BY
    bins_duration
ORDER BY
    bins_duration;

""")
display(answer_7)


Answer 7: 
As the result in the calculation, Over 60 min and 30 to 60 mins duration seem have the high paid but when considering a time spent in a vehicle is too long compared to the number of trips that less than any bin duration which come with long time waiting. For under 5 min trip, even they have less time spent, the fare is very low. So, the best bin to maximise the paid, it is 10 to 20 mins bin because they have less distance per dollar with so massive trips and time spent close to the over 60 mins bin.  