In [None]:
# Package installation and imports - run this first
"""
import importlib.util, sys, subprocess
packages = [ 'pandas', 'pyarrow', 'requests', 'tqdm', 
             'geopandas', 'shapely', 'IPython',
            'fiona', 'pyproj', 'folium', 'meteostat',
           'pathlib', 'datetime', 'collections']
for package_name in packages:
    is_present = importlib.util.find_spec(package_name)
    if is_present is None:
        print(f"{package_name} is not installed")
        !pip install {package_name}
        print(f"{package_name} is now installed")
    else:
        print(f"{package_name} is installed")
        """

In [None]:
pip install pyspark

In [None]:
#create spark session

try:
    import pyspark
except ImportError:
    !pip install pyspark

from pyspark.sql import SparkSession

In [None]:
MASTER = "local[*]"   #
spark = (
    SparkSession.builder
    .appName("TLC-Notebook")
    .master(MASTER)
    .getOrCreate()
)

spark

In [None]:
#initialize paths

IN_TLC   = "data/tlc"                 # parquet inputs (all TLC months)
IN_WX    = "data/weather/hourly"      # your monthly weather parquet files
OUT_ZONE = "out/zone_hour_dropoff"    # notebook writes here
OUT_OD   = "out/od_flows"
OUT_WXJ  = "out/weather_join"

In [None]:
#hand column-name drift
from pyspark.sql import functions as F

def pick(cols, candidates):
    """Return the first matching column from candidates (case-insensitive)."""
    low = {c.lower(): c for c in cols}
    for c in candidates:
        if c.lower() in low:
            return low[c.lower()]
    return None


In [None]:
#Zone × hour demand (pickup or dropoff)
# Read all TLC parquet; compute trips per (zone, hour); write parquet
df = spark.read.parquet(IN_TLC)

PU = ["PULocationID","PUlocationID","pu_location_id"]
DO = ["DOLocationID","DOlocationID","do_location_id"]
TP = ["tpep_pickup_datetime","lpep_pickup_datetime","pickup_datetime","request_datetime"]
TD = ["tpep_dropoff_datetime","lpep_dropoff_datetime","dropoff_datetime","dropOff_datetime"]

KIND = "dropoff"  # change to "pickup" if you want pickup hotspots
zone_col = pick(df.columns, DO if KIND == "dropoff" else PU)
time_col = pick(df.columns, TD if KIND == "dropoff" else TP)
assert zone_col and time_col, "Required TLC columns not found"

result = (
    df.filter(F.col(zone_col).isNotNull())                       # keep rows with a zone id
      .withColumn("zone", F.col(zone_col).cast("int"))           # cast zone to int
      .withColumn("ts", F.to_timestamp(F.col(time_col)))         # parse timestamp
      .withColumn("hour", F.date_trunc("hour", F.col("ts")))     # truncate to hour
      .groupBy("zone","hour").count()                            # aggregate counts
      .withColumnRenamed("count","trips")                        # name output col
)

result.write.mode("overwrite").parquet(OUT_ZONE)
print("Wrote:", OUT_ZONE)


In [None]:
#OD flows (PU → DO) by hour

df = spark.read.parquet(IN_TLC)

pu = pick(df.columns, PU)
do = pick(df.columns, DO)
tp = pick(df.columns, TP)
assert pu and do and tp, "Required TLC columns not found"

flows = (
    df.filter(F.col(pu).isNotNull() & F.col(do).isNotNull())     # require both PU & DO
      .withColumn("pu", F.col(pu).cast("int"))                   # cast ids to int
      .withColumn("do", F.col(do).cast("int"))
      .withColumn("hour", F.date_trunc("hour", F.to_timestamp(F.col(tp))))  # hour key
      .groupBy("pu","do","hour").count()                         # aggregate OD counts
      .withColumnRenamed("count","trips")
)

flows.write.mode("overwrite").parquet(OUT_OD)
print("Wrote:", OUT_OD)


In [None]:
#Weather join → citywide hourly series

# Load zone-hour demand and weather; join on 'hour'; save citywide hourly series
dem = spark.read.parquet(OUT_ZONE)     # from step 3
wx  = spark.read.parquet(IN_WX)        # your hourly weather parquet

# Detect timestamp column in weather parquet (depends on how you saved it)
ts_col = "time" if "time" in wx.columns else ("index" if "index" in wx.columns else None)
assert ts_col, "Weather parquet must have a 'time' (or 'index') column"

wx_hour = (
    wx.withColumn("hour", F.date_trunc("hour", F.to_timestamp(F.col(ts_col))))  # normalize to hour
      .select("hour", *[c for c in ["prcp","temp"] if c in wx.columns])         # keep prcp/temp if present
)

joined = dem.join(wx_hour, on="hour", how="left")                                # left join demand to weather

city = (
    joined.groupBy("hour")                                                       # citywide hourly
          .agg(F.sum("trips").alias("trips"),
               F.first("prcp").alias("prcp"),
               F.first("temp").alias("temp"))
)

city.write.mode("overwrite").parquet(OUT_WXJ)
print("Wrote:", OUT_WXJ)
