# SpO2 Signal quality
Measure Spo2 quality by using cross corelation between two consecutive R PPG segment, and correlation between R and IR segment.

## import module and declare function

In [1]:

import numpy as np
import matplotlib.pyplot as plt
import os,sys
import scipy.signal as signal
from scipy.interpolate import interp1d
import re
import pandas as pd
import math
plt.style.use('ggplot')
sys.path.insert(0, os.path.abspath('../lib'))

import sig_proc as sp
import file_read

import csv


# Normalize to mv
def value_trans(data):
    data = data*1.2/2097151.0
    return data


# Root mean square error
def rmse_calc(target, prediction):
    error = []
    for i in range(len(target)):
        error.append(target[i] - prediction[i])

    squaredError = []

    for val in error:
        squaredError.append(math.sqrt(val**2))
    rmseError = np.mean(squaredError)
    print(rmseError)

# Mean absolute error
def mae_calc(target, prediction):
    error = []
    for i in range(len(target)):
        error.append(target[i] - prediction[i])

    absError = []
    for val in error:
        absError.append(abs(val))#誤差絕對值
    maeError = np.mean(absError)
    print(maeError)


def sqi_xcorr(x,y):
    x_norm = sp.z_score(x)
    y_norm = sp.z_score(y)
    pxy = max(np.correlate(x_norm, y_norm))
#     pyy = np.correlate(y_norm, y_norm)
    pyy = sum(y_norm*y_norm)
    return (2*pxy)/pyy

# for 2 seconds version 
def sqi_xcorr_2s(x,y):
    x_norm = sp.z_score(x)
    y_norm = sp.z_score(y)
    
    pxy = max(np.correlate(x_norm, y_norm))
#     pyy = np.correlate(y_norm, y_norm)
    pyy = sum(y_norm*y_norm)
    return pxy/pyy

def RMS_amp(data):
    squaredData = []
    for val in data:
        squaredData.append(val**2)
    
    ms = np.mean(squaredData)
    rms = math.sqrt(ms)
    return rms
    
def snr_calc(signal):
    return 20* math.log(np.abs(np.mean(signal)/np.std(signal)),10)


## Main function

In [None]:
folder = "../../Thor 2.0 BP收集 1220-0103"
subjects = ['Coco', 'Gary', 'Jack', 'Molly', 'Nicole', 'Rick', 'Shawn', 'Steven', 'Tanya', 'Vanessa']
day = '01'

file_name = "HealthPatchPlatform-PPG_0816_145709.csv"


G2_df = pd.read_csv(os.path.join(folder, target_folder, file_name))
ppg_df.head()

In [3]:

ppgR = ppg_df['Red (Counts)'].to_numpy()
ppgIR = ppg_df[' IR (Counts)'].to_numpy()

''' Select the duration of PPG segment in seconds'''
sr = 128
start_sec = 0
period = 2
overlap = 1

data_length = len(ppgR)/sr
hop_length = period - overlap
# num_watching_period = 200



vsTable = {
    'subject':[],
    't1':[],
    'spo2':[],
    'R_AC':[],
    'R_DC':[],
    'IR_AC':[],
    'IR_DC':[],
    'R_PI':[],
    'IR_PI':[],
    'R_value':[],
    'R_squared':[],
}

# K1 = 121.12
# K2 = -51.77
K1 = 99.813
K2 = -4.615

# Filter parameters
bp_b, bp_a = sp.bp_filter(0.8, 3, sr, 4)
lp_b, lp_a = sp.lp_filter(3, sr, 4)

coeficient = [-12.75886139, 1.39192419]
intercept = 104.14801275562009

num_watching_period = int((data_length - start_sec -overlap)/hop_length) -1
# num_watching_period = 10


subject = 'Jason'
for n in range(num_watching_period):
    
    t1 = start_sec +  (period-overlap) * n
    t2 = t1 + period
    s1 = int(t1*sr)
    s2 = int(t2*sr )

    '''''''''''''''''''''''''''''''''''''''
    Signal Preprocessing
    '''''''''''''''''''''''''''''''''''''''
    IR_segment = np.array(ppgIR[s1:s2])
    R_segment = np.array(ppgR[s1:s2])
    
    ''' AC component '''
    filt_IR = signal.filtfilt(bp_b,bp_a, IR_segment)
    filt_R = signal.filtfilt(bp_b, bp_a, R_segment)
    ''' DC component '''
    R_DC_component = signal.filtfilt(lp_b,lp_a, R_segment)
    IR_DC_component = signal.filtfilt(lp_b,lp_a, IR_segment)

    ''' signal quality '''
    # level = signalQuality(filt_R, filt_IR, sr)
    
    ''' Calculate R value by using RMS method'''
    R_AC = RMS_amp(filt_R)
    R_DC = RMS_amp(R_DC_component)
    IR_AC = RMS_amp(filt_IR)
    IR_DC = RMS_amp(IR_DC_component)
    
    
    R_ratio = RMS_amp(filt_R/R_DC_component)
    IR_ratio = RMS_amp(filt_IR/IR_DC_component)
    
    # R_divide = filt_R/R_DC_component
    # R_RMS = RMS_amp(R_divide)

    # IR_divide = filt_IR/IR_DC_component
    # IR_RMS = RMS_amp(IR_divide)
    
    R_value = R_ratio/ IR_ratio
    R_squared = R_value**2
    
    # coeficient = [ 17.41863831, -18.27211895]
    # intercept = 94.81588110236386

    spo2 = intercept + (coeficient[0]*R_value) + (coeficient[1]*R_squared)

    # simulator
    # spo2 = 119.4638 + (-37.8709*R_value) + (-0.1610*R_squared)
    vsTable['subject'].append(subject)
    vsTable['t1'].append(t1)
    vsTable['spo2'].append(spo2)
    vsTable['R_AC'].append(R_AC)
    vsTable['R_DC'].append(R_DC)
    vsTable['IR_AC'].append(IR_AC)
    vsTable['IR_DC'].append(IR_DC)
    
    vsTable['R_PI'].append(R_ratio*100)
    vsTable['IR_PI'].append(IR_ratio*100)
    vsTable['R_value'].append(R_value)
    vsTable['R_squared'].append(R_squared)
    
    '''''''''''''''''''''''''''''''''''
    plot filgure
    '''''''''''''''''''''''''''''''''''
    font_size = 14
    fig_size  = (6,4)
    
    # plt.figure(figsize=fig_size)
    # plot_title = "{},{}:{} \n".format(subject, t1,t2)
    # plot_title = plot_title + "R value: {}\n".format(np.round(R_value,4))
    # plot_title = plot_title + "IR PI: {}, R PI:{}\n".format(np.round(IR_ratio*100,4), np.round(R_ratio*100,4))
    # plt.title(plot_title)
    # plt.plot(filt_R, label='R')        
    # plt.plot(filt_IR, label='IR')




NameError: name 'load_ppg' is not defined

In [11]:
import pandas as pd

pd.DataFrame(SpO2_table)


Unnamed: 0,spo2,criteria,R_PI,IR_PI,R_value
0,97.878184,Luke_0708_AM,0.000133,0.000317,0.419245
1,97.984538,Luke_0708_AM,0.000125,0.000315,0.3962
2,97.826433,Luke_0708_AM,0.000137,0.000319,0.430459
3,98.033666,Luke_0708_AM,0.000121,0.000314,0.385555
4,97.991775,Luke_0708_AM,0.000109,0.000277,0.394632
5,97.892131,Luke_0708_AM,0.000131,0.000314,0.416223
6,97.502033,Luke_0708_AM,0.000212,0.000422,0.500751
7,97.274104,Luke_0708_AM,0.00025,0.000455,0.55014
8,97.664613,Luke_0708_AM,0.000146,0.000313,0.465523
9,97.525925,Luke_0708_AM,0.000156,0.000315,0.495574


In [9]:
0.000133/0.000317

0.4195583596214511

In [8]:
os.path.exists(full_path)

True