**Team Name**: <br>
AlgoWizards <br><br>
**Team Members**: <br>
Momina Iffat Iftikhar 2412443 <br>
Muhammad Junaid Raza 2409917

In [75]:
import pickle
import numpy as npa
import matplotlib.pyplot as plt
import scipy.signal
from scipy.ndimage import median_filter
import os

# --- 1. Load ECG Data with HR Labels ---

In [78]:
file_path = os.path.join("data", "ecg_data_with_hr_labels.pkl")  

# Load ECG data
with open(file_path, "rb") as f:
    data = pickle.load(f)
    
# Extract signals and ground truth HR values
signals = data["signals"]
ground_truth_hr = data["hr_values"]

print(f"Loaded {len(signals)} ECG signals.")

Loaded 200 ECG signals.


# --- 2. Implement Your HR Extraction Algorithm Here ---

**Instruction:**

***Your algorithm should return a list of HR values where each HR value corresponds to an ECG signal. Ensure the length of the list is 200 (equal to number of signals). The list can include np.nan if your algorithm is not able to calculate HR for a signal.***

In [118]:
# References:
### https://www.ijariit.com/manuscripts/v4i4/V4I4-1527.pdf
### https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.savgol_filter.html
### https://stackoverflow.com/questions/67001293/savitzky-golay-filter-in-python-wrong-window-size

In [109]:
def extract_hr(ecg_signals, fs=200):
    detected_hr_values = [npa.nan] * len(ecg_signals)

    for i, signal in enumerate(ecg_signals):
        try:
            # === Step 1: Noise Reduction ===
            signal = median_filter(signal, size=3)  # Reduces baseline wander and noise with median filtering

            # Applying Notch filter (50 Hz) with a Q-factor of 35
            b_notch, a_notch = scipy.signal.iirnotch(50, 35, fs)
            signal = scipy.signal.filtfilt(b_notch, a_notch, signal)  # Removes powerline interference at 50 Hz

            # Bandpass filter (0.5 - 40 Hz)
            b, a = scipy.signal.butter(4, [0.5/(fs/2), 40/(fs/2)], btype='band')
            filtered_signal = scipy.signal.filtfilt(b, a, signal)  # Filters the signal to include only relevant frequencies

            # Smoothing using Savitzky-Golay filter
            smooth_signal = scipy.signal.savgol_filter(filtered_signal, window_length=15, polyorder=3)  # Smooths the signal for better peak detection

            # === Step 2: QRS Peak Detection ===
            diff_signal = npa.diff(smooth_signal, n=1)
            squared_signal = diff_signal ** 2  # Enhances the signal by squaring the derivative of the smoothed signal

            # Moving window integration with an optimized window size of 0.11s
            window_size = int(0.11 * fs)
            integrated_signal = npa.convolve(squared_signal, npa.ones(window_size)/window_size, mode='same')  # Integrates the squared signal for peak detection

            # Fine-tuned adaptive threshold
            threshold = npa.median(integrated_signal) + 0.65 * npa.std(integrated_signal)  # Dynamic threshold based on median and standard deviation
            peaks, _ = scipy.signal.find_peaks(integrated_signal, height=threshold, distance=int(fs // 2.5))  # Detect QRS peaks

            if len(peaks) < 2:
                detected_hr_values[i] = npa.nan
                continue

            # === Step 3: Compute RR Intervals ===
            rr_intervals = npa.diff(peaks) / fs  # Convert peak-to-peak distances into RR intervals in seconds

            # IQR Filtering: Tighter upper bound: `min(1.2, q3 + 1.4 * iqr)`
            q1, q3 = npa.percentile(rr_intervals, [25, 75])
            iqr = q3 - q1
            lower_bound = max(0.4, q1 - 1.5 * iqr)  # Exclude intervals that are too short (lower bound)
            upper_bound = min(1.2, q3 + 1.4 * iqr)  # Exclude intervals that are too long (upper bound)
            
            valid_rr = rr_intervals[(rr_intervals >= lower_bound) & (rr_intervals <= upper_bound)]  # Filter out outliers based on IQR

            if len(valid_rr) == 0:
                detected_hr_values[i] = npa.nan
                continue

            # Using trimmed Mean here instead of Median**
            valid_rr_sorted = npa.sort(valid_rr)
            trimmed_rr = valid_rr_sorted[int(0.1 * len(valid_rr)) : int(0.9 * len(valid_rr))]  # Remove extreme values by trimming 10% from both ends

            if len(trimmed_rr) == 0:
                detected_hr_values[i] = npa.nan
                continue

            avg_rr = npa.mean(trimmed_rr)  # Compute the mean RR interval after trimming for stability
            detected_hr_values[i] = int(60 / avg_rr)  # Calculate heart rate in beats per minute (BPM)

        except Exception as e:
            print(f"Error processing signal {i}: {e}")
            detected_hr_values[i] = npa.nan

    return detected_hr_values

# Run HR extraction
detected_hr_values = extract_hr(signals)

# --- 3. Evaluation of HR Extraction Algorithm ---

**Description:**

***Evaluates the performance of your HR extraction method using Mean Absolute Error (MAE).***

In [112]:
def evaluate_hr_extraction(detected_hr_values, ground_truth_hr):
    valid_indices = np.where(~np.isnan(detected_hr_values))[0].tolist()
    
    if len(valid_indices) == 0:
        return {"Mean Absolute Error": np.nan}
    
    absolute_errors = np.abs(np.array(ground_truth_hr)[valid_indices] - np.array(detected_hr_values)[valid_indices])
    mae = np.sum(absolute_errors) / len(absolute_errors)
    return {"Mean Absolute Error": mae}

In [114]:
# Evaluate performance of your HR extraction method
mae = evaluate_hr_extraction(detected_hr_values, ground_truth_hr)

print(mae)

{'Mean Absolute Error': 5.248226672141202}


***Run the above cells and check your evaluation score.***

***The final assessment is based on the MAE, the lowest MAE is the first rank in competition!***