In [None]:
import pandas as pd
import numpy as np 
import matplotlib.pyplot as plt
import scipy.signal as sps


# %matplotlib notebook
%matplotlib inline

In [None]:
df_2_1 = pd.read_csv('../data/FINGERTIP_PPG_FROM_HYPERTENSIVE_SUBJECTS/0_subject/2_1.txt', sep='\t', header=None)
df_2_1 = df_2_1.T
raw_signal = df_2_1.values.T[0]
raw_signal = raw_signal[:-1]

In [None]:
plt.plot(raw_signal)
plt.show()

In [None]:
# using FFT in scipy.fftpack to extract the wave frequency of raw signal 
from scipy import fftpack
f_s=1000

X = fftpack.fft(raw_signal)
freqs = fftpack.fftfreq(len(raw_signal)) * f_s

fig, ax = plt.subplots()

ax.stem(freqs, np.abs(X))
ax.set_title('frequency domain_PPGBP_0subject_2_1')
ax.set_xlabel('Frequency in Hertz [Hz]')
ax.set_ylabel('Frequency Domain (Spectrum) Magnitude')
ax.set_xlim(-20, 20)
ax.set_ylim(-5, 1e5)
plt.tight_layout()
# plt.savefig('frequency domain_PPGBP_0subject_2_1.png', dpi=150)

In [None]:
# using FFT in scipy.fft to extract the wave frequency of raw signal 
from scipy.fft import fft, fftfreq

f_s=1000
N = len(raw_signal)


X = fft(raw_signal)
freqs = fftfreq(N , 1 / f_s)

fig, ax = plt.subplots()

ax.stem(freqs, np.abs(X))
ax.set_title('frequency domain_PPGBP_0subject_2_1')
ax.set_xlabel('Frequency in Hertz [Hz]')
ax.set_ylabel('Frequency Domain (Spectrum) Magnitude')
ax.set_xlim(-20, 20)
ax.set_ylim(-5, 1e5)
plt.tight_layout()
# plt.savefig('frequency domain_PPGBP_0subject_2_1.png', dpi=150)

In [None]:
# using FFT in scipy.rfft to extract the wave frequency of raw signal 
from scipy.fft import rfft, rfftfreq

f_s=1000
N = len(raw_signal)


X = rfft(raw_signal)
freqs = rfftfreq(N , 1 / f_s)

fig, ax = plt.subplots()

ax.stem(freqs, np.abs(X))
ax.set_title('frequency domain_PPGBP_0subject_2_1')
ax.set_xlabel('Frequency in Hertz [Hz]')
ax.set_ylabel('Frequency Domain (Spectrum) Magnitude')
ax.set_xlim(0, 20)
ax.set_ylim(-5, 1e5)
plt.tight_layout()
# plt.savefig('frequency domain_PPGBP_0subject_2_1.png', dpi=150)

### Butter filtering 

In [None]:
fc = 11
m = 6 
sos = sps.butter(m, fc, 'lp', fs=1000, output='sos')
butter_filtered = sps.sosfilt(sos, raw_signal)

fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True)
ax1.plot(raw_signal)
ax1.set_title('raw signal with 1kHz sampling rate')

ax2.plot(raw_signal, label='raw signal')
ax2.plot(butter_filtered, label=f'butter(m={m}, cutoff={fc}Hz)')
ax2.set_title(f'after {fc} Hz Butter low-pass filter and Butter filter')
ax2.legend(bbox_to_anchor=(1, 0.5))
plt.show()

In [None]:
two_pulses = butter_filtered[1100:]

In [None]:
plt.plot(two_pulses)
plt.show()

In [None]:
# resample_butter_filtered = sps.resample(butter_filtered, 1000)

In [None]:
# plt.plot(resample_butter_filtered)
# plt.show()

In [None]:
len(two_pulses)

In [None]:
two_pulses_X = rfft(two_pulses)
two_pulses_N = len(two_pulses)

freqs = rfftfreq(two_pulses_N, 1 / f_s)

fig, ax = plt.subplots()

ax.stem(freqs, np.abs(two_pulses_X))
ax.set_title('frequency domain of butter_filtered_PPGBP_0subject_2_1')
ax.set_xlabel('Frequency in Hertz [Hz]')
ax.set_ylabel('Frequency Domain (Spectrum) Magnitude')
ax.set_xlim(0, 12)
# ax.set_ylim(-5, 3e5)
plt.tight_layout()

In [None]:
freqs[:20]

In [None]:
from scipy.fft import irfft 

new_signal = irfft(two_pulses_X)

plt.plot(new_signal)
plt.show()

In [None]:
one_pulses = butter_filtered[1100:1601]
plt.plot(one_pulses)
plt.show()

In [None]:
one_pulses_resample = sps.resample(one_pulses, 1000)

In [None]:
one_pulses_resample.shape

In [None]:
one_pulses_X = rfft(one_pulses_resample)
one_pulses_N = len(one_pulses_resample)

freqs = rfftfreq(one_pulses_N, 1 / 1000)

fig, ax = plt.subplots()

ax.stem(freqs, np.abs(one_pulses_X))
ax.set_title('frequency domain of butter_filtered_PPGBP_0subject_2_1')
ax.set_xlabel('Frequency in Hertz [Hz]')
ax.set_ylabel('Frequency Domain (Spectrum) Magnitude')
ax.set_xlim(0, 12)
# ax.set_ylim(-5, 3e5)
plt.tight_layout()

In [None]:
one_pulses_X.shape

### Transfer function implementation 

In [None]:
# amplication: Dimensionless
transfer_ammplication_dict = {
    '0': 1,
    '1': 1.2,
    '2': 1.5,
    '3': 2,
    '4': 3, 
    '5': 2.2, 
    '6': 1.7,
    '7': 1.8,
    '8': 1.6,
    '9': 1.5
}


# phase: radians
transfer_phase_dict = {
    '0': 0,
    '1': -0.3,
    '2': -0.8,
    '3': -1.2,
    '4': -2.5, 
    '5': -3.9, 
    '6': -4.6,
    '7': -5.1,
    '8': -6.1,
    '9': -6.5
}

In [None]:
len(freqs)

In [None]:
one_pulses_X[:15]

In [None]:
def Real2Polar(x):
    return np.abs(x), np.angle(x)

def Polar2Real(radii, angles):
    return radii * np.exp(1j*angles)

def transfer_function(radial_x):
    """
    radial_x is the fft results at different frequency
    """
    pass

#### test example

In [None]:
a = -1 + 5j 

In [None]:
a_moduli, a_phase = Real2Polar(a)

In [None]:
a_moduli

In [None]:
a_phase

In [None]:
Polar2Real(a_moduli, a_phase)

In [None]:
moduli, phase = Real2Polar(one_pulses_X[1])

In [None]:
Polar2Real(moduli, phase)

In [None]:
amplification = transfer_ammplication_dict[str(1)]
delta_phase = transfer_phase_dict[str(1)]

moduli, phase = Real2Polar(one_pulses_X[1])
amplified_moduli = moduli * amplification 
delay_phase = phase + delta_phase

transfer_comp = Polar2Real(amplified_moduli , delay_phase)

In [None]:
transfer_comp

#### try in top10 frequency components 

In [None]:
transfer_res = []
for i, complex_comp in enumerate(one_pulses_X[:10]):
    amplification = transfer_ammplication_dict[str(i)]
    delta_phase = transfer_phase_dict[str(i)]
    
    moduli, phase = Real2Polar(complex_comp)
    amplified_moduli = moduli * amplification 
    delay_phase = phase + delta_phase
    
    transfer_comp = Polar2Real(amplified_moduli, delay_phase)
    transfer_res.append(transfer_comp)

In [None]:
transfer_res = np.array(transfer_res)

In [None]:
transfer_res

In [None]:
one_pulses_X[:10]

In [None]:
transfer_butter_X = np.copy(one_pulses_X)

In [None]:
transfer_butter_X[:10] = transfer_res

In [None]:
len(transfer_butter_X)

In [None]:
from scipy.fft import irfft 

new_signal = irfft(transfer_butter_X)

fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True)
ax1.plot(one_pulses_resample, label='ppg')
ax1.set_title('ppg signal resampled in 1kHz')

ax2.plot(new_signal, label='transfer waveform')
ax2.set_title('pulse waveform after transfer function')
# ax2.legend()
plt.show()

In [None]:
from scipy.fft import irfft 

new_signal = irfft(transfer_res)

fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True)

ax2.plot(new_signal, label='transfer waveform')
ax2.set_xlim([0, 20])
ax2.set_title('pulse waveform after transfer function')
# ax2.legend()
plt.show()

In [None]:
len(new_signal)

In [None]:
transfer_res = []
for i, complex_comp in enumerate(one_pulses_X[:10]):
    amplification = transfer_ammplication_dict[str(i)]
    delta_phase = transfer_phase_dict[str(i)]
    
    moduli, phase = Real2Polar(complex_comp)
    amplified_moduli = moduli * (1 / amplification )
    delay_phase = phase - delta_phase
    
    transfer_comp = Polar2Real(amplified_moduli, delay_phase)
    transfer_res.append(transfer_comp)
    
transfer_res = np.array(transfer_res)
transfer_butter_X = np.copy(one_pulses_X)
transfer_butter_X[:10] = transfer_res

In [None]:
from scipy.fft import irfft 

new_signal = irfft(transfer_butter_X)

fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True)
ax1.plot(one_pulses_resample, label='ppg')
ax1.set_title('ppg signal resampled in 1kHz')

ax2.plot(new_signal, label='transfer waveform')
ax2.set_title('pulse waveform after transfer function')
# ax2.legend()
plt.show()

In [None]:
from scipy.fft import irfft 

new_signal = irfft(transfer_res)

fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True)

ax2.plot(new_signal, label='transfer waveform')
ax2.set_xlim([0, 20])
ax2.set_title('pulse waveform after transfer function')
# ax2.legend()
plt.show()