# Exploratory Data Analysis with Pyspark and Spark SQL

The following notebook utilizes New York City taxi data from [TLC Trip Record Data](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page)

## Instructions

- Load and explore nyc taxi data from january 0f 2019. The exercises can be executed using pyspark or spark sql ( a subset of the questions will be re-answered using the language not chosen for the  main work).
- Load the zone lookup table to answer the questions about the nyc boroughs.  
- Load nyc taxi data from January of 2025 and compare data.  
- With any remaining time, work on the where to go from here section.  
- Lab due date is TBD ( due dates will be updated in the readme for the class repo )

In [0]:
# Define the name of the new catalog
catalog = 'taxi_eda_db'

# define variables for the trips data
schema = 'yellow_taxi_trips'
volume = 'data'
file_name = 'yellow_tripdata_2019-01.parquet'
table_name = 'tbl_yellow_taxi_trips'
path_volume = '/Volumes/' + catalog + "/" + schema + '/' + volume
path_table =  catalog + "." + schema
download_url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2019-01.parquet'

In [0]:
# create the catalog/schema/volume
spark.sql('create catalog if not exists ' + catalog)
spark.sql('create schema if not exists ' + catalog + '.' + schema)
spark.sql('create volume if not exists ' + catalog + '.' + schema + '.' + volume)

DataFrame[]

In [0]:
# Get the data
dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")

True

In [0]:
# create the dataframe
df = spark.read.parquet(f"{path_volume}/{file_name}",
  header=True,
  inferSchema=True,
  sep=",")

In [0]:
# Show the dataframe
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2019-01-01 00:46:40|  2019-01-01 00:53:20|            1.0|          1.5|       1.0|                 N|         151|         239|           1|        7.0|  0.5|    0.5|      1.6

## Lab

### Part 1
This section can be completed either using pyspark commands or sql commands ( There will be a section after in which a self-chosen subset of the questions are re-answered using the language not used for the main section. i.e. if pyspark is chosen for the main lab, sql should be used to repeat some of the questions. )

- Add a column that creates a unique key to identify each record in order to answer questions about individual trips
- Which trip has the highest passanger count
- What is the Average passanger count
- Shortest/longest trip by distance? by time?.
- busiest day/slowest single day
- busiest/slowest time of day ( you may want to bucket these by hour or create timess such as morning, afternoon, evening, late night )
- On average which day of the week is slowest/busiest
- Does trip distance or num passangers affect tip amount
- What was the highest "extra" charge and which trip
- Are there any datapoints that seem to be strange/outliers (make sure to explain your reasoning in a markdown cell)?

In [0]:
#Add a column that creates a unique key to identify each record in order to answer questions about individual trips
from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumn("trip_id", monotonically_increasing_id())
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|trip_id|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------+
|       1| 2019-01-01 00:46:40|  2019-01-01 00:53:20|            1.0|          1.5|       1.0|                 N|         151|         239|           1|        7.0

In [0]:
df.createOrReplaceTempView("TP")



In [0]:

%sql
SELECT *
FROM TP
ORDER BY passenger_count DESC
LIMIT 1;

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,trip_id
2,2019-01-05T13:12:29.000,2019-01-05T13:12:32.000,9.0,0.0,5.0,N,68,68,1,9.8,0.0,0.5,2.0,0.0,0.3,12.6,,,949956


In [0]:
#Which trip has the highest passanger count
display(df.orderBy(df.passenger_count.desc()).limit(1))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,trip_id
2,2019-01-05T13:12:29.000,2019-01-05T13:12:32.000,9.0,0.0,5.0,N,68,68,1,9.8,0.0,0.5,2.0,0.0,0.3,12.6,,,949956


In [0]:
from pyspark.sql.functions import avg
#What is the Average passanger count
display(df.select(avg("passenger_count")))

avg(passenger_count)
1.5670317144945614


In [0]:
%sql
SELECT avg(passenger_count)
FROM TP;

avg(passenger_count)
1.5670317144945614


In [0]:
from pyspark.sql.functions import unix_timestamp, col
#Shortest/longest trip by distance? by time?
display(df.orderBy(df.trip_distance.desc()).limit(1))
display(df.orderBy(df.trip_distance.asc()).limit(1))
df2 = df.withColumn(
    "duration_s",
    unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))
)
df_valid = df2.filter(col("duration_s").isNotNull() & (col("duration_s") > 0))
display(df_valid.orderBy(col("duration_s").desc()).limit(1))
display(df_valid.orderBy(col("duration_s").asc()).limit(1))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,trip_id
1,2019-01-25T21:56:39.000,2019-01-25T22:06:08.000,1.0,831.8,1.0,N,140,239,1,8.5,0.5,0.5,1.96,0.0,0.3,11.76,0.0,,6074091


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,trip_id
2,2018-12-21T13:48:30.000,2018-12-21T13:52:40.000,3.0,0.0,1.0,N,236,236,1,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,,2


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,trip_id,duration_s
1,2019-01-01T07:01:20.000,2019-01-31T14:29:21.000,1.0,1.2,1.0,N,48,163,2,6.5,0.0,0.5,0.0,0.0,0.3,7.3,0.0,,68267,2618881


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,trip_id,duration_s
2,2019-01-01T00:56:01.000,2019-01-01T00:56:02.000,5.0,0.0,5.0,N,264,79,1,12.0,0.0,0.5,2.0,0.0,0.3,14.8,,,12355,1


In [0]:
#busiest day/slowest single day
from pyspark.sql.functions import dayofmonth

df_day = df.groupBy(dayofmonth("tpep_pickup_datetime").alias("day")).count()
display(df_day.orderBy(col("count").desc()).limit(1))
display(df_day.orderBy(col("count").asc()).limit(1))

day,count
25,292501


day,count
1,189552


In [0]:
#busiest/slowest time of da
from pyspark.sql.functions import hour

df_hour = df.groupBy(hour("tpep_pickup_datetime").alias("hour")).count()
display(df_hour.orderBy(col("count").desc()).limit(1))
display(df_hour.orderBy(col("count").asc()).limit(1))

hour,count
18,515390


hour,count
4,61424


In [0]:
#On average which day of the week is slowest/busiest
from pyspark.sql.functions import dayofweek

df_week = df.groupBy(dayofweek("tpep_pickup_datetime").alias("weekday")).count()

display(df_week.orderBy(col("count").desc()).limit(1))  # busiest
display(df_week.orderBy(col("count").asc()).limit(1))

weekday,count
5,1357043


weekday,count
1,859905


In [0]:
#Does trip distance or num passangers affect tip amount
from pyspark.sql.functions import corr

corr_distance_tip = df.stat.corr("trip_distance", "tip_amount")
corr_passengers_tip = df.stat.corr("passenger_count", "tip_amount")

print("Correlation distance vs tips :", corr_distance_tip)
print("Correlation passenger_count vs tips :", corr_passengers_tip)

Correlation distance vs tips : 0.5269200663652669
Correlation passenger_count vs tips : 0.004431051585116288


In [0]:
#What was the highest "extra" charge and which trip
display(df.orderBy(df.extra.desc()).limit(1))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,trip_id
1,2019-01-23T08:58:09.000,2019-01-23T08:58:09.000,1.0,0.0,1.0,Y,24,264,2,355676.98,535.38,2.42,0.0,0.0,0.0,356214.78,0.0,,5323483


In [0]:
df_outliers = df.filter(
      (df.trip_distance > 50)
    | (df.trip_distance == 0)
    | (df.passenger_count > 6)
    | (df.fare_amount < 0)
    | (df.tip_amount > 50)
)
display(df_outliers.limit(20))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,trip_id
2,2018-12-21T13:48:30.000,2018-12-21T13:52:40.000,3.0,0.0,1.0,N,236,236,1,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,,2
2,2018-11-28T15:52:25.000,2018-11-28T15:55:45.000,5.0,0.0,1.0,N,193,193,2,3.5,0.5,0.5,0.0,0.0,0.3,7.55,,,3
2,2018-11-28T15:56:57.000,2018-11-28T15:58:33.000,5.0,0.0,2.0,N,193,193,2,52.0,0.0,0.5,0.0,0.0,0.3,55.55,,,4
2,2018-11-28T16:25:49.000,2018-11-28T16:28:26.000,5.0,0.0,1.0,N,193,193,2,3.5,0.5,0.5,0.0,5.76,0.3,13.31,,,5
2,2018-11-28T16:29:37.000,2018-11-28T16:33:43.000,5.0,0.0,2.0,N,193,193,2,52.0,0.0,0.5,0.0,0.0,0.3,55.55,,,6
1,2019-01-01T00:32:59.000,2019-01-01T00:32:59.000,3.0,0.0,1.0,Y,237,264,2,6.5,0.5,0.5,0.0,0.0,0.3,7.8,,,28
1,2019-01-01T00:42:45.000,2019-01-01T00:44:12.000,1.0,0.0,1.0,N,161,161,3,3.0,0.5,0.5,0.0,0.0,0.3,4.3,,,459
2,2019-01-01T00:32:56.000,2019-01-01T00:33:35.000,2.0,0.1,1.0,N,148,148,3,-2.5,-0.5,-0.5,0.0,0.0,-0.3,-3.8,,,663
1,2019-01-01T00:39:07.000,2019-01-01T00:39:07.000,1.0,0.0,1.0,N,142,264,2,2.5,0.5,0.5,0.0,0.0,0.3,3.8,,,681
1,2019-01-01T00:41:58.000,2019-01-01T00:44:05.000,1.0,0.0,1.0,N,161,161,3,3.5,0.5,0.5,0.0,0.0,0.3,4.8,,,845


Some anomalies appear in the data: zero-distance trips with a fare, extremely long trips (>50 miles), negative fares, or impossible passenger counts (>6). These values likely come from input or sensor errors and could skew analyses, so they should be filtered or handled separately.


### Part 2

- Using the code for loading the first dataset as an example, load in the taxi zone lookup and answer the following questions
- which borough had most pickups? dropoffs?
- what are the busy/slow times by borough 
- what are the busiest days of the week by borough?
- what is the average trip distance by borough?
- what is the average trip fare by borough?
- highest/lowest faire amounts for a trip, what burough is associated with the each
- load the dataset from the most recently available january, is there a change to any of the average metrics.

In [0]:
# Load Taxi Zone Lookup
schema_zones = 'yellow_taxi_trips'
volume_zones = 'data'
file_name_zones = 'taxi_zone_lookup.csv'
table_name_zones = 'tbl_taxi_zones'
path_volume_zones = '/Volumes/' + catalog + "/" + schema_zones + '/' + volume_zones
path_table_zones =  catalog + "." + schema_zones


df_zones = spark.read.csv(
    f"{path_volume_zones}/{file_name_zones}",
    header=True,
    inferSchema=True
)

display(df_zones.limit(5))
df_zones.createOrReplaceTempView(table_name_zones)



LocationID,Borough,Zone,service_zone
1,EWR,Newark Airport,EWR
2,Queens,Jamaica Bay,Boro Zone
3,Bronx,Allerton/Pelham Gardens,Boro Zone
4,Manhattan,Alphabet City,Yellow Zone
5,Staten Island,Arden Heights,Boro Zone


In [0]:
from pyspark.sql.functions import col

df_join = df.join(
    df_zones.withColumnRenamed("LocationID", "PULocationID").withColumnRenamed("Borough", "PU_Borough"),
    on="PULocationID",
    how="left"
)

df_join = df_join.join(
    df_zones.withColumnRenamed("LocationID", "DOLocationID").withColumnRenamed("Borough", "DO_Borough"),
    on="DOLocationID",
    how="left"
)

display(df_join.limit(5))


DOLocationID,PULocationID,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,trip_id,PU_Borough,Zone,service_zone,DO_Borough,Zone.1,service_zone.1
239,151,1,2019-01-01T00:46:40.000,2019-01-01T00:53:20.000,1.0,1.5,1.0,N,1,7.0,0.5,0.5,1.65,0.0,0.3,9.95,,,0,Manhattan,Manhattan Valley,Yellow Zone,Manhattan,Upper West Side South,Yellow Zone
246,239,1,2019-01-01T00:59:47.000,2019-01-01T01:18:59.000,1.0,2.6,1.0,N,1,14.0,0.5,0.5,1.0,0.0,0.3,16.3,,,1,Manhattan,Upper West Side South,Yellow Zone,Manhattan,West Chelsea/Hudson Yards,Yellow Zone
236,236,2,2018-12-21T13:48:30.000,2018-12-21T13:52:40.000,3.0,0.0,1.0,N,1,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,,2,Manhattan,Upper East Side North,Yellow Zone,Manhattan,Upper East Side North,Yellow Zone
193,193,2,2018-11-28T15:52:25.000,2018-11-28T15:55:45.000,5.0,0.0,1.0,N,2,3.5,0.5,0.5,0.0,0.0,0.3,7.55,,,3,Queens,Queensbridge/Ravenswood,Boro Zone,Queens,Queensbridge/Ravenswood,Boro Zone
193,193,2,2018-11-28T15:56:57.000,2018-11-28T15:58:33.000,5.0,0.0,2.0,N,2,52.0,0.0,0.5,0.0,0.0,0.3,55.55,,,4,Queens,Queensbridge/Ravenswood,Boro Zone,Queens,Queensbridge/Ravenswood,Boro Zone


In [0]:
#which borough had most pickups? dropoffs?
from pyspark.sql.functions import count

display(
    df_join.groupBy("PU_Borough")
           .agg(count("*").alias("pickup_count"))
           .orderBy(col("pickup_count").desc())
)

display(
    df_join.groupBy("DO_Borough")
           .agg(count("*").alias("dropoff_count"))
           .orderBy(col("dropoff_count").desc())
)


PU_Borough,pickup_count
Manhattan,6950965
Queens,471173
Unknown,159815
Brooklyn,91905
Bronx,18062
,3890
EWR,446
Staten Island,361


DO_Borough,dropoff_count
Manhattan,6817355
Queens,340972
Brooklyn,301105
Unknown,149097
Bronx,58085
,16904
EWR,10914
Staten Island,2185


In [0]:
#what are the busy/slow times by borough
from pyspark.sql.functions import hour

df_join = df_join.withColumn("pickup_hour", hour("tpep_pickup_datetime"))

display(
    df_join.groupBy("PU_Borough", "pickup_hour")
           .count()
           .orderBy("PU_Borough", "pickup_hour")
)


PU_Borough,pickup_hour,count
Bronx,0,324
Bronx,1,254
Bronx,2,225
Bronx,3,225
Bronx,4,403
Bronx,5,736
Bronx,6,1301
Bronx,7,1803
Bronx,8,1445
Bronx,9,1158


In [0]:
#what are the busiest days of the week by borough?
from pyspark.sql.functions import dayofweek

df_join = df_join.withColumn("pickup_weekday", dayofweek("tpep_pickup_datetime"))

display(
    df_join.groupBy("PU_Borough", "pickup_weekday")
           .count()
           .orderBy("PU_Borough", "pickup_weekday")
)


PU_Borough,pickup_weekday,count
Bronx,1,2112
Bronx,2,2177
Bronx,3,3059
Bronx,4,2999
Bronx,5,3121
Bronx,6,2666
Bronx,7,1928
Brooklyn,1,11099
Brooklyn,2,9516
Brooklyn,3,15779


In [0]:
#what is the average trip distance by borough?
from pyspark.sql.functions import avg
display(
    df_join.groupBy("PU_Borough")
           .agg(avg("trip_distance").alias("avg_trip_distance"))
           .orderBy(col("avg_trip_distance").desc())
)


PU_Borough,avg_trip_distance
Staten Island,12.503601108033246
Queens,11.283218499361992
Bronx,7.233194552098303
Brooklyn,4.787677275447492
,3.193850899742941
EWR,2.641098654708519
Unknown,2.415464130400774
Manhattan,2.22866933584026


In [0]:
#what is the average trip fare by borough?
display(
    df_join.groupBy("PU_Borough")
           .agg(avg("fare_amount").alias("avg_fare_amount"))
           .orderBy(col("avg_fare_amount").desc())
)

PU_Borough,avg_fare_amount
EWR,76.24024663677126
,59.5731593830335
Staten Island,45.289861495844896
Queens,35.14462651722029
Bronx,26.26890543682963
Brooklyn,18.64913280017229
Unknown,14.944423051653525
Manhattan,10.792468572351568


In [0]:
#highest/lowest faire amounts for a trip, what burough is associated with the each
display(
    df_join.orderBy(col("fare_amount").desc())
           .select("fare_amount", "PU_Borough", "DO_Borough",
                   "trip_distance", "tpep_pickup_datetime", "tpep_dropoff_datetime")
           .limit(1)
)
display(
    df_join.orderBy(col("fare_amount").asc())
           .select("fare_amount", "PU_Borough", "DO_Borough",
                   "trip_distance", "tpep_pickup_datetime", "tpep_dropoff_datetime")
           .limit(1)
)


fare_amount,PU_Borough,DO_Borough,trip_distance,tpep_pickup_datetime,tpep_dropoff_datetime
623259.86,Manhattan,Manhattan,2.4,2019-01-11T19:33:15.000,2019-01-11T19:53:09.000


fare_amount,PU_Borough,DO_Borough,trip_distance,tpep_pickup_datetime,tpep_dropoff_datetime
-362.0,Queens,Queens,0.0,2019-01-21T12:01:45.000,2019-01-21T12:08:36.000


In [0]:
file_2025 = "yellow_tripdata_2025-01.parquet"

df_2025 = spark.read.parquet(
    f"{path_volume}/{file_2025}",
    header=True,
    inferSchema=True
)

display(df_2025.limit(5))


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,cbd_congestion_fee
1,2025-01-01T00:18:38.000,2025-01-01T00:26:59.000,1,1.6,1,N,229,237,1,10.0,3.5,0.5,3.0,0.0,1.0,18.0,2.5,0.0,0.0
1,2025-01-01T00:32:40.000,2025-01-01T00:35:13.000,1,0.5,1,N,236,237,1,5.1,3.5,0.5,2.02,0.0,1.0,12.12,2.5,0.0,0.0
1,2025-01-01T00:44:04.000,2025-01-01T00:46:01.000,1,0.6,1,N,141,141,1,5.1,3.5,0.5,2.0,0.0,1.0,12.1,2.5,0.0,0.0
2,2025-01-01T00:14:27.000,2025-01-01T00:20:01.000,3,0.52,1,N,244,244,2,7.2,1.0,0.5,0.0,0.0,1.0,9.7,0.0,0.0,0.0
2,2025-01-01T00:21:34.000,2025-01-01T00:25:06.000,3,0.66,1,N,244,116,2,5.8,1.0,0.5,0.0,0.0,1.0,8.3,0.0,0.0,0.0


In [0]:
from pyspark.sql.functions import col

df_2025_join = (
    df_2025
    .join(
        df_zones.withColumnRenamed("LocationID", "PULocationID_temp")
                .withColumnRenamed("Borough", "PU_Borough"),
        df_2025.PULocationID == col("PULocationID_temp"),
        "left"
    )
    .drop("PULocationID_temp")
    .join(
        df_zones.withColumnRenamed("LocationID", "DOLocationID_temp")
                .withColumnRenamed("Borough", "DO_Borough"),
        df_2025.DOLocationID == col("DOLocationID_temp"),
        "left"
    )
    .drop("DOLocationID_temp")
)

display(df_2025_join.limit(5))


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,cbd_congestion_fee,PU_Borough,Zone,service_zone,DO_Borough,Zone.1,service_zone.1
1,2025-01-01T00:18:38.000,2025-01-01T00:26:59.000,1,1.6,1,N,229,237,1,10.0,3.5,0.5,3.0,0.0,1.0,18.0,2.5,0.0,0.0,Manhattan,Sutton Place/Turtle Bay North,Yellow Zone,Manhattan,Upper East Side South,Yellow Zone
1,2025-01-01T00:32:40.000,2025-01-01T00:35:13.000,1,0.5,1,N,236,237,1,5.1,3.5,0.5,2.02,0.0,1.0,12.12,2.5,0.0,0.0,Manhattan,Upper East Side North,Yellow Zone,Manhattan,Upper East Side South,Yellow Zone
1,2025-01-01T00:44:04.000,2025-01-01T00:46:01.000,1,0.6,1,N,141,141,1,5.1,3.5,0.5,2.0,0.0,1.0,12.1,2.5,0.0,0.0,Manhattan,Lenox Hill West,Yellow Zone,Manhattan,Lenox Hill West,Yellow Zone
2,2025-01-01T00:14:27.000,2025-01-01T00:20:01.000,3,0.52,1,N,244,244,2,7.2,1.0,0.5,0.0,0.0,1.0,9.7,0.0,0.0,0.0,Manhattan,Washington Heights South,Boro Zone,Manhattan,Washington Heights South,Boro Zone
2,2025-01-01T00:21:34.000,2025-01-01T00:25:06.000,3,0.66,1,N,244,116,2,5.8,1.0,0.5,0.0,0.0,1.0,8.3,0.0,0.0,0.0,Manhattan,Washington Heights South,Boro Zone,Manhattan,Hamilton Heights,Boro Zone


In [0]:
from pyspark.sql import functions as F
avg_2019 = (
    df
    .select(
        F.avg("passenger_count").alias("avg_passenger_count_2019"),
        F.avg("trip_distance").alias("avg_trip_distance_2019"),
        F.avg("fare_amount").alias("avg_fare_amount_2019")
    )
)

avg_2025 = (
    df_2025
    .select(
        F.avg("passenger_count").alias("avg_passenger_count_2025"),
        F.avg("trip_distance").alias("avg_trip_distance_2025"),
        F.avg("fare_amount").alias("avg_fare_amount_2025")
    )
)


avg_2019.createOrReplaceTempView("avg2019")
avg_2025.createOrReplaceTempView("avg2025")

comparison = spark.sql("""
SELECT
    a.avg_passenger_count_2019,
    b.avg_passenger_count_2025,
    a.avg_trip_distance_2019,
    b.avg_trip_distance_2025,
    a.avg_fare_amount_2019,
    b.avg_fare_amount_2025
FROM avg2019 a
CROSS JOIN avg2025 b
""")

display(comparison)

avg_passenger_count_2019,avg_passenger_count_2025,avg_trip_distance_2019,avg_trip_distance_2025,avg_fare_amount_2019,avg_fare_amount_2025
1.5670317144945614,1.2978589658806226,2.8301461681151574,5.855126178844192,12.529676777472504,17.081802760453556


### Part 3

- choose 3 questions from above and re-answer them using the language you did not use for the main notebook . (i.e - if you completed the exercise in python, redo 3 questions in pure sql) . at least one of the questions to be redone must involve a join

In [0]:
%sql
SELECT *
FROM TP
ORDER BY passenger_count DESC
LIMIT 1;

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,trip_id
2,2019-01-05T13:12:29.000,2019-01-05T13:12:32.000,9.0,0.0,5.0,N,68,68,1,9.8,0.0,0.5,2.0,0.0,0.3,12.6,,,949956


In [0]:
%sql
SELECT avg(passenger_count)
FROM TP;

avg(passenger_count)
1.5670317144945614


In [0]:
df_join.createOrReplaceTempView("taxi_zones_2019")

# SQL query for average trip distance per borough
result = spark.sql("""
SELECT
    PU_Borough AS borough,
    ROUND(AVG(trip_distance), 2) AS avg_trip_distance
FROM taxi_zones_2019
WHERE PU_Borough IS NOT NULL
GROUP BY PU_Borough
ORDER BY avg_trip_distance DESC
""")

display(result)


borough,avg_trip_distance
Staten Island,12.5
Queens,11.28
Bronx,7.23
Brooklyn,4.79
,3.19
EWR,2.64
Unknown,2.42
Manhattan,2.23



### Part 4

As of spark v4 dataframes have native visualization support. Choose at least 3 questions from above and provide visualizations.


In [0]:
df_pickups = (
    df_join.groupBy("PU_Borough")
           .count()
           .withColumnRenamed("count", "pickup_count")
           .orderBy(F.desc("pickup_count"))
)

display(df_pickups)


PU_Borough,pickup_count
Manhattan,6950965
Queens,471173
Unknown,159815
Brooklyn,91905
Bronx,18062
,3890
EWR,446
Staten Island,361


Databricks visualization. Run in Databricks to view.

In [0]:
df_avg_dist = (
    df_join.groupBy("PU_Borough")
           .agg(F.avg("trip_distance").alias("avg_trip_distance"))
           .orderBy(F.desc("avg_trip_distance"))
)

display(df_avg_dist)


PU_Borough,avg_trip_distance
Staten Island,12.503601108033246
Queens,11.283218499361992
Bronx,7.233194552098303
Brooklyn,4.787677275447492
,3.193850899742941
EWR,2.641098654708519
Unknown,2.415464130400774
Manhattan,2.22866933584026


Databricks visualization. Run in Databricks to view.

In [0]:
df_avg_fare = (
    df_join.groupBy("PU_Borough")
           .agg(F.avg("fare_amount").alias("avg_fare_amount"))
           .orderBy(F.desc("avg_fare_amount"))
)

display(df_avg_fare)


PU_Borough,avg_fare_amount
EWR,76.24024663677126
,59.5731593830335
Staten Island,45.289861495844896
Queens,35.14462651722029
Bronx,26.26890543682963
Brooklyn,18.64913280017229
Unknown,14.944423051653525
Manhattan,10.792468572351568


Databricks visualization. Run in Databricks to view.

# Where to go from here

- Continue building the dataset by loading in more data, start by completing the data for 2019 and calculating the busiest season (fall, winter, spring, summer)
- Explore a dataset/datasets of your choosing