# Victor Papin - TD 2 - Data & IA - ING 5

# Exploratory Data Analysis with Pyspark and Spark SQL

The following notebook utilizes New York City taxi data from [TLC Trip Record Data](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page)

## Instructions

- Load and explore nyc taxi data from january 0f 2019. The exercises can be executed using pyspark or spark sql ( a subset of the questions will be re-answered using the language not chosen for the  main work).
- Load the zone lookup table to answer the questions about the nyc boroughs.  
- Load nyc taxi data from January of 2025 and compare data.  
- With any remaining time, work on the where to go from here section.  
- Lab due date is TBD ( due dates will be updated in the readme for the class repo )

In [1]:
import dbutils
import spark

# Define the name of the new catalog
catalog = 'taxi_eda_db'

# define variables for the trips data
schema = 'yellow_taxi_trips'
volume = 'data'
file_name = 'yellow_tripdata_2019-01.parquet'
table_name = 'tbl_yellow_taxi_trips'
path_volume = '/Volumes/' + catalog + "/" + schema + '/' + volume
path_table =  catalog + "." + schema
download_url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2019-01.parquet'

In [None]:
# create the catalog/schema/volume
spark.sql('create catalog if not exists ' + catalog)
spark.sql('create schema if not exists ' + catalog + '.' + schema)
spark.sql('create volume if not exists ' + catalog + '.' + schema + '.' + volume)

In [None]:
# Get the data
dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")

In [None]:
# create the dataframe
df_trips = spark.read.parquet(f"{path_volume}/{file_name}",
  header=True,
  inferSchema=True,
  sep=",")

In [None]:
# Show the dataframe
df_trips.show()

## Lab

### Part 1
This section can be completed either using pyspark commands or sql commands ( There will be a section after in which a self-chosen subset of the questions are re-answered using the language not used for the main section. i.e. if pyspark is chosen for the main lab, sql should be used to repeat some of the questions. )

- Add a column that creates a unique key to identify each record in order to answer questions about individual trips
- Which trip has the highest passanger count
- What is the Average passanger count
- Shortest/longest trip by distance? by time?.
- busiest day/slowest single day
- busiest/slowest time of day ( you may want to bucket these by hour or create timess such as morning, afternoon, evening, late night )
- On average which day of the week is slowest/busiest
- Does trip distance or num passangers affect tip amount
- What was the highest "extra" charge and which trip
- Are there any datapoints that seem to be strange/outliers (make sure to explain your reasoning in a markdown cell)?

### Part 2

- Using the code for loading the first dataset as an example, load in the taxi zone lookup and answer the following questions
- which borough had most pickups? dropoffs?
- what are the busy/slow times by borough 
- what are the busiest days of the week by borough?
- what is the average trip distance by borough?
- what is the average trip fare by borough?
- highest/lowest faire amounts for a trip, what burough is associated with the each
- load the dataset from the most recently available january, is there a change to any of the average metrics.

### Part 3

- choose 3 questions from above and re-answer them using the language you did not use for the main notebook . (i.e - if you completed the exercise in python, redo 3 questions in pure sql) . at least one of the questions to be redone must involve a join


### Part 4

As of spark v4 dataframes have native visualization support. Choose at least 3 questions from above and provide visualizations.


# Where to go from here

- Continue building the dataset by loading in more data, start by completing the data for 2019 and calculating the busiest season (fall, winter, spring, summer)
- Explore a dataset/datasets of your choosing

In [None]:
from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.appName("NYC Taxi EDA").getOrCreate()

yellow_tripdata_path = download_url
zone_lookup_path = 'taxi_zone_lookup.csv'

df_trips = spark.read.parquet(yellow_tripdata_path)
df_zones = spark.read.csv(zone_lookup_path, header=True, inferSchema=True)

df_trips.cache()
df_trips.count()

In [None]:
from pyspark.sql import functions as F

df_trips = df_trips.withColumn("trip_id", F.monotonically_increasing_id())
df_trips.orderBy(F.desc("passenger_count")).select("trip_id","passenger_count","trip_distance","tpep_pickup_datetime","tpep_dropoff_datetime").show(1)
df_trips.agg(F.avg("passenger_count")).show()
df_trips.orderBy("trip_distance").select("trip_id","trip_distance","tpep_pickup_datetime","tpep_dropoff_datetime").show(1)
df_trips.orderBy(F.desc("trip_distance")).select("trip_id","trip_distance","tpep_pickup_datetime","tpep_dropoff_datetime").show(1)
df_trips = df_trips.withColumn("trip_duration_minutes",(F.unix_timestamp("tpep_dropoff_datetime")-F.unix_timestamp("tpep_pickup_datetime"))/60)
df_trips.orderBy("trip_duration_minutes").select("trip_id","trip_duration_minutes").show(1)
df_trips.orderBy(F.desc("trip_duration_minutes")).select("trip_id","trip_duration_minutes").show(1)
df_by_date = df_trips.groupBy(F.to_date("tpep_pickup_datetime").alias("date")).count()
df_by_date.orderBy(F.desc("count")).show(1)
df_by_date.orderBy("count").show(1)
df_trips = df_trips.withColumn("hour", F.hour("tpep_pickup_datetime"))
df_trips = df_trips.withColumn("time_of_day", F.when((df_trips.hour >= 5) & (df_trips.hour < 12), "morning").when((df_trips.hour >= 12) & (df_trips.hour < 17), "afternoon").when((df_trips.hour >= 17) & (df_trips.hour < 21), "evening").otherwise("late_night"))
df_trips.groupBy("time_of_day").count().orderBy(F.desc("count")).show()
df_trips.groupBy("time_of_day").count().orderBy("count").show()
df_by_dow = df_trips.groupBy(F.date_format("tpep_pickup_datetime","E").alias("day_of_week")).count()
df_by_dow.orderBy(F.desc("count")).show()
df_trips.stat.corr("trip_distance","tip_amount")
df_trips.stat.corr("passenger_count","tip_amount")
df_trips.orderBy(F.desc("extra")).select("trip_id","extra","tpep_pickup_datetime","tpep_dropoff_datetime").show(1)
quantiles = df_trips.approxQuantile("trip_distance",[0.25,0.75],0.05)
iqr = quantiles[1] - quantiles[0]
lower_bound = quantiles[0] - 1.5*iqr
upper_bound = quantiles[1] + 1.5*iqr
df_outliers = df_trips.filter((df_trips.trip_distance < lower_bound) | (df_trips.trip_distance > upper_bound))
df_outliers.count()df_trips.createOrReplaceTempView("trips")
df_zones.createOrReplaceTempView("zones")

pickups = df_zones.withColumnRenamed("LocationID","PULocationID").withColumnRenamed("Borough","pickup_borough")
dropoffs = df_zones.withColumnRenamed("LocationID","DOLocationID").withColumnRenamed("Borough","dropoff_borough")
df_with_zones = df_trips.join(pickups, on="PULocationID", how="left").join(dropoffs, on="DOLocationID", how="left")

df_with_zones.groupBy("pickup_borough").count().orderBy(F.desc("count")).show(1)
df_with_zones.groupBy("dropoff_borough").count().orderBy(F.desc("count")).show(1)
df_with_zones.groupBy("pickup_borough","time_of_day").count().orderBy(F.desc("count")).show()
df_with_zones.groupBy("pickup_borough", F.date_format("tpep_pickup_datetime","E").alias("day_of_week")).count().orderBy(F.desc("count")).show()
df_with_zones.groupBy("pickup_borough").agg(F.avg("trip_distance").alias("avg_distance")).orderBy(F.desc("avg_distance")).show()
df_with_zones.groupBy("pickup_borough").agg(F.avg("total_amount").alias("avg_fare")).orderBy(F.desc("avg_fare")).show()
highest_fare = df_with_zones.orderBy(F.desc("total_amount")).select("pickup_borough","total_amount","trip_id").limit(1)
lowest_fare = df_with_zones.orderBy("total_amount").select("pickup_borough","total_amount","trip_id").limit(1)
highest_fare.show()
lowest_fare.show()

recent_path = 'path/to/yellow_tripdata_2025-01.parquet'
df_recent = spark.read.parquet(recent_path)
df_recent = df_recent.withColumn("trip_duration_minutes",(F.unix_timestamp("tpep_dropoff_datetime")-F.unix_timestamp("tpep_pickup_datetime"))/60)
recent_avg_distance = df_recent.agg(F.avg("trip_distance")).first()[0]
recent_avg_passengers = df_recent.agg(F.avg("passenger_count")).first()[0]
recent_avg_fare = df_recent.agg(F.avg("total_amount")).first()[0]
recent_avg_distance, recent_avg_passengers, recent_avg_farespark.sql("SELECT z.Borough AS pickup_borough, COUNT(*) AS pickups FROM trips t JOIN zones z ON t.PULocationID = z.LocationID GROUP BY z.Borough ORDER BY pickups DESC LIMIT 1").show()
spark.sql("SELECT z.Borough AS pickup_borough, AVG(t.trip_distance) AS avg_distance FROM trips t JOIN zones z ON t.PULocationID = z.LocationID GROUP BY z.Borough ORDER BY avg_distance DESC").show()
spark.sql("SELECT t.trip_id, z.Borough AS pickup_borough, t.total_amount FROM trips t JOIN zones z ON t.PULocationID = z.LocationID ORDER BY t.total_amount DESC LIMIT 1").show()

In [None]:
spark.sql("SELECT z.Borough AS pickup_borough, COUNT(*) AS pickups FROM trips t JOIN zones z ON t.PULocationID = z.LocationID GROUP BY z.Borough ORDER BY pickups DESC LIMIT 1").show()
spark.sql("SELECT z.Borough AS pickup_borough, AVG(t.trip_distance) AS avg_distance FROM trips t JOIN zones z ON t.PULocationID = z.LocationID GROUP BY z.Borough ORDER BY avg_distance DESC").show()
spark.sql("SELECT t.trip_id, z.Borough AS pickup_borough, t.total_amount FROM trips t JOIN zones z ON t.PULocationID = z.LocationID ORDER BY t.total_amount DESC LIMIT 1").show()