<a href="https://colab.research.google.com/github/sajjkavinda/blood-pressure-estimation/blob/main/blood_pressure_estimation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Notebook** **Preparation**

In [None]:
import os
import zipfile
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.io import wavfile
from scipy.signal import stft
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import Ridge, Lasso
from sklearn.svm import SVR
from sklearn.tree import DecisionTreeRegressor
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, ConstantKernel as C, WhiteKernel
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import learning_curve

In [None]:
!pip install openpyxl

In [None]:
from google.colab import drive
drive.mount('/content/drive')

**Dataset Preparation**

In [None]:
#Extract the dataset zip
zip_path = "/content/drive/MyDrive/PCG+BP-Dataset/Dataset_PCG_signals.zip"
extract_dir = "/content/pcg_bp_dataset"

os.makedirs(extract_dir, exist_ok=True)

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_dir)

print("Directory contents:", os.listdir(extract_dir))

In [None]:
#List the reference csv files
data_dir = "/content/pcg_bp_dataset/Dataset_PCG_signals copy"

#Load BP labels
bp_file = os.path.join(data_dir, "Participant_BP_Label.xlsx")
df_bp = pd.read_excel(bp_file)
print("BP labels head:")
print(df_bp.head())

#Load demographics for each patient
info_file = os.path.join(data_dir, "Information_78_participant.xlsx")
df_info = pd.read_excel(info_file)
print("Demographics head:")
print(df_info.head())

In [None]:
#Primary key
key_col = "filename"

df_meta = pd.merge(df_bp, df_info, on=key_col)
print(df_meta.head())
print(df_meta.shape)

# Paths
root_path = "/content/pcg_bp_dataset/Dataset_PCG_signals copy"
bandpass_path = os.path.join(root_path, "Bandpass_signal", "78_participants_BPF_7_segment")
original_11seg_path = os.path.join(root_path, "Original_signal", "78_participants_11_segment")
original_7seg_path = os.path.join(root_path, "Original_signal", "78_participants_7_segment")
original_full_path = os.path.join(root_path, "Original_signal", "Original_78_participants")

In [None]:
#MAP the two CSV files and the folders to be used in the training

#BP files
bp_file = os.path.join(root_path, "Participant_BP_Label.xlsx")
bp_df = pd.read_excel(bp_file, header=0)
bp_df.columns = bp_df.columns.str.strip()
print("BP columns:", bp_df.columns)

#Demographics files
demo_file = os.path.join(root_path, "Information_78_participant.xlsx")
demo_df = pd.read_excel(demo_file, header=0)
demo_df.columns = demo_df.columns.str.strip()
print("Demographics columns:", demo_df.columns)

#Mapping dictionories
bp_map = {row['filename'].lower(): {'systolic': row['SYS'], 'diastolic': row['DAI']}
          for _, row in bp_df.iterrows()}

demo_map = {row['filename'].lower(): {'age': row['age'], 'gender': row['gender'],
                                      'pluse': row['pluse'], 'PtP': row['PtP'],
                                      'W': row['W'], 'H': row['H']}
            for _, row in demo_df.iterrows()}

#Find each signal data in WAV format
def get_wav_files(folder):
    wavs = []
    for root, dirs, files in os.walk(folder):
        for f in files:
            if f.lower().endswith('.wav'):
                wavs.append(os.path.join(root, f))
    return wavs

#Build a dataframe from WAV files and dictionories
def build_df(wav_list, bp_map, demo_map, segment_type="Bandpass"):
    data = []
    for wav_path in wav_list:
        fname = os.path.basename(wav_path).lower()
        parent_folder = os.path.basename(os.path.dirname(wav_path)).lower()

        participant_num = ''.join(filter(str.isdigit, parent_folder))
        excel_fname = f"participant_{participant_num}.wav"

        bp_info = bp_map.get(excel_fname, None)
        demo_info = demo_map.get(fname, None)

        data.append({
            "filename": fname,
            "wav_path": wav_path,
            "bp_filename": excel_fname,
            "systolic": bp_info['systolic'] if bp_info else None,
            "diastolic": bp_info['diastolic'] if bp_info else None,
            "age": demo_info['age'] if demo_info else None,
            "gender": demo_info['gender'] if demo_info else None,
            "pluse": demo_info['pluse'] if demo_info else None,
            "PtP": demo_info['PtP'] if demo_info else None,
            "W": demo_info['W'] if demo_info else None,
            "H": demo_info['H'] if demo_info else None,
            "segment_type": segment_type
        })
    return pd.DataFrame(data)

#Use the functions for each folder contains signal data
bandpass_wavs = get_wav_files(bandpass_path)
orig11_wavs = get_wav_files(original_11seg_path)
orig7_wavs = get_wav_files(original_7seg_path)
full_wavs = get_wav_files(original_full_path)

#Use the function to build the dataframe
df_bandpass = build_df(bandpass_wavs, bp_map, demo_map, segment_type="Bandpass")
df_orig11 = build_df(orig11_wavs, bp_map, demo_map, segment_type="Original_11")
df_orig7 = build_df(orig7_wavs, bp_map, demo_map, segment_type="Original_7")
df_full = build_df(full_wavs, bp_map, demo_map, segment_type="Original_Full")

#Combine all to be used in the pipeline
combined_df = pd.concat([df_bandpass, df_orig11, df_orig7, df_full], ignore_index=True)

combined_df[['age','gender','pluse','PtP','W','H']] = combined_df.apply(
    lambda row: pd.Series(demo_map.get(row['bp_filename'].lower(), {
        'age': None, 'gender': None, 'pluse': None, 'PtP': None, 'W': None, 'H': None
    })), axis=1
)

print(combined_df.head(10))
print("Total WAV entries:", len(combined_df))

**Feature Extraction**

In [None]:
#Feature extraction using STFT
#Learning source: https://www.mathworks.com/help/signal/ref/stft.html
#Used chatgpt tool to understand and experiment the best way to use STFT for this study
def extract_stft_features(wav_path, nperseg=256):
    sr, y = wavfile.read(wav_path)
    y = y.astype(float)
    f, t, Zxx = stft(y, fs=sr, nperseg=nperseg)
    power = np.abs(Zxx)**2
    features = {
        'stft_mean': np.mean(power),
        'stft_std': np.std(power),
        'stft_max': np.max(power),
        'stft_median': np.median(power)
    }
    return features

#Apply the feature extraction function to the dataframe
stft_features_list = []

for path in combined_df['wav_path']:
    feats = extract_stft_features(path)
    stft_features_list.append(feats)

stft_df = pd.DataFrame(stft_features_list)

#Combine final dataframe
final_df = pd.concat([combined_df.reset_index(drop=True), stft_df], axis=1)

print(final_df.head())
print("Final shape:", final_df.shape)

In [None]:
#Features and targets
features = final_df[['age','gender','pluse','PtP','W','H','stft_mean','stft_std','stft_max','stft_median']]
features['gender'] = features['gender'].astype(int)

X = features.values
y = final_df['systolic'].values

#Standardize features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

#Split the dataset into train and test
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)

**Customize the models to be used in the pipeline**

In [None]:
#Training models

#Ridge Regression
ridge_reg = Ridge(
    alpha=3.0,
    solver='cholesky',
    random_state=42
)

#Lasso Regression
lasso_reg = Lasso(
    selection="cyclic",
    alpha=0.1,
    max_iter=20000,
    random_state=42
)

#Support Vector Regression
svr_reg = SVR(
    kernel='rbf',
    C=120,
    gamma="scale",
    epsilon=2.0
)

#Decision Tree Regressor
dt_reg = DecisionTreeRegressor(
    max_depth=9,
    min_samples_split=8,
    min_samples_leaf=4,
    ccp_alpha=0.0005,
    random_state=42
)

#Gaussian Process Regressor
gpr_kernel = (
    C(1.0, (1e-2, 1e3)) *
    RBF(
        length_scale=0.5,
        length_scale_bounds=(1e-3, 1e3)
    )
) + WhiteKernel(
    noise_level=1e-3,
    noise_level_bounds=(1e-5, 1e1)
)

gpr_reg = GaussianProcessRegressor(
    kernel=gpr_kernel,
    alpha=0.0,
    normalize_y=True,
    random_state=42
)

models = {
    'Ridge Regression': ridge_reg,
    'Lasso Regression': lasso_reg,
    'Support Vector Regression': svr_reg,
    'Decision Tree Regressor': dt_reg,
    'Gaussian Process Regressor': gpr_reg
}

**Train and test**

In [None]:
#Train and test
results = []

for name, model in models.items():
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    mse = mean_squared_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)
    results.append({'Model': name, 'MSE': mse, 'R2': r2})
    print(f"{name} --> MSE: {mse:.2f}, R2: {r2:.2f}")

#Results
results_df = pd.DataFrame(results)
print("\nSummary of model performance:")
print(results_df)

**Learning Curve Analysis**

In [None]:
#Learning Curve Analysis
def plot_learning_curve(model, X, y, model_name, cv=5, scoring='neg_mean_squared_error'):
    train_sizes, train_scores, test_scores = learning_curve(
        model,
        X, y,
        cv=cv,
        scoring=scoring,
        train_sizes=np.linspace(0.1, 1.0, 6),
        n_jobs=-1,
        shuffle=True,
        random_state=42
    )

    train_scores_mean = -np.mean(train_scores, axis=1)
    test_scores_mean  = -np.mean(test_scores, axis=1)

    plt.figure(figsize=(8, 5))
    plt.plot(train_sizes, train_scores_mean, 'o-', label="Training MSE")
    plt.plot(train_sizes, test_scores_mean, 'o-', label="Validation MSE")
    plt.title(f"Learning Curve: {model_name}")
    plt.xlabel("Training Examples")
    plt.ylabel("Mean Squared Error")
    plt.legend()
    plt.grid(True)
    plt.show()

for name, model in models.items():
    print(f"\n=== Learning Curve for {name} ===")
    plot_learning_curve(model, X, y, name)