In [None]:
 !pip install hopsworks[python] xgboost requests matplotlib pandas numpy scikit-learn


Collecting hopsworks[python]
  Downloading hopsworks-4.3.1-py3-none-any.whl.metadata (11 kB)
Collecting pyhumps==1.6.1 (from hopsworks[python])
  Downloading pyhumps-1.6.1-py3-none-any.whl.metadata (3.7 kB)
Collecting furl (from hopsworks[python])
  Downloading furl-2.1.4-py2.py3-none-any.whl.metadata (25 kB)
Collecting boto3 (from hopsworks[python])
  Downloading boto3-1.40.11-py3-none-any.whl.metadata (6.7 kB)
Collecting numpy
  Downloading numpy-1.26.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pyjks (from hopsworks[python])
  Downloading pyjks-20.0.0-py2.py3-none-any.whl.metadata (1.7 kB)
Collecting mock (from hopsworks[python])
  Downloading mock-5.2.0-py3-none-any.whl.metadata (3.1 kB)
Collecting avro==1.11.3 (from hopsworks[python])
  Downloading avro-1.11.3.tar.gz (90 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import os
import sys
import json
import math
import requests
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# XGBoost
try:
    import xgboost as xgb
except ImportError:
    raise ImportError("xgboost is not installed. Please `pip install xgboost` and retry.")

# Hopsworks
try:
    import hopsworks
except ImportError:
    raise ImportError("hopsworks is not installed. Please `pip install hopsworks[python]` and retry.")

from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

In [None]:
# --------------
# Configuration
# --------------
FEATURE_GROUP_NAME = "aqi_weather_features"
FEATURE_GROUP_VER  = 2

# Location (Islamabad example)
LATITUDE  = 33.5973
LONGITUDE = 73.0479

# Forecast horizon in hours
HORIZON_H = 72

# Timezone for plotting & API alignment
TZ = "Asia/Karachi"

# Max lag window (hours) used in features
MAX_LAG_H = 120  # 5 days

# Output paths
ARTIFACT_DIR = "xgb_aqi_artifacts"
PLOTS_DIR    = os.path.join(ARTIFACT_DIR, "plots")
os.makedirs(PLOTS_DIR, exist_ok=True)
os.makedirs(ARTIFACT_DIR, exist_ok=True)

In [None]:
def create_lag_features(df: pd.DataFrame, feat_cols, lags=None):
    """Create lag & rolling features up to 5 days for given feature columns.
    Expects df sorted by time ascending and contains feat_cols.
    """
    if lags is None:
        lags = [1, 2, 3, 6, 12, 24, 48, 72, 96, 120]
    out = df.copy()
    for f in feat_cols:
        for lag in lags:
            out[f"{f}_lag_{lag}"] = out[f].shift(lag)
        out[f"{f}_roll_mean_24"] = out[f].rolling(24, min_periods=24).mean()
        out[f"{f}_roll_std_24"]  = out[f].rolling(24, min_periods=24).std()
        out[f"{f}_roll_mean_72"] = out[f].rolling(72, min_periods=72).mean()
        out[f"{f}_roll_std_72"]  = out[f].rolling(72, min_periods=72).std()
    return out

In [None]:
def ensure_utc(ts_series: pd.Series) -> pd.Series:
    s = pd.to_datetime(ts_series)
    try:
        if s.dt.tz is None:
            return s.dt.tz_localize("UTC")
        else:
            return s.dt.tz_convert("UTC")
    except AttributeError:
        # if series has NaT or not datetime-like
        s = pd.to_datetime(s, errors="coerce")
        s = s.dt.tz_localize("UTC")
        return s


def utc_to_tz(ts_series: pd.Series, tz: str) -> pd.Series:
    s = ensure_utc(ts_series)
    return s.dt.tz_convert(tz)


def largest_lag_from_cols(cols) -> int:
    max_lag = 0
    for c in cols:
        if "_lag_" in c:
            try:
                max_lag = max(max_lag, int(c.split("_lag_")[-1]))
            except Exception:
                pass
    return max_lag


def metrics(y_true, y_pred):
    mae = mean_absolute_error(y_true, y_pred)
    rmse = mean_squared_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    return mae, rmse, r2

In [None]:
# ------------------------
# 1) Load data (Hopsworks)
# ------------------------
print("[1/7] Logging into Hopsworks and reading Feature Group...")
project = hopsworks.login()
fs = project.get_feature_store()
fg = fs.get_feature_group(name=FEATURE_GROUP_NAME, version=FEATURE_GROUP_VER)
df_raw = fg.read()

# Sort & select columns
print("[info] Rows fetched:", len(df_raw))
df_raw = df_raw.sort_values("time", ascending=True).reset_index(drop=True)

cols_needed = [
    "time", "pm_10", "pm_25", "carbon_monoxidegm", "nitrogen_dioxide",
    "sulphur_dioxide", "ozone", "us_aqi"
]

missing = [c for c in cols_needed if c not in df_raw.columns]
if missing:
    raise ValueError(f"Missing required columns in Feature Group: {missing}")

# Prepare base dataframe
df = df_raw[cols_needed].copy()
df["time"] = pd.to_datetime(df["time"])  # may be tz-naive or tz-aware

# Keep a tz-aware UTC index internally
df["time_utc"] = ensure_utc(df["time"])  # tz-aware UTC

[1/7] Logging into Hopsworks and reading Feature Group...
Copy your Api Key (first register/login): https://c.app.hopsworks.ai/account/api/generated

Paste it here: ··········




To ensure compatibility please install the latest bug fix release matching the minor version of your backend (4.2) by running 'pip install hopsworks==4.2.*'



Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1239199
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.03s) 
[info] Rows fetched: 21479


In [None]:
# -----------------------------
# 2) Feature engineering (lags)
# -----------------------------
print("[2/7] Building lag & rolling features (up to 120h)...")
features = ["pm_10", "pm_25", "carbon_monoxidegm", "nitrogen_dioxide", "sulphur_dioxide", "ozone"]

work = df.set_index("time_utc")[features + ["us_aqi"]].copy()
work = create_lag_features(work, features)
work.dropna(inplace=True)

all_features = [c for c in work.columns if c != "us_aqi"]
X = work[all_features]
y = work["us_aqi"]

# ---------------------------------------
# 3) Train/test split (time-aware split)
# ---------------------------------------
print("[3/7] Splitting train/test by time (80/20)...")
split_idx = int(len(work) * 0.8)
X_train, X_test = X.iloc[:split_idx], X.iloc[split_idx:]
y_train, y_test = y.iloc[:split_idx], y.iloc[split_idx:]

[2/7] Building lag & rolling features (up to 120h)...
[3/7] Splitting train/test by time (80/20)...


In [None]:
 #-----------------------
# 4) Train XGBoost model
# -----------------------
print("[4/7] Training XGBoost regressor...")

model = xgb.XGBRegressor(
    n_estimators=800,
    learning_rate=0.05,
    max_depth=8,
    subsample=0.8,
    colsample_bytree=0.8,
    reg_lambda=1.0,
    objective="reg:squarederror",
    random_state=42,
    n_jobs=-1,
    tree_method="hist",
)
model.fit(X_train, y_train, eval_set=[(X_test, y_test)], verbose=False)

[4/7] Training XGBoost regressor...


In [None]:
# ----------------------
# 5) Evaluate on test set
# ----------------------
print("[5/7] Evaluating on held-out test set...")
y_pred = model.predict(X_test)
mae, rmse, r2 = metrics(y_test, y_pred)
print(f"MAE:  {mae:.2f}\nRMSE: {rmse:.2f}\nR²:   {r2:.4f}")

# Plot: Predicted vs Actual (test)
plt.figure(figsize=(12,6))
plt.scatter(y_test.values, y_pred, alpha=0.6)
lims = [min(y_test.min(), y_pred.min()), max(y_test.max(), y_pred.max())]
plt.plot(lims, lims, "--")
plt.xlabel("Actual US AQI (test)")
plt.ylabel("Predicted US AQI")
plt.title(f"XGBoost: Predicted vs Actual (R²={r2:.4f})")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(os.path.join(PLOTS_DIR, "test_pred_vs_actual.png"), dpi=140)
plt.close()

# Feature importance (gain)
imp = pd.DataFrame({
    "feature": all_features,
    "importance": model.feature_importances_
}).sort_values("importance", ascending=False)
imp.to_csv(os.path.join(ARTIFACT_DIR, "xgb_feature_importance.csv"), index=False)

[5/7] Evaluating on held-out test set...
MAE:  2.56
RMSE: 17.27
R²:   0.9858


In [None]:
plt.figure(figsize=(12,7))
plt.barh(imp.head(20)["feature"], imp.head(20)["importance"])
plt.gca().invert_yaxis()
plt.title("Top 20 Feature Importances (XGBoost)")
plt.tight_layout()
plt.savefig(os.path.join(PLOTS_DIR, "feature_importance_top20.png"), dpi=140)
plt.close()

In [None]:
# Save artifacts
import joblib
joblib.dump(model, os.path.join(ARTIFACT_DIR, "xgb_model.pkl"))
joblib.dump(all_features, os.path.join(ARTIFACT_DIR, "xgb_features.pkl"))
print(f"[info] Artifacts saved to: {ARTIFACT_DIR}")

[info] Artifacts saved to: xgb_aqi_artifacts


In [None]:
# -------------------------------------------------
# 6) Build 72h forecast from last known timestamp
# -------------------------------------------------
print("[6/7] Preparing 72h forecast from last known time...")
last_known_utc = df["time_utc"].iloc[-1]

# Request future pollutant forecasts in UTC to avoid DST glitches
start_utc = (last_known_utc + pd.Timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%SZ")
end_utc   = (last_known_utc + pd.Timedelta(hours=HORIZON_H)).strftime("%Y-%m-%dT%H:%M:%SZ")

air_url = "https://air-quality-api.open-meteo.com/v1/air-quality"
air_params = {
    "latitude": LATITUDE,
    "longitude": LONGITUDE,
    "hourly": "pm10,pm2_5,carbon_monoxide,nitrogen_dioxide,sulphur_dioxide,ozone",
    "start": start_utc,
    "end": end_utc,
    "timezone": "UTC",
}

resp = requests.get(air_url, params=air_params)
resp.raise_for_status()
raw = resp.json()

# Future df (UTC)
df_future = pd.DataFrame({
    "time_utc": pd.to_datetime(raw["hourly"]["time"]),
    "pm_10": raw["hourly"]["pm10"],
    "pm_25": raw["hourly"]["pm2_5"],
    "carbon_monoxidegm": raw["hourly"]["carbon_monoxide"],
    "nitrogen_dioxide": raw["hourly"]["nitrogen_dioxide"],
    "sulphur_dioxide": raw["hourly"]["sulphur_dioxide"],
    "ozone": raw["hourly"]["ozone"],
})

# Enforce desired horizon and sort
if len(df_future) > HORIZON_H:
    df_future = df_future.sort_values("time_utc").iloc[:HORIZON_H].reset_index(drop=True)
else:
    df_future = df_future.sort_values("time_utc").reset_index(drop=True)

# Combine last MAX_LAG_H hours of history + future for feature generation
history_block = work[features].tail(MAX_LAG_H).copy()
if len(history_block) < MAX_LAG_H:
    print(f"[warn] Only {len(history_block)} hours of history available; expected {MAX_LAG_H}.")

combined_vals = pd.concat([
    history_block.reset_index(drop=True),
    df_future[features].reset_index(drop=True)
], axis=0).reset_index(drop=True)

# Create lag/rolling features on combined sequence
combined = create_lag_features(combined_vals, features)

# Slice the future part
future_block = combined.iloc[len(history_block): len(history_block) + len(df_future)].copy()

# Align columns to model expectation
for c in all_features:
    if c not in future_block.columns:
        future_block[c] = 0.0
future_block = future_block[all_features]

# Predict future AQI
future_pred = model.predict(future_block)

# Build prediction dataframe with timestamps in TZ
future_times_utc = df_future["time_utc"].copy()
future_times_tz  = utc_to_tz(future_times_utc, TZ)

forecast_df = pd.DataFrame({
    "datetime": future_times_tz,              # tz-aware in TZ
    "datetime_utc": future_times_utc.dt.tz_localize("UTC") if future_times_utc.dt.tz is None else future_times_utc.dt.tz_convert("UTC"),
    "predicted_us_aqi": future_pred,
})

forecast_path = os.path.join(ARTIFACT_DIR, "xgb_72h_forecast.csv")
forecast_df.to_csv(forecast_path, index=False)
print(f"[info] Saved 72h forecast to: {forecast_path}")

[6/7] Preparing 72h forecast from last known time...
[info] Saved 72h forecast to: xgb_aqi_artifacts/xgb_72h_forecast.csv


In [None]:
# -------------------------------------------------------
# 7) Fetch US AQI for same timestamps & align (same TZ)
# -------------------------------------------------------
print("[7/7] Fetching US AQI for the exact forecast window (aligned TZ)...")
start_date_local = forecast_df["datetime"].min().strftime("%Y-%m-%d")
end_date_local   = forecast_df["datetime"].max().strftime("%Y-%m-%d")

# Use timezone=TZ so API returns local-time stamps matching `forecast_df.datetime`
aqi_params = {
    "latitude": LATITUDE,
    "longitude": LONGITUDE,
    "hourly": "us_aqi",
    "timezone": TZ,
    "start_date": start_date_local,
    "end_date": end_date_local,
}

resp2 = requests.get(air_url, params=aqi_params)
resp2.raise_for_status()
raw2 = resp2.json()

actual_df = pd.DataFrame({
    "datetime": pd.to_datetime(raw2["hourly"]["time"]).tz_localize(TZ),
    "us_aqi_actual": raw2["hourly"]["us_aqi"],
})

# Inner join ensures perfect hour alignment on shared timestamps
merged = pd.merge(forecast_df[["datetime", "predicted_us_aqi"]], actual_df, on="datetime", how="inner").sort_values("datetime")
merged_path = os.path.join(ARTIFACT_DIR, "xgb_forecast_vs_us_aqi.csv")
merged.to_csv(merged_path, index=False)
print(f"[info] Saved aligned forecast-vs-actual to: {merged_path}")

[7/7] Fetching US AQI for the exact forecast window (aligned TZ)...
[info] Saved aligned forecast-vs-actual to: xgb_aqi_artifacts/xgb_forecast_vs_us_aqi.csv


In [None]:
# ------------------
# Plot 72h alignment
# ------------------
plt.figure(figsize=(12,6))
plt.plot(merged["datetime"], merged["predicted_us_aqi"], label="Predicted US AQI (XGB)")
plt.plot(merged["datetime"], merged["us_aqi_actual"], label="US AQI from API", linestyle="--")
plt.title("Next 72 Hours: Predicted vs US AQI (Aligned in {0})".format(TZ))
plt.xlabel("Datetime ({0})".format(TZ))
plt.ylabel("US AQI")
plt.grid(True, alpha=0.3)
plt.legend()
plt.tight_layout()
plot_path = os.path.join(PLOTS_DIR, "xgb_72h_pred_vs_actual.png")
plt.savefig(plot_path, dpi=140)
plt.close()
print(f"[info] Plot saved to: {plot_path}")

# ----------------------
# Quick accuracy summary
# ----------------------
# Note: This is not a true evaluation (future vs future), but useful to see drift.
if not merged.empty:
    mae_f = mean_absolute_error(merged["us_aqi_actual"], merged["predicted_us_aqi"])
    rmse_f = mean_squared_error(merged["us_aqi_actual"], merged["predicted_us_aqi"])
    corr_f = np.corrcoef(merged["us_aqi_actual"].values, merged["predicted_us_aqi"].values)[0,1] if len(merged) > 1 else np.nan
    print(f"\n[summary] 72h window alignment metrics (API us_aqi vs XGB prediction):\n  MAE:  {mae_f:.2f}\n  RMSE: {rmse_f:.2f}\n  Corr: {corr_f:.4f}")
else:
    print("[summary] No overlapping rows after merge; check timezone and date range.")

print("\n✅ Done. Artifacts are in:", os.path.abspath(ARTIFACT_DIR))

[info] Plot saved to: xgb_aqi_artifacts/plots/xgb_72h_pred_vs_actual.png

[summary] 72h window alignment metrics (API us_aqi vs XGB prediction):
  MAE:  3.93
  RMSE: 32.82
  Corr: 0.9436

✅ Done. Artifacts are in: /content/xgb_aqi_artifacts


In [None]:
from hsml.model import ModelSchema, Schema
import joblib
import os
import shutil

# --- Create schema ---
input_schema = Schema(X_train)
output_schema = Schema(y_train)
model_schema = ModelSchema(input_schema=input_schema, output_schema=output_schema)

# --- Model registry ---
mr = project.get_model_registry()

model_meta = mr.python.create_model(
    name="xgb_aqi_forecaster",
    metrics={"mae": mae, "rmse": rmse, "r2": r2},
    model_schema=model_schema,
    description="XGBoost model for AQI forecasting using weather & pollutant lags"
)

# --- Create artifact folder ---
ARTIFACT_DIR = "xgb_artifacts"
os.makedirs(ARTIFACT_DIR, exist_ok=True)

# Save model
joblib.dump(model, os.path.join(ARTIFACT_DIR, "xgb_model.pkl"))

# Save features list
joblib.dump(all_features, os.path.join(ARTIFACT_DIR, "xgb_features.pkl"))

# --- Save to Model Registry ---
model_meta.save(ARTIFACT_DIR)


  0%|          | 0/6 [00:00<?, ?it/s]

Uploading /content/xgb_artifacts/xgb_features.pkl: 0.000%|          | 0/1933 elapsed<00:00 remaining<?

Uploading /content/xgb_artifacts/xgb_model.pkl: 0.000%|          | 0/9070397 elapsed<00:00 remaining<?

Uploading /content/model_schema.json: 0.000%|          | 0/7493 elapsed<00:00 remaining<?

Model created, explore it at https://c.app.hopsworks.ai:443/p/1239199/models/xgb_aqi_forecaster/2


Model(name: 'xgb_aqi_forecaster', version: 2)