# Names

- Mrassi, Yacine, 21962368, Informatique
- GUEYE, Cheikh Mouhamadou Moustapha, 22220071, MIDS

# New York taxis trips

This homework is about New York taxi trips. Here is something from [Todd Schneider](https://toddwschneider.com/posts/analyzing-1-1-billion-nyc-taxi-and-uber-trips-with-a-vengeance/):

> The New York City Taxi & Limousine Commission has released a  detailed historical dataset covering over 1 billion individual taxi trips in the city from January 2009 through December 2019. 
Taken as a whole, the detailed trip-level data is more than just a vast list of taxi pickup and drop off coordinates: it's a story of a City. 
How bad is the rush hour traffic from Midtown to JFK? 
Where does the Bridge and Tunnel crowd hang out on Saturday nights?
What time do investment bankers get to work? How has Uber changed the landscape for taxis?
The dataset addresses all of these questions and many more.

The dataset is available from New York City Government:

        https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

There is one parquet file for each NY taxi service (yellow, green, fhv) and each calendar
month). Each file is moderately large (up to hundreds of megabytes). The full dataset is
relatively large if it has to be handled on a laptop (several hundred gigabytes)

In [None]:
#!pip install nbconvert

In [None]:
#!pip install contextily

In [None]:
#!pip install geojson geopandas plotly

In [None]:
#!pip install ipyleaflet

## Packages

In [None]:
import os
import urllib.request

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.functions import col, broadcast, dayofweek, hour, count, avg, unix_timestamp, expr,date_format, to_date
from pyspark.sql.catalog import Catalog
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [None]:
import pandas as pd

In [None]:
import geopandas as gpd
from matplotlib import pyplot as plt
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.express as px
import seaborn as sns

# Downloading data and Preparing spark

In [None]:
def download_taxi_data(year, month, data_type):
    base_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/"
    filename = f"{data_type}_tripdata_{year}-{month:02d}.parquet"
    file_url = base_url + filename
    fpath = filename

    if os.path.exists(fpath):
        print(f"{filename} already exists!")
    else:
        print(f"Downloading {filename}...")
        urllib.request.urlretrieve(file_url, fpath)
        print(f"{filename} downloaded.")
        
        
years = [2019, 2020, 2021, 2022]
months = [4,12]
data_types = ["yellow", "fhv"]

for year in years:
    for month in months:
        for data_type in data_types:
            download_taxi_data(year, month, data_type)
            

In [None]:
"""
I have a machine with 4 CPU cores and about 8 GB RAM.
I am using Intel(R) Core(TM) i5-6300U CPU @ 2.40GHz processor.
"""
"""
free -h
              total       utilisé      libre     partagé tamp/cache   disponible
Mem:          7,6Gi       4,4Gi       398Mi       512Mi       2,9Gi       2,5Gi
Partition d'échange:       2,0Gi        29Mi       2,0Gi

"""
%ls -l --block-size=MB

In [None]:
spark = SparkSession \
    .builder \
    .appName("NY Taxi Data Analysis") \
    .config("spark.driver.memory", "3g") \
    .config("spark.executor.memory", "3g") \
    .config("spark.cores.max", "3") \
    .config("spark.sql.shuffle.partitions", "100") \
    .getOrCreate()



In [None]:
dfs = {}

for year in years:
    for month in months:
        for data_type in data_types:
            filename = f"{data_type}_tripdata_{year}-{month:02d}.parquet"
            df = spark.read.parquet(filename)
            dfs[(year, month, data_type)] = df


# Extracting Latitude and Longitude of Taxi Zones and adding it to the dataframe

In [None]:
import zipfile
import io


file_url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zones.zip"

fpath = "taxi_zones.zip"

if os.path.exists(fpath):
    print(f"{filename} already exists!")
else:
    print(f"Downloading {filename}...")
    r=urllib.request.urlretrieve(file_url, fpath)
    print(f"{fpath} downloaded.")
    

with zipfile.ZipFile(fpath, 'r') as zip_ref:
    zip_ref.extractall('./')


In [None]:
taxi_zones_gdf = gpd.read_file("taxi_zones.shp")
taxi_zones_gdf = taxi_zones_gdf.to_crs(4326)
taxi_zones_df = pd.DataFrame(taxi_zones_gdf)

In [None]:
taxi_zones_df.dtypes

In [None]:
taxi_zones_df['centroid'] = taxi_zones_df['geometry'].apply(lambda x: x.centroid)
taxi_zones_df['longitude'] = taxi_zones_df['centroid'].apply(lambda x: x.x)
taxi_zones_df['latitude'] = taxi_zones_df['centroid'].apply(lambda x: x.y)

In [None]:
taxi_zones_df.dtypes


In [None]:
taxi_zones_spark_df = spark.createDataFrame(taxi_zones_df[['LocationID', 'longitude', 'latitude']])

In [None]:
taxi_zones_spark_df.show(3)


In [None]:
def associating_Pickup_Dropoff_Location_IDs_with_Longitude_Latitude(df):
    if 'pickup_longitude' not in df.columns and 'pickup_latitude' not in df.columns:    
        df = df.join(
            broadcast(taxi_zones_spark_df),
            df["PULocationID"] == taxi_zones_spark_df["LocationID"],
            "left"
        ).withColumnRenamed("longitude", "pickup_longitude") \
            .withColumnRenamed("latitude", "pickup_latitude") \
            .drop("LocationID")
    if 'dropoff_longitude' not in df.columns and 'dropoff_latitude' not in df.columns:
        df = df.join(
            broadcast(taxi_zones_spark_df),
            df["DOLocationID"] == taxi_zones_spark_df["LocationID"],
            "left"
        ).withColumnRenamed("longitude", "dropoff_longitude") \
            .withColumnRenamed("latitude", "dropoff_latitude") \
            .drop("LocationID")
    return df

The following longitudes and lattitudes encompass Newark and JFK airports, Northern Man-
hattan and Verazzano bridge.

In [None]:
long_min = -74.10
long_max = -73.70
lat_min = 40.58
lat_max = 40.90

# Invesitage the data of April 2019 for yellow taxis

In [None]:
df_04_19 = dfs[(2019,4,'yellow')]
df_04_20 = dfs[(2020,4,'yellow')]

Associating Taxi Ride Pickup and Dropoff Location IDs with Longitude and Latitude Coordinates

In [None]:
df_04_19 = associating_Pickup_Dropoff_Location_IDs_with_Longitude_Latitude(df_04_19)
df_04_20 = associating_Pickup_Dropoff_Location_IDs_with_Longitude_Latitude(df_04_20)

Using boundaries defined previously, let's filter the 2019 april data (using pickup and dropoff longitude and latitude) and count the number of trips for each value of passenger_count and make a plot of that.

In [None]:
def total_trips_by_passenger_count(df):
            df = df.na.drop(subset=['passenger_count'])
            return (df
               .filter(
                        (col('pickup_longitude') >= long_min) & (col('pickup_longitude') <= long_max) &
                        (col('pickup_latitude') >= lat_min) & (col('pickup_latitude') <= lat_max) &
                        (col('dropoff_longitude') >= long_min) & (col('dropoff_longitude') <= long_max) &
                        (col('dropoff_latitude') >= lat_min) & (col('dropoff_latitude') <= lat_max))
               .groupBy("passenger_count").agg(F.count("*").alias("trip_counts"))
              )
def plot_total_trips_by_passenger_count(df,date):
    total_trips_by_passenger_count_Pandas= df.toPandas()
    plt.bar(total_trips_by_passenger_count_Pandas.passenger_count, total_trips_by_passenger_count_Pandas.trip_counts)
    plt.xlabel('Passenger Count')
    plt.ylabel('Number of Trips')
    plt.title(f"Total Trips by Passenger Count of {date}")
    plt.show()

In [None]:
total_Trips_by_Passenger_Count_of_04_19 = total_trips_by_passenger_count(df_04_19)

In [None]:
total_Trips_by_Passenger_Count_of_04_20 = total_trips_by_passenger_count(df_04_20)

In [None]:
total_Trips_by_Passenger_Count_of_04_19.show()
plot_total_trips_by_passenger_count(total_Trips_by_Passenger_Count_of_04_19,"April 2019")
total_Trips_by_Passenger_Count_of_04_20.show()
plot_total_trips_by_passenger_count(total_Trips_by_Passenger_Count_of_04_20,"April 2020")

Trips with 0 or larger than 7 passengers are pretty rare. We suspect these to be outliers. We
need to explore these trips further in order order to understand what might be wrong with
them
1. What’s special with trips with zero passengers?

In [None]:
trips_zero_passengers = df_04_19.filter(col("passenger_count") == 0)

In [None]:
trips_zero_passengers.describe().show()

2.What's special with trips with more than 6 passengers?

In [None]:
trips_more_6_passengers = df_04_19.filter(col("passenger_count") > 6)

In [None]:
trips_more_6_passengers

3. What is the largest distance travelled during this month? 

In [None]:
df_04_19.select(F.max('trip_distance')).show()

Is it the first taxi on the moon? so close!

In [None]:
df_04_20.select(F.max('trip_distance')).show()

4.Plot the distribution of the trip_distance (using an histogram for instance) during year
2019. Focus on trips with non-zero trip distance and trip distance less than 30 miles.

In [None]:
trip_distance_df = df_04_19.filter((col("trip_distance") > 0)&(col("trip_distance")<30))

In [None]:
sampled_df = trip_distance_df.sample(False, 0.1) 

trip_distances = sampled_df.select("trip_distance").rdd.flatMap(lambda x: x).collect()

plt.hist(trip_distances, bins=50, color="blue", edgecolor="black")
plt.title("Distribution of Trip Distances April 2019")
plt.xlabel("Trip Distance (miles)")
plt.ylabel("Frequency")
plt.show()

In [None]:
trip_distance_df.explain(True)

Let’s look at what Spark does for these computations
• Parsed Logical Plan
• Analyzed Logical Plan
• Optimized Logical Plan
• Physical Plan

Do the Analyzed Logical Plan and Optimized Logical Plan differ? Spot the differences if any. How would a RDBMS proceed with such a query?

The Analyzed Logical Plan and Optimized Logical Plan are the same.
In an RDBMS, the query would be parsed, optimized, and executed. 

Inspect the stages on Spark UI. How many stages are necessary to complete the Spark
job? What are the roles of HashAggregate and Exchange hashpartitioning?


In [None]:
print(spark.sparkContext.uiWebUrl)

Does the physical plan perform shuffle operations? If yes how many? 

I don't know how check

the trip distance distribution for each day of week

In [None]:
day_dict = {1:'Sun', 2:'Mon', 3:'Tue', 4:'Wed', 5:'Thu', 6:'Fri', 7:'Sat'}


In [None]:
df = df_04_19.withColumn("day_of_week", dayofweek('tpep_pickup_datetime'))
df = df.groupBy('day_of_week').agg({'trip_distance': 'mean'})
df = df.orderBy('day_of_week')
pd_df = df.toPandas()
pd_df['day_of_week'] = pd_df['day_of_week'].replace(day_dict)


In [None]:
plt.figure(figsize=(10,6))
sns.barplot(x='day_of_week', y='avg(trip_distance)', data=pd_df)
plt.title('Average Trip Distance for Each Day of Week')
plt.xlabel('Day of Week')
plt.ylabel('Average Trip Distance')
plt.show()

the number of distinct pickup location

In [None]:
num_distinct_pickup_locations = df_04_19.select("PULocationID").distinct().count()
print(num_distinct_pickup_locations)

Compute and display tips and profits as a function of the pickup location

In [None]:
df_profits = df_04_19.na.fill(0)

In [None]:
#It is difficult to identify all the sites, so we chose to work only on the 20 most important areas.
top_locations = df_profits.groupBy("PULocationID").count().orderBy(F.desc("count")).limit(20)

df_profits = df_profits.withColumn("Profit", 
                   F.col("Total_amount") - 
                   (F.col("Fare_amount") + 
                    F.col("Extra") + 
                    F.col("MTA_tax") + 
                    F.col("Improvement_surcharge") + 
                    F.col("Tolls_amount") + 
                    F.col("Congestion_Surcharge") + 
                    F.col("Airport_fee")))

df_profits = df_profits.groupBy("PULocationID").agg(
    F.mean("Tip_amount").alias("Avg_tip"),
    F.mean("Profit").alias("Avg_profit")
)

df_profits = df_profits.filter(F.col("Avg_profit") >= 0)

df_profits = df_profits.join(top_locations, "PULocationID")
df_pandas = df_profits.toPandas()


plt.figure(figsize=(10,5))
sns.barplot(x='PULocationID', y='Avg_tip', data=df_pandas, ci=None)
plt.title('Average Tips by Pickup Location')
plt.ylabel('Average Tip Amount (USD)')
plt.xticks(rotation=90)
plt.show()



plt.figure(figsize=(10,5))
sns.barplot(x='PULocationID', y='Avg_profit', data=df_pandas, ci=None)
plt.title('Average Profit by Pickup Location')
plt.ylabel('Average Profit (USD)')
plt.xticks(rotation=90)
plt.show()

>we can conclude that the profits are essentially made up of tips.

# Investigate the data trips of APRIL in 2019, 2020, 2021, 2022 for yellow and fhv taxis

## preparing data

We will create a dictionary containing DataFrames, where each DataFrame includes the original data along with columns for pickup and dropoff coordinates (longitudes and latitudes), as well as the associated day of the week and hour of the day for each pickup. To improve processing performance, we will also partition the data by day of the week and hour of the day, and use cache() to store intermediate results for quicker access.

In [None]:
def add_day_and_hour_columns(df, taxi_type):
    if taxi_type == "fhv":
        df = df.withColumnRenamed('pickup_datetime', 'tpep_pickup_datetime')
    df = df.withColumn("day_of_week", dayofweek('tpep_pickup_datetime')) \
           .withColumn("hour_of_day", hour('tpep_pickup_datetime'))
    df = df.repartition("day_of_week", "hour_of_day").cache()
    return df


In [None]:
df_yellow_04_2019 = add_day_and_hour_columns(associating_Pickup_Dropoff_Location_IDs_with_Longitude_Latitude(dfs[(2019,4,'yellow')]),'yellow')
df_yellow_04_2020 = add_day_and_hour_columns(associating_Pickup_Dropoff_Location_IDs_with_Longitude_Latitude(dfs[(2020,4,'yellow')]),'yellow')
df_yellow_04_2021 = add_day_and_hour_columns(associating_Pickup_Dropoff_Location_IDs_with_Longitude_Latitude(dfs[(2021,4,'yellow')]),'yellow')
df_yellow_04_2022 = add_day_and_hour_columns(associating_Pickup_Dropoff_Location_IDs_with_Longitude_Latitude(dfs[(2022,4,'yellow')]),'yellow')

In [None]:
df_fhv_04_2019 = add_day_and_hour_columns(associating_Pickup_Dropoff_Location_IDs_with_Longitude_Latitude(dfs[(2019,4,'fhv')]),'fhv')
df_fhv_04_2020 = add_day_and_hour_columns(associating_Pickup_Dropoff_Location_IDs_with_Longitude_Latitude(dfs[(2020,4,'fhv')]),'fhv')
df_fhv_04_2021 = add_day_and_hour_columns(associating_Pickup_Dropoff_Location_IDs_with_Longitude_Latitude(dfs[(2021,4,'fhv')]),'fhv')
df_fhv_04_2022 = add_day_and_hour_columns(associating_Pickup_Dropoff_Location_IDs_with_Longitude_Latitude(dfs[(2022,4,'fhv')]),'fhv')

In [None]:
data = {
    2019: {'yellow': df_yellow_04_2019,'fhv': df_fhv_04_2019},
    2020: {'yellow': df_yellow_04_2020,'fhv': df_fhv_04_2020},
    2021: {'yellow': df_yellow_04_2021,'fhv': df_fhv_04_2021},
    2022: {'yellow': df_yellow_04_2022,'fhv': df_fhv_04_2022}
}

In [None]:
df_fhv_04_2022.printSchema()

## Assessing seasonalities and looking at time series

### The number of pickups

In [None]:
def pickups_by_dow_hour(df):
    result = df.groupBy('day_of_week', 'hour_of_day') \
               .agg(count('tpep_pickup_datetime').alias('num_pickups'))\
               .orderBy('day_of_week', 'hour_of_day')
    return result

In [None]:
def plot_pickups_by_day_and_hour(df,date,taxi_type):
    
    df = df.toPandas()
    fig = px.bar(df, x='hour_of_day', y='num_pickups', color='day_of_week', barmode='group', labels={'hour_of_day':'Hour of the day', 'num_pickups':'Number of pickups', 'day_of_week':'Day of the week'}, title=f'The number of pickups as a function of the day of the week and hour of day in {date} for {taxi_type} taxis')
    fig.show()

In [None]:
plot_pickups_by_day_and_hour(pickups_by_dow_hour(data[2019]['yellow']),'April 2019','yellow')

In [None]:
plot_pickups_by_day_and_hour(pickups_by_dow_hour(data[2019]['fhv']),'April 2019','fhv')

In [None]:
plot_pickups_by_day_and_hour(pickups_by_dow_hour(data[2020]['yellow']),'April 2020','yellow')

In [None]:
plot_pickups_by_day_and_hour(pickups_by_dow_hour(data[2020]['fhv']),'April 2020','fhv')

In [None]:
plot_pickups_by_day_and_hour(pickups_by_dow_hour(data[2021]['yellow']),'April 2021','yellow')

In [None]:
plot_pickups_by_day_and_hour(pickups_by_dow_hour(data[2021]['fhv']),'April 2021','fhv')

In [None]:
plot_pickups_by_day_and_hour(pickups_by_dow_hour(data[2022]['yellow']),'April 2022','yellow')


In [None]:
plot_pickups_by_day_and_hour(pickups_by_dow_hour(data[2022]['fhv']),'April 2022','fhv')

### The average fare

----Missing data for fhv taxis---

In [None]:
def avg_fare_by_dow_hour(df):
    result = df.groupBy('day_of_week', 'hour_of_day') \
               .agg(avg('fare_amount').alias('average_fare')) \
               .orderBy('day_of_week', 'hour_of_day')
    return result

In [None]:
def plot_avg_fare_by_dow_hour(df,date,taxi_type):
    df = df.toPandas()
    fig = px.bar(df, x='hour_of_day', y='average_fare', color='day_of_week', barmode='group', labels={'hour_of_day':'Hour of the day', 'average_fare':'average fare', 'day_of_week':'Day of the week'}, title=f'The avg fare as a function of the day of the week and hour of day in {date} for {taxi_type} taxis')
    fig.show()

In [None]:
plot_avg_fare_by_dow_hour(avg_fare_by_dow_hour(data[2019]['yellow']),'April 2019','yellow')

In [None]:
plot_avg_fare_by_dow_hour(avg_fare_by_dow_hour(data[2019]['yellow']),'April 2019','yellow')

In [None]:
plot_avg_fare_by_dow_hour(avg_fare_by_dow_hour(data[2020]['yellow']),'April 2020','yellow')

In [None]:
plot_avg_fare_by_dow_hour(avg_fare_by_dow_hour(data[2021]['yellow']),'April 2021','yellow')

In [None]:
plot_avg_fare_by_dow_hour(avg_fare_by_dow_hour(data[2022]['yellow']),'April 2022','yellow')

### The average trip duration

In [None]:
def avg_duration_by_dow_hour(df):
    df = df.withColumn("duration", (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60)
    result = df.groupBy('day_of_week', 'hour_of_day') \
               .agg(avg('duration').alias('average_duration')) \
               .orderBy('day_of_week', 'hour_of_day')
    return result

In [None]:
def plot_avg_duration_by_dow_hour(df,date,taxi_type):
    df = df.toPandas()
    fig = px.bar(df, x='hour_of_day', y='average_duration', color='day_of_week', barmode='group', labels={'hour_of_day':'Hour of the day', 'average_duration':'average duration', 'day_of_week':'Day of the week'}, title=f'The average duration as a function of the day of the week and hour of day in {date} for {taxi_type} taxis')
    fig.show()

In [None]:
plot_avg_duration_by_dow_hour(avg_duration_by_dow_hour(data[2019]['yellow']),'April 2019','yellow')

In [None]:
plot_avg_duration_by_dow_hour(avg_duration_by_dow_hour(data[2020]['yellow']),'April 2020','yellow')

### The average of ongoing trips

In [None]:
# We didn't understand the question!

## Rides to the airports

In [None]:
midtown = {'lon_min':-74.01, 'lon_max':-73.96, 'lat_min':40.73, 'lat_max':40.78}
jfk = {'lon_min':-73.84, 'lon_max':-73.75, 'lat_min':40.61, 'lat_max':40.67}
newark = {'lon_min':-74.21, 'lon_max':-74.14, 'lat_min':40.66, 'lat_max':40.72}
laguardia = {'lon_min':-73.90, 'lon_max':-73.85, 'lat_min':40.76, 'lat_max':40.79}

In [None]:

def median_trip_duration(df, pickup_bounds, dropoff_bounds):
    filtered_df = df.filter(
        (col("pickup_longitude") >= pickup_bounds["lon_min"]) &
        (col("pickup_longitude") <= pickup_bounds["lon_max"]) &
        (col("pickup_latitude") >= pickup_bounds["lat_min"]) &
        (col("pickup_latitude") <= pickup_bounds["lat_max"]) &
        (col("dropoff_longitude") >= dropoff_bounds["lon_min"]) &
        (col("dropoff_longitude") <= dropoff_bounds["lon_max"]) &
        (col("dropoff_latitude") >= dropoff_bounds["lat_min"]) &
        (col("dropoff_latitude") <= dropoff_bounds["lat_max"])
    )

    result = (
        filtered_df
        .withColumn('duration', (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60)
        .groupBy('day_of_week', 'hour_of_day')
        .agg(F.expr("percentile_approx(duration, 0.5)").alias("median_duration"))
        .orderBy('day_of_week', 'hour_of_day')
    )


    return result

###  Median duration of taxi trip leaving Midtown (Southern Manhattan) headed for JFK Airport

In [None]:
d = median_trip_duration(data[2019]['yellow'], midtown, jfk)
df = d.toPandas()
fig = px.bar(df, x='hour_of_day', y='median_duration', color='day_of_week', labels={'hour_of_day':'Hour of the day', 'median_duration':'median duration', 'day_of_week':'Day of the week'},title="Median duration of taxi trip leaving Midtown (Southern Manhattan) headed for JFK Airport")
fig.show()

### Median taxi duration of trip leaving from JFK Airport to Midtown (Southern Manhattan)

In [None]:
d = median_trip_duration(data[2019]['yellow'], jfk, midtown)
df = d.toPandas()
fig = px.bar(df, x='hour_of_day', y='median_duration', color='day_of_week', labels={'hour_of_day':'Hour of the day', 'median_duration':'median duration', 'day_of_week':'Day of the week'},title='Median taxi duration of trip leaving from JFK Airport to Midtown (Southern Manhattan)')
fig.show()

# Geographic information

In this section, we are going to work 

1. a heatmap where color is a function of number of pickups, number of dropoffs and number of pickups with dropoff at some airport (JFK, LaGuardia, Newark)

In [None]:
def pickups_count(df):
    result = df.groupBy('PULocationID').agg(count('*').alias('num_pickups'))
    return result.toPandas()
def dropoff_count(df):
    result = df.groupBy('DOLocationID').agg(count('*').alias('num_dropoff'))
    return result.toPandas()
def pickups_dropoff_airport(df):
    result = (df
              .where(((jfk['lat_min'] <= col("dropoff_latitude")) & (col("dropoff_latitude") <= jfk['lat_max'])
                      & (jfk['lon_min'] <= col("dropoff_longitude")) & (col("dropoff_longitude") <= jfk['lon_max']))
                    |((newark['lat_min'] <= col("dropoff_latitude")) & (col("dropoff_latitude") <= newark['lat_max'])
                      & (newark['lon_min'] <= col("dropoff_longitude")) & (col("dropoff_longitude") <= newark['lon_max']))
                    |((laguardia['lat_min'] <= col("dropoff_latitude")) & (col("dropoff_latitude") <= laguardia['lat_max'])
                      & (laguardia['lon_min'] <= col("dropoff_longitude")) & (col("dropoff_longitude") <= laguardia['lon_max'])))
              .groupBy('PULocationID')
              .agg(count('*').alias('num_pickups_dropoff_airport'))
             )
    return result.toPandas()


In [None]:
pickups_df = pickups_count(data[2019]['yellow'])
dropoffs_df = dropoff_count(data[2019]['yellow'])
airport_pickups_df = pickups_dropoff_airport(data[2019]['yellow'])
pickups_df.dtypes
pickups_map = taxi_zones_df.merge(pickups_df, left_on='LocationID', right_on='PULocationID')
dropoffs_map = taxi_zones_df.merge(dropoffs_df, left_on='LocationID', right_on='DOLocationID')
airport_pickups_map = taxi_zones_df.merge(airport_pickups_df, left_on='LocationID', right_on='PULocationID')

In [None]:
airport_pickups_map.head(1)

In [None]:
def plot(df, column, title):
    fig, ax = plt.subplots(figsize=(12, 8))
    gdf = gpd.GeoDataFrame(df) 
    gdf.plot(column=column, cmap='viridis', ax=ax, legend=True)
    ax.set_aspect('equal')
    ax.set_title(title, fontsize=18)
    plt.show()

plot(pickups_map, column='num_pickups', title='Pickups Heatmap')

plot(dropoffs_map, column='num_dropoff', title='Dropoffs Heatmap')

plot(airport_pickups_map, column='num_pickups_dropoff_airport', title='Pickups to Airport Heatmap')


2. Build a choropeth map where color is a function of number of pickups in the area, ratio of number of payments by card/number of cash payments for pickups in the area and ratio of total fare/trip duration for dropoff in the area

In [None]:
pickup_df = data[2019]['yellow'].groupBy('PULocationID').count().withColumnRenamed('count', 'num_pickups')

card_payments_df = data[2019]['yellow'].filter(data[2019]['yellow'].payment_type == 1).groupBy('PULocationID').count().withColumnRenamed('count', 'card_payments')

cash_payments_df = data[2019]['yellow'].filter(data[2019]['yellow'].payment_type == 2).groupBy('PULocationID').count().withColumnRenamed('count', 'cash_payments')

pickup_df = pickup_df.join(card_payments_df, 'PULocationID', 'outer').join(cash_payments_df, 'PULocationID', 'outer')

pickup_df = pickup_df.withColumn('card_cash_ratio', F.when(F.col('cash_payments') == 0, F.lit(None)).otherwise(F.col('card_payments') / F.col('cash_payments')))

pickup_df.show()

In [None]:
df = data[2019]['yellow'].withColumn("trip_duration_minutes", 
                   (F.unix_timestamp(data[2019]['yellow'].tpep_dropoff_datetime) - F.unix_timestamp(data[2019]['yellow'].tpep_pickup_datetime)) / 60)

dropoff_df = df.groupBy('DOLocationID').agg(F.sum('fare_amount').alias('total_fare'),
                                             F.sum('trip_duration_minutes').alias('total_duration'))

dropoff_df = dropoff_df.withColumn('fare_duration_ratio', 
                                   F.when(F.col('total_duration') == 0, F.lit(None)).otherwise(F.col('total_fare') / F.col('total_duration')))

dropoff_df = dropoff_df.drop('total_fare', 'total_duration')

dropoff_df.show()


In [None]:
pickup_pd_df = pickup_df.toPandas()
dropoff_pd_df = dropoff_df.toPandas()

In [None]:
geo_pickup_df = taxi_zones_df.merge(pickup_pd_df, left_on='LocationID', right_on='PULocationID', how='right')
geo_dropoff_df = taxi_zones_df.merge(dropoff_pd_df, left_on='LocationID', right_on='DOLocationID', how='right')

In [None]:
plot(geo_pickup_df, column='card_cash_ratio', title='card_cash_ratio')

plot(geo_dropoff_df, column='fare_duration_ratio', title='fare_duration_ratio')


3. Build an interactive chorophet with a slider allowing the user to select an hour of day and where the color is a function of average number of dropoffs in the area during that hour the day and average ratio of tip over total fare amount for pickups in the area at given hour of the day

In [None]:

avg_do_by_hour_DOLocationId = data[2019]['yellow'].groupBy('hour_of_day','DOLocationID').mean()

avg_ratio_tip_over_fare_amount_by_hour_PULocationID = data[2019]['yellow'] \
    .filter((col('fare_amount') != 0) & (~col('fare_amount').isNull())) \
    .withColumn('tip_to_fare_ratio', col('tip_amount') / col('fare_amount')) \
    .groupBy('hour_of_day', 'PULocationID', 'tip_to_fare_ratio') \
    .mean()


4. Spot traﬀic imbalances. For each day and each hour, and each zone, compute the number of trips arriving and leaving the zone, compute the ratio between the two quantities and build a choropleth spotting possible imbalances.

In [None]:

df1 = data[2019]['yellow'].groupBy('day_of_week', 'hour_of_day', 'DOLocationID').agg(count('*').alias('arriving_count'))
df2 = data[2019]['yellow'].groupBy('day_of_week', 'hour_of_day', 'PULocationID').agg(count('*').alias('leaving_count'))
df= df1.join(df2, on=['day_of_week', 'hour_of_day'], how='inner')
df = df.filter(col('DOLocationID') == col('PULocationID')).withColumn('imbalance_ratio', col('arriving_count') / col('leaving_count'))
df_pd=df.toPandas()
df_pd=taxi_zones_df.merge(df_pd, left_on='LocationID', right_on='DOLocationID', how='right')
df_pd['imbalance_ratio'] = df_pd['imbalance_ratio'].astype(float)
df_pd = gpd.GeoDataFrame(df_pd, geometry='geometry')

In [None]:
fig, ax = plt.subplots(1, 1)
df_pd.plot(column='imbalance_ratio', 
            ax=ax, 
            legend=True, 
            cmap='YlOrRd', 
            legend_kwds={'label': "Imbalance Ratio", 'orientation': "horizontal"},
            missing_kwds={"color": "lightgrey", "label": "Missing values"})
plt.show()


# Covid impact

We are going to download the first 7 months of 2020 year

In [None]:
m = [1,2,3,4,5,6,7]
for month in m:
    download_taxi_data(2020, month,'yellow')

In [None]:

for month in m:
    filename = f"yellow_tripdata_2020-{month:02d}.parquet"
    df = spark.read.parquet(filename)
    dfs[(2020, month, 'yellow')] = df
    


In [None]:
union_df = dfs[(2020, m[0], 'yellow')]
for month in m[1:]:
    union_df = union_df.union(dfs[(2020, month, 'yellow')])

In [None]:
df_by_day = union_df.groupBy(date_format('tpep_pickup_datetime', 'yyyy-MM-dd').alias('date')) \
                  .agg(count('*').alias('num_trips')) \
                  .orderBy('date')

pdf_by_day = df_by_day.toPandas()
pdf_by_day["date"] = pd.to_datetime(pdf_by_day["date"])

In [None]:
ax = pdf_by_day.plot(x="date", y="num_trips", kind="line", figsize=(12, 6), title="Number of Trips by Day")
plt.axvline(x='2020-03-21', color='r', linestyle='--', label='Covid lockdown')
ax.set_xlim('2020-01-01','2020-06-30')
plt.legend()
plt.show()


# Clean cache and stop spark

In [None]:
def list_all_active_dataframes():
    return [obj for obj in globals().values() if isinstance(obj, DataFrame)]

def uncache_all_dataframes():
    for df in list_all_active_dataframes():
        if df.storageLevel.useMemory:
            df.unpersist()

uncache_all_dataframes()

In [None]:
spark.stop()