# imports

In [12]:
!pip install neurokit2 pyhrv
!pip install peakutils


Collecting peakutils
  Downloading PeakUtils-1.3.5-py3-none-any.whl.metadata (1.6 kB)
Downloading PeakUtils-1.3.5-py3-none-any.whl (7.7 kB)
Installing collected packages: peakutils
Successfully installed peakutils-1.3.5


In [13]:
import pandas as pd
import numpy as np
from google.colab import drive
drive.mount('/content/drive')

df = pd.read_csv("/content/drive/MyDrive/project/ecg_to_emotion.csv")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [14]:
df.columns

Index(['Participant Id', 'Session ID', 'Video ID', 'Name', 'Age', 'Male',
       'Valence level', 'Arousal level', 'Dominance level', 'Happy', 'Sad',
       'Fear', 'Anger', 'Neutral', 'Disgust', 'Surprised', 'Familiarity Score',
       'Emotion', 'Valence', 'Arousal', 'Four_Label', 'raw data', 'Gender',
       'V_Label', 'A_Label', 'Four_Labels', 'R_Peaks', 'RR_Intervals',
       'RR_Intervals_ms', 'Cleaned_RR_Intervals_ms', 'Mean RR (ms)',
       'SDNN (ms)', 'Min RR (ms)', 'Max RR (ms)', 'Mean HR (bpm)',
       'STD HR (bpm)', 'Min HR (bpm)', 'Max HR (bpm)', 'RMSSD (ms)',
       'NN50 count', 'pNN50 (%)', 'LF power (ms^2)', 'HF power (ms^2)',
       'LF/HF ratio', 'Total Power (ms^2)', 'SD1 (ms)', 'SD2 (ms)', 'SD1/SD2',
       'Sample Entropy'],
      dtype='object')

In [27]:
df = df[['Happy', 'Sad', 'Anger', 'Neutral', 'raw data']]

import ast
df["raw data"] = df["raw data"].apply(ast.literal_eval)


# features

In [28]:
import numpy as np
import neurokit2 as nk
from pyhrv.frequency_domain import welch_psd
from pyhrv.nonlinear import sample_entropy

def extract_hrv_features(ecg_signal, sampling_rate=250, datasetId=None):

    ecg_signal = np.asarray(ecg_signal, dtype=np.float64)
    # --- 1. Pre-clean the ECG signal ---
    cleaned = nk.ecg_clean(ecg_signal, sampling_rate=sampling_rate, method="biosppy")

    # --- 2. R-peak detection using cleaned signal ---
    _, rpeaks_info = nk.ecg_peaks(cleaned, sampling_rate=sampling_rate)
    rpeaks = rpeaks_info["ECG_R_Peaks"]

    # --- 3. Compute RR intervals in ms ---
    rr_intervals = np.diff(rpeaks) / sampling_rate * 1000.0
    rr_intervals = rr_intervals[(rr_intervals >= 300) & (rr_intervals <= 2000)]

    # --- 4. Time-domain features ---
    mean_rr = float(np.mean(rr_intervals))
    median_rr = float(np.median(rr_intervals))
    sdrr = float(np.std(rr_intervals, ddof=0))
    diff_rr = np.diff(rr_intervals)
    rmssd = float(np.sqrt(np.mean(diff_rr**2))) if diff_rr.size > 0 else 0.0
    sdsd = float(np.std(diff_rr, ddof=0)) if diff_rr.size > 0 else 0.0
    sdrr_rmssd = float(sdrr / rmssd) if rmssd != 0 else 0.0
    hr = float(60000.0 / mean_rr) if mean_rr > 0 else 0.0
    abs_diff_rr = np.abs(diff_rr)
    pnn25 = float(100.0 * np.sum(abs_diff_rr > 25) / abs_diff_rr.size) if abs_diff_rr.size > 0 else 0.0
    pnn50 = float(100.0 * np.sum(abs_diff_rr > 50) / abs_diff_rr.size) if abs_diff_rr.size > 0 else 0.0

    # --- 5. Poincaré & shape ---
    sd1 = float(sdsd / np.sqrt(2.0))
    sd2 = float(np.sqrt(2.0 * (sdrr**2) - 0.5 * (sdsd**2))) if sdrr >= 0 and sdsd >= 0 else 0.0
    if rr_intervals.size > 0:
        z_rr = (rr_intervals - mean_rr) / np.std(rr_intervals, ddof=0)
        kurt = float(np.mean(z_rr**4) - 3)
        skew = float(np.mean(z_rr**3))
    else:
        kurt = 0.0
        skew = 0.0

    # --- 6. Relative RR ---
    if rr_intervals.size > 1:
        rel_rr = rr_intervals[1:] / rr_intervals[:-1]
    else:
        rel_rr = np.array([])

    if rel_rr.size > 0:
        mean_rel_rr = float(np.mean(rel_rr))
        median_rel_rr = float(np.median(rel_rr))
        sdrr_rel_rr = float(np.std(rel_rr, ddof=0))
        diff_rel_rr = np.diff(rel_rr)
        rmssd_rel_rr = float(np.sqrt(np.mean(diff_rel_rr**2))) if diff_rel_rr.size > 0 else 0.0
        sdsd_rel_rr = float(np.std(diff_rel_rr, ddof=0)) if diff_rel_rr.size > 0 else 0.0
        sdrr_rmssd_rel_rr = float(sdrr_rel_rr / rmssd_rel_rr) if rmssd_rel_rr != 0 else 0.0
        z_rel = (rel_rr - mean_rel_rr) / np.std(rel_rr, ddof=0)
        kurt_rel_rr = float(np.mean(z_rel**4) - 3)
        skew_rel_rr = float(np.mean(z_rel**3))
    else:
        mean_rel_rr = median_rel_rr = sdrr_rel_rr = rmssd_rel_rr = sdsd_rel_rr = 0.0
        sdrr_rmssd_rel_rr = kurt_rel_rr = skew_rel_rr = 0.0

    # --- 7. Frequency domain ---
    try:
        fd_res = welch_psd(nni=rr_intervals, show=False, show_param=False)
        vlf, lf, hf = fd_res['fft_abs']
        vlf_pct, lf_pct, hf_pct = fd_res['fft_rel']
        lf_nu, hf_nu = fd_res['fft_norm']
        tp = fd_res['fft_total']
        lf_hf = fd_res['fft_ratio']
        hf_lf = float(hf / lf) if lf > 0 else float('inf')
    except Exception:
        vlf = lf = hf = vlf_pct = lf_pct = hf_pct = lf_nu = hf_nu = tp = lf_hf = hf_lf = 0.0

    # --- 8. Nonlinear ---
    try:
        sampen_val = float(sample_entropy(nni=rr_intervals)['sampen'])
    except:
        sampen_val = 0.0
    try:
        hfd_val, _ = nk.fractal_higuchi(rr_intervals, show=False)
    except:
        hfd_val = 0.0

    return [
        mean_rr, median_rr, sdrr, rmssd, sdsd, sdrr_rmssd, hr, pnn25, pnn50,
        sd1, sd2, kurt, skew,
        mean_rel_rr, median_rel_rr, sdrr_rel_rr, rmssd_rel_rr, sdsd_rel_rr,
        sdrr_rmssd_rel_rr, kurt_rel_rr, skew_rel_rr,
        vlf, vlf_pct, lf, lf_pct, lf_nu,
        hf, hf_pct, hf_nu, tp, lf_hf, hf_lf,
        sampen_val, hfd_val, datasetId
    ]


In [30]:
# Define the HRV feature column names (excluding datasetId)
hrv_feature_names = [
    "MEAN_RR", "MEDIAN_RR", "SDRR", "RMSSD", "SDSD", "SDRR_RMSSD", "HR",
    "pNN25", "pNN50", "SD1", "SD2", "KURT", "SKEW",
    "MEAN_REL_RR", "MEDIAN_REL_RR", "SDRR_REL_RR", "RMSSD_REL_RR", "SDSD_REL_RR",
    "SDRR_RMSSD_REL_RR", "KURT_REL_RR", "SKEW_REL_RR",
    "VLF", "VLF_PCT", "LF", "LF_PCT", "LF_NU",
    "HF", "HF_PCT", "HF_NU", "TP", "LF_HF", "HF_LF",
    "sampen", "higuci"
]

# Create a list to store features for each row
hrv_features_all = []

# Apply the extraction row by row
for i, row in df.iterrows():
    ecg = row["raw data"]
    features = extract_hrv_features(ecg_signal=ecg, sampling_rate=250, datasetId=None)
    hrv_features_all.append(features[:-1])  # exclude datasetId

# Create a DataFrame from the extracted features
hrv_features_df = pd.DataFrame(hrv_features_all, columns=hrv_feature_names)

# Concatenate to original DataFrame
df = pd.concat([df.reset_index(drop=True), hrv_features_df.reset_index(drop=True)], axis=1)

# View the updated DataFrame
print(df.head())


      Happy       Sad     Anger   Neutral  \
0  Moderate   VeryLow   VeryLow  VeryHigh   
1   VeryLow      High  VeryHigh   VeryLow   
2  Moderate  Moderate       Low  Moderate   
3  Moderate  Moderate   VeryLow       Low   
4   VeryLow   VeryLow      High   VeryLow   

                                            raw data  length     MEAN_RR  \
0  [16.51, 16.496, 16.5, 16.492, 16.492, 16.494, ...    5000  652.137931   
1  [16.51, 16.496, 16.5, 16.492, 16.492, 16.494, ...    5000  652.137931   
2  [11.273, 11.265, 11.274, 11.272, 11.269, 11.27...    5000  664.571429   
3  [10.049, 10.065, 10.042, 10.008, 10.065, 10.07...    5000  712.153846   
4  [9.8221, 9.8296, 9.8246, 9.8414, 9.8504, 9.820...    5000  688.857143   

   MEDIAN_RR       SDRR      RMSSD  ...  LF_PCT  LF_NU   HF  HF_PCT  HF_NU  \
0      656.0  31.679128  18.973666  ...     0.0    0.0  0.0     0.0    0.0   
1      656.0  31.679128  18.973666  ...     0.0    0.0  0.0     0.0    0.0   
2      660.0  37.731598  21.895374  ..

# agumation

In [39]:
import pandas as pd
import numpy as np


# Emotion category mapping
emotion_mapping = {
    "VeryLow": 0,
    "Low": 1,
    "Moderate": 2,
    "High": 3,
    "VeryHigh": 4
}

# Emotion and feature columns
emotion_cols = ["Happy", "Sad", "Anger", "Neutral"]
feature_cols = [
    "MEAN_RR", "MEDIAN_RR", "SDRR", "RMSSD", "SDSD", "SDRR_RMSSD", "HR",
    "pNN25", "pNN50", "SD1", "SD2", "KURT", "SKEW",
    "MEAN_REL_RR", "MEDIAN_REL_RR", "SDRR_REL_RR", "RMSSD_REL_RR", "SDSD_REL_RR",
    "SDRR_RMSSD_REL_RR", "KURT_REL_RR", "SKEW_REL_RR",
    "VLF", "VLF_PCT", "LF", "LF_PCT", "LF_NU",
    "HF", "HF_PCT", "HF_NU", "TP", "LF_HF", "HF_LF",
    "sampen", "higuci"
]

# Drop rows with missing data
df_clean = df.dropna(subset=emotion_cols + feature_cols).copy()

# Convert emotion categories to integers
for col in emotion_cols:
    df_clean[col] = df_clean[col].map(emotion_mapping)

# Convert features and labels to arrays
X_orig = df_clean[feature_cols].values.astype(float)
y_orig = df_clean[emotion_cols].values.astype(int)

# Augmentation parameters
n_augment = 3
noise_level = 0.02

# Apply augmentation
X_augmented = []
y_augmented = []

for i in range(len(X_orig)):
    for _ in range(n_augment):
        noise = np.random.normal(0, noise_level, X_orig.shape[1])
        X_augmented.append(X_orig[i] + noise)
        y_augmented.append(y_orig[i])

# Convert to DataFrame
X_aug_df = pd.DataFrame(X_augmented, columns=feature_cols)
y_aug_df = pd.DataFrame(y_augmented, columns=emotion_cols)

# Merge original and augmented
final_df = pd.concat([
    pd.concat([df_clean[feature_cols], df_clean[emotion_cols]], axis=1),
    pd.concat([X_aug_df, y_aug_df], axis=1)
], ignore_index=True)

# Show result
print("✅ Final augmented dataset shape:", final_df.shape)



✅ Final augmented dataset shape: (1624, 38)


In [40]:
from google.colab import files

# Save DataFrame again (if needed)
df.to_csv("hrv_features_to_emotions.csv", index=False)

# Download the file
files.download("hrv_features_to_emotions.csv")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>