In [7]:
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy
from scipy import signal
import os


# Functions

## Read ppg signal csv

In [8]:
def funcReadSignal(path_ppg):
  df_ppg = pd.read_csv(path_ppg)
  signal_ppg = df_ppg['PPG'] # PPG value
  return signal_ppg

## Graph PPG Signal

In [27]:
def funcVisualizeSignal(ppg_signal, fs, label, scatterX = 0, scatterY = 0, scatter = False): # scatterX in samples --> generates times array = samples/fs
  L = len(ppg_signal)
  t = np.linspace(0,L/fs,L)
  plt.figure(figsize=(50,10))
  plt.plot(t,ppg_signal, 'y')
  if (scatter == True):
    plt.scatter(scatterX/fs, scatterY)
  plt.ylabel('Amplitude', fontsize=14)
  plt.xlabel('Time [s]', fontsize=14)
  plt.title(label, fontsize=14)
  plt.show()

## Filters

### BP Filter

In [10]:
def funcBPFilter(ppg_signal, order, f1, f2, fs):
  fc = [f1,f2]
  [b,a] = signal.butter(order, fc, btype = 'bandpass', analog = False, output = 'ba', fs = fs) # Butterworth Nth-order digital IIR filter design: returns the filter coefficients (Numerator (b) and denominator (a)) of the polynomials of the IIR filter
  signalBP = signal.filtfilt(b, a, ppg_signal) # Applies a linear digital filter forward and backward to the signal: the combined filter has zero phase and a filter order twice that of the original
  return signalBP

### LP Filter

In [11]:
def funcLPFilter(ppg_signal, order, fc, fs):
  [b,a] = signal.butter(order, fc, btype = 'lowpass', analog = False, output = 'ba', fs = fs)
  signalLP = signal.filtfilt(b, a, ppg_signal)
  return signalLP

### HP Filter

In [12]:
def funcHPFilter(ppg_signal, order, fc, fs):
  [b,a] = signal.butter(order, fc, btype = 'highpass', analog = False, output = 'ba', fs = fs)
  signalHP = signal.filtfilt(b, a, ppg_signal)
  return signalHP

### Adaptative Filter

In [29]:
def funcFilter(ppg_signal, fs, HR, order=3):
  if HR < 60:
    fmin = 0.1
    fmax = 1.5
  elif HR < 100:
    fmin = 0.35
    fmax = 2.5
  elif HR < 180:
    fmin = 0.5
    fmax = 3.5
  else:
    fmin = 1
    fmax = 4.5
  ppg_signal_HP = funcHPFilter(ppg_signal = ppg_signal, order = order, fc = fmin, fs = fs)
  ppg_signal_LP = funcLPFilter(ppg_signal = ppg_signal_HP, order = order, fc = fmax, fs = fs)
  return ppg_signal_LP

## Estimate HR

In [46]:
def funcFindPosition(array, value):
  for i, element in enumerate(array):
    if element >= value:
      return i

def funcEstimateHR(ppg_signal, f1, f2, fs):
  ppg_signal_BP = funcBPFilter(ppg_signal=ppg_signal, order=3, f1=f1, f2=f2, fs=fs)
  # FFT
  fft_result = np.fft.fft(ppg_signal_BP)
  fft_freqs = np.fft.fftfreq(len(ppg_signal_BP), 1/fs)
  # Find dominant frequency in the FFT that estimates the heart frequency --> 30bpm to 240 bpm = 0.5 to 4 Hz
  x_min = funcFindPosition(fft_freqs, 0.5)
  x_max = funcFindPosition(fft_freqs, 4)
  positive_freqs = fft_freqs[:len(fft_freqs)//2][x_min:x_max]
  positive_result = fft_result[:len(fft_result)//2][x_min:x_max]
  magnitude_spectrum = np.abs(positive_result)
  max_peak_index = np.argmax(magnitude_spectrum)
  dominant_freq = positive_freqs[max_peak_index]
  heart_rate = dominant_freq * 60  # Transform Hz to bpm

  # Plot FFT and dominant frequency
  plt.figure(figsize=(10,3))
  plt.plot(positive_freqs, magnitude_spectrum)
  plt.plot(positive_freqs[max_peak_index], magnitude_spectrum[max_peak_index], 'ro')
  plt.title(f"FFT - HR: {heart_rate:.2f} bpm")
  plt.xlabel("Frequency (Hz)")
  plt.grid()
  plt.xlim(0.5, 5)
  plt.tight_layout()
  plt.show()

  print(f"Estimated HR: {heart_rate:.2f} bpm")
  return heart_rate


## Find Pulse Peaks

In [15]:
def funcFindPeaks(ppg_signal):
  peaks = signal.find_peaks(ppg_signal)[0]
  return peaks

### Remove artifact and diastolic peaks:

In [38]:
def funRemoveUpStatePeaks(ppg_signal, locs_peaks_ppg, HR, fs):
  thd = int((60/HR)*fs/4)
  correct_locs_peaks_ppg = []
  correct_locs_peaks_ppg.append(locs_peaks_ppg[0])
  for loc in range(1, len(locs_peaks_ppg)):
    if (locs_peaks_ppg[loc]+thd) < len(ppg_signal):
        if ppg_signal[locs_peaks_ppg[loc]+thd] < ppg_signal[locs_peaks_ppg[loc]]:
            correct_locs_peaks_ppg.append(locs_peaks_ppg[loc])
    else:
        correct_locs_peaks_ppg.append(locs_peaks_ppg[loc])
  return np.array(correct_locs_peaks_ppg)

In [39]:
def funcRemoveDiastolicPeaks(ppg_signal, locs_peaks_ppg, HR, fs): # "funRemoveDownStatePeaks"
  thd = int((60/HR)*fs/4)
  locs_systolic_peaks_ppg = []
  locs_systolic_peaks_ppg.append(locs_peaks_ppg[0])
  for loc in range(1, len(locs_peaks_ppg)):
    if ppg_signal[locs_peaks_ppg[loc]-thd] < ppg_signal[locs_peaks_ppg[loc]]:
      locs_systolic_peaks_ppg.append(locs_peaks_ppg[loc])
  return np.array(locs_systolic_peaks_ppg)

In [34]:
def funcFindSystolicPeaks(ppg_signal, fs, HR):
  locs_peaks_ppg = funcFindPeaks(ppg_signal)
  locs_peaks_ppg = funRemoveUpStatePeaks(ppg_signal, locs_peaks_ppg, HR, fs)
  locs_systolic_peaks_ppg = funcRemoveDiastolicPeaks(ppg_signal, locs_peaks_ppg, HR, fs)
  return locs_systolic_peaks_ppg

## Find Pulse Onsets

In [32]:
def funcDetectPeakOnsets(ppg_signal, locs_peaks_ppg):
  locs_onsets_ppg = []
  for i in range(len(locs_peaks_ppg)-1):
    locs_onsets_ppg.append(np.argmin(ppg_signal[locs_peaks_ppg[i]:locs_peaks_ppg[i+1]])+locs_peaks_ppg[i])
  return np.array(locs_onsets_ppg)

## Measure HBI (Heart Beat Interval)

In [20]:
def funcHBI(locs_peaks_ppg):
  HBI = [j-i for i,j in zip(locs_peaks_ppg[0:-1], locs_peaks_ppg[1:])]
  return HBI

## Measure PPGA (PPG Amplitude)

In [44]:
def funcPPGA(ppg_signal, locs_peaks_ppg, locs_onsets_ppg):
  if locs_onsets_ppg[0]>locs_peaks_ppg[0]:
    locs_peaks_ppg = locs_peaks_ppg[1:]
  PPGA = [int(ppg_signal[peak]-ppg_signal[onset]) for peak,onset in zip(locs_peaks_ppg, locs_onsets_ppg)]
  return PPGA

## Graph Time Series

In [36]:
def funcPlotTimeSeries(array, label, title):
  plt.figure(figsize=(10,3))
  plt.plot(array)
  plt.title(title)
  plt.xlabel("Pulse Number")
  plt.ylabel(label)
  plt.grid()
  plt.tight_layout()
  plt.show()

# Signals

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
dataset_path = '/content/drive/MyDrive/PPG_Signal_Dataset/dataset'

In [None]:
fs = 500

for filename in os.listdir(dataset_path):
  subjectcode = filename.split('.')[0]
  print(subjectcode)

  # Load the signal form the csv
  path_ppg = os.path.join(dataset_path, filename)
  ppg_signal = funcReadSignal(path_ppg)

  # Plot the signal
  funcVisualizeSignal(ppg_signal = ppg_signal, fs = 500, label = subjectcode, scatterX = 0, scatterY = 0, scatter = False)

  # Estimate the heart rate based on the signal's spectrum
  HR = funcEstimateHR(ppg_signal=ppg_signal, f1=0.1, f2=5, fs=fs) # 30bpm to 240 bpm = 0.5 to 4 Hz --> filter from 0.1 to 5 Hz

  # HP Filter to remove the drift and LP Filter to remove high frequency noise (IIR Butterworth order 3 filters with cut off frequencies adaptative to the subject's heart rate)
  ppg_signal_filtered = funcFilter(ppg_signal=ppg_signal, fs=fs, HR=HR, order=3)
  funcVisualizeSignal(ppg_signal=ppg_signal_filtered, fs=fs, label=f"Filtered {subjectcode}")

  # Find systolic peaks
  locs_systolic_peaks_ppg = funcFindSystolicPeaks(ppg_signal=ppg_signal_filtered, fs=fs, HR=HR)
  funcVisualizeSignal(ppg_signal=ppg_signal_filtered, fs=fs, label=f"Systolic Peaks {subjectcode}", scatterX = locs_systolic_peaks_ppg, scatterY = ppg_signal_filtered[locs_systolic_peaks_ppg], scatter = True)

  # Find systolic peaks onsets
  locs_onsets_ppg = funcDetectPeakOnsets(ppg_signal=ppg_signal_filtered, locs_peaks_ppg=locs_systolic_peaks_ppg)
  funcVisualizeSignal(ppg_signal=ppg_signal_filtered, fs=fs, label=f"Systolic Peaks Onsets {subjectcode}", scatterX = locs_onsets_ppg, scatterY = ppg_signal_filtered[locs_onsets_ppg], scatter = True)

  # Obtain pulse amplitudes and heart beat intervals
  PPGA = funcPPGA(ppg_signal=ppg_signal_filtered, locs_peaks_ppg=locs_systolic_peaks_ppg, locs_onsets_ppg=locs_onsets_ppg)
  HBI = funcHBI(locs_peaks_ppg=locs_systolic_peaks_ppg)

  # Plot time series
  funcPlotTimeSeries(PPGA, label="PPGA", title=f"PPGA Time Series {subjectcode}")
  funcPlotTimeSeries(HBI, label="HBI", title=f"HBI Time Series {subjectcode}")

  print()