This notebook demonstrates how to generate synthetic PPG signals.

In [None]:
%load_ext autoreload
%autoreload 2
%matplotlib widget

import os
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal

import sys
sys.path.append('../')
from src import synthetic_ppg, model_parameters, utils

plt.style.use('./article.mplstyle')

Generate a large number of random PPG signals.

In [None]:
# Output folder and filename.
output_folder = '../data'
filename = 'synthetic_ppg.npy'

In [None]:
# Default model parameters.
model_params = model_parameters.ModelParameters()
model_params.n = 100 # Demonstrate with a small number of signals.

synts, labels_feet, labels_peaks, noises = \
    synthetic_ppg.gen_rand_ppg_signals_parallel(model_params)

# Generate realistic noisy signals.
ppgs_raw = synts + noises

# Filter and normalize the signals.
ppgs = np.zeros(ppgs_raw.shape)
for i in range(len(ppgs)):
    ppgs[i] = utils.min_max_normalize(signal.sosfiltfilt(model_params.sos, ppgs_raw[i]), -1, 1)

In [None]:
def fix_feet_labels(s, labels, label_width, w):
    """
    Fix pulse waveform feet labels.
    """
    label_ones = np.argwhere(labels == 1.0).flatten()
    label_ones_idxs = np.argwhere(np.diff(label_ones) > 1).flatten() + 1
    label_ones_idxs = np.insert(label_ones_idxs, 0, 0)
    chr_points = label_ones[label_ones_idxs]

    labels_fixed = np.zeros(len(labels))
    for c in chr_points:
        s_i = max(0, c - w)
        s_e = min(len(s), c + w + label_width)
        loc_mins = signal.argrelmin(s[s_i:s_e])[0] + s_i
        if len(loc_mins) == 1:
            labels_fixed[loc_mins[0] - label_width // 2:loc_mins[0] + label_width // 2 + 1] = 1
        elif len(loc_mins) > 1:
            labels_fixed[loc_mins[-1] - label_width // 2:loc_mins[-1] + label_width // 2 + 1] = 1
        else:
            # Keep the original location as there is no idea of a better one.
            labels_fixed[c - label_width // 2:c + label_width // 2 + 1] = 1

    return labels_fixed

# The labels might not be accurate anymore due to addedd noise
# and filtering so we try to fix them.
labels_feet_fixed = np.zeros(ppgs.shape)
for i in range(len(ppgs)):
    labels_feet_fixed[i] = fix_feet_labels(ppgs[i], labels_feet[i], model_params.label_width, 9)

def create_labels(labels, label_width):
    """
    Creates new label array with a given number of ones
    marking a foot.
    """
    labels_new = np.zeros(labels.shape)
    for lf, lf_new in zip(labels, labels_new):
        for idx in np.where(lf == 1)[0]:
            s_idx = max(0, idx - label_width // 2)
            e_idx = min(len(lf), idx + label_width // 2 + 1)
            lf_new[s_idx:e_idx] = 1.0
    
    return labels_new

# Create label arrays with 5 ones marking a foot.
labels = create_labels(labels_feet, 5)
labels_fixed = create_labels(labels_feet_fixed, 5)

# Save the generated data.
data = np.array([synts, ppgs_raw, ppgs, labels, labels_fixed, noises, model_params], dtype=object)
if not os.path.isdir(output_folder):
        os.mkdir(output_folder)
with open(f'{output_folder}/{filename}', 'wb') as f:
    np.save(f, data, allow_pickle=True)

Plot a random signal.

In [None]:
synts, ppgs_raw, ppgs, labels, labels_fixed, noises, model_params = np.load(f'{output_folder}/{filename}', allow_pickle=True)

In [None]:
idx = np.random.randint(0, len(synts))

fig, axes = plt.subplots(3, 1, sharex=True)
fig.set_size_inches(6.5 / 2.54, 9 / 2.54)

axes[0].plot(synts[idx], linewidth=0.75)
axes[0].plot(labels[idx], c='tab:orange', linewidth=0.75)
axes[1].plot(noises[idx], linewidth=0.75)
axes[2].plot(utils.min_max_normalize(synts[idx] + noises[idx], -1, 1), linewidth=0.75)

title_pad = 0
axes[0].set_title('Clean PPG signal with label', 
    pad=title_pad)
axes[1].set_title('Noise', pad=title_pad)
axes[2].set_title('Realistic PPG signal', pad=title_pad)

axes[-1].set_xlabel('Sample')

for i in range(3):
    axes[i].spines[['right', 'top']].set_visible(False)

fig.tight_layout()

Create longer example signals.

In [None]:
# Create one synthetic signal. We will use only the clean signal, i.e. res_example[0].
model_params_example = model_parameters.ModelParameters()
model_params_example.n = 1
model_params_example.s_len = 2000
model_params_example.n_pulse_widths = 160
res_example = synthetic_ppg.gen_rand_ppg_signals(model_params_example)
# Create an array of timestamps [seconds].
ts_s = np.arange(model_params_example.s_len) / model_params_example.fs

# Create the noise signals with the same random seed to guarantee similar
# noise PSD sampling.
np.random.seed(0)
noises_example = np.zeros((3, model_params_example.s_len))
for i in range(3):
    noises_example[i], _, _ = synthetic_ppg.psd_to_time(model_params_example.s_len, 
                                model_params_example.fs, np.load(model_params_example.psd_interpolator_filenames[i], 
                                allow_pickle=True)[()])

# Scale the noises relative to the noise with the largest amplitude.
noises_amps = [np.max(s_noise) - np.min(s_noise) for s_noise in noises_example]
signals_example = np.zeros((4, model_params_example.s_len))
signals_example[0] = res_example[0][0]
for i in range(3):
    noises_example[i] = utils.min_max_normalize(noises_example[i], 0, noises_amps[i] / np.max(noises_amps))
    signals_example[i + 1] = res_example[0][0] + noises_example[i]

# Feet and peaks locations.
feet = np.argwhere(res_example[1][0]).flatten()
peaks = np.argwhere(res_example[2][0]).flatten()

# Plot the signals together with the feet and peaks.
titles = ['Clean', 'Sitting', 'Walking', 'Hand movement']
fig, axes = plt.subplots(4, 1, sharex=True)
fig.set_size_inches(12 / 2.54, 9 / 2.54)
for i in range(4):
    axes[i].plot(ts_s, signals_example[i], linewidth=1)
    axes[i].plot(ts_s[feet], signals_example[i][feet], marker='o', markeredgecolor='tab:red', markerfacecolor='none', linestyle='none')
    # Set y-tick to 1.
    y_min, y_max = 0, int(np.round(np.max(signals_example[i]) / 1, 0) * 1)
    axes[i].set_yticks(np.arange(y_min, y_max + 1, 1), np.arange(y_min, y_max + 1, 1))
    axes[i].set_title(titles[i], pad=0)
    axes[i].spines[['right', 'top']].set_visible(False)

axes[-1].set_xlabel('Time [s]')
fig.subplots_adjust(hspace=0.35)