# Heart Rate Feature Extraction

In the previous section we cleaned the ECG data from WESAD, so that we can now extract the features of interest. Specifically, we are interested in extracting heart rate (HR) and heart rate variability features, in order to differentiate stressful sessions from baseline cardiac activity. Let us first build the algorithms using a sample ECG signal from WESAD:

In [None]:
import numpy as np, os

clean_path = "WESAD_clean/"
subject = "S10"
session = "baseline"

# Load the 1D np.array using os.path.join and np.load
filepath = os.path.join(clean_path, f"{subject}_{session}.npy")
sample_ecg = np.load(filepath)[10000:13000]

print(sample_ecg.shape)

Heart rate is the estimated number of heartbeats per minute, and it is computed simply by
1. locating the time points of R peaks
2. counting the number of R peaks in the signal
3. dividing this number by the signal length (in minutes -> BPM)

Let us first detect the R peaks in the given ECG signal using the ``scipy.signal`` library:

In [None]:
from scipy.signal import find_peaks

peaks, _ = find_peaks(
    sample_ecg,     # the ECG signal
    height=0.1,     # minimum peak height (safe value for normalized ECG is 0.1)
    threshold=0.01, # minimum peak height difference to neighbors (safe value is 0.01)
    distance=30     # minimum distance between each peak (30 samples (~0.3s) is safe for 100Hz)
)

# Visualize the peaks we calculated
import matplotlib.pyplot as plt

plt.figure(figsize=(20, 3))
plt.plot(sample_ecg)
plt.plot(peaks, sample_ecg[peaks], marker="o", color="red", ls="")
plt.title("ECG with R-peaks")
plt.xlabel("Sample number")
plt.ylabel("Amplitude")
plt.show()

We can now count the number of peaks in the interval we have and calculate the heart rate:

In [None]:
# TODO: Count the number of peaks in <sample_ecg>
num_peaks = ...
print(f"Number of peaks: {num_peaks}")

# TODO: Calculate the heart rate in beats per MINUTE (BPM)
signal_len = ...
heart_rate = ...
print(f"Heart rate: {heart_rate:.2f} BPM")

Let us now verify our algorithm by comparing what we got with a popular ECG library:

In [None]:
import neurokit2 as nk

# calculate the heart rate using the `nk.ecg_rate()` function
_, rpeaks = nk.ecg_peaks(sample_ecg, sampling_rate=100)
heart_rate = nk.ecg_rate(rpeaks, sampling_rate=100, desired_length=len(sample_ecg))
overall_heart_rate = np.mean(heart_rate)
print(f"Benchmark heart rate: {overall_heart_rate:.2f} BPM")

Your estimation should be pretty close to the benchmark one (+- 0.5 BPM). Any small differences are drawn from the library's behavior to estimate heart rate per individual sample. If you want, you can replicate that behavior by applying your algorithm with a sliding window over the ECG signal. This is however out of the scope of this assignment.

Now that we established the algorithm for heart rate extraction, let us build it in a separate function:

In [None]:
def calculate_heart_rate(signal, sampling_rate=100):
    # TODO: Write the function
    ...
    return heart_rate

Great, now let's build a feature extraction function for HRV. There are multiple HRV measures that we can use. One of the most popular is called RMSSD, i.e., the RMS value of the successive differences between heartbeats (R peaks). The name is self-explanatory, hence let's compute it based on what we know:

In [None]:
# TODO: Find the peaks in the ECG signal
peaks, _ = ...
# TODO: Calculate the RR interval duration in MILISECONDS using np.diff()
time_diff = ...
# TODO: calculate the difference between successive intervals
time_diff_intervals = ...
# TODO: calculate the RMS value of the differences
rmssd = ...

print(f"RMSSD: {rmssd:.2f} ms")

Let us now verify our algorithm by comparing what we got with Neurokit:

In [None]:
peaks_nk, _ = nk.ecg_peaks(sample_ecg, sampling_rate=100)
hrv = nk.hrv_time(peaks_nk, sampling_rate=100, show=False)
rmssd = hrv["HRV_RMSSD"].values[0]
print(f"Benchmark RMSSD: {rmssd:.2f} ms")

Your estimation should match the benchmark (acceptable error +- 0.2 ms). Let's now write the HRV extraction function:

In [None]:
def calculate_hrv(signal, sampling_rate=100):
    # TODO: Write the function
    ...
    return rmssd

We have successfully built the feature extraction functions for our task. To finish this assignment, we need to perform the feature extraction over the whole clean dataset. To increase the size for our experiments, we will slice each clean ECG into subsignals of 5 minutes, with 20% overlap, and compute the features on those intervals. Let us code a function for that:

In [None]:
def slice_signal(signal, slice_len, overlap):
    # calculate the number of slices we can take
    num_slices = (len(signal) - slice_len) // (slice_len - overlap) + 1
    # constrain the signal
    # create a 2D array to hold the slices
    slices = np.zeros((num_slices, slice_len))
    for i in range(num_slices):
        # calculate the start and end of the current slice
        start = i * (slice_len - overlap)
        end = start + slice_len
        # store the slice in the 2D array
        slices[i] = signal[start:end]
    return slices

# test the function on the whole sample ECG signal
sample_ecg = np.load(filepath) # see top of the notebook
slice_len = 5 * 60 * 100 # 5 minutes of ECG signal
overlap = 1 * 60 * 100 # 1 minute of overlap

slices = slice_signal(sample_ecg, slice_len, overlap)
print(slices.shape)

Hopefully the function works as expected. Now let's code the complete feature (and label) extraction process:

In [None]:
features, labels = [], []
for npy_file in os.listdir(clean_path):
    # TODO: Load the data file using np.load
    data = ...

    # TODO: Slice the data into 5-minute segments with 1-minute overlap
    slice_len = ...
    overlap = ...
    slices = slice_signal(data, slice_len, overlap)

    # TODO: Calculate the heart rate features and labels
    for i, slice in enumerate(slices):
        # feature information
        hr = ...
        rmssd = ...
        features.append([hr, rmssd])

        # label information
        pid = npy_file.split("_")[0]
        pid = int(pid[1:])
        session = npy_file.split("_")[1].split(".")[0]
        session = 0 if session == "baseline" else 1
        labels.append([pid, session])

# TODO: Save the features and labels to a file
np.save("final_features.npy", np.array(features))
np.save("final_labels.npy", np.array(labels))

print("Done!")

That is the end of this notebook. Make sure the generated file has the correct shape. You can then return and proceed to the next part of the assignment.

In [None]:
check_data = np.load("final_features.npy")
assert check_data.shape == (90, 2), "The shape of the data is incorrect :("
print("All good!")