In [34]:
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib qt


# Load data
data = pd.read_excel("data.xlsx", header=None)  # Replace with your file path
signal = data.iloc[1 :, 0]  # Assuming signal is in the first column

# Plot signal
plt.plot(signal)
plt.title("Signal Plot")
plt.xlabel("Sample Index")
plt.ylabel("Amplitude")
plt.show()


In [31]:
import numpy as np
from scipy.signal import find_peaks, butter, filtfilt

def bandpass_filter(data, lowcut, highcut, fs, order=4):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    return filtfilt(b, a, data)

ppg_signal = signal  
fs = 50  
filtered_signal = bandpass_filter(ppg_signal, 0.67, 4, fs)

peaks, _ = find_peaks(filtered_signal, distance=fs*0.5)  # Minimum 0.5s apart

intervals = np.diff(peaks) / fs  
heart_rate = 60 / np.mean(intervals)

print(f"Estimated Heart Rate: {heart_rate:.2f} bpm")


Estimated Heart Rate: 82.87 bpm


In [30]:
plt.figure(figsize=(12,4))
plt.plot(filtered_signal)
plt.title("Signal Plot")
plt.xlabel("Sample Index")
plt.ylabel("Amplitude")
plt.show()

In [33]:
import heartpy as hp

wd, m = hp.process(filtered_signal, 50)

#plot
plt.figure(figsize=(12,4))
hp.plotter(wd, m)

#display measures computed
for measure in m.keys():
    print('%s: %f' %(measure, m[measure]))

bpm: 86.474501
ibi: 693.846154
sdnn: 66.587724
sdsd: 70.492097
rmssd: 83.931189
pnn20: 0.361111
pnn50: 0.222222
hr_mad: 20.000000
sd1: 59.160798
sd2: 62.858846
s: 11682.890365
sd1/sd2: 0.941169
breathingrate: 0.258684


In [43]:
import numpy as np
from scipy.signal import firwin, filtfilt
from scipy.interpolate import interp1d
import scipy.fft as fft

def preprocess_ppg(ppg_signal, sampling_rate):
    """Apply FIR bandpass filter to remove noise."""
    nyquist = 0.5 * sampling_rate
    low_cutoff = 0.5 / nyquist
    high_cutoff = 3.0 / nyquist
    fir_coeff = firwin(200, [low_cutoff, high_cutoff], pass_zero=False)
    filtered_signal = filtfilt(fir_coeff, [1.0], ppg_signal)
    return filtered_signal

def incremental_merge_segmentation(ppg_signal, segment_size):
    """Extract RIAV using Incremental Merge Segmentation."""
    slopes = []
    line_segments = []
    for i in range(0, len(ppg_signal) - segment_size, segment_size):
        start = i
        end = i + segment_size
        slope = (ppg_signal[end - 1] - ppg_signal[start]) / segment_size
        if not slopes or np.sign(slope) == np.sign(slopes[-1]):
            if line_segments:
                line_segments[-1].append(end)
            else:
                line_segments.append([start, end])
            slopes.append(slope)
        else:
            slopes.append(slope)
            line_segments.append([start, end])
    return line_segments

def adaptive_thresholding(line_segments, ppg_signal, amp_thresh_low=None, amp_thresh_high=None):
    """Apply adaptive thresholding to remove artifacts."""
    if amp_thresh_low is None or amp_thresh_high is None:
        amplitudes = [np.abs(ppg_signal[seg[-1]] - ppg_signal[seg[0]]) for seg in line_segments]
        median_amp = np.median(amplitudes)
        amp_thresh_low = 0.5 * median_amp
        amp_thresh_high = 1.5 * median_amp

    true_lines = []
    for seg in line_segments:
        start, end = seg[0], seg[-1]
        amp = np.abs(ppg_signal[end] - ppg_signal[start])
        if amp_thresh_low < amp < amp_thresh_high:
            true_lines.append((start, end))
    return true_lines

def extract_riav(true_lines, ppg_signal):
    """Extract RIAV signal from the thresholded segments."""
    riav_signal = [ppg_signal[end] - ppg_signal[start] for start, end in true_lines]
    times = [end for _, end in true_lines]
    return riav_signal, times

def uniform_interval_interpolation(riav_signal, times, new_sampling_rate):
    """Interpolate RIAV signal to a uniform interval."""
    total_duration = times[-1]
    uniform_times = np.linspace(0, total_duration, int(total_duration * new_sampling_rate))
    interpolator = interp1d(times, riav_signal, kind='linear', fill_value="extrapolate")
    uniform_signal = interpolator(uniform_times)
    return uniform_signal, uniform_times

def estimate_respiration_rate(interpolated_signal, sampling_rate):
    """Estimate RR using FFT on the interpolated RIAV signal."""
    fft_result = np.abs(fft.fft(interpolated_signal))
    freq = fft.fftfreq(len(interpolated_signal), d=1/sampling_rate)
    # Only consider positive frequencies within the respiratory range
    freq_range = (freq > 0.1) & (freq < 0.6)
    dominant_freq = freq[freq_range][np.argmax(fft_result[freq_range])]
    respiration_rate = dominant_freq * 60  # Convert to bpm
    return respiration_rate

# Example usage
if __name__ == "__main__":
    # Simulate a PPG signal (replace with actual data)
    t = np.linspace(0, 10, 500)  # 10 seconds at 50 Hz
    ppg_signal = signal  # Simulated PPG signal
    sampling_rate = 50  # Hz

    # Pipeline
    filtered_signal = preprocess_ppg(ppg_signal, sampling_rate)
    segments = incremental_merge_segmentation(filtered_signal, segment_size=20)
    thresholded_lines = adaptive_thresholding(segments, filtered_signal)
    riav_signal, times = extract_riav(thresholded_lines, filtered_signal)
    interpolated_signal, uniform_times = uniform_interval_interpolation(riav_signal, times, new_sampling_rate=10)
    rr = estimate_respiration_rate(interpolated_signal, sampling_rate=10)

    print(f"Estimated Respiration Rate: {rr:.2f} bpm")


Estimated Respiration Rate: 7.38 bpm
