In [12]:
import pandas as pd
import numpy as np
import heartpy as hp # Python Heart Rate Analysis Toolkit is a module for heart rate analysis in Python
import json
from scipy.signal import lfilter, butter
import matplotlib
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')


In [13]:
class SignalExtractor:

    def __init__(self, sample_rate=25):
        self.sample_rate = sample_rate
        pass

    def luma_component_mean(self, frames, **kwargs):
        signal = []

        for frame_bgr in frames:
            frame_bgr = np.array(frame_bgr)
            frame_bgr = frame_bgr.astype(np.float32)
            img_ycrcb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2YCrCb)
            mean_of_luma = img_ycrcb[..., 0].mean()
            signal.append(mean_of_luma)

        signal = np.array(signal)

        if not kwargs.get('initial_skip_seconds'):
            kwargs['initial_skip_seconds'] = 1

        samples_to_skip = kwargs["initial_skip_seconds"] * self.sample_rate
        signal = signal[samples_to_skip:]  # ignore first second because of auto exposure

        return signal

    

class SignalPreprocessor:

    def __init__(self, sample_rate):
        self.sample_rate = sample_rate
        self.shorter_names = {
            "hpf": "butter_highpass_filter",
            "lpf": "butter_lowpass_filter",
            "maf": "moving_average_flat",
            "diff_pad": "minus_with_pad",
            "fft": "fft",
            "roll_avg": "rolling_average",
            "sub": "subtract",
            "bandpass": "butter_bandpass_filter",
            "imf": "increase_main_freq",
            "cut_start": "cut_start",
            "bpf_bpm": "bandpass_bpm"
        }

    
    def rolling_average(self, signal, **kwargs):
        window_size_seconds = kwargs["window_size_seconds"]
        window_size = int(window_size_seconds * self.sample_rate)
        if window_size % 2 == 0:
            window_size += 1
        y = np.convolve(signal, np.ones(window_size), 'valid') / window_size
        y = np.pad(y, [((window_size - 1) // 2, (window_size - 1) // 2)], mode='edge')
        return y

    
    def subtract(self, signal, **kwargs):
        original_signal = kwargs["prev_x"]
        assert signal.shape == original_signal.shape
        y = original_signal - signal
        return y

    
    def butter_lowpass_filter(self, signal, low, filter_order):
        nyq = 0.5 * self.sample_rate
        normal_cutoff = low / nyq
        output = butter(filter_order, normal_cutoff, btype='low', analog=False)
        b, a = output[0], output[1]
        y = lfilter(b, a, signal)
        return y

    

In [14]:
import cv2  #opencv
def convert_video_to_array(video_file):
    print('Converting video to array - ')
    list_of_frames = []
    vidcap = cv2.VideoCapture(video_file)
    vidcap.set(cv2.CAP_PROP_FPS, 30)
    vidcap.set(cv2.CAP_PROP_FRAME_COUNT, 300)

    while vidcap.isOpened():
        success, frame = vidcap.read()

        try:
            new_frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
            new_frame = cv2.resize(new_frame, (480, 360))
        except Exception as e:
            print('Warning - ' + str(e))
            break

        list_of_frames.append(new_frame)
    print(len(list_of_frames))
    return list_of_frames

print("Done")

Done


In [15]:
def main_function(video_file_location):
        '''
        # 1. Read video file
        # 2. Process video file - RGB -> YCrCB conversion
        # 3. Find mean of each frame
        # 4. Find heart beat
        
        '''
        
        # 1. Read video file
        video_array = convert_video_to_array(video_file_location)
        
        
        # Process the luma_mean and red_mean values.
        se = SignalExtractor(30)
        extracted_signal = pd.DataFrame()
        luma_mean = se.luma_component_mean(video_array, initial_skip_seconds= 1)
        # red_mean = se.red_channel_mean(video_array, initial_skip_seconds= 0)

        extracted_signal['luma_mean'] = luma_mean * -1
        # extracted_signal['r_ch_mean'] = red_mean * -1
        
        sp = SignalPreprocessor(30)
        # luma_mean = np.array(data['body']['yellowAv'])
        # Low Pass Filtering
        rolling_avg = sp.rolling_average(luma_mean, window_size_seconds= 1.01)
        sub_values = sp.subtract(rolling_avg, prev_x=luma_mean)
        lpf_values = sp.butter_lowpass_filter(sub_values, low= 4, filter_order= 2)
        lpf_df = pd.DataFrame(lpf_values, columns=['LPF_Values'])
        
        wd, m = hp.process(lpf_df['LPF_Values'], sample_rate=30, windowsize=1)
        m['breathingrate'] = 1.0 / m['breathingrate']
        
        print('Heart Rate Values - ', m)
        
        # Get HRV Analysis
        #fig = lpf_df.plot(y='LPF_Values', kind='line').get_figure()
        

In [16]:
main_function('../Data/PPG-80.mp4')

# main_function('C:/Users/preetham/Desktop/HRV_Videos/Data/PPG-70.mp4')

Converting video to array - 

321
Heart Rate Values -  {'bpm': 71.1864406779661, 'ibi': 842.8571428571429, 'sdnn': 77.07816217256497, 'sdsd': 45.215533220835084, 'rmssd': 69.9205898780101, 'pnn20': 0.8, 'pnn50': 0.4, 'hr_mad': 66.66666666666663, 'sd1': 40.55175020198814, 'sd2': 90.18499505645786, 's': 11489.30507733394, 'sd1/sd2': 0.44965074485619055, 'breathingrate': 5.898}


In [14]:
x=pd.DataFrame()

In [16]:
#xy = pd.DataFrame(columns=['luma_mean', 'r_ch_mean'])
x["a"]=[1,2,3,4]
x

Unnamed: 0,a
0,1
1,2
2,3
3,4
