In [1]:
import hopsworks
import pandas as pd
import os
from zoneinfo import ZoneInfo
import requests
import tqdm as notebook_tqdm

  from .autonotebook import tqdm as notebook_tqdm


Login and create a hopworks project with api_key

In [2]:

PROJECT_NAME = "Air_Pollution_AQI_2026"
HOPSWORKS_API_KEY = "TdfX0A4MLqJC0IIn.sAY3ZbpVnGwgbRwCnjibbiEhsVy5hrlFPyIg6Rr2yHW8ptEKRhJ3OEx0an3itvtw"

project = hopsworks.login(
    project=PROJECT_NAME,
    api_key_value=HOPSWORKS_API_KEY
)

print(f"Connected to Hopsworks project: {project.name}")

2026-01-05 23:38:20,341 INFO: Initializing external client
2026-01-05 23:38:20,341 INFO: Base URL: https://c.app.hopsworks.ai:443




To ensure compatibility please install the latest bug fix release matching the minor version of your backend (4.2) by running 'pip install hopsworks==4.2.*'


2026-01-05 23:38:21,963 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1301658
Connected to Hopsworks project: Air_Pollution_AQI_2026


In [3]:
TZ = ZoneInfo("America/New_York")

In [4]:
path_dataset = "data/10_2024.csv"
df = pd.read_csv(path_dataset, low_memory=False)

# Filter to JFK only
df = df[df["ORIGIN"] == "JFK"].copy()

print("Rows after JFK filter:", len(df))
print("Columns available:", len(df.columns))


Rows after JFK filter: 10732
Columns available: 16


In [5]:
# 2) Drop cancelled/diverted flights (recommended)
df["CANCELLED"] = pd.to_numeric(df["CANCELLED"], errors="coerce").fillna(0)
df["DIVERTED"] = pd.to_numeric(df["DIVERTED"], errors="coerce").fillna(0)
df = df[(df["CANCELLED"] == 0) & (df["DIVERTED"] == 0)].copy()
print("Rows after removing cancelled/diverted:", len(df))

Rows after removing cancelled/diverted: 10651


In [6]:
keep_cols = [
    "QUARTER", "MONTH", "DAY_OF_MONTH", "DAY_OF_WEEK",
    "FL_DATE", "CRS_DEP_TIME",
    "OP_UNIQUE_CARRIER", "DEST", "DISTANCE",
    "DEP_DELAY"
]
missing = [c for c in keep_cols if c not in df.columns]
if missing:
    raise ValueError(f"Missing required columns: {missing}")

df = df[keep_cols].copy()

In [7]:
# ----------------------------
# TIME CLEANING
# ----------------------------
# CRS_DEP_TIME -> 4-digit string "HHMM"
df["CRS_DEP_TIME"] = pd.to_numeric(df["CRS_DEP_TIME"], errors="coerce")
df = df.dropna(subset=["FL_DATE", "CRS_DEP_TIME"])

df["CRS_DEP_TIME"] = df["CRS_DEP_TIME"].astype(int).astype(str).str.zfill(4)

# FL_DATE -> date (ignore time portion)
df["FL_DATE"] = pd.to_datetime(df["FL_DATE"], errors="coerce")
df = df.dropna(subset=["FL_DATE"])

# Build scheduled departure local timestamp
df["sched_dep_local"] = pd.to_datetime(
    df["FL_DATE"].dt.strftime("%Y-%m-%d") + " " + df["CRS_DEP_TIME"],
    format="%Y-%m-%d %H%M",
    errors="coerce"
)
df = df.dropna(subset=["sched_dep_local"])
df["sched_dep_local"] = df["sched_dep_local"].dt.tz_localize(TZ)



In [8]:
# ----------------------------
# TARGET CLEANING (keep raw dep_delay as requested)
# ----------------------------
df["DEP_DELAY"] = pd.to_numeric(df["DEP_DELAY"], errors="coerce")

In [9]:
# ----------------------------
# PRIMARY KEY
# ----------------------------
df["flight_id"] = (
    "JFK_" +
    df["DEST"].astype(str) + "_" +
    df["OP_UNIQUE_CARRIER"].astype(str) + "_" +
    df["sched_dep_local"].astype(str)
)

In [10]:
# ----------------------------
# FINAL DATAFRAME (rename to clean names)
# ----------------------------
df_fg = df.rename(columns={
    "QUARTER": "quarter",
    "MONTH": "month",
    "DAY_OF_MONTH": "day_of_month",
    "DAY_OF_WEEK": "day_of_week",
    "CRS_DEP_TIME": "crs_dep_time",
    "OP_UNIQUE_CARRIER": "reporting_airline",
    "DEST": "dest",
    "DISTANCE": "distance",
    "DEP_DELAY": "dep_delay",
})

df_fg = df_fg[
    [
        "flight_id",
        "sched_dep_local",
        "quarter",
        "month",
        "day_of_month",
        "day_of_week",
        "crs_dep_time",
        "reporting_airline",
        "dest",
        "distance",
        "dep_delay",
    ]
].copy()

print(df_fg.head())
print("Final rows:", len(df_fg))
print("Unique flight_id:", df_fg["flight_id"].nunique())

                               flight_id           sched_dep_local  quarter  \
85  JFK_BGR_9E_2024-10-01 22:15:00-04:00 2024-10-01 22:15:00-04:00        4   
86  JFK_BTV_9E_2024-10-01 09:59:00-04:00 2024-10-01 09:59:00-04:00        4   
87  JFK_BTV_9E_2024-10-01 22:45:00-04:00 2024-10-01 22:45:00-04:00        4   
88  JFK_BUF_9E_2024-10-01 09:29:00-04:00 2024-10-01 09:29:00-04:00        4   
89  JFK_BUF_9E_2024-10-01 12:59:00-04:00 2024-10-01 12:59:00-04:00        4   

    month  day_of_month  day_of_week crs_dep_time reporting_airline dest  \
85     10             1            2         2215                9E  BGR   
86     10             1            2         0959                9E  BTV   
87     10             1            2         2245                9E  BTV   
88     10             1            2         0929                9E  BUF   
89     10             1            2         1259                9E  BUF   

    distance  dep_delay  
85     382.0      -12.0  
86     266.0    

In [14]:
# ----------------------------
# HOPSWORKS
# ----------------------------
project = hopsworks.login(project=PROJECT_NAME, api_key_value=HOPSWORKS_API_KEY)
fs = project.get_feature_store()

bts_fg = fs.get_or_create_feature_group(
    name="bts_jfk_selected_features_fg",
    version=1,
    primary_key=["flight_id"],
    event_time="sched_dep_local",
    description="BTS JFK departures: selected predictors + dep_delay target"
)

bts_fg.insert(df_fg, write_options={"wait_for_job": True})

print("Inserted month into bts_jfk_selected_features_fg v1")

2026-01-05 23:47:11,920 INFO: Closing external client and cleaning up certificates.
2026-01-05 23:47:11,923 INFO: Connection closed.
2026-01-05 23:47:11,925 INFO: Initializing external client
2026-01-05 23:47:11,925 INFO: Base URL: https://c.app.hopsworks.ai:443




To ensure compatibility please install the latest bug fix release matching the minor version of your backend (4.2) by running 'pip install hopsworks==4.2.*'


2026-01-05 23:47:13,324 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1301658


FeatureStoreException: Failed to write to delta table in external cluster. Make sure datanode load balancer has been setup on the cluster.

Get Weather data from Open Mateo

In [None]:
"""# ----------------------------
# 1) Derive START_DATE / END_DATE from BTS df_fg
# ----------------------------
TZ_NAME = "America/New_York"
TZ = ZoneInfo(TZ_NAME)

# df_fg should already exist from your BTS cleaning cell
# Ensure sched_dep_local is datetime
df_fg["sched_dep_local"] = pd.to_datetime(df_fg["sched_dep_local"], errors="coerce")
df_fg = df_fg.dropna(subset=["sched_dep_local"])

min_ts = df_fg["sched_dep_local"].min()
max_ts = df_fg["sched_dep_local"].max()

START_DATE = min_ts.date().isoformat()
END_DATE = max_ts.date().isoformat()

print("Derived weather range from BTS:")
print("START_DATE =", START_DATE)
print("END_DATE   =", END_DATE)

In [None]:
"""# ----------------------------
# 2) Fetch Open-Meteo hourly historical weather for JFK
#    (exact variables you requested)
# ----------------------------
LAT = 40.6413
LON = -73.7781

OPEN_METEO_HIST_URL = "https://archive-api.open-meteo.com/v1/archive"

HOURLY_VARS = [
    "weathercode",
    "windspeed_10m",
    "windgusts_10m",
    "temperature_2m",
    "precipitation",
    "snowfall",
    "visibility",
]

params = {
    "latitude": LAT,
    "longitude": LON,
    "start_date": START_DATE,
    "end_date": END_DATE,
    "hourly": ",".join(HOURLY_VARS),
    "timezone": TZ_NAME,  # return timestamps in NY local time
}

resp = requests.get(OPEN_METEO_HIST_URL, params=params, timeout=60)
resp.raise_for_status()
data = resp.json()

if "hourly" not in data or "time" not in data["hourly"]:
    raise ValueError(f"Unexpected Open-Meteo response structure. Top-level keys: {list(data.keys())}")

hourly = data["hourly"]
print("✅ Open-Meteo response received. Hours:", len(hourly["time"]))

In [None]:
"""# ----------------------------
# 3) Build weather dataframe (hourly) with clean schema
# ----------------------------
times = pd.to_datetime(hourly["time"], errors="coerce")

# Open-Meteo returns local timestamps when timezone is specified, usually as naive strings.
# Localize to America/New_York safely around DST.
weather_hour_local = times.dt.tz_localize(
    TZ,
    ambiguous="infer",
    nonexistent="shift_forward"
)

df_weather = pd.DataFrame({
    "weather_hour_local": weather_hour_local
})

# Rename to our stored feature names
df_weather["weather_code"]   = hourly.get("weathercode")        # WMO code
df_weather["wind_speed_ms"]  = hourly.get("windspeed_10m")
df_weather["wind_gust_ms"]   = hourly.get("windgusts_10m")
df_weather["temp_c"]         = hourly.get("temperature_2m")
df_weather["precip_mm"]      = hourly.get("precipitation")
df_weather["snowfall_cm"]    = hourly.get("snowfall")
df_weather["visibility_m"]   = hourly.get("visibility")

# Primary key for each weather hour
df_weather["weather_id"] = df_weather["weather_hour_local"].astype(str)

# Optional metadata (handy for debugging)
df_weather["station"] = "JFK"
df_weather["latitude"] = float(LAT)
df_weather["longitude"] = float(LON)

# Force numeric types where appropriate
num_cols = [
    "weather_code",
    "wind_speed_ms",
    "wind_gust_ms",
    "temp_c",
    "precip_mm",
    "snowfall_cm",
    "visibility_m",
]
for c in num_cols:
    df_weather[c] = pd.to_numeric(df_weather[c], errors="coerce")

df_weather = df_weather.dropna(subset=["weather_hour_local"])

# Final column order for Hopsworks
df_weather_fg = df_weather[
    [
        "weather_id",
        "weather_hour_local",
        "station",
        "latitude",
        "longitude",
        "weather_code",
        "wind_speed_ms",
        "wind_gust_ms",
        "temp_c",
        "precip_mm",
        "snowfall_cm",
        "visibility_m",
    ]
].copy()

print(df_weather_fg.head())
print("Rows:", len(df_weather_fg))
print("Min hour:", df_weather_fg["weather_hour_local"].min())
print("Max hour:", df_weather_fg["weather_hour_local"].max())

In [None]:
"""# ----------------------------
# 4) Create/Insert into Hopsworks Weather Feature Group
# ----------------------------
# Assumes you already have: project, fs from earlier BTS code
# If not, uncomment below:
# import hopsworks
# project = hopsworks.login(project=PROJECT_NAME, api_key_value=API_KEY)
# fs = project.get_feature_store()

weather_fg = fs.get_or_create_feature_group(
    name="weather_jfk_hourly_fg",
    version=1,
    primary_key=["weather_id"],
    event_time="weather_hour_local",
    description="Hourly JFK weather from Open-Meteo (weathercode, wind, temp, precip, snowfall, visibility)"
)

weather_fg.insert(df_weather_fg, write_options={"wait_for_job": True})

print("✅ Feature group created/updated: weather_jfk_hourly_fg v1")