In [2]:
import sys
from pathlib import Path
import warnings
warnings.filterwarnings("ignore", module="IPython")

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml

if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    # Strip ~/notebooks/ccfraud from PYTHON_PATH if notebook started in one of these subdirectories
    if root_dir.parts[-1:] == ('airquality',):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ('notebooks',):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir) 
    print("Local environment")

print(f"Root dir: {root_dir}")

# Add the root directory to the `PYTHONPATH` 
if root_dir not in sys.path:
    sys.path.append(root_dir)
    print(f"Added the following directory to the PYTHONPATH: {root_dir}")

# Set the environment variables from the file <root_dir>/.env
from mlfs import config
settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

Local environment
Root dir: c:\Users\20609\Documents\KTHP5P6\scablemachine\lab1\311-service-resolution-prediction
HopsworksSettings initialized!


In [3]:
import datetime
import requests
import pandas as pd
import hopsworks
from mlfs.airquality import util
import datetime
from pathlib import Path
import json
import re
import os
import warnings
warnings.filterwarnings("ignore")


project = hopsworks.login(
    api_key_value="YOUR_API_KEY",  
    host="c.app.hopsworks.ai", 
    project="Lab1_xiaotong",  
    engine="python"  
)

2026-01-10 14:17:40,853 INFO: Initializing external client
2026-01-10 14:17:40,853 INFO: Base URL: https://c.app.hopsworks.ai:443






2026-01-10 14:17:42,309 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1286339


In [4]:
import requests
import pandas as pd
from tqdm import tqdm


START_DATE = "2025-12-18"
END_DATE   = "2026-01-09"

BASE = "https://data.cityofnewyork.us/resource/erm2-nwe9.json"

APP_TOKEN = "" 
headers = {}
if APP_TOKEN:
    headers["X-App-Token"] = APP_TOKEN


where = (
    f"created_date >= '{START_DATE}T00:00:00.000' "
    f"AND created_date < '{END_DATE}T00:00:00.000'"
)

limit = 500       
offset = 0
max_pages = 100

all_rows = []

for _ in tqdm(range(max_pages), desc="Downloading NYC 311"):
    params = {
        "$where": where,
        "$limit": limit,
        "$offset": offset,
        "$order": "created_date ASC, unique_key ASC",
    }

    r = requests.get(BASE, params=params, headers=headers, timeout=120)
    r.raise_for_status()
    batch = r.json()

    if not batch:
        break

    all_rows.extend(batch)
    offset += limit

df_request_append = pd.DataFrame(all_rows)

print("df_request_append shape:", df_request_append.shape)
df_request_append.head()


Downloading NYC 311:   0%|          | 0/100 [00:00<?, ?it/s]

Downloading NYC 311: 100%|██████████| 100/100 [01:35<00:00,  1.04it/s]


df_request_append shape: (50000, 44)


Unnamed: 0,unique_key,created_date,closed_date,agency,agency_name,complaint_type,descriptor,location_type,incident_zip,incident_address,...,descriptor_2,facility_type,due_date,vehicle_type,taxi_pick_up_location,bridge_highway_name,bridge_highway_segment,bridge_highway_direction,road_ramp,taxi_company_borough
0,67216735,2025-12-18T00:00:02.000,2025-12-18T01:05:22.000,NYPD,New York City Police Department,Noise - Residential,Banging/Pounding,Residential Building/House,10019,105 WEST 55 STREET,...,,,,,,,,,,
1,67220025,2025-12-18T00:00:15.000,2025-12-18T03:42:30.000,NYPD,New York City Police Department,Blocked Driveway,Partial Access,Street/Sidewalk,11234,1138 EAST 57 STREET,...,,,,,,,,,,
2,67216223,2025-12-18T00:00:47.000,2025-12-18T00:35:13.000,DSNY,Department of Sanitation,Vendor Enforcement,Food Vendor,Street,11220,5108 4 AVENUE,...,In Prohibited Area,,,,,,,,,
3,67234348,2025-12-18T00:00:50.000,2025-12-19T00:00:00.000,DOB,Department of Buildings,Elevator,Elevator - Single Device On Property/No Altern...,,10032,522 WEST 157 STREET,...,,,,,,,,,,
4,67221913,2025-12-18T00:01:01.000,2025-12-18T00:45:32.000,NYPD,New York City Police Department,Illegal Parking,Blocked Sidewalk,Street/Sidewalk,11220,514 60 STREET,...,,,,,,,,,,


In [6]:
selected_cols = [
    "created_date",
    "closed_date",
    "agency",
    "agency_name",
    "complaint_type",
    "descriptor",
    "location_type",
    "borough",
    # "latitude",
    # "longitude"
]

df_selected = df_request_append[selected_cols].copy()
df_selected["created_date"] = pd.to_datetime(df_selected["created_date"])
df_selected["closed_date"] = pd.to_datetime(df_selected["closed_date"])
df_clean = df_selected.dropna()
df_clean

Unnamed: 0,created_date,closed_date,agency,agency_name,complaint_type,descriptor,location_type,borough
0,2025-12-18 00:00:02,2025-12-18 01:05:22,NYPD,New York City Police Department,Noise - Residential,Banging/Pounding,Residential Building/House,MANHATTAN
1,2025-12-18 00:00:15,2025-12-18 03:42:30,NYPD,New York City Police Department,Blocked Driveway,Partial Access,Street/Sidewalk,BROOKLYN
2,2025-12-18 00:00:47,2025-12-18 00:35:13,DSNY,Department of Sanitation,Vendor Enforcement,Food Vendor,Street,BROOKLYN
4,2025-12-18 00:01:01,2025-12-18 00:45:32,NYPD,New York City Police Department,Illegal Parking,Blocked Sidewalk,Street/Sidewalk,BROOKLYN
5,2025-12-18 00:01:15,2025-12-18 11:27:48,NYPD,New York City Police Department,Blocked Driveway,No Access,Street/Sidewalk,QUEENS
...,...,...,...,...,...,...,...,...
49991,2025-12-23 12:29:36,2025-12-23 12:30:12,NYPD,New York City Police Department,Traffic,Truck Route Violation,Street/Sidewalk,MANHATTAN
49992,2025-12-23 12:29:39,2025-12-23 13:24:42,NYPD,New York City Police Department,Illegal Parking,Blocked Hydrant,Street/Sidewalk,QUEENS
49993,2025-12-23 12:29:45,2025-12-23 12:31:59,NYPD,New York City Police Department,Noise - Residential,Loud Music/Party,Residential Building/House,BRONX
49994,2025-12-23 12:29:51,2025-12-23 14:34:30,NYPD,New York City Police Department,Urinating in Public,,Store/Commercial,BRONX


In [19]:
import holidays
import pytz


us_holidays = holidays.US()
ny_tz = pytz.timezone("America/New_York")
df_clean = df_clean.copy()
df_clean["created_date"] = pd.to_datetime(df_clean["created_date"])
df_clean["created_date_ny"] = df_clean["created_date"].dt.tz_localize("UTC").dt.tz_convert(ny_tz)

df_clean["date"] = df_clean["created_date_ny"].dt.date
df_clean["hour"] = df_clean["created_date_ny"].dt.hour
df_clean["weekday"] = df_clean["created_date_ny"].dt.weekday  # Monday=0

df_clean["is_holiday"] = df_clean["date"].apply(
    lambda x: 1 if x in us_holidays else 0
).astype("int64")

df_clean["is_work_day"] = (
    (df_clean["weekday"] < 5) & (df_clean["is_holiday"] == 0)
).astype("int64")

df_clean["is_work_hours"] = (
    (df_clean["is_work_day"] == 1) &
    (df_clean["hour"] >= 9) &
    (df_clean["hour"] < 17)
).astype("int64")

cols_to_drop = [
    "created_date_ny",
    "date",
    "hour",
    "weekday",
    "is_holiday"
]

df_clean = df_clean.drop(columns=cols_to_drop)

df_clean["is_work_day"] = df_clean["is_work_day"].astype("int64")
df_clean["is_work_hours"] = df_clean["is_work_hours"].astype("int64")

print("Column types:")
print(df_clean[["is_work_day", "is_work_hours"]].dtypes)
print("\nFirst few rows:")
df_clean.head()

Column types:
is_work_day      int64
is_work_hours    int64
dtype: object

First few rows:


Unnamed: 0,created_date,closed_date,agency,agency_name,complaint_type,descriptor,location_type,borough,is_work_day,is_work_hours
0,2025-12-18 00:00:02,2025-12-18 01:05:22,NYPD,New York City Police Department,Noise - Residential,Banging/Pounding,Residential Building/House,MANHATTAN,1,0
1,2025-12-18 00:00:15,2025-12-18 03:42:30,NYPD,New York City Police Department,Blocked Driveway,Partial Access,Street/Sidewalk,BROOKLYN,1,0
2,2025-12-18 00:00:47,2025-12-18 00:35:13,DSNY,Department of Sanitation,Vendor Enforcement,Food Vendor,Street,BROOKLYN,1,0
4,2025-12-18 00:01:01,2025-12-18 00:45:32,NYPD,New York City Police Department,Illegal Parking,Blocked Sidewalk,Street/Sidewalk,BROOKLYN,1,0
5,2025-12-18 00:01:15,2025-12-18 11:27:48,NYPD,New York City Police Department,Blocked Driveway,No Access,Street/Sidewalk,QUEENS,1,0


In [7]:
import pandas as pd

BOROUGH_COORDS = {
    "MANHATTAN": (40.7829, -73.9654),
    "BROOKLYN": (40.6928, -73.9903),
    "QUEENS": (40.7769, -73.8740),
    "BRONX": (40.8506, -73.8769),
    "STATEN ISLAND": (40.6437, -74.0736),
}


In [8]:
import requests
import pandas as pd

# - temperature (average over the day)
# - precipitation (the total over the day)
# - wind speed (average over the day)

TZ = "America/New_York"
START_DATE = "2025-12-18"
END_DATE = "2026-01-09"
HOURLY_VARS = ["temperature_2m", "precipitation", "wind_speed_10m"]


def _fetch_open_meteo_hourly(lat: float, lon: float) -> pd.DataFrame:
    url = "https://archive-api.open-meteo.com/v1/archive"
    params = {
        "latitude": float(lat),
        "longitude": float(lon),
        "start_date": START_DATE,
        "end_date": END_DATE,
        "hourly": ",".join(HOURLY_VARS),
        "timezone": TZ,
    }
    r = requests.get(url, params=params, timeout=120)
    r.raise_for_status()
    data = r.json()
    hourly = (data or {}).get("hourly") or {}
    if "time" not in hourly:
        return pd.DataFrame(columns=["datetime"] + HOURLY_VARS)

    dfh = pd.DataFrame(hourly)
    dfh["datetime"] = pd.to_datetime(dfh["time"], errors="coerce")
    return dfh.drop(columns=["time"])


_frames = []
for borough, (lat, lon) in BOROUGH_COORDS.items():
    dfh = _fetch_open_meteo_hourly(lat, lon)
    if dfh.empty:
        continue

    dfh["borough"] = borough
    dfh["date"] = dfh["datetime"].dt.date

    daily = (
        dfh.groupby(["borough", "date"], as_index=False)
        .agg(
            temperature_mean=("temperature_2m", "mean"),
            precipitation_sum=("precipitation", "sum"),
            wind_speed_mean=("wind_speed_10m", "mean"),
        )
    )
    _frames.append(daily)

borough_weather_daily = (
    pd.concat(_frames, ignore_index=True)
    if _frames
    else pd.DataFrame(columns=["borough", "date", "temperature_mean", "precipitation_sum", "wind_speed_mean"])
)

_all_dates = pd.date_range(START_DATE, END_DATE, freq="D").date
_grid = pd.MultiIndex.from_product(
    [list(BOROUGH_COORDS.keys()), _all_dates],
    names=["borough", "date"],
).to_frame(index=False)

borough_weather_daily = _grid.merge(borough_weather_daily, on=["borough", "date"], how="left")
borough_weather_daily["latitude"] = borough_weather_daily["borough"].map(lambda b: BOROUGH_COORDS[b][0])
borough_weather_daily["longitude"] = borough_weather_daily["borough"].map(lambda b: BOROUGH_COORDS[b][1])
borough_weather_daily = borough_weather_daily.sort_values(["date", "borough"]).reset_index(drop=True)

borough_weather_daily.head(200)

Unnamed: 0,borough,date,temperature_mean,precipitation_sum,wind_speed_mean,latitude,longitude
0,BRONX,2025-12-18,3.179167,0.0,7.700000,40.8506,-73.8769
1,BROOKLYN,2025-12-18,3.237500,0.0,8.637500,40.6928,-73.9903
2,MANHATTAN,2025-12-18,2.066667,0.0,6.137500,40.7829,-73.9654
3,QUEENS,2025-12-18,2.641667,0.0,6.641667,40.7769,-73.8740
4,STATEN ISLAND,2025-12-18,3.387500,0.0,9.454167,40.6437,-74.0736
...,...,...,...,...,...,...,...
110,BRONX,2026-01-09,5.458333,2.9,11.791667,40.8506,-73.8769
111,BROOKLYN,2026-01-09,6.550000,2.0,11.987500,40.6928,-73.9903
112,MANHATTAN,2026-01-09,5.429167,3.0,10.183333,40.7829,-73.9654
113,QUEENS,2026-01-09,5.775000,4.5,11.370833,40.7769,-73.8740


In [9]:
fs = project.get_feature_store() 

In [10]:
weather_fg = fs.get_or_create_feature_group(
    name="nyc_weather_daily",
    version=1,
    primary_key=["borough", "date"],
    event_time="date",
    description="Daily borough-level weather features for NYC (mean temperature, total precipitation, mean wind speed)."
)
weather_fg.insert(
    borough_weather_daily,
    write_options={
        "wait_for_job": True
    }
)


Uploading Dataframe: 100.00% |██████████| Rows 115/115 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: nyc_weather_daily_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1286339/jobs/named/nyc_weather_daily_1_offline_fg_materialization/executions
2026-01-10 14:39:25,201 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2026-01-10 14:39:31,589 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2026-01-10 14:40:51,561 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2026-01-10 14:40:51,734 INFO: Waiting for log aggregation to finish.
2026-01-10 14:41:00,397 INFO: Execution finished successfully.


(Job('nyc_weather_daily_1_offline_fg_materialization', 'SPARK'), None)

In [20]:
service_request_fg = fs.get_or_create_feature_group(
    name="nyc_311_service_request",
    version=1,
    description="NYC 311 service requests",
    primary_key=[
        "created_date",
        "agency",
        "complaint_type",
        "borough"
    ]
)
service_request_fg.insert(
    df_clean,
    write_options={
        "wait_for_job": True
    }
)


Uploading Dataframe: 100.00% |██████████| Rows 39016/39016 | Elapsed Time: 00:07 | Remaining Time: 00:00


Launching job: nyc_311_service_request_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1286339/jobs/named/nyc_311_service_request_1_offline_fg_materialization/executions
2026-01-10 15:01:15,392 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2026-01-10 15:01:21,778 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2026-01-10 15:03:07,195 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2026-01-10 15:03:07,362 INFO: Waiting for log aggregation to finish.
2026-01-10 15:03:19,433 INFO: Execution finished successfully.


(Job('nyc_311_service_request_1_offline_fg_materialization', 'SPARK'), None)