<a href="https://colab.research.google.com/github/yuvanarvind/sleep-analysis/blob/main/Sleep_Analysis_Final_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Sleep Analysis - Final Project

### -Yuvan

In [6]:
!pip install pandas numpy scikit-learn shap matplotlib



In [7]:
import os
import pandas as pd
import numpy as np
from datetime import timedelta
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
import shap
import matplotlib.pyplot as plt
import joblib
pd.options.display.max_columns = 200


In [8]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Loading the dataset from Drive

In [9]:
import glob

DRIVE_BASE = "/content/drive/MyDrive/XAI"
RING_DATA_DIR = os.path.join(DRIVE_BASE, "ring_data")
OUTPUT_DIR = os.path.join(DRIVE_BASE, "sleep_xai_outputs")
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Get all CSV files inside ring_data folder
csv_files = glob.glob(os.path.join(RING_DATA_DIR, "*.csv"))

print("Found files:", len(csv_files))
csv_files[:5]    # show first 5


Found files: 70


['/content/drive/MyDrive/XAI/ring_data/ring_data_2025_15.csv',
 '/content/drive/MyDrive/XAI/ring_data/ring_data_2025_38.csv',
 '/content/drive/MyDrive/XAI/ring_data/ring_data_2025_3.csv',
 '/content/drive/MyDrive/XAI/ring_data/ring_data_2025_39.csv',
 '/content/drive/MyDrive/XAI/ring_data/ring_data_2025_35.csv']

Load and combine all CSVs


In [10]:

df_list = []

for file in csv_files:
    temp_df = pd.read_csv(file)
    df_list.append(temp_df)

df_all = pd.concat(df_list, ignore_index=True)

print(df_all.shape)
df_all.head()


KeyboardInterrupt: 

In [None]:
# Load and combine all CSVs from ring_data directory
df_list = []
for file in csv_files:
    df_list.append(pd.read_csv(file))

df = pd.concat(df_list, ignore_index=True)   # <-- rename combined dataset to df


In [None]:
df.info()
df.describe(include='all')
df.columns

Pivot from long → wide format

In [None]:
# Pivot long → wide
df_wide = df.pivot(index="timestamp_epoch", columns="data_type", values="value").reset_index()

# Convert epoch to datetime
df_wide["timestamp"] = pd.to_datetime(df_wide["timestamp_epoch"], unit="s")

# Sort
df_wide = df_wide.sort_values("timestamp").reset_index(drop=True)

df_wide.head()


Sort & Resample to 1-minute intervals

In [None]:
# Pivot long → wide
df_wide = df.pivot(index="timestamp_epoch", columns="data_type", values="value").reset_index()

# Convert epoch to datetime
df_wide["timestamp"] = pd.to_datetime(df_wide["timestamp_epoch"], unit="s")

# Sort
df_wide = df_wide.sort_values("timestamp").reset_index(drop=True)

df_wide.head()

# Use timestamp as index
df_wide = df_wide.set_index("timestamp")

# Resample to 1-minute intervals (fill missing values with forward-fill)
df_resampled = df_wide.resample("1T").mean().ffill().bfill()

df_resampled.head()


In [None]:
# Rolling features (5-minute window)
df_resampled["hr_rolling"]   = df_resampled["raw_hr"].rolling(5).mean()
df_resampled["hrv_rolling"]  = df_resampled["raw_hrv_2"].rolling(5).mean()
df_resampled["motion_roll"]  = df_resampled["raw_motion"].rolling(5).mean()
df_resampled["temp_roll"]    = df_resampled["temp"].rolling(5).mean()
df_resampled["rr_roll"]      = df_resampled["respiratory_rate"].rolling(5).mean()


In [None]:
df_resampled["stillness_index"] = 1 / (1 + df_resampled["raw_motion"])


In [None]:
df_resampled["slowing_score"] = (
    (df_resampled["hr_rolling"].max() - df_resampled["hr_rolling"]) +
    (df_resampled["hrv_rolling"] - df_resampled["hrv_rolling"].min())
)


In [None]:
df_resampled["awake_flag"] = (df_resampled["steps"] > 0).astype(int)


In [None]:
df_resampled.tail()


## Sleep Score Formula
Normalize features (Z-score)

In [None]:
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()

features_to_scale = [
    "hr_rolling", "hrv_rolling", "motion_roll",
    "rr_roll", "temp_roll"
]

df_scaled = df_resampled.copy()
df_scaled[features_to_scale] = scaler.fit_transform(df_scaled[features_to_scale])


Build a weighted sleep score

In [None]:
df_scaled["sleep_score_raw"] = (
    (-df_scaled["hr_rolling"] * 0.20) +
    ( df_scaled["hrv_rolling"] * 0.25) +
    (-df_scaled["motion_roll"] * 0.25) +
    (-df_scaled["rr_roll"] * 0.15) +
    (-df_scaled["temp_roll"] * 0.15)
)


Convert to a clean 0–100 scale

In [None]:
# Min-max scale to 0–100
score_min = df_scaled["sleep_score_raw"].min()
score_max = df_scaled["sleep_score_raw"].max()

df_scaled["sleep_score"] = (
    (df_scaled["sleep_score_raw"] - score_min) /
    (score_max - score_min)
) * 100


Check sleep_score

In [None]:
df_scaled[["sleep_score"]].describe()


Select Features & Prepare Train/Test Data

In [None]:
#Select features and target
feature_cols = ["hr_rolling", "hrv_rolling", "motion_roll", "rr_roll", "temp_roll"]
target_col = "sleep_score"

X = df_scaled[feature_cols]
y = df_scaled[target_col]

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

X.shape, y.shape


Train a Simple, Interpretable Model

In [None]:
#Train a Random Forest model
# Drop rows where sleep_score is NaN
df_model = df_scaled.dropna(subset=["sleep_score"])

# Then drop/fill remaining NaNs in features
df_model = df_model.fillna(method="ffill").fillna(method="bfill")

# Redefine X and y cleanly
X = df_model[feature_cols]
y = df_model[target_col]

# Train/test split again
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

X.shape, y.shape



In [None]:
#Train a Random Forest model
model = RandomForestRegressor(
    n_estimators=200,
    max_depth=6,
    random_state=42
)

model.fit(X_train, y_train)

print("Sleep Model trained!")


Evaluate the Model (RMSE + R²)

In [None]:
from sklearn.metrics import mean_squared_error, r2_score
import numpy as np

# Predictions
y_pred = model.predict(X_test)

# RMSE (manual)
rmse = np.sqrt(mean_squared_error(y_test, y_pred))

# R2
r2 = r2_score(y_test, y_pred)

print("Model Performance:")
print(f"RMSE: {rmse:.3f}")
print(f"R²:   {r2:.3f}")


Generate SHAP values

In [None]:
#Initialize SHAP explainer
explainer = shap.TreeExplainer(model)

# Compute SHAP values for the test set
shap_values = explainer.shap_values(X_test)

print("SHAP values computed!")


SHAP Summary Plot

In [None]:
shap.summary_plot(shap_values, X_test)


Convert SHAP values into confidence percentages


In [None]:

feature_names = X_test.columns.tolist()
shap_abs = np.abs(shap_single)

# Normalize to percentages
confidence_scores = shap_abs / shap_abs.sum() * 100

# Create a readable table
explanation_df = pd.DataFrame({
    "feature": feature_names,
    "shap_value": shap_single,
    "confidence_percent": confidence_scores
}).sort_values("confidence_percent", ascending=False)

explanation_df


Final Human interpretable text

In [None]:
#Build a natural language explanation

# Get predicted sleep score for this sample
predicted_score = model.predict(x_single)[0]

explanation_lines = []
for idx, row in explanation_df.iterrows():
    feature = row["feature"]
    direction = "increased" if row["shap_value"] > 0 else "decreased"
    percent = round(row["confidence_percent"], 1)

    explanation_lines.append(
        f"- {percent}% confidence: {feature} {direction} your sleep score"
    )

final_explanation = (
    f"Predicted Sleep Score for this night: {predicted_score:.1f}/100\n\n"
    "What affected your sleep the most:\n" +
    "\n".join(explanation_lines)
)

print(final_explanation)


Compute Baselines & Bedtime proxy

In [None]:
#Compute baseline HR and HRV for comparisons
baseline_hr = df_scaled["hr_rolling"].median()
baseline_hrv = df_scaled["hrv_rolling"].median()


In [None]:
# Late bedtime proxy
df_scaled["bedtime_proxy"] = df_scaled.index.hour

Caffeine proxy (evening HR + motion)

In [None]:
# Caffeine proxy (evening HR + motion)
df_scaled["caffeine_proxy"] = (
    df_scaled["hr_rolling"] * (df_scaled.index.hour >= 18)
    +
    df_scaled["motion_roll"] * (df_scaled.index.hour >= 18)
)


Alcohol Proxy

In [None]:
# Alcohol proxy
df_scaled["alcohol_proxy"] = (
    (df_scaled["temp_roll"] - df_scaled["temp_roll"].median()) +
    (df_scaled["hr_rolling"] - baseline_hr) -
    (df_scaled["hrv_rolling"] - baseline_hrv)
)


Stress Proxy

In [None]:
# Stress proxy
df_scaled["stress_proxy"] = (
    (df_scaled["hr_rolling"] / baseline_hr) -
    (df_scaled["hrv_rolling"] / baseline_hrv)
)


Update the Model to Include the 5 Lifestyle Proxies -- Defining new features set :)

In [None]:
df_scaled.columns


In [None]:
# Recreate activity proxy safely
if "steps" in df_scaled.columns:
    df_scaled["activity_proxy"] = df_scaled["steps"].fillna(0)
else:
    print("WARNING: 'steps' column missing!")

In [None]:
"activity_proxy" in df_scaled.columns


In [None]:
# STEP 8A: Updated feature list (physiology + lifestyle)
feature_cols = [
    "hr_rolling",
    "hrv_rolling",
    "motion_roll",
    "rr_roll",
    "temp_roll",
    "bedtime_proxy",
    "caffeine_proxy",
    "alcohol_proxy",
    "stress_proxy",
    "activity_proxy"
]

# Prepare data
X = df_scaled[feature_cols]
y = df_scaled["sleep_score"]

# Clean missing values again (safety)
X = X.fillna(method="ffill").fillna(method="bfill")
y = y.fillna(method="ffill").fillna(method="bfill")

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

X.shape, y.shape


In [None]:
feature_cols = [
    "hr_rolling",
    "hrv_rolling",
    "motion_roll",
    "rr_roll",
    "temp_roll",
    "bedtime_proxy",
    "caffeine_proxy",
    "alcohol_proxy",
    "stress_proxy",
    "activity_proxy"
]

X = df_scaled[feature_cols]
y = df_scaled["sleep_score"]

X = X.fillna(method="ffill").fillna(method="bfill")
y = y.fillna(method="ffill").fillna(method="bfill")

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

X.shape, y.shape


Retrain model with new physiology + lifestyle features

In [None]:
# Retrain model with new physiology + lifestyle features
model = RandomForestRegressor(
    n_estimators=300,
    max_depth=7,
    random_state=42
)

model.fit(X_train, y_train)

print("Model retrained with lifestyle features!")


Compute SHAP Values for the New Model

In [None]:
# New SHAP explainer for lifestyle model
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test)

print("SHAP values updated for lifestyle model!")


New SHAP summary plot

In [None]:
shap.summary_plot(shap_values, X_test)


Local SHAP explanations

In [None]:
# # Local SHAP explanation with confidence %

# # Pick one sample from the test set to explain
# # i = 19   # you can change this number to inspect different nights
# x_single = X_test.iloc[[i]]

# # SHAP values for this sample
# shap_single = explainer.shap_values(x_single)[0]

# # Absolute shap values → determine confidence
# shap_abs = np.abs(shap_single)
# confidence_scores = shap_abs / shap_abs.sum() * 100

# # Build explanation dataframe
# explanation_df = pd.DataFrame({
#     "feature": X_test.columns,
#     "shap_value": shap_single,
#     "confidence_percent": confidence_scores
# }).sort_values("confidence_percent", ascending=False)

# explanation_df


Narrative Explanation

In [None]:
# Choose a date you want to explain
date_str = "2025-11-06"   # <-- change this to any date in your dataset

target_date = pd.to_datetime(date_str).date()

# Filter rows in X_test that belong to that calendar date
mask = X_test.index.date == target_date
X_test_for_date = X_test[mask]

X_test_for_date.head()


In [None]:
# Pick first row from that date to explain
if len(X_test_for_date) == 0:
    print("No data in X_test for that date. Try another date.")
else:
    x_single = X_test_for_date.iloc[[0]]  # first row for that date
    print("Using this timestamp for explanation:", x_single.index[0])


In [None]:
# SHAP values for the selected date's row

shap_single = explainer.shap_values(x_single)[0]

# absolute shap values → determine confidence
shap_abs = np.abs(shap_single)
confidence_scores = shap_abs / shap_abs.sum() * 100

# Build the explanation dataframe
explanation_df = pd.DataFrame({
    "feature": X_test.columns,
    "shap_value": shap_single,
    "confidence_percent": confidence_scores
}).sort_values("confidence_percent", ascending=False)

explanation_df


In [None]:
# User-friendly narrative explanation for the selected date

# Predicted sleep score for this date
pred_score = model.predict(x_single)[0]

# Map features to friendly names
name_map = {
    "bedtime_proxy": "your bedtime",
    "caffeine_proxy": "caffeine or stimulation before sleep",
    "alcohol_proxy": "alcohol-like physiological effects",
    "stress_proxy": "your stress level",
    "activity_proxy": "your physical activity",

    "hr_rolling": "your nighttime heart rate",
    "hrv_rolling": "your HRV",
    "motion_roll": "your restlessness",
    "rr_roll": "your breathing rate",
    "temp_roll": "your body temperature"
}

# Build explanation lines
user_lines = []
for _, row in explanation_df.iterrows():
    feature = row["feature"]
    percent = round(row["confidence_percent"], 1)
    direction = "improved" if row["shap_value"] > 0 else "reduced"
    readable = name_map.get(feature, feature.replace("_", " "))

    user_lines.append(f"- {percent}% confidence → {readable} **{direction}** your sleep score")

# Build the final narrative
narrative = f"""
### Sleep Score Analysis for {date_str}

Your predicted sleep score for this night is **{pred_score:.1f}/100**.

Here’s what influenced your sleep the most:

{chr(10).join(user_lines)}

---

### Interpretation

These percentages show how strongly each factor contributed to your sleep score.
Higher percentages mean greater influence — positive or negative.

Lifestyle factors and physiological responses combine to form your nightly recovery:
- Stress raises heart rate & lowers HRV
- Caffeine elevates evening HR & delays sleep onset
- Alcohol elevates temperature & suppresses HRV
- Late bedtime shifts circadian alignment
- Activity improves sleep drive
- Physiological signals (HR, HRV, temperature, motion, breathing) show how your body responded


### Personalized Suggestions

"""

# Auto-suggestions based on negative contributors
for _, row in explanation_df.iterrows():
    feature = row["feature"]
    shap_val = row["shap_value"]

    if shap_val < 0:
        if feature == "stress_proxy":
            narrative += "- Try calming down pre-bedtime — your body showed nighttime stress.\n"
        elif feature == "caffeine_proxy":
            narrative += "- Reduce caffeine 6–8 hours before sleep — stimulant patterns were detected.\n"
        elif feature == "alcohol_proxy":
            narrative += "- Alcohol-like physiological markers appeared; these often hurt HRV.\n"
        elif feature == "bedtime_proxy":
            narrative += "- Your bedtime was later than your ideal rhythm.\n"
        elif feature == "activity_proxy":
            narrative += "- Low daytime activity reduced your sleep drive.\n"
        elif feature == "hr_rolling":
            narrative += "- Elevated heart rate reduced deep sleep potential.\n"
        elif feature == "hrv_rolling":
            narrative += "- Lower HRV indicates your recovery was impaired.\n"
        elif feature == "motion_roll":
            narrative += "- You experienced restlessness during sleep.\n"
        elif feature == "temp_roll":
            narrative += "- Higher temperature indicates stress, alcohol, or late meals.\n"

print(narrative)

Saving outputs to a file in the sleep_xai_outputs folder

In [None]:
df_scaled.to_csv(
    "/content/drive/MyDrive/XAI/sleep_xai_outputs/df_scaled.csv"
)

print("Saved df_scaled.csv")

In [None]:
joblib.dump(
    model,
    "/content/drive/MyDrive/XAI/sleep_xai_outputs/model.pkl"
)

print("Saved model.pkl")

In [None]:
joblib.dump(
    explainer,
    "/content/drive/MyDrive/XAI/sleep_xai_outputs/explainer.pkl"
)

print("Saved explainer.pkl")

In [None]:
import json

with open("/content/drive/MyDrive/XAI/sleep_xai_outputs/feature_cols.json", "w") as f:
    json.dump(feature_cols, f)

print("Saved feature_cols.json")

In [None]:
df_scaled.columns
