# HR + SpO2 Calibration
Using MAX30102 raw IR/red signals

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import butter, filtfilt, find_peaks
from scipy.optimize import curve_fit

plt.style.use("seaborn-v0_8")

# Load raw calibration data
FILE = "hr_spo2_calibration_data.csv"
df = pd.read_csv(FILE)

df.head()

## Preprocessing and Cleaning

In [None]:
# Basic cleaning
df = df.dropna()
df = df[(df.ir_raw > 0) & (df.red_raw > 0)]

# Create window labels based on time (2 seconds each)
df["timestamp"] = pd.to_datetime(df["timestamp"])
df = df.sort_values("timestamp")

window_length = 2.0   # seconds per calibration set
t0 = df.timestamp.iloc[0]

df["window"] = ((df.timestamp - t0).dt.total_seconds() // window_length).astype(int)

print("Number of windows:", df.window.nunique())

print("Samples:", len(df))
df.head()


## Data Visualization

In [None]:
plt.figure(figsize=(12,5))
plt.subplot(1,2,1)
df.true_hr.hist(bins=15)
plt.title("True HR Distribution")

plt.subplot(1,2,2)
df.true_spo2.hist(bins=15)
plt.title("True SpO2 Distribution")

plt.show()


## Bandpass Filter (0.5-5 Hz) for HR extraction

In [None]:
fs = 100  # sampling rate used during calibration
low, high = 0.5, 5

def butter_bandpass(low, high, fs, order=3):
    nyq = fs / 2
    b, a = butter(order, [low/nyq, high/nyq], btype="band")
    return b, a

def apply_filter(sig):
    b, a = butter_bandpass(low, high, fs)
    return filtfilt(b, a, sig)


## Filter Windows

In [None]:
filtered = []

for w in df.window.unique():
    chunk = df[df.window == w]
    ir = chunk.ir_raw.values
    red = chunk.red_raw.values
    
    if len(ir) <= 25:
        print(f"Skipping window {w}: too few samples ({len(ir)})")
        continue

    ir_f = apply_filter(ir)
    red_f = apply_filter(red)

    chunk = chunk.copy()
    chunk["ir_filt"] = ir_f
    chunk["red_filt"] = red_f
    filtered.append(chunk)

df_filt = pd.concat(filtered)
df_filt.head()


## Compute AC/DC + R Ratio

In [None]:
rows = []

for w in df_filt.window.unique():
    chunk = df_filt[df_filt.window == w]

    ir = chunk.ir_raw.values
    red = chunk.red_raw.values
    ir_f = chunk.ir_filt.values
    red_f = chunk.red_filt.values
    
    ir_dc = np.mean(ir)
    red_dc = np.mean(red)
    ir_ac = np.ptp(ir_f)  # peak-to-peak
    red_ac = np.ptp(red_f)

    R = (red_ac/red_dc) / (ir_ac/ir_dc)

    # true labels for this window
    hr = chunk.true_hr.values[0]
    spo2 = chunk.true_spo2.values[0]

    rows.append([w, ir_dc, red_dc, ir_ac, red_ac, R, hr, spo2])

features = pd.DataFrame(rows, columns=[
    "window", "ir_dc", "red_dc", "ir_ac", "red_ac", "R", "true_hr", "true_spo2"
])

features.head()


## HR Calibration

In [None]:
estimated_hr = []

for w in df_filt.window.unique():
    chunk = df_filt[df_filt.window == w]
    sig = chunk.ir_filt.values

    # Peak detection
    peaks, _ = find_peaks(sig, distance=fs*0.4)  # avoid detecting noise

    hr_est = len(peaks) * 60 / 2  # 2-second window
    true_hr = chunk.true_hr.values[0]

    estimated_hr.append([w, hr_est, true_hr])

hr_df = pd.DataFrame(estimated_hr, columns=["window", "hr_est", "true_hr"])
hr_df.head()


In [None]:
def linear_hr(x, a, b):
    return a*x + b

params_hr, _ = curve_fit(linear_hr, hr_df.hr_est, hr_df.true_hr)

a_hr, b_hr = params_hr
print("HR calibration model: true_hr = a*est + b")
print("a =", a_hr, "b =", b_hr)


## HR Calibration Plot

In [None]:
plt.figure(figsize=(6,6))
plt.scatter(hr_df.hr_est, hr_df.true_hr, label="Raw", alpha=0.6)
plt.plot(hr_df.hr_est, linear_hr(hr_df.hr_est, a_hr, b_hr), 'r', label="Fit")

plt.xlabel("Estimated HR from Peaks")
plt.ylabel("True HR")
plt.title("Heart Rate Calibration Curve")
plt.legend()
plt.grid()
plt.show()


## HR Curve Fit Evaluation

In [None]:
hr_pred = linear_hr(hr_df.hr_est, a_hr, b_hr)
hr_true = hr_df.true_hr

HR_MAE = np.mean(np.abs(hr_pred - hr_true))
HR_RMSE = np.sqrt(np.mean((hr_pred - hr_true)**2))
HR_R2 = 1 - np.sum((hr_pred - hr_true)**2) / np.sum((hr_true - np.mean(hr_true))**2)

print("HR Calibration Performance")
print(f"MAE:  {HR_MAE:.2f} bpm")
print(f"RMSE: {HR_RMSE:.2f} bpm")
print(f"R²:   {HR_R2:.3f}")


## SpO2 Calibration Using Standard Ratio-of-Ratios Model

In [None]:
def spo2_model(R, A, B):
    return A - B*R

params_spo2, _ = curve_fit(spo2_model, features.R, features.true_spo2)
A, B = params_spo2

print("SpO2 calibration model: SpO2 = A - B*R")
print("A =", A, "B =", B)


## SpO2 Calibration Plot

In [None]:
plt.figure(figsize=(6,6))
plt.scatter(features.R, features.true_spo2, label="Data", alpha=0.6)
plt.plot(features.R, spo2_model(features.R, A, B), 'r', label="Fit")

plt.xlabel("R Ratio")
plt.ylabel("True SpO2")
plt.title("SpO2 Calibration Curve")
plt.legend()
plt.grid()
plt.show()


## SpO2 Curve Fit Evaluation

In [None]:
spo2_pred = spo2_model(features.R, A, B)
spo2_true = features.true_spo2

SpO2_MAE = np.mean(np.abs(spo2_pred - spo2_true))
SpO2_RMSE = np.sqrt(np.mean((spo2_pred - spo2_true)**2))
SpO2_R2 = 1 - np.sum((spo2_pred - spo2_true)**2) / np.sum((spo2_true - np.mean(spo2_true))**2)

print("SpO2 Calibration Performance")
print(f"MAE:  {SpO2_MAE:.2f} %")
print(f"RMSE: {SpO2_RMSE:.2f} %")
print(f"R²:   {SpO2_R2:.3f}")


## Final Equations

In [None]:
print(f"// Heart Rate")
print(f"true_hr = {a_hr:.4f} * estimated_hr + {b_hr:.4f};")

print(f"\n// SpO₂")
print(f"spo2 = {A:.4f} - {B:.4f} * R;")

## Save Calibration Parameters

In [None]:
import json

cal = {
    "hr": {"a": float(a_hr), "b": float(b_hr)},
    "spo2": {"A": float(A), "B": float(B)}
}

with open("ppg_calibration_constants.json", "w") as f:
    json.dump(cal, f, indent=2)

print("Saved calibration constants → ppg_calibration_constants.json")
