In [None]:
"""
Imports all data required for the project.
Processes and saves parquets.
Requires Subway Data CSV and NY Boroughs folder.
"""

# Data Import (Landing Layer)

In [10]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
import requests
import time
import geopandas as gpd
import pandas as pd
import urllib

In [11]:
folder_names = ["taxi_data", "weather_data", "event_data", "subway_data"]

# Create a folder to store the data
data_folder = "../data/landing"
if not os.path.exists(data_folder):
    os.makedirs(data_folder)

# Create the path for each type of dataset
output_dirs = []
# TLC (Yellow taxi), Weather, and Event Data
for target_dir in folder_names:
    dir_path = os.path.join(data_folder, target_dir)
    os.makedirs(dir_path, exist_ok=True)
    output_dirs.append(dir_path)


## Yellow Taxi Data

In [None]:
# Define our range of interest
YEARS = [2024]
MONTHS = range(1, 13)

# Base URL for Yellow Taxi data
BASE_URL = BASE = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_"

# Download Yellow Taxi data
for year in YEARS:
    for month in MONTHS:
        fname = f"{year}-{month:02d}.parquet"
        url = BASE_URL + fname
        output = os.path.join(output_dirs[0], fname)
        if not os.path.exists(output):
            print(f"Downloading {fname} to {output}")
            urllib.request.urlretrieve(url, output)
        else:
            continue

    print(f"Yellow Taxi data for {year} completed.")


Yellow Taxi data for 2024 completed.


#### Get borough coordinates

In [None]:
gdf = gpd.read_file("../data/ny_boroughs/nybb.shp")

# Project to a different coord reference system for better accuracy
gdf_proj = gdf.to_crs(epsg=2263)

# Compute controids to pull weather from
gdf_proj["centroid"] = gdf_proj.geometry.centroid

# Convert back to lat/lon (WGS84) for export
centroids_latlon = gdf_proj.set_geometry("centroid").to_crs(epsg=4326)

# Get lat long of centroids
centroids_latlon["lat"] = centroids_latlon.geometry.y
centroids_latlon["lon"] = centroids_latlon.geometry.x


centroids = centroids_latlon[["BoroName", "lat", "lon"]].copy()

# Add newark airport coords
# From https://www.latlong.net/place/newark-liberty-international-airport-nj-usa-33210.html

# There are very few centroids so I will use pandas for this task

ewr_row = pd.DataFrame([{"BoroName": "EWR", "lat": 40.689491, "lon": -74.174538}])
centroids = pd.concat([centroids, ewr_row], ignore_index=True)

print(centroids)


        BoroName        lat        lon
0  Staten Island  40.580833 -74.153405
1       Brooklyn  40.644746 -73.947855
2         Queens  40.707626 -73.818545
3      Manhattan  40.777239 -73.967200
4          Bronx  40.852635 -73.866508
5            EWR  40.689491 -74.174538


#### Import Taxi Zone Data

In [14]:
# Import taxi zone Shapefiles and find the centroid of each zone

# Add each centroid as a field in the lookup table

zone_lookup = pd.read_csv("../data/import_csvs/taxi_zones/taxi_zone_lookup.csv")

# Read shapefile
shapefile_path = "../data/import_csvs/taxi_zones/shapefiles/taxi_zones.shp"


def find_write_centroids(zone_lookup_df, shapefile_path):
    """
    Find the centroids of the taxi zones and write them to the lookup table.
    Adds both the centroid geometry and its coordinates (lat/lon).
    """
    gdf = gpd.read_file(shapefile_path)
    # Convert to WGS84 for lat/lon
    gdf = gdf.to_crs(epsg=4326)
    for idx, row in gdf.iterrows():
        centroid = row["geometry"].centroid
        zone_lookup_df.loc[
            zone_lookup_df["LocationID"] == row["LocationID"], "centroid"
        ] = centroid
        zone_lookup_df.loc[
            zone_lookup_df["LocationID"] == row["LocationID"], "centroid_lat"
        ] = centroid.y
        zone_lookup_df.loc[
            zone_lookup_df["LocationID"] == row["LocationID"], "centroid_lon"
        ] = centroid.x
    zone_lookup_df.to_csv(
        "../data/import_csvs/taxi_zones/taxi_zone_lookup.csv", index=False, header=True
    )
    return zone_lookup_df.head(3)


# Call the function
find_write_centroids(zone_lookup, shapefile_path)


Unnamed: 0,LocationID,Borough,Zone,service_zone,centroid,centroid_lat,centroid_lon
0,1,EWR,Newark Airport,EWR,POINT (-74.17400009529821 40.691831156065916),40.691831,-74.174
1,2,Queens,Jamaica Bay,Boro Zone,POINT (-73.83129879600592 40.61674466473727),40.616745,-73.831299
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone,POINT (-73.84742222112834 40.8644731447022),40.864473,-73.847422


## Weather Data

In [15]:
# Create session
spark = (
    SparkSession.builder.appName("Data Import")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "America/New_York")
    .config("spark.driver.memory", "8g")
    .config("spark.executor.memory", "8g")
    .getOrCreate()
)

In [None]:
# Attributes we want
WEATHER_LIST = [
    "temperature_2m",
    "snow_depth",
    "snowfall",
    "rain",
    "precipitation",
    "relative_humidity_2m",
    "weather_code",
    "cloud_cover",
    "wind_speed_10m",
    "wind_gusts_10m",
    "direct_radiation",
    "shortwave_radiation",
    "apparent_temperature",
    "surface_pressure",
    "is_day",
    "sunshine_duration",
]


def get_weather_data(lat, lon, start="2024-01-01", end="2024-12-31"):
    """
    Pulls hourly weather data for given borough location from Open Meteo API.
    Returns a dataframe with timestamps and weather attributes

    """
    weather_url = "https://archive-api.open-meteo.com/v1/archive"
    params = {
        "latitude": lat,
        "longitude": lon,
        "start_date": start,
        "end_date": end,
        "hourly": ",".join(WEATHER_LIST),
        "timezone": "America/New_York",
    }

    # Make API request
    response = requests.get(weather_url, params=params)
    response.raise_for_status()
    data = response.json()

    # Extract timestamps
    timestamps = data["hourly"]["time"]
    n = len(timestamps)

    # Build list of dicts directly from JSON
    records = []
    for i in range(n):
        record = {"timestamp": timestamps[i]}
        for attribute in WEATHER_LIST:
            record[attribute] = data["hourly"].get(attribute, [None] * n)[i]
        records.append(record)

    sdf = spark.createDataFrame(records)

    return sdf

In [None]:
# Start with an empty Spark DataFrame
weather_sdf = None

# Fetch weather data for each borough
for idx, borough in centroids.iterrows():
    success = False
    attempts = 0

    while not success:
        try:
            curr_sdf = get_weather_data(borough["lat"], borough["lon"]).withColumn(
                "BoroName", lit(borough["BoroName"])
            )

            # Union (combine) new sdf into main sdf
            if weather_sdf is None:
                weather_sdf = curr_sdf
            else:
                weather_sdf = weather_sdf.unionByName(curr_sdf)

            success = True

        # Avoid being rate limited by sleeping
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 429:
                attempts += 1
                wait_time = min(10 * attempts, 60)
                print(f"Hit rate limit, sleeping for {wait_time} seconds")
                time.sleep(wait_time)
            else:
                print("HTTP error for borough")
                break
        # Non HTTP related error
        except Exception as e:
            print("Other error has occurred")
            break

# Save to Parquet after we read in data
weather_dir = os.path.join(data_folder, "weather_data")

print(f"Number of rows read in {weather_sdf.count()}")

if weather_sdf is not None:
    weather_sdf.write.mode("overwrite").parquet(weather_dir)


Number of rows read in 52704


                                                                                

## Subway (Metro) Data

In [18]:
SUB_PARQUET_DIR = "../data/landing/subway_data"

# Create directory if it doesn't exist (defensive as we should have already created it)
os.makedirs(SUB_PARQUET_DIR, exist_ok=True)

num_rows = 0

# Read only 2024's csv into landing/subway_data
for year in [2024]:
    sub_sdf = spark.read.csv(
        f"../data/import_csvs/subway_data/MTA_{year}.csv", header=True, inferSchema=True
    )
    num_rows += sub_sdf.count()
    sub_sdf.write.parquet(f"{SUB_PARQUET_DIR}/{year}", mode="overwrite")

print(f"Records read in: {num_rows}")



Records read in: 27023937


                                                                                

In [19]:
print(num_rows)

27023937


In [20]:
# End spark session
spark.stop()