In [11]:
#!/usr/bin/env python3
"""
ingest_to_hopsworks.py

Loads hourly counts (always), optional predictions (if present),
and the best MLflow model, then pushes them into Hopsworks
Feature Store and Model Registry.
"""

import os
import shutil
import pandas as pd
import hopsworks
import mlflow
from mlflow.tracking import MlflowClient

# ──────────────────────────────────────────────────────────────────────────────
# LOCAL PATHS & CONFIG
PARQUET_PATH       = "/Users/kaushalshivaprakash/Desktop/project3/data/processed/cleaned_citibike/citibike_2023_top3.parquet"
PREDICTIONS_PATH   = "/Users/kaushalshivaprakash/Desktop/project3/data/predictions.csv"
EXPERIMENT_NAME    = "CitiBike_Remote_Experiment"

# Feature Group names & versions
FG_COUNTS_NAME     = "citi_bike_hourly_counts"
FG_COUNTS_VERSION  = 1
FG_PRED_NAME       = "citi_bike_hourly_predictions"
FG_PRED_VERSION    = 1

# Hopsworks Model Registry name
HOPS_MODEL_NAME    = "CitiBikeForecasting"

# Local folder for downloading the best model artifact
MODEL_LOCAL_PATH   = "best_model_artifact"
# ──────────────────────────────────────────────────────────────────────────────

def main():
    # 1. Log into Hopsworks
    project = hopsworks.login()
    fs      = project.get_feature_store()
    mr      = project.get_model_registry()

    # 2. Load & aggregate your time-series counts
    df = pd.read_parquet(PARQUET_PATH)
    df["datetime"] = df["started_at"].dt.floor("H")
    counts = (
        df.groupby("datetime")
          .size()
          .reset_index(name="count")
          .sort_values("datetime")
    )

    # 3. Create or get a Feature Group for counts
    fg_counts = fs.get_feature_group(FG_COUNTS_NAME, FG_COUNTS_VERSION)
    if fg_counts is None:
        fg_counts = fs.create_feature_group(
            name=FG_COUNTS_NAME,
            version=FG_COUNTS_VERSION,
            primary_key=["datetime"],
            event_time="datetime",
            description="Hourly ride counts for top-3 stations",
        )
    fg_counts.insert(counts, write_options={"wait_for_job": False})
    print(f"→ Inserted {len(counts)} rows into Feature Group '{FG_COUNTS_NAME}'")

    # 4. (Optional) Load & ingest your predictions CSV if it exists
    if os.path.exists(PREDICTIONS_PATH):
        preds = pd.read_csv(PREDICTIONS_PATH, parse_dates=["datetime"])
        fg_preds = fs.get_feature_group(FG_PRED_NAME, FG_PRED_VERSION)
        if fg_preds is None:
            fg_preds = fs.create_feature_group(
                name=FG_PRED_NAME,
                version=FG_PRED_VERSION,
                primary_key=["datetime"],
                event_time="datetime",
                description="Model predictions for hourly ride counts",
            )
        fg_preds.insert(preds, write_options={"wait_for_job": False})
        print(f"→ Inserted {len(preds)} rows into Feature Group '{FG_PRED_NAME}'")
    else:
        print(f"⚠️  No predictions file at '{PREDICTIONS_PATH}', skipped.")

    # 5. Find the best MLflow run (lowest MAE) in your experiment
    client = MlflowClient()
    exp    = client.get_experiment_by_name(EXPERIMENT_NAME)
    runs   = client.search_runs(
        experiment_ids=[exp.experiment_id],
        run_view_type=1,
        max_results=1,
        order_by=["metrics.mae ASC"],
    )
    best   = runs[0]
    run_id = best.info.run_id
    best_mae = best.data.metrics["mae"]
    print(f"→ Best run {run_id} with MAE={best_mae:.2f}")

    # 6. Download the model artifact from that run
    if os.path.exists(MODEL_LOCAL_PATH):
        shutil.rmtree(MODEL_LOCAL_PATH)
    os.makedirs(MODEL_LOCAL_PATH, exist_ok=True)
    client.download_artifacts(run_id, "model", dst_path=MODEL_LOCAL_PATH)
    print(f"→ Downloaded artifacts to ./{MODEL_LOCAL_PATH}/")

    # 7. Register in Hopsworks Model Registry
    hs_model = mr.python.create_model(HOPS_MODEL_NAME)
    hs_model.save(MODEL_LOCAL_PATH)
    print(f"→ Registered model '{HOPS_MODEL_NAME}' version {hs_model.version} in Hopsworks")

    # If you wish to promote to production, you can do so via:
    #   - The Hopsworks UI under Model Registry
    #   - or via the Hopsworks SDK, e.g. mr.get_model(...).set_stage("production")

if __name__ == "__main__":
    main()

2025-05-10 17:11:26,108 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-05-10 17:11:26,112 INFO: Initializing external client
2025-05-10 17:11:26,112 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-05-10 17:11:26,829 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1213683


Uploading Dataframe: 100.00% |█| Rows 8256/8256 | Elapsed Time: 00:00 | Remainin


Launching job: citi_bike_hourly_counts_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1213683/jobs/named/citi_bike_hourly_counts_1_offline_fg_materialization/executions
→ Inserted 8256 rows into Feature Group 'citi_bike_hourly_counts'


Uploading Dataframe: 100.00% |█| Rows 8228/8228 | Elapsed Time: 00:01 | Remainin
Use fg.materialization_job.run(args=-op offline_fg_materialization -path hdfs:///Projects/Applied_ML_Project1/Resources/jobs/citi_bike_hourly_predictions_1_offline_fg_materialization/config_1746911103706) to trigger the materialization job again.


→ Inserted 8228 rows into Feature Group 'citi_bike_hourly_predictions'
→ Best run 305b68c9cd9c463fb359e5b715a94872 with MAE=8.22


Downloading artifacts:   0%|          | 0/5 [00:00<?, ?it/s]

→ Downloaded artifacts to ./best_model_artifact/




  0%|          | 0/6 [00:00<?, ?it/s]

Uploading /Users/kaushalshivaprakash/Desktop/project3/pipelines/best_model_artifact/model/python_env.yaml: 0.0…

Uploading /Users/kaushalshivaprakash/Desktop/project3/pipelines/best_model_artifact/model/requirements.txt: 0.…

Uploading /Users/kaushalshivaprakash/Desktop/project3/pipelines/best_model_artifact/model/MLmodel: 0.000%|    …

Uploading /Users/kaushalshivaprakash/Desktop/project3/pipelines/best_model_artifact/model/model.pkl: 0.000%|  …

Uploading /Users/kaushalshivaprakash/Desktop/project3/pipelines/best_model_artifact/model/conda.yaml: 0.000%| …

Model created, explore it at https://c.app.hopsworks.ai:443/p/1213683/models/CitiBikeForecasting/2
→ Registered model 'CitiBikeForecasting' version 2 in Hopsworks


In [6]:
#!/usr/bin/env python3
"""
inference_run_uri.py

Finds your best MLflow run (by MAE), loads its model artifact directly via run URI,
makes forecasts on the cleaned Parquet, and writes out predictions.csv.
"""

import os
import pandas as pd
import mlflow
from mlflow.tracking import MlflowClient

# ──────────────────────────────────────────────────────────────────────────────
# CONFIG
PARQUET_PATH    = "/Users/kaushalshivaprakash/Desktop/project3/data/processed/cleaned_citibike/citibike_2023_top3.parquet"
EXPERIMENT_NAME = "CitiBike_Remote_Experiment"
OUTPUT_CSV      = "/Users/kaushalshivaprakash/Desktop/project3/data/predictions.csv"

# If you’re using DagsHub remote tracking, set these (otherwise unset)
os.environ["MLFLOW_TRACKING_USERNAME"] = "kaushal-shivaprakashan"
os.environ["MLFLOW_TRACKING_PASSWORD"] = "b01d7b8c94b982d47d0224ea469bbfe4b8870ff6"
mlflow.set_tracking_uri("https://dagshub.com/kaushal-shivaprakashan/final_project.mlflow")
# ──────────────────────────────────────────────────────────────────────────────

def load_hourly_counts(path):
    df = pd.read_parquet(path)
    df["datetime"] = df["started_at"].dt.floor("H")
    agg = (
        df.groupby("datetime")
          .size()
          .reset_index(name="count")
          .sort_values("datetime")
          .reset_index(drop=True)
    )
    return agg

def get_best_run_id(experiment_name: str) -> str:
    client = MlflowClient()
    exp = client.get_experiment_by_name(experiment_name)
    if exp is None:
        raise ValueError(f"Experiment '{experiment_name}' not found")
    runs = client.search_runs(
        experiment_ids=[exp.experiment_id],
        filter_string="",
        run_view_type=1,
        max_results=1,
        order_by=["metrics.mae ASC"]
    )
    best = runs[0]
    print(f"Best run: {best.info.run_id} (MAE={best.data.metrics['mae']:.2f})")
    return best.info.run_id

def load_model_from_run(run_id: str):
    uri = f"runs:/{run_id}/model"
    print(f"Loading model from {uri}")
    return mlflow.pyfunc.load_model(uri)

def build_lag_features(df, max_lag=28):
    df = df.copy()
    for lag in range(1, max_lag+1):
        df[f"lag_{lag}"] = df["count"].shift(lag)
    return df.dropna().reset_index(drop=True)

def main():
    # 1) load counts
    hourly = load_hourly_counts(PARQUET_PATH)

    # 2) find best run
    run_id = get_best_run_id(EXPERIMENT_NAME)

    # 3) load model
    model = load_model_from_run(run_id)

    # 4) build features & predict
    data = build_lag_features(hourly, max_lag=28)
    feature_cols = [f"lag_{i}" for i in range(1, 29)]
    data["prediction"] = model.predict(data[feature_cols])

    # 5) write CSV
    out = data[["datetime", "prediction"]]
    os.makedirs(os.path.dirname(OUTPUT_CSV), exist_ok=True)
    out.to_csv(OUTPUT_CSV, index=False)
    print(f"Wrote {len(out)} rows to {OUTPUT_CSV}")

if __name__ == "__main__":
    main()

Best run: 305b68c9cd9c463fb359e5b715a94872 (MAE=8.22)
Loading model from runs:/305b68c9cd9c463fb359e5b715a94872/model


Downloading artifacts:   0%|          | 0/5 [00:00<?, ?it/s]

Wrote 8228 rows to /Users/kaushalshivaprakash/Desktop/project3/data/predictions.csv


In [37]:
#!/usr/bin/env python3
"""
register_and_deploy_model.py

1) Defines unpickle‐time helpers
2) Logs into Hopsworks
3) Loads preprocessing pipeline & 28-lag model
4) Registers model via the sklearn API (exporting only the .pkl)
5) Generates 28-lag features, runs batch inference, writes predictions
6) Deploys the model endpoint (single .pkl artifact)
"""

import os
import shutil
import pandas as pd
import joblib
import hopsworks

# ─── 1) UNPICKLE HELPERS ────────────────────────────────────────────────
def select_hour_bucket(df): return df[["hour_bucket"]]
def extract_datetime_parts(df):
    return pd.DataFrame({
        "year":  df["hour_bucket"].dt.year,
        "month": df["hour_bucket"].dt.month,
        "day":   df["hour_bucket"].dt.day,
        "hour":  df["hour_bucket"].dt.hour
    })
def select_station_id(df): return df[["start_station_id"]]

# 28-lag generator
def make_28_lag_features(df):
    counts = df.groupby(["start_station_id","hour_bucket"]).size().rename("trips").reset_index()
    frames = []
    for station, grp in counts.groupby("start_station_id"):
        ts = grp.set_index("hour_bucket")["trips"]
        full_idx = pd.date_range(ts.index.min(), ts.index.max(), freq="H")
        ts = ts.reindex(full_idx, fill_value=0)
        ts.index.name = "hour_bucket"
        lags = {f"lag_{i}": ts.shift(i) for i in range(1,29)}
        lag_df = pd.DataFrame(lags).dropna()
        lag_df["start_station_id"] = station
        frames.append(lag_df.reset_index())
    return pd.concat(frames, ignore_index=True)

# ─── 2) CONFIGURATION ───────────────────────────────────────────────────
HOST    = "c.app.hopsworks.ai"
PROJECT = "final_project_7"
API_KEY = "QfxZ4kUDSLb9E9qs.xCFq8U27Q5jfdYDNVLMuwmj5e90Ru3cUC1oXpbzl5WNCXZIJ3SJ1DGM3uaiVHYUV"

PIPELINE_PATH = "/Users/kaushalshivaprakash/Desktop/project3/pipelines/models/feature_pipeline.pkl"
MODEL_PATH    = "/Users/kaushalshivaprakash/Desktop/project3/models/lgbm_28lag.pkl"
PARQUET_PATH  = "/Users/kaushalshivaprakash/Desktop/project3/data/processed/cleaned_citibike/citibike_2023_top3.parquet"

FG_NAME    = "citi_bike_inference"
FG_VERSION = 1
EP_NAME    = "citiBikePredictor"  # alphanumeric only

# ─── 3) LOGIN & LOAD ARTIFACTS ────────────────────────────────────────────
project = hopsworks.login(host=HOST, project=PROJECT, api_key_value=API_KEY)
print(f"✅ Logged into Hopsworks: {project.name}")

preprocessor = joblib.load(PIPELINE_PATH)
model        = joblib.load(MODEL_PATH)

# ─── 4) REGISTER MODEL ───────────────────────────────────────────────────
mr = project.get_model_registry()
sk_model = mr.sklearn.create_model(
    name="citi_bike_demand_model",
    metrics={"validation_mae": 5.534},
    description="LGBM 28-lag model"
)
export_dir = "/tmp/hopsworks_model_export"
shutil.rmtree(export_dir, ignore_errors=True)
os.makedirs(export_dir)
# Only export the single model file for serving:
shutil.copy(MODEL_PATH, os.path.join(export_dir, "model.pkl"))
sk_model.save(export_dir)
print(f"✅ Registered model version: {sk_model.version}")

# ─── 5) BATCH INFERENCE & STORE ─────────────────────────────────────────
df = pd.read_parquet(PARQUET_PATH)
df["started_at"]  = pd.to_datetime(df["started_at"])
df["hour_bucket"] = df["started_at"].dt.floor("H")

lags_df = make_28_lag_features(df)
lag_cols = [f"lag_{i}" for i in range(1,29)]
lags_df["predicted_trips"] = model.predict(lags_df[lag_cols])

fs = project.get_feature_store()
fg = fs.get_or_create_feature_group(
    name=FG_NAME,
    version=FG_VERSION,
    primary_key=["start_station_id","hour_bucket"],
    event_time="hour_bucket",
    description="28-lag batch predictions"
)
fg.insert(lags_df[["start_station_id","hour_bucket","predicted_trips"]],
          write_options={"wait_for_job":True})
print("✅ Stored inference results in Feature Store.")

# ─── 6) DEPLOY MODEL ENDPOINT ────────────────────────────────────────────
endpoint = sk_model.deploy(name=EP_NAME)
print(f"🚀 Deployed endpoint: {endpoint.name} at {endpoint.url}")

2025-05-11 00:35:28,444 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-05-11 00:35:28,448 INFO: Initializing external client
2025-05-11 00:35:28,448 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-05-11 00:35:29,043 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1228970
✅ Logged into Hopsworks: final_project_7




  0%|          | 0/6 [00:00<?, ?it/s]

Uploading /tmp/hopsworks_model_export/model.pkl: 0.000%|          | 0/276812 elapsed<00:00 remaining<?

Model created, explore it at https://c.app.hopsworks.ai:443/p/1228970/models/citi_bike_demand_model/6
✅ Registered model version: 6


Uploading Dataframe: 100.00% |█| Rows 26294/26294 | Elapsed Time: 00:00 | Remain


Launching job: citi_bike_inference_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1228970/jobs/named/citi_bike_inference_1_offline_fg_materialization/executions
2025-05-11 00:35:51,106 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-05-11 00:35:54,198 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-05-11 00:37:30,185 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-05-11 00:37:30,251 INFO: Waiting for log aggregation to finish.
2025-05-11 00:37:41,688 INFO: Execution finished successfully.
✅ Stored inference results in Feature Store.


RestAPIError: Metadata operation error: (url: https://c.app.hopsworks.ai/hopsworks-api/api/project/1228970/serving). Server response: 
HTTP code: 422, HTTP reason: CUSTOM, body: b'{"errorCode":120001,"usrMsg":"Serving name must consist of lower case alphanumeric characters, \'-\' or \'.\', and start and end with an alphanumeric character","errorMsg":"An argument was not provided or it was malformed."}', error code: 120001, error msg: An argument was not provided or it was malformed., user msg: Serving name must consist of lower case alphanumeric characters, '-' or '.', and start and end with an alphanumeric character