In [0]:
# importing necessary libraries
from pyspark.sql.functions import (
    col,
    when,
    hour,
    month,
    year,
    dayofweek,
    dayofmonth,
    date_format,
    unix_timestamp,
    from_unixtime,
    count,
    sum,
    round,
    desc,
    lit,
    substring,
)
import numpy as np
import pandas as pd
from datetime import datetime
import plotly.express as px
import plotly.graph_objects as go

Loading Data
--

In [0]:
# accessing azure cloud storage
storage_account_name = "unifieddatadl02azemtd"
container_name = "cbdtp-data"
sub_directory = "TLC_Data"  # f'{agency_name}/{dataset_name}/{file_name}'

In [0]:
# reading lookup table
sub_directory = "TLC_Data"
file_name = "Taxi_Zone_Lookup.csv"
adlspath = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{sub_directory}/{file_name}"
lookup_df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(adlspath)
)

In [0]:
# reading HVFHV data
sub_directory = "TLC_Data/*/HVFHV"
adlspath = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{sub_directory}/"
hvfhv_df = (
    spark.read.format("parquet")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(adlspath)
)

In [0]:
# reading yellow cab data
sub_directory = "TLC_Data/*/Yellow"
adlspath = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{sub_directory}/"
yellow_df = (
    spark.read.format("parquet")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(adlspath)
)

In [0]:
# reading green data

sub_directory = "TLC_Data/*/Green"
adlspath = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{sub_directory}/"
green_df = (
    spark.read.format("parquet")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(adlspath)
)

In [0]:
# reading FHV data
sub_directory = "TLC_Data/*/FHV"
adlspath = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{sub_directory}/"
fhv_df = (
    spark.read.format("parquet")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(adlspath)
)

In [0]:
# omitting FHV data due to missing columns and values
# getting count of FHV trips omitted
fhv_df.count()

Out[71]: 88637990

Data Cleaning and Transformation
--

In [0]:
# filtering for only data between 2019-2023
yellow_df = yellow_df.filter(
    (year(col("tpep_pickup_datetime")).between(2019, 2023))
    & (year(col("tpep_dropoff_datetime")).between(2019, 2023))
)


green_df = green_df.filter(
    (year(col("lpep_pickup_datetime")).between(2019, 2023))
    & (year(col("lpep_dropoff_datetime")).between(2019, 2023))
)

In [0]:
# keeping only necessary columns
hvfhv_df = hvfhv_df.select(
    "pickup_datetime",
    "dropoff_datetime",
    "PULocationID",
    "DOLocationID",
    "trip_miles",
    "trip_time",
    "tolls",
)

yellow_df = yellow_df.select(
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "PULocationID",
    "DOLocationID",
    "trip_distance",
    "tolls_amount",
)

green_df = green_df.select(
    "lpep_pickup_datetime",
    "lpep_dropoff_datetime",
    "PULocationID",
    "DOLocationID",
    "trip_distance",
    "tolls_amount",
)

In [0]:
# renaming columns for consistency
yellow_df = (
    yellow_df.withColumnRenamed("tpep_pickup_datetime", "pickup_datetime")
    .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")
    .withColumnRenamed("trip_distance", "trip_miles")
    .withColumnRenamed("tolls_amount", "tolls")
)

green_df = (
    green_df.withColumnRenamed("lpep_pickup_datetime", "pickup_datetime")
    .withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")
    .withColumnRenamed("trip_distance", "trip_miles")
    .withColumnRenamed("tolls_amount", "tolls")
)

In [0]:
# creating new column for trip time
time_diff = unix_timestamp(col("dropoff_datetime")) - unix_timestamp(
    col("pickup_datetime")
)
yellow_df = yellow_df.withColumn("trip_time", time_diff)
green_df = green_df.withColumn("trip_time", time_diff)

In [0]:
# converting trip time column values from seconds to hours
hvfhv_df = hvfhv_df.withColumn("trip_time", (col("trip_time") / 3600))
yellow_df = yellow_df.withColumn("trip_time", (col("trip_time") / 3600))
green_df = green_df.withColumn("trip_time", (col("trip_time") / 3600))

In [0]:
# creating a source identifier
hvfhv_df = hvfhv_df.withColumn("service", lit("HVFHV"))
yellow_df = yellow_df.withColumn("service", lit("Yellow Taxi"))
green_df = green_df.withColumn("service", lit("Green Taxi"))

In [0]:
# combining datasets
merged_df = hvfhv_df.unionAll(yellow_df).unionAll(green_df)

In [0]:
# merging lookup table to TLC data on pick up location
int_df = merged_df.alias("a").join(
    lookup_df.alias("b"),
    merged_df["PULocationID"] == lookup_df["location_i"].alias("PU_Zone"),
    how="left",
)

In [0]:
# renaming columns for clarity
int_df = int_df.withColumnRenamed("location_i", "PU_id").withColumnRenamed(
    "CBD_Zone", "PU_Zone"
)

In [0]:
# merging lookup table to TLC data on drop off location
df = int_df.join(
    lookup_df, int_df["DOLocationID"] == lookup_df["location_i"], how="left"
)

In [0]:
# renaming new columns
df = df.withColumnRenamed("CBD_Zone", "DO_Zone")

In [0]:
# dropping redundant columns
df = df.drop("PU_id", "location_i")

In [0]:
# creating new column indicating whether a trip was picked up or dropped off in CBD using filters
pu_filter = col("PU_Zone").like("CBD")
do_filter = col("DO_Zone").like("CBD")

# df = df.withColumn(
# "CBD_Check",
# when((pu_filter & do_filter), "CBD")
# .when(pu_filter | do_filter, "Partial CBD")
# .otherwise("Not CBD"))

df = df.withColumn(
    "CBD_Check", when((pu_filter | do_filter), "CBD").otherwise("Not CBD")
)

In [0]:
# extracting day of week, hour, month, and year from pick up datetime
df = (
    df.withColumn("hour", hour("pickup_datetime"))
    .withColumn("month_year", date_format("pickup_datetime", "MM-yyyy"))
    .withColumn("day_of_week", dayofweek("pickup_datetime"))
    .withColumn("year", year("pickup_datetime"))
)

In [0]:
# creating column to indicate whether the pick up date was during weekday or weekend
df = df.withColumn(
    "Weekday_Check",
    when(df["day_of_week"].isin([1, 7]), "Weekend").otherwise("Weekday"),
)

In [0]:
# dropping unnecessary columns
df = df.drop("pickup_datetime", "dropoff_datetime", "day_of_week")

In [0]:
# showing final data frame
df.display()

PULocationID,DOLocationID,trip_miles,trip_time,tolls,service,PU_Zone,DO_Zone,CBD_Check,hour,month_year,year,Weekday_Check
245,251,2.45,0.1608333333333333,0.0,HVFHV,Other,Other,Not CBD,0,02-2019,2019,Weekday
216,197,1.71,0.1361111111111111,0.0,HVFHV,Other,Other,Not CBD,0,02-2019,2019,Weekday
261,234,5.01,0.5997222222222223,0.0,HVFHV,CBD,CBD,CBD,0,02-2019,2019,Weekday
87,87,0.34,0.0497222222222222,0.0,HVFHV,CBD,CBD,CBD,0,02-2019,2019,Weekday
87,198,6.84,0.4997222222222222,0.11,HVFHV,CBD,Other,CBD,0,02-2019,2019,Weekday
198,198,1.11,0.0997222222222222,0.0,HVFHV,Other,Other,Not CBD,0,02-2019,2019,Weekday
161,148,4.53,0.4997222222222222,0.0,HVFHV,CBD,CBD,CBD,0,02-2019,2019,Weekday
148,21,11.24,0.4830555555555555,0.72,HVFHV,CBD,Other,CBD,0,02-2019,2019,Weekday
226,260,1.59,0.1594444444444444,0.0,HVFHV,Queens Buffer,Other,Not CBD,0,02-2019,2019,Weekday
7,223,1.9,0.1316666666666666,0.0,HVFHV,Queens Buffer,Queens Buffer,Not CBD,0,02-2019,2019,Weekday


Summary Tables
--

In [0]:
# checking our numbers against the TLC Data Hub numbers

validation = (
    df.groupBy("service", "month_year", "CBD_Check")
    .agg(count("*"))
    .alias("Trips")
    .orderBy("month_year", "CBD_Check")
)

validation.display(5)

service,month_year,CBD_Check,count(1)
Yellow Taxi,01-2019,CBD,5487648
Green Taxi,01-2019,CBD,53155
Yellow Taxi,01-2019,Not CBD,2211508
Green Taxi,01-2019,Not CBD,625490
Yellow Taxi,01-2020,CBD,4633224
Green Taxi,01-2020,CBD,36485
HVFHV,01-2020,CBD,7665424
Yellow Taxi,01-2020,Not CBD,1776172
Green Taxi,01-2020,Not CBD,415592
HVFHV,01-2020,Not CBD,13016074


In [0]:
# defining a function to agreggate column values by the retrieved business day counts within a time period


def aggregate_data(df, datetime_col, offset, agg_col, new_col_name):
    """
    This function aggregates data based on the given parameters and creates a new column with the calculated result.
      Parameters:
      -----------
          df: pd.DataFrame
             The input summary table containing the data to be aggregated.

          datetime_col: datetime
             The name of the column containing datetime values used to calculate the date range.

          offset: int
             The number of months to be used for aggregation.

          agg_col: str
             The name of the column containing values to be aggregated.

          new_col_name: str
             The name of the new column in which the aggregated result will be stored.

      Returns:
          pd.DataFrame: The input summary table with the new column containing the aggregated result.

     Example:
     --------
     df = pd.DataFrame({
         'date': pd.date_range(start='2021-01-01', periods=5, freq='M'),
         'value': [100, 200, 300, 400, 500]
     })

     aggregated_df = data_agg(
         df,
         datetime_col = 'date',
         offset = 1,
         agg_col = 'value',
         new_col_name = 'result'
     )
    """

    # Convert to datetime objects and apply the offsets
    start = pd.to_datetime(df[datetime_col]).values.astype("datetime64[M]")
    end = pd.to_datetime(df[datetime_col]).values.astype(
        "datetime64[M]"
    ) + np.timedelta64(offset, "M")

    # Calculate the number of weekdays between the start and end dates
    weekdays = np.busday_count(start, end, weekmask="Mon Tue Wed Thu Fri")

    # Calculate the aggregated values for the specified column and store it in the new column
    df[new_col_name] = np.round(df[agg_col] / weekdays.astype("float"), 0)

    return df

In [0]:
# This script processes a DataFrame of transportation data, specifically focusing on the month to month weekday average trips, trip miles, and trip time within the Central Business District (CBD). It first groups the data by various factors (e.g., weekday, month, and year), counts the trips, and sums the trip miles and time. It then filters the data to only include weekdays and trips within the CBD. Lastly, it aggregates this data for each month and outputs a DataFrame with the monthly average trip count, miles, and time. This data can be used for further analysis on traffic patterns and transportation efficiency within the CBD.

monthly = (
    df.groupBy("Weekday_Check", "service", "year", "month_year", "CBD_Check")
    .agg(
        count("*").alias("trips"),
        sum("trip_miles").alias("trip_miles"),
        sum("trip_time").alias("trip_time"),
    )
    .orderBy("year", "month_year")
)

monthly = monthly.toPandas()

monthly = monthly.query("Weekday_Check == 'Weekday' & CBD_Check == 'CBD'")

aggregate_data(monthly, "month_year", 1, "trips", "monthly_trips")
aggregate_data(monthly, "month_year", 1, "trip_miles", "monthly_miles")
aggregate_data(monthly, "month_year", 1, "trip_time", "monthly_time")

monthly.display()

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


Weekday_Check,service,year,month_year,CBD_Check,trips,trip_miles,trip_time,monthly_trips,monthly_miles,monthly_time
Weekday,Yellow Taxi,2019,01-2019,CBD,4137933,11489762.27999885,1339914.1700016614,179910.0,499555.0,58257.0
Weekday,Green Taxi,2019,01-2019,CBD,40775,300410.1600000006,40354.44999999856,1773.0,13061.0,1755.0
Weekday,Yellow Taxi,2019,02-2019,CBD,3754166,10723608.639999028,1331634.6800016244,187708.0,536180.0,66582.0
Weekday,HVFHV,2019,02-2019,CBD,5519820,27549051.890002005,2009029.8419442775,275991.0,1377453.0,100451.0
Weekday,Green Taxi,2019,02-2019,CBD,35737,271160.0800000002,39801.60999999834,1787.0,13558.0,1990.0
Weekday,Yellow Taxi,2019,03-2019,CBD,4049426,11754362.189999152,1484534.5800018115,192830.0,559732.0,70692.0
Weekday,HVFHV,2019,03-2019,CBD,5835604,29970868.30000314,2193424.823888062,277886.0,1427184.0,104449.0
Weekday,Green Taxi,2019,03-2019,CBD,34505,259414.1499999989,37362.34999999759,1643.0,12353.0,1779.0
Weekday,HVFHV,2019,04-2019,CBD,5784715,29720746.370001893,2190700.289721729,262942.0,1350943.0,99577.0
Weekday,Yellow Taxi,2019,04-2019,CBD,4108110,12049385.409999408,1597803.7700082082,186732.0,547699.0,72627.0


In [0]:
# This script processes a DataFrame of transportation data, focusing specifically on the hourly weekday average trips, trip miles, and trip time within the Central Business District (CBD). It first groups the data by various factors (e.g., weekday, hour, and whether the trip is within the CBD), counts the trips, and sums up the trip miles and time. The data is then filtered to only include weekdays and trips within the CBD. Lastly, it aggregates this data for each hour of the day and outputs a DataFrame with the hourly average trip count, miles, and time. This information can be used for a more granular analysis of traffic patterns and transportation efficiency within the CBD during different hours of the day.

hourly = (
    df.groupBy("Weekday_Check", "service", "hour", "CBD_Check")
    .agg(
        count("*").alias("trips"),
        sum("trip_miles").alias("trip_miles"),
        sum("trip_time").alias("trip_time"),
    )
    .orderBy("hour")
)

hourly = hourly.toPandas()

hourly = hourly.query("Weekday_Check == 'Weekday' & CBD_Check == 'CBD'")

aggregate_data(hourly, "hour", 1, "trips", "hourly_trips")
aggregate_data(hourly, "hour", 12, "trip_miles", "hourly_miles")
aggregate_data(hourly, "hour", 12, "trip_time", "hourly_time")

hourly.display()

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


Weekday_Check,service,hour,CBD_Check,trips,trip_miles,trip_time,hourly_trips,hourly_miles,hourly_time
Weekday,HVFHV,0,CBD,5482531,32538973.95999999,1686541.8352777772,249206.0,124670.0,6462.0
Weekday,Yellow Taxi,0,CBD,2041569,8199238.089999992,683272.2499999717,92799.0,31415.0,2618.0
Weekday,Green Taxi,0,CBD,7197,73618.36999999997,2715.580000000001,327.0,282.0,10.0
Weekday,HVFHV,1,CBD,3344596,19642136.86700001,971984.1094444456,152027.0,75257.0,3724.0
Weekday,Yellow Taxi,1,CBD,1168900,4728181.259999999,292786.01000000304,53132.0,18116.0,1122.0
Weekday,Green Taxi,1,CBD,4543,29304.52,2060.5800000000004,206.0,112.0,8.0
Weekday,HVFHV,2,CBD,2168299,12824492.70500001,619441.5822222218,98559.0,49136.0,2373.0
Weekday,Yellow Taxi,2,CBD,707555,3761165.3900000025,146162.1700000002,32162.0,14411.0,560.0
Weekday,Green Taxi,2,CBD,3055,20673.52999999999,1540.3900000000006,139.0,79.0,6.0
Weekday,HVFHV,3,CBD,1502456,10090219.25699999,450307.66527777823,68293.0,38660.0,1725.0


In [0]:
# summing monthly average weekday number of pickups

pickup = (
    df.groupBy("service", "year", "month_year", "PULocationID")
    .agg(count("*").alias("monthly_total"))
    .orderBy("year", "month_year")
)

pickup = pickup.toPandas()

aggregate_data(pickup, "month_year", 1, "monthly_total", "daily_average")

pickup.display()

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


service,year,month_year,PULocationID,monthly_total,daily_average
Yellow Taxi,2019,01-2019,211,55700,2422.0
Yellow Taxi,2019,01-2019,75,57761,2511.0
Yellow Taxi,2019,01-2019,50,62841,2732.0
Yellow Taxi,2019,01-2019,79,194199,8443.0
Yellow Taxi,2019,01-2019,233,95286,4143.0
Yellow Taxi,2019,01-2019,158,69445,3019.0
Yellow Taxi,2019,01-2019,41,28913,1257.0
Yellow Taxi,2019,01-2019,144,66952,2911.0
Yellow Taxi,2019,01-2019,152,6961,303.0
Yellow Taxi,2019,01-2019,237,332796,14469.0


In [0]:
# summing monthly average weekday number of dropoffs

dropoff = (
    df.groupBy("service", "year", "month_year", "DOLocationID")
    .agg(count("*").alias("monthly_total"))
    .orderBy("year", "month_year")
)

dropoff = dropoff.toPandas()

aggregate_data(dropoff, "month_year", 1, "monthly_total", "daily_average")

dropoff.display()

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


service,year,month_year,DOLocationID,monthly_total,daily_average
Yellow Taxi,2019,01-2019,193,6335,275.0
Green Taxi,2019,01-2019,113,1033,45.0
Green Taxi,2019,01-2019,228,2789,121.0
Green Taxi,2019,01-2019,265,1102,48.0
Green Taxi,2019,01-2019,27,21,1.0
Green Taxi,2019,01-2019,86,659,29.0
Green Taxi,2019,01-2019,12,50,2.0
Green Taxi,2019,01-2019,15,329,14.0
Green Taxi,2019,01-2019,220,1634,71.0
Green Taxi,2019,01-2019,144,898,39.0
