# Notebook using SparkGEO to create "silver" data of the NYC taxi data.

You can download the NYC taxi data from [here](https://www.kaggle.com/c/nyc-taxi-trip-duration/data).

Here, we preprocess additional table columns value such as:
- Hour of day.
- Day of week.
- Pick up borough.
- Dropoff airport.

Make sure to install these additional modules:
```shell
uv pip install -U duckdb folium matplotlib mapclassify xyzservices
```

In [None]:
import os

import duckdb
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
import sparkgeo.functions as S
import sparkgeo.processors as P

In [None]:
spark

In [None]:
# import warnings
# warnings.filterwarnings("ignore", category=UserWarning, message="to_numpy")

# import logging
# logging.getLogger('py4j').setLevel(logging.ERROR)

In [None]:
#  Adjust to where you downloaded the data.
base = os.path.expanduser("~/data/nyc-taxi-trip-duration")

### Read Airport Polygons.

You can download them from [here](https://data.cityofnewyork.us/City-Government/Airport-Polygon/xfhz-rhsk).

In [None]:
schema = ",".join(
    [
        "`the_geom` string",
        "`NAME` string",
        "`GEOSERVER_` string",
        "`URL` string",
        "`SHAPE_AREA` string",
        "`SHAPE_LEN` string",
    ]
)

airports = (
    spark.read.csv(
        os.path.join(base, "AIRPORT_POLYGON_20240519.csv"),
        schema=schema,
        header=True,
        mode="DROPMALFORMED",
    )
    .select(
        S.st_wgs84(
            S.st_buffer(
                S.st_mercator(S.st_from_text("the_geom")),
                dist=600.0,
                wkid=3857,
                max_vertices=16,
            )
        ),
        (
            F.when(F.col("NAME") == "La Guardia Airport", "LGA")
            .when(F.col("NAME") == "John F. Kennedy International Airport", "JFK")
            .otherwise(F.col("NAME"))
            .alias("airport")
        ),
        # F.col("NAME").alias("airport_name"),
    )
    .cache()
)

In [None]:
airports.select("airport").show(vertical=True, truncate=False)

In [None]:
# gdf = airports.st_project().toGeoPandas()
# gdf.explore(tiles=xyz.Esri.WorldGrayCanvas)

### Read Borough Polygons.

You can download them from [here](https://data.cityofnewyork.us/City-Government/Borough-Boundaries/tqmj-j8zm).

In [None]:
boro = (
    spark.read.format("shp")
    .select(
        S.st_dump_explode("shape"),
        F.col("boro_name").alias("boro"),
    )
)

In [None]:
boro.select("boro").distinct().show(truncate=False)

In [None]:
# gdf = boro.st_project().toGeoPandas()
# gdf.explore(tiles=xyz.Esri.WorldGrayCanvas)

### Define dataframe schema when reading CSV file.

In [None]:
schema = ",".join(
    [
        "`id` string",
        "`vendor_id` string",
        "`pickup_datetime` timestamp",
        "`dropoff_datetime` timestamp",
        "`passenger_count` int",
        "`pickup_longitude` double",
        "`pickup_latitude` double",
        "`dropoff_longitude` double",
        "`dropoff_latitude` double",
        "`store_and_fwd_flag` string",
        "`trip_duration` double",
    ]
)

### Define spatial extent and aggregation cell dimension.

In [None]:
xmin, xmax, ymin, ymax = (
    -74.28268900920182,
    -73.04544707296462,
    40.49710075083063,
    41.18590742469119,
)

cell = 100.0

In [None]:
df = (
    spark.read.csv(
        os.path.join(base, "train.csv"),
        schema=schema,
        header=True,
        mode="DROPMALFORMED",
    )
    # get trips in the spatial extent.
    .filter(
        F.col("pickup_longitude").between(xmin, xmax)
        & F.col("pickup_latitude").between(ymin, ymax)
        & F.col("dropoff_longitude").between(xmin, xmax)
        & F.col("dropoff_latitude").between(ymin, ymax)
    )
    .withColumnRenamed("pickup_datetime", "pickup_timestamp")
    .withColumnRenamed("dropoff_datetime", "dropoff_timestamp")
    .withColumnRenamed("trip_duration", "seconds")
    # Calculate trip distance.
    .withColumn(
        "meters",
        S.haversine(
            "pickup_longitude",
            "pickup_latitude",
            "dropoff_longitude",
            "dropoff_latitude",
        ),
    )
    .drop(
        "vendor_id",
        "id",
        "store_and_fwd_flag",
    )
    # Calculate pickup/dropff Q/R values.
    .withColumn("pickup_q", S.lon_to_q("pickup_longitude", cell))
    .withColumn("pickup_r", S.lat_to_r("pickup_latitude", cell))
    .withColumn("dropoff_q", S.lon_to_q("dropoff_longitude", cell))
    .withColumn("dropoff_r", S.lat_to_r("dropoff_latitude", cell))
    # Calculate temporal columns.
    .withColumn("day_of_week", F.dayofweek("pickup_timestamp"))
    .withColumn("hour_of_day", F.hour("pickup_timestamp"))
    .withColumn("minutes", F.col("seconds") / F.lit(60.0))
    # Finally, filter by distance and time.
    .filter(F.col("meters").between(10, 15_000) & F.col("seconds").between(1, 3600))
    .cache()
)

### Get trip distance and duration statistics

In [None]:
df.select("meters", "minutes").summary().show()

### Let's plot the distributions.

In [None]:
pdf = df.select(
    "meters",
    "minutes",
).toPandas()

In [None]:
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.hist(pdf["meters"], bins=30, alpha=0.7, color="blue", edgecolor="black")
plt.title("Meter Series Distribution")
plt.xlabel("Meter")
plt.ylabel("Frequency")

plt.subplot(1, 2, 2)
plt.hist(pdf["minutes"], bins=30, alpha=0.7, color="green", edgecolor="black")
plt.title("Minute Series Distribution")
plt.xlabel("Minute")
plt.ylabel("Frequency")

plt.tight_layout()
plt.show()

### Precalculate Airport and Borough columns.

In [None]:
df = (
    df.xy_in_polygon(
        airports.withColumnRenamed("airport", "pickup_airport"),
        point_x="pickup_longitude",
        point_y="pickup_latitude",
        keep_all_points=True,
        wkid=4326,
    )
    .xy_in_polygon(
        airports.withColumnRenamed("airport", "dropoff_airport"),
        point_x="dropoff_longitude",
        point_y="dropoff_latitude",
        keep_all_points=True,
        wkid=4326,
    )
    .xy_in_polygon(
        boro.withColumnRenamed("boro", "dropoff_boro"),
        point_x="dropoff_longitude",
        point_y="dropoff_latitude",
        keep_all_points=True,
        wkid=4326,
    )
    .xy_in_polygon(
        boro.withColumnRenamed("boro", "pickup_boro"),
        point_x="pickup_longitude",
        point_y="pickup_latitude",
        keep_all_points=True,
        wkid=4326,
    )
)

### Perform final clean up.

In [None]:
df = (
    df.drop(
        "pickup_longitude",
        "pickup_latitude",
        "dropoff_longitude",
        "dropoff_latitude",
        "seconds",
    )
    .withColumnRenamed("minutes", "trip_duration_in_minutes")
    .withColumnRenamed("meters", "trip_distance_in_meters")
)

### Persist dataframe as a parquet file.

**Note**: Typically, multiple parquet files should be created. But here, we are creating 1 output parquet file for ease of use with DuckDB Web Shell.

In [None]:
# df.printSchema()

In [None]:
parquet = os.path.join(base, "train.prq")

In [None]:
(
    df
    # .repartition(1)
    .write.parquet(parquet, mode="overwrite")
)

### Create DuckDB database.

In [None]:
def create_ijw_table(conn) -> None:
    conn.execute(
        """
create or replace table ijw as (
select x.i,y.j,if(i==0 and j==0,1.0,0.5) w
from
range(-1,2) as x(i),
range(-1,2) as y(j))
    """.strip()
    )


def create_hotspot_macro(conn) -> None:
    conn.sql(
        """
create or replace macro hotspot(_where) as table
with
t1 as (select pickup_q q,pickup_r r,count(*) qr_count from trips where _where group by q,r),
t2 as (select q+i q, r+j r, qr_count*w qr_count from t1,ijw),
t3 as (select q,r,sum(qr_count) qr_sum from t2 group by q,r),
t4 as (select t3.q q,t3.r r,t3.qr_sum qr_sum,t1.qr_count qr_count from t3 join t1 using (q,r)),
t5 as (select mean(qr_sum) mu,stddev(qr_sum) sd from t4)
select
st_aswkb(st_makeenvelope(t4.q*100.0,t4.r*100.0,t4.q*100.0+100.0,t4.r*100.0+100.0)) as geometry,
t4.qr_count qr_count,
(t4.qr_sum-t5.mu)/t5.sd z_score
from t4,t5
order by z_score
    """.strip()
    )


def create_heatmap_macro(conn) -> None:
    conn.sql(
        """
create or replace macro heatmap(_where) as table
with
t1 as (select pickup_q q,pickup_r r,count(*) c from trips where _where group by q,r),
t2 as (select mean(c) mu,stddev(c) sd from t1)
select
st_aswkb(st_makeenvelope(t1.q*100.0,t1.r*100.0,t1.q*100.0+100.0,t1.r*100.0+100.0)) as geometry,
t1.c count,
(t1.c - t2.mu) / t2.sd z_score
from t1,t2
order by z_score
    """.strip()
    )


with duckdb.connect(database=os.path.join(base, "trips.db")) as conn:
    # create_ijw_table(conn)
    # create_hotspot_macro(conn)
    create_heatmap_macro(conn)
    conn.execute(
        f"create or replace table trips as select * from '{parquet}/*.parquet'"
    )