In [10]:
#bootstrapping the environment - repo root, .env, settings

import sys
import os
import hopsworks
import pandas as pd
import numpy as np
from pathlib import Path
from dotenv import load_dotenv

def find_repo_root(start: Path | None = None) -> Path:
    p = (start or Path.cwd()).resolve()
    for _ in range(25):
        if (p / ".env").exists() or (p / ".git").exists() or (p / "pyproject.toml").exists():
            return p
        if p.parent == p:
            break
        p = p.parent
    return (start or Path.cwd()).resolve()

root_dir = find_repo_root()
print("Repo root:", root_dir)

if str(root_dir) not in sys.path:
    sys.path.append(str(root_dir))
    print("Added to PYTHONPATH:", root_dir)

load_dotenv(root_dir / ".env")

assert os.getenv("HOPSWORKS_API_KEY"), "Missing HOPSWORKS_API_KEY in .env"
assert os.getenv("DATA_PATH"), \
    "Missing data path in .env (set DATA_PATH=... recommended)"

print("Loaded .env successfully")

from mlfs.mcphases.config import settings
print("DATA_PATH resolved to:", settings.DATA_PATH)

Repo root: /Users/sreenijaveladri/Downloads/llm_project_starter/scalable-ml-project
Loaded .env successfully
DATA_PATH resolved to: /Users/sreenijaveladri/Downloads/llm_project_starter/scalable-ml-project/data/mcphases/raw


In [12]:
project = hopsworks.login(engine="python")
fs = project.get_feature_store()


fg_raw = fs.get_feature_group("mcphases_daily_fg", version=1)
#offline read into pandas
df = fg_raw.read()   

print("Loaded from Hopsworks Feature Group: mcphases_daily_fg v1")
print("Shape:", df.shape)
display(df.head(3))

#fxing column-name mismatch (raw FG uses single underscore)
if "resting_heart_rate__value" not in df.columns and "resting_heart_rate_value" in df.columns:
    df["resting_heart_rate__value"] = df["resting_heart_rate_value"]

2026-01-11 18:19:03,930 INFO: Closing external client and cleaning up certificates.
2026-01-11 18:19:03,934 INFO: Connection closed.
2026-01-11 18:19:03,936 INFO: Initializing external client
2026-01-11 18:19:03,937 INFO: Base URL: https://eu-west.cloud.hopsworks.ai:443
2026-01-11 18:19:05,325 INFO: Python Engine initialized.

Logged in to project, explore it here https://eu-west.cloud.hopsworks.ai:443/p/3208
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.38s) 
Loaded from Hopsworks Feature Group: mcphases_daily_fg v1
Shape: (5659, 75)


Unnamed: 0,subject_id,study_interval,is_weekend,day_in_study,phase,lh,estrogen,pdg,flow_volume,flow_color,...,subject_info_birth_year,subject_info_gender,subject_info_ethnicity,subject_info_education,subject_info_sexually_active,subject_info_self_report_menstrual_health_literacy,subject_info_age_of_first_menarche,sleep_duration_minutes,sleep_duration_hours,event_time
0,1,2022,True,1,Follicular,2.9,94.2,,Not at all,Not at all,...,1999,Woman,White,"Some university/ post-secondary, no degree",Yes,,14,617.0,10.283333,2020-01-02
1,1,2022,False,2,Follicular,1.2,226.3,,Not at all,Not at all,...,1999,Woman,White,"Some university/ post-secondary, no degree",Yes,,14,258.5,4.308333,2020-01-03
2,1,2022,False,3,Follicular,3.5,276.8,,Not at all,Not at all,...,1999,Woman,White,"Some university/ post-secondary, no degree",Yes,,14,530.0,8.833333,2020-01-04


In [13]:
#avro safe python types
def make_avro_safe(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()

    df = df.where(pd.notnull(df), None)

    for col in df.columns:
        s = df[col]

        if pd.api.types.is_bool_dtype(s) or str(s.dtype) == "boolean":
            df[col] = s.apply(lambda x: None if x is None else bool(x)).astype("object")

        elif pd.api.types.is_integer_dtype(s):
            df[col] = s.apply(lambda x: None if x is None else int(x)).astype("object")

        elif pd.api.types.is_float_dtype(s):
            df[col] = s.apply(lambda x: None if x is None else float(x)).astype("object")

        elif pd.api.types.is_datetime64_any_dtype(s):
            df[col] = s.apply(lambda x: None if x is None else pd.Timestamp(x).to_pydatetime()).astype("object")

        elif s.dtype == object:
            df[col] = s.apply(lambda x: None if x is None else (str(x) if isinstance(x, np.str_) else x))

    return df

In [14]:
#defining pks, and sorting

KEYS = ["subject_id", "day_in_study"]
df = df.sort_values(KEYS).reset_index(drop=True)

assert df[KEYS].isna().sum().sum() == 0, "Missing keys"
assert df.duplicated(subset=KEYS).sum() == 0, "Duplicate subject-day rows still exist"

Keys OK.


In [16]:
#definig targets (mood + energy/fatigue) from existing cols
#mappig ordinal target categories to numbers and then to 3 classes

LEVELS_6 = ["Not at all", "Very Low/Little", "Low", "Moderate", "High", "Very High"]
LEVEL_MAP_6 = {lvl: i for i, lvl in enumerate(LEVELS_6)}  # 0..5

MAX6 = len(LEVELS_6) - 1 

def map_ordinal_6(series: pd.Series) -> pd.Series:
    s = series.astype("string").str.strip()
    s = s.str.replace(r"\s+", " ", regex=True)
    return s.map(LEVEL_MAP_6).astype("float")


# fatigue->energy (inverting so higher = better)
df["fatigue_num"] = map_ordinal_6(df["fatigue"])
df["y_energy_num"] = MAX6 - df["fatigue_num"]

# moodswing->mood stability (inverting so higher = better)
df["moodswing_num"] = map_ordinal_6(df["moodswing"])
df["y_mood_stability_num"] = MAX6 - df["moodswing_num"]

def to_3class_from_6(x):
    #0-1 low, 2-3 mid, 4-5 high
    return pd.cut(x, bins=[-0.1, 1.5, 3.5, 5.1], labels=[0, 1, 2]).astype("float")

df["y_energy_cls3"] = to_3class_from_6(df["y_energy_num"])
df["y_mood_stability_cls3"] = to_3class_from_6(df["y_mood_stability_num"])

#rows that have no ground-truth labels/targets
print("Missing fatigue:", df["fatigue"].isna().mean())
print("Missing y_energy_cls3:", df["y_energy_cls3"].isna().mean())
print("Missing moodswing:", df["moodswing"].isna().mean())
print("Missing y_mood_stability_cls3:", df["y_mood_stability_cls3"].isna().mean())

#not losing any extra rows, mapping is perfect

Missing fatigue: 0.4113801024916063
Missing y_energy_cls3: 0.4113801024916063
Missing moodswing: 0.41332390881781234
Missing y_mood_stability_cls3: 0.41332390881781234


In [17]:
# quantifying how many target labeled rows there are
n_total = len(df)
n_energy = df["y_energy_cls3"].notna().sum()
n_mood = df["y_mood_stability_cls3"].notna().sum()
n_both = df[["y_energy_cls3","y_mood_stability_cls3"]].notna().all(axis=1).sum()

print("Total rows:", n_total)
print("Energy labeled rows:", n_energy, f"({n_energy/n_total:.1%})")
print("Mood stability labeled rows:", n_mood, f"({n_mood/n_total:.1%})")
print("Both labeled rows:", n_both, f"({n_both/n_total:.1%})")

#will be training them separetly so no need for both targets to exist per row 
#DECISION POINT == 4 models (energy, mood stability predictors) per mode (mode A = only wearables as inputs, mode B = including lagged targets), 
    # == 3-class classigication for higher accuracy due to smaller dataset limitations

Total rows: 5659
Energy labeled rows: 3331 (58.9%)
Mood stability labeled rows: 3320 (58.7%)
Both labeled rows: 3320 (58.7%)


In [18]:
#mpping ordinal selfreport feature cols
ORDINAL_FEATURE_COLS = [c for c in ["cramps", "headaches", "sleepissue", "stress", "flow_volume"] if c in df.columns]

for c in ORDINAL_FEATURE_COLS:
    df[c + "_num"] = map_ordinal_6(df[c])

print("Created ordinal numeric features:", [c + "_num" for c in ORDINAL_FEATURE_COLS])

Created ordinal numeric features: ['cramps_num', 'headaches_num', 'sleepissue_num', 'stress_num', 'flow_volume_num']


In [19]:
#universal input features defining

KEYS = ["subject_id", "day_in_study"]

FEATURES_BASE = [
    #calendar
    "phase", "is_weekend",

    # strict wearables-friendly numeric features
    "sleep_duration_minutes",
    "resting_heart_rate__value",
]

#ordinal->numemic alreadry
FEATURES_SELFREPORT = [
    "cramps_num",
    "stress_num",
    "headaches_num",
    "sleepissue_num",
]

FEATURES_MODE_A = [c for c in (FEATURES_BASE + FEATURES_SELFREPORT) if c in df.columns]

#ensure keys not duplicated
FEATURES_MODE_A = [c for c in FEATURES_MODE_A if c not in KEYS]
FEATURES_MODE_A = list(dict.fromkeys(FEATURES_MODE_A))

print("Mode A STRICT features:", FEATURES_MODE_A)

Mode A STRICT features: ['phase', 'is_weekend', 'sleep_duration_minutes', 'resting_heart_rate__value', 'cramps_num', 'stress_num', 'headaches_num', 'sleepissue_num']


In [20]:
#MODE A DATASET (only wearables as features), less accurate

# Energy dataset
df_energy_A = df[KEYS + FEATURES_MODE_A + ["y_energy_cls3"]].dropna(subset=["y_energy_cls3"]).copy()
df_energy_A["y_energy_cls3"] = df_energy_A["y_energy_cls3"].astype(int)

# Mood stability dataset (Mode A)
df_mood_A = df[KEYS + FEATURES_MODE_A + ["y_mood_stability_cls3"]].dropna(subset=["y_mood_stability_cls3"]).copy()
df_mood_A["y_mood_stability_cls3"] = df_mood_A["y_mood_stability_cls3"].astype(int)

print("Mode A energy rows:", len(df_energy_A))
print("Mode A mood rows:", len(df_mood_A))

Mode A energy rows: 3331
Mode A mood rows: 3320


In [21]:
#MODE B DATASET (wearables & lagged targets as features), more accurate

LAGS = [1]

# Energy Mode B: adding lagged energy labels as features
SELECT_ENERGY = list(dict.fromkeys(KEYS + FEATURES_MODE_A + ["y_energy_cls3"]))
df_energy_B = df.loc[:, SELECT_ENERGY].copy().sort_values(KEYS)
for lag in LAGS:
    df_energy_B[f"lag{lag}_energy"] = df_energy_B.groupby("subject_id")["y_energy_cls3"].shift(lag)

# keeping rows where current y exists and lags exist
df_energy_B = df_energy_B.dropna(subset=["y_energy_cls3"] + [f"lag{l}_energy" for l in LAGS]).copy()
df_energy_B["y_energy_cls3"] = df_energy_B["y_energy_cls3"].astype(int)

# Mood Mode B: adding lagged mood stability labels as features
SELECT_MOOD = list(dict.fromkeys(KEYS + FEATURES_MODE_A + ["y_mood_stability_cls3"]))
df_mood_B = df.loc[:, SELECT_MOOD].copy().sort_values(KEYS)
for lag in LAGS:
    df_mood_B[f"lag{lag}_mood"] = df_mood_B.groupby("subject_id")["y_mood_stability_cls3"].shift(lag)

df_mood_B = df_mood_B.dropna(subset=["y_mood_stability_cls3"] + [f"lag{l}_mood" for l in LAGS]).copy()
df_mood_B["y_mood_stability_cls3"] = df_mood_B["y_mood_stability_cls3"].astype(int)

print("Mode B energy rows:", len(df_energy_B))
print("Mode B mood rows:", len(df_mood_B))

Mode B energy rows: 3085
Mode B mood rows: 3084


In [22]:
#one hotting only nominal columns, not ordinal

def one_hot_nominal_only(df_in: pd.DataFrame, nominal_cols: list[str]) -> pd.DataFrame:
    out = df_in.copy()
    cols = [c for c in nominal_cols if c in out.columns]
    if cols:
        out = pd.get_dummies(out, columns=cols, dummy_na=True)
    return out

NOMINAL_COLS = ["phase"] #add more if tjere

energy_A_ml = one_hot_nominal_only(df_energy_A, NOMINAL_COLS)
energy_B_ml = one_hot_nominal_only(df_energy_B, NOMINAL_COLS)
mood_A_ml   = one_hot_nominal_only(df_mood_A, NOMINAL_COLS)
mood_B_ml   = one_hot_nominal_only(df_mood_B, NOMINAL_COLS)


OUT_DIR = Path(root_dir) / "data_cache"
OUT_DIR.mkdir(exist_ok=True)

energy_A_ml.to_parquet(OUT_DIR / "mcphases_energy_modeA.parquet", index=False)
energy_B_ml.to_parquet(OUT_DIR / "mcphases_energy_modeB.parquet", index=False)
mood_A_ml.to_parquet(OUT_DIR / "mcphases_mood_modeA.parquet", index=False)
mood_B_ml.to_parquet(OUT_DIR / "mcphases_mood_modeB.parquet", index=False)

print("Saved 4 datasets to:", OUT_DIR)

Saved 4 datasets to: /Users/sreenijaveladri/Downloads/llm_project_starter/scalable-ml-project/data_cache


In [25]:
#saving the engineered datasets (as feature groups) to Hopsworks Feature Store
KEYS = ["subject_id", "day_in_study"]
#v1 schema is wrong
FG_VER = 2

PHASE_DUMMIES = ["phase_Fertility","phase_Follicular","phase_Luteal","phase_Menstrual","phase_nan"]

def ensure_cols(df_in: pd.DataFrame, cols: list[str], fill_value=0):
    df = df_in.copy()
    for c in cols:
        if c not in df.columns:
            df[c] = fill_value
    return df

#ensuring the phase dummy columns exist in all 4 tables, for the model to work
energy_A_hs = ensure_cols(energy_A_ml, PHASE_DUMMIES, 0)
energy_B_hs = ensure_cols(energy_B_ml, PHASE_DUMMIES, 0)
mood_A_hs   = ensure_cols(mood_A_ml,   PHASE_DUMMIES, 0)
mood_B_hs   = ensure_cols(mood_B_ml,   PHASE_DUMMIES, 0)


#single underscore
def fix_rhr_name(d):
    d = d.copy()
    if "resting_heart_rate__value" in d.columns and "resting_heart_rate_value" not in d.columns:
        d["resting_heart_rate_value"] = d["resting_heart_rate__value"]
        d = d.drop(columns=["resting_heart_rate__value"])
    return d

energy_A_hs = fix_rhr_name(energy_A_hs)
energy_B_hs = fix_rhr_name(energy_B_hs)
mood_A_hs   = fix_rhr_name(mood_A_hs)
mood_B_hs   = fix_rhr_name(mood_B_hs)


#adding event_time for online-enabled feature groups
for d in [energy_A_hs, energy_B_hs, mood_A_hs, mood_B_hs]:
    d["event_time"] = pd.Timestamp("2020-01-01") + pd.to_timedelta(d["day_in_study"], unit="D")

#avro-safe conversion for online_enabled=True
energy_A_hs = make_avro_safe(energy_A_hs)
energy_B_hs = make_avro_safe(energy_B_hs)
mood_A_hs   = make_avro_safe(mood_A_hs)
mood_B_hs   = make_avro_safe(mood_B_hs)

#creating/getting feature groups - one fg per dataset
fg_energy_A = fs.get_or_create_feature_group(
    name="mcphases_energy_modea_fg", version=FG_VER,
    primary_key=KEYS, online_enabled=True, event_time="event_time",
    description="Energy Mode A dataset (features + y_energy_cls3)"
)
fg_energy_B = fs.get_or_create_feature_group(
    name="mcphases_energy_modeb_fg", version=FG_VER,
    primary_key=KEYS, online_enabled=True, event_time="event_time",
    description="Energy Mode B dataset (features + lag1_energy + y_energy_cls3)"
)
fg_mood_A = fs.get_or_create_feature_group(
    name="mcphases_mood_modea_fg", version=FG_VER,
    primary_key=KEYS, online_enabled=True, event_time="event_time",
    description="Mood Mode A dataset (features + y_mood_stability_cls3)"
)
fg_mood_B = fs.get_or_create_feature_group(
    name="mcphases_mood_modeb_fg", version=FG_VER,
    primary_key=KEYS, online_enabled=True, event_time="event_time",
    description="Mood Mode B dataset (features + lag1_mood + y_mood_stability_cls3)"
)

#inserting/upserting to Hopsworks
fg_energy_A.insert(energy_A_hs, operation="upsert", write_options={"wait_for_job": True})
fg_energy_B.insert(energy_B_hs, operation="upsert", write_options={"wait_for_job": True})
fg_mood_A.insert(mood_A_hs, operation="upsert", write_options={"wait_for_job": True})
fg_mood_B.insert(mood_B_hs, operation="upsert", write_options={"wait_for_job": True})

print("Inserted 4 engineered Feature Groups into Hopsworks")


Feature Group created successfully, explore it at 
https://eu-west.cloud.hopsworks.ai:443/p/3208/fs/3152/fg/3424


Uploading Dataframe: 100.00% |█| Rows 3331/3331 | Elapsed Time: 00:00 | Remainin


Launching job: mcphases_energy_modea_fg_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://eu-west.cloud.hopsworks.ai:443/p/3208/jobs/named/mcphases_energy_modea_fg_2_offline_fg_materialization/executions
2026-01-11 18:32:59,328 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2026-01-11 18:33:02,495 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2026-01-11 18:34:47,031 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2026-01-11 18:34:47,171 INFO: Waiting for log aggregation to finish.
2026-01-11 18:34:56,044 INFO: Execution finished successfully.
Feature Group created successfully, explore it at 
https://eu-west.cloud.hopsworks.ai:443/p/3208/fs/3152/fg/3426


Uploading Dataframe: 100.00% |█| Rows 3085/3085 | Elapsed Time: 00:00 | Remainin


Launching job: mcphases_energy_modeb_fg_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://eu-west.cloud.hopsworks.ai:443/p/3208/jobs/named/mcphases_energy_modeb_fg_2_offline_fg_materialization/executions
2026-01-11 18:35:11,117 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2026-01-11 18:35:14,283 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2026-01-11 18:37:08,397 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2026-01-11 18:37:08,526 INFO: Waiting for log aggregation to finish.
2026-01-11 18:37:20,509 INFO: Execution finished successfully.
Feature Group created successfully, explore it at 
https://eu-west.cloud.hopsworks.ai:443/p/3208/fs/3152/fg/3429


Uploading Dataframe: 100.00% |█| Rows 3320/3320 | Elapsed Time: 00:00 | Remainin


Launching job: mcphases_mood_modea_fg_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://eu-west.cloud.hopsworks.ai:443/p/3208/jobs/named/mcphases_mood_modea_fg_2_offline_fg_materialization/executions
2026-01-11 18:37:37,657 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2026-01-11 18:37:40,828 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2026-01-11 18:39:25,487 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2026-01-11 18:39:25,636 INFO: Waiting for log aggregation to finish.
2026-01-11 18:39:37,765 INFO: Execution finished successfully.
Feature Group created successfully, explore it at 
https://eu-west.cloud.hopsworks.ai:443/p/3208/fs/3152/fg/3432


Uploading Dataframe: 100.00% |█| Rows 3084/3084 | Elapsed Time: 00:00 | Remainin


Launching job: mcphases_mood_modeb_fg_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://eu-west.cloud.hopsworks.ai:443/p/3208/jobs/named/mcphases_mood_modeb_fg_2_offline_fg_materialization/executions
2026-01-11 18:39:52,682 INFO: Waiting for execution to finish. Current state: INITIALIZING. Final status: UNDEFINED
2026-01-11 18:39:55,863 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2026-01-11 18:42:53,310 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2026-01-11 18:42:53,446 INFO: Waiting for log aggregation to finish.
2026-01-11 18:43:02,138 INFO: Execution finished successfully.
✅ Inserted 4 engineered Feature Groups into Hopsworks


In [26]:
#creating Feature Views that match the model feature lists exactly

FV_VERSION = 1

# model feature lists
PHASE_COLS = ["phase_fertility", "phase_follicular", "phase_luteal", "phase_menstrual", "phase_nan"]
ENERGY_MODEA_FEATURES = [
  "is_weekend",
  "sleep_duration_minutes",
  "resting_heart_rate_value",
  "cramps_num",
  "stress_num",
  "headaches_num",
  "sleepissue_num",
] + PHASE_COLS
ENERGY_MODEB_FEATURES = ENERGY_MODEA_FEATURES + ["lag1_energy"]
MOOD_MODEA_FEATURES = ENERGY_MODEA_FEATURES.copy()
MOOD_MODEB_FEATURES = MOOD_MODEA_FEATURES + ["lag1_mood"]

def create_fv(fv_name, fg, feature_cols, label_col):
    cols = ["subject_id","day_in_study","event_time"] + feature_cols + [label_col]
    q = fg.select(cols)
    fv = fs.get_or_create_feature_view(
        name=fv_name,
        version=FV_VERSION,
        query=q,
        labels=[label_col],
        description=f"{fv_name} (features aligned to model)",
        training_helper_columns=["subject_id"],
    )
    print("FV ready:", fv.name, fv.version)

create_fv("mcphases_energy_modea_fv", fg_energy_A, ENERGY_MODEA_FEATURES, "y_energy_cls3")
create_fv("mcphases_energy_modeb_fv", fg_energy_B, ENERGY_MODEB_FEATURES, "y_energy_cls3")

create_fv("mcphases_mood_modea_fv",   fg_mood_A,   MOOD_MODEA_FEATURES,   "y_mood_stability_cls3")
create_fv("mcphases_mood_modeb_fv",   fg_mood_B,   MOOD_MODEB_FEATURES,   "y_mood_stability_cls3")

Feature view created successfully, explore it at 
https://eu-west.cloud.hopsworks.ai:443/p/3208/fs/3152/fv/mcphases_energy_modea_fv/version/1
FV ready: mcphases_energy_modea_fv 1
Feature view created successfully, explore it at 
https://eu-west.cloud.hopsworks.ai:443/p/3208/fs/3152/fv/mcphases_energy_modeb_fv/version/1
FV ready: mcphases_energy_modeb_fv 1
Feature view created successfully, explore it at 
https://eu-west.cloud.hopsworks.ai:443/p/3208/fs/3152/fv/mcphases_mood_modea_fv/version/1
FV ready: mcphases_mood_modea_fv 1
Feature view created successfully, explore it at 
https://eu-west.cloud.hopsworks.ai:443/p/3208/fs/3152/fv/mcphases_mood_modeb_fv/version/1
FV ready: mcphases_mood_modeb_fv 1
