# Visual Analytics

## Assignment 3

**Instructor:** Dr. Marco D'Ambros  
**TAs:** Carmen Armenti, Mattia Giannaccari

**Contacts:** marco.dambros@usi.ch, carmen.armenti@usi.ch, mattia.giannaccari@usi.ch

**Due Date:** May 16, 2025 @ 23:55

---
The goal of this assignment is to use **Spark (PySpark)** and **Polars** in Jupyter notebooks.  
The files `trip_data.csv`, `trip_fare.csv`, and `nyc_boroughs.geojson` are available in the provided folder: [Assignment3-data](https://usi365-my.sharepoint.com/:f:/g/personal/armenc_usi_ch/Ejp7sb8QAMROoWe0XUDcAkMBoqUFk-w2Vgroup025NhAww?e=2I7SMC).

You may clean the data as needed; however, please note that specific data cleaning steps will be required in **Exercise 5**. If you choose to clean the data before Exercise 5, make sure to retain the **original dataset** for use with the Polars exercises.

- Use **Spark** to solve **Exercises 1–4**
- Use **Polars** to solve **Exercises 5–8**

You are encouraged to use [Spark window functions](https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-window.html) whenever appropriate.

Please name your notebook file as `SurnameName_Assignment3.ipynb`

## Spark

### Exercise 1
Join the `trip_data` and `trip_fare` dataframes into one and consider only data on 2013-01-01. Please specify the number of rows obtained after joining the 2 datasets.

In [46]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DateType
import geopandas as gpd

In [47]:
session = SparkSession.builder.getOrCreate()
session.conf.set("spark.sql.shuffle.partitions", 400)
session.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [48]:
trip_data_df = session.read.option("inferSchema", "true").option("header", "true").csv("./datasets/trip_data.csv")
trip_fare_df = session.read.option("inferSchema", "true").option("header", "true").csv("./datasets/trip_fare.csv")


In [49]:
trip_data_df.select([col(c).alias(c.strip()) for c in trip_data_df.columns]) 
trip_fare_df = trip_fare_df.select([col(c).alias(c.strip()) for c in trip_fare_df.columns])

In [50]:
trip_data_df = trip_data_df.select(
    "*",
    col("pickup_datetime").cast(DateType()).alias("pickup_date")
)

In [None]:
joined_df = trip_data_df.join(trip_fare_df, on=["medallion", "hack_license", "pickup_datetime"])

In [None]:
filtered_df = joined_df.filter(joined_df.pickup_date == "2013-01-01")

In [None]:
print("Number of records for 2013-01-01: ", filtered_df.count())

In [None]:
#print schema
filtered_df.printSchema()

### Exercise 2
Provide a graphical representation to compare the average fare amount for trips _within_ and _across_ all the boroughs. You may want to have a look at: https://docs.bokeh.org/en/latest/docs/user_guide/topics/categorical.html#categorical-heatmaps

In [None]:
nyc_df = gpd.read_file('./datasets/nyc-boroughs.geojson')

In [None]:
filtered_df_pandas = filtered_df.toPandas()
pickup_gdf = gpd.GeoDataFrame(
    filtered_df_pandas,
    geometry=gpd.points_from_xy(filtered_df_pandas['pickup_longitude'], filtered_df_pandas['pickup_latitude']),
    crs=nyc_df.crs
)

dropoff_gdf = gpd.GeoDataFrame(
    filtered_df_pandas,
    geometry=gpd.points_from_xy(filtered_df_pandas['dropoff_longitude'], filtered_df_pandas['dropoff_latitude']),
    crs=nyc_df.crs
)

In [None]:
import geopandas as gpd

In [None]:
# Spatial join to get borough names
pickup_boroughs = gpd.sjoin(pickup_gdf, nyc_df, how="left", predicate="within")
dropoff_boroughs = gpd.sjoin(dropoff_gdf, nyc_df, how="left", predicate="within")

In [None]:
# Add to original DataFrame
filtered_df_pandas["pickup_borough"] = pickup_boroughs["borough"].values
filtered_df_pandas["dropoff_borough"] = dropoff_boroughs["borough"].values

In [None]:
filtered_df_pandas.head()

In [None]:
# Step 6: Back to Spark for aggregation
spark_df = session.createDataFrame(filtered_df_pandas)

trip_group_df = spark_df \
    .groupBy(['pickup_borough', 'dropoff_borough']) \
    .avg('fare_amount') \
    .withColumnRenamed('avg(fare_amount)', 'avg_fare')

unique_borough = nyc_df['borough'].unique()

In [None]:
from bokeh.models import BasicTicker, PrintfTickFormatter
from bokeh.plotting import figure, show
from pyspark.sql.functions import max as spark_max, min as spark_min
from bokeh.transform import linear_cmap

min = trip_group_df.agg(spark_min('avg_fare')).collect()[0][0]
max = trip_group_df.agg(spark_max('avg_fare')).collect()[0][0]

colors = ["#03045e", "#023e8a", "#0077b6", "#0096c7", "#00b4d8", "#48cae4", "#90e0ef", "#ade8f4", "#caf0f8"]

TOOLS = "hover"
TOOLTIPS = [
    ('Pickup Borough', '@unique_borough'),
    ('Dropoff Borough', '@unique_borough'),
    ('Average Fare Amount', '@avg_fare{0.2f}')
]

p = figure(title="Average Fare Amount for Pickup and Dropoff Boroughs",
           x_range=unique_borough, y_range=unique_borough,
           x_axis_location="above", width=900, height=400,
           tools=TOOLS, toolbar_location='below', tooltips=TOOLTIPS)

p.grid.grid_line_color = None
p.axis.axis_line_color = None
p.axis.major_tick_line_color = None
p.axis.major_label_text_font_size = "7px"
p.axis.major_label_standoff = 0

r = p.rect(x="pickup_borough", y="dropoff_borough", width=1, height=1, source=trip_group_df.toPandas(),
           fill_color=linear_cmap("avg_fare", colors[::-1], low=min, high=max),
           line_color=None)

p.add_layout(r.construct_color_bar(
    major_label_text_font_size="7px",
    ticker=BasicTicker(desired_num_ticks=len(colors)),
    formatter=PrintfTickFormatter(format="%d%%"),
    label_standoff=6,
    border_line_color=None,
    padding=5,
), 'right')

show(p)

### Exercise 3
Consider only Manhattan, Bronx and Brooklyn boroughs. Then create a dataframe that shows the total number of trips *within* the same borough and *across* all the other boroughs mentioned before (Manhattan, Bronx, and Brooklyn) where the passengers are more or equal than 3.

For example, for Manhattan borough you should consider the total number of the following trips:
- Manhattan → Manhattan
- Manhattan → Bronx
- Manhattan → Brooklyn

You should then do the same for Bronx and Brooklyn boroughs.

In [None]:
from pyspark.sql.functions import col, when
boroughs = ["Manhattan", "Bronx", "Brooklyn"]

filtered_df = spark_df.filter(
    (col("pickup_borough").isin(boroughs)) &
    (col("dropoff_borough").isin(boroughs)) &
    (col("passenger_count") >= 3)
)

In [None]:
# Step 2: Define trip type: 'within' or 'across'
labeled_df = filtered_df.withColumn(
    "trip_type",
    when(col("pickup_borough") == col("dropoff_borough"), "within")
    .otherwise("across")
)

# Step 3: Group by trip_type and count
result_df = labeled_df.groupBy("trip_type").count()
result_df

### Exercise 4
Create a dataframe where each row represents a driver, and there is one column per borough.
For each driver-borough, the dataframe provides the maximum number of consecutive trips
for the given driver, within the given borough. Please consider only trips which were payed by card. 

For example, if for driver A we have (sorted by time):
- Trip 1: Bronx → Bronx
- Trip 2: Bronx \→ Bronx
- Trip 3: Bronx → Manhattan
- Trip 4: Manhattan → Bronx.
    
The maximum number of consecutive trips for Bronx is 2.

In [None]:
card_trips = spark_df.filter(
    (col("payment_type") == "CRD") &
    (col("pickup_borough") == col("dropoff_borough"))
).orderBy("hack_license", "pickup_datetime")

## Polars

### Exercise 5

Please work on the merged dataset of trips and fares and perform the following data cleaning tasks:

1. Remove trips with invalid locations (i.e. not in New York City);
3. Remove trips with invalid amounts:
    - Total amount must be greater than zero;
    - Total amount must correspond to the sum of all the other amounts.
5. Remove trips with invalid time:
    - Pick-up before drop-off;
    - Valid duration.

After each data cleaning task, report how many rows where removed. Finally report:
- Are there **duplicate trips**?
- How many trips remain after cleaning?

In [51]:
import polars as pl

In [52]:
trip_data_df = pl.read_csv(
    "./datasets/trip_data.csv",
    infer_schema_length=100,
    has_header=True,
    encoding="us-ascii"
)
trip_fare_df = pl.read_csv(
    "./datasets/trip_fare.csv",
    infer_schema_length=100,
    has_header=True,
    schema_overrides={" tip_amount": pl.Float64},
    encoding="us-ascii"
)

In [53]:
trip_fare_df.head(5)

medallion,hack_license,vendor_id,pickup_datetime,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount
str,str,str,str,str,f64,f64,f64,f64,f64,f64
"""89D227B655E5C82AECF13C3F540D4C…","""BA96DE419E711691B9445D6A6307C1…","""CMT""","""2013-01-01 15:11:48""","""CSH""",6.5,0.0,0.5,0.0,0.0,7.0
"""0BD7C8F5BA12B88E0B67BED28BEA73…","""9FD8F69F0804BDB5549F40E9DA1BE4…","""CMT""","""2013-01-06 00:18:35""","""CSH""",6.0,0.5,0.5,0.0,0.0,7.0
"""0BD7C8F5BA12B88E0B67BED28BEA73…","""9FD8F69F0804BDB5549F40E9DA1BE4…","""CMT""","""2013-01-05 18:49:41""","""CSH""",5.5,1.0,0.5,0.0,0.0,7.0
"""DFD2202EE08F7A8DC9A57B02ACB81F…","""51EE87E3205C985EF8431D850C7863…","""CMT""","""2013-01-07 23:54:15""","""CSH""",5.0,0.5,0.5,0.0,0.0,6.0
"""DFD2202EE08F7A8DC9A57B02ACB81F…","""51EE87E3205C985EF8431D850C7863…","""CMT""","""2013-01-07 23:25:03""","""CSH""",9.5,0.5,0.5,0.0,0.0,10.5


In [54]:
print("Trip data columns: ", trip_data_df.columns)
print("Trip fare columns: ", trip_fare_df.columns)

Trip data columns:  ['medallion', 'hack_license', 'vendor_id', 'rate_code', 'store_and_fwd_flag', 'pickup_datetime', 'dropoff_datetime', 'passenger_count', 'trip_time_in_secs', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude']
Trip fare columns:  ['medallion', ' hack_license', ' vendor_id', ' pickup_datetime', ' payment_type', ' fare_amount', ' surcharge', ' mta_tax', ' tip_amount', ' tolls_amount', ' total_amount']


In [55]:
trip_data_df = trip_data_df.rename({name: name.strip() for name in trip_data_df.columns})
trip_fare_df = trip_fare_df.rename({name: name.strip() for name in trip_fare_df.columns})

In [56]:
joined_df = trip_data_df.join(trip_fare_df, on=["medallion", "hack_license", "pickup_datetime"], how="inner")

In [None]:
joined_df.head(5)

In [None]:
import json
from pathlib import Path
from typing import Any


def get_min_max_coordinates(
    geojson: dict[str, Any],
) -> tuple[float, float, float, float]:
    """
    Get the min/max coordinates from a geojson file.
    """
    # get all the coordinates from the geojson file
    coordinates: list[tuple[float, float]] = []
    for feature in geojson["features"]:
        coordinates.extend(feature["geometry"]["coordinates"][0])

    # get the min/max coordinates
    min_lon = min([point[0] for point in coordinates])
    max_lon = max([point[0] for point in coordinates])
    min_lat = min([point[1] for point in coordinates])
    max_lat = max([point[1] for point in coordinates])

    return min_lon, max_lon, min_lat, max_lat

nyc_boroughs_geojson_path = Path("./datasets/nyc-boroughs.geojson")

with nyc_boroughs_geojson_path.open("r") as f:
    json_data = f.read()
    nyc_boroughs_geo_data: dict[str, Any] = json.loads(json_data)
# get the min/max coordinates
min_lon, max_lon, min_lat, max_lat = get_min_max_coordinates(nyc_boroughs_geo_data)

print(f"Min lon: {min_lon}, Max lon: {max_lon}")
print(f"Min lat: {min_lat}, Max lat: {max_lat}")

In [None]:
filtered_ny_df = joined_df.filter(
    pl.col("pickup_longitude") > min_lon,
    pl.col("pickup_longitude") < max_lon,
    pl.col("pickup_latitude") > min_lat,
    pl.col("pickup_latitude") < max_lat,
    pl.col("dropoff_longitude") > min_lon,
    pl.col("dropoff_longitude") < max_lon,
    pl.col("dropoff_latitude") > min_lat,
    pl.col("dropoff_latitude") < max_lat,

)

In [None]:
filtered_ny_df.columns

In [None]:
filtered_ny_df = filtered_ny_df.filter(
    (pl.col("total_amount") >= 0) & 
    (pl.col("total_amount") == (pl.col("fare_amount") + pl.col("surcharge") + pl.col("mta_tax")+ pl.col("tip_amount")+ pl.col("tolls_amount"))) &
    (pl.col("pickup_datetime") < pl.col("dropoff_datetime")) &
    (pl.col('dropoff_datetime') - pl.col('pickup_datetime')).dt.total_seconds() == pl.col('trip_time_in_secs')
)

In [None]:
print("Number of records before filtering: ", joined_df.shape[0])
print("Number of records after filtering: ", filtered_ny_df.shape[0])
print("Deleted records: ", joined_df.shape[0] - filtered_ny_df.shape[0])

### Exercise 6

Compute the **total revenue** (total_amount) grouped by:
- Pick-up hour of the day (0–23)
- Passenger count (group >=6 into “6+”)

Create a heatmap where:
- X-axis = hour
- Y-axis = passenger count group
- Cell value = average revenue per trip

### Exercise 7

Define an "anomalous trip" as one that satisfies at least two of the following:
- Fare per mile is above the 95th percentile
- Tip amount > 100% of fare
- trip_time_in_secs is less than 60 seconds but distance is more than 1 mile

Create a dataframe of anomalous trips and:
- Report how many such trips exist
- Create a scatterplot to visualize the anomaly metrics
- Describe the visualization identifying groups and outliers

### Exercise 8
For each driver (hack_license), calculate the **total profit per hour worked**, where:
> profit = 0.7 * (fare_amount + tip_amount) when the trip starts between 7:01 AM and 7:00 PM\
> profit = 0.8 * (fare_amount + tip_amount) when the trip starts between 7:01PM and 7:00 AM

Estimate "hours worked" by summing trip_time_in_secs.

Plot a line chart showing the distribution of average profit per hour **for the top 10% drivers** in terms of total trips.

Which time of day offers **best earning efficiency**?