#PYVHR STEP BY STEP

Import everything we will need

In [None]:
from pyVHR.extraction.sig_processing import *
from pyVHR.BVP.BVP import *
from pyVHR.BPM.BPM import *
from pyVHR.BVP.methods import *
from pyVHR.BVP.filters import *
from pyVHR.plot.visualize import *

# PLOTTING
# 'colab' or 'notebook'
VisualizeParams.renderer = 'colab'

First create a dataset object by using the datasetFactory. LGI_PPGI is inside the pyVHR package, but you can use also custom dataset class!

In [None]:
from pyVHR.datasets.dataset import datasetFactory
from pyVHR.utils.errors import *

dataset_name = 'pure'
video_DIR = '/var/datasets/VHR1/' 
BVP_DIR = '/var/datasets/VHR1/'

dataset = datasetFactory(dataset_name, videodataDIR=video_DIR, BVPdataDIR=BVP_DIR)
allvideo = dataset.videoFilenames

# print the list of video with the progressive index (idx)
for v in range(len(allvideo)):
  print(v, allvideo[v])

The window size *wsize* will be used for extracting both the ground truth BPM, and estimated BPM

In [None]:
wsize = 8 #seconds

Ground Truth extraction

In [None]:
video_idx = 19
fname = dataset.getSigFilename(video_idx)
sigGT = dataset.readSigfile(fname)
test_bvp = sigGT.data
bpmGT, timesGT = sigGT.getBPM(wsize)
videoFileName = dataset.getVideoFilename(video_idx)
#videoFileName = "/home/devel/mortara_realtime/video/me.mp4"
print(videoFileName)
fps = get_fps(videoFileName)
print(fps)

In [None]:
#display_video(videoFileName)

**pyVHR** is structured in a simple way. 

There are 3 main steps for obtaining the estimated BPM:


1.   Signal extraction
2.   BVP extraction
3.   BPM extraction

The first step is carried out by the *SignalProcessing* class. This class can be accelerated by using a CUDA device.

*SignalProcessing* compute the signal in two ways:


*   Holistic mean: the whole skin is considered as a single estimator
*   Patches mean: facial regions, each one with a center called landmark. Each region is an estimator.



In [None]:
sig_extractor = SignalProcessing()
sig_extractor.display_cuda_device()
sig_extractor.choose_cuda_device(0)

The SignalProcessing class can use different skin extractors:


*   Convex Hull skin extractor
*   Face Parsing skin extractor

Both work with CPU and GPU.

In [None]:
sig_extractor.set_skin_extractor(SkinExtractionFaceParsing('GPU'))
sig_extractor.set_skin_extractor(SkinExtractionConvexHull('GPU'))

We can choose to process a specific number of frames of the video

In [None]:
# To precess the whole video pass 0
sig_extractor.set_total_frames(0*fps)

Both signal extraction and skin extraction have a color-threshold filter for removing unwanted RGB colors.
We can set the RGB threshold interval using theese classes:

In [None]:
SkinProcessingParams.RGB_LOW_TH = 2
SkinProcessingParams.RGB_HIGH_TH = 254

SignalProcessingParams.RGB_LOW_TH = 2
SignalProcessingParams.RGB_HIGH_TH = 254

We can save intermediate results by calling the 
"*set_visualize_skin_and_landmarks*" method. 

We can retrieve any intermediate result by calling the methods:


*   get_visualize_skin
*   get_visualize_patches



In [None]:
sig_extractor.set_visualize_skin_and_landmarks(
    visualize_skin=False, visualize_landmarks=False, visualize_landmarks_number=False, visualize_patch=False)

In [None]:
sig_extractor.set_visualize_skin_and_landmarks(
    visualize_skin=True, visualize_landmarks=True, visualize_landmarks_number=True, visualize_patch=True)

The SignalProcessing class allows you to extract the signal using customizable facial regions; the center of a region is called landmark. There are 468 landmarks, and we can use any combination of these.

This script is very useful for viewing all the possible facial landmarks.

In [None]:
# visualize_landmarks_list("/home/devel/pyVHR_pack/example.jpg", MagicLandmarks.forehead_center)
# visualize_landmarks_list("/home/devel/pyVHR_pack/example.jpg", MagicLandmarks.forehead_left)
# visualize_landmarks_list("/home/devel/pyVHR_pack/example.jpg", MagicLandmarks.forehoead_right)
# visualize_landmarks_list("/home/devel/pyVHR_pack/example.jpg", MagicLandmarks.eye_left)
# visualize_landmarks_list("/home/devel/pyVHR_pack/example.jpg", MagicLandmarks.eye_right)
# visualize_landmarks_list("/home/devel/pyVHR_pack/example.jpg", MagicLandmarks.nose)
# visualize_landmarks_list("/home/devel/pyVHR_pack/example.jpg", MagicLandmarks.mounth_down)
# visualize_landmarks_list("/home/devel/pyVHR_pack/example.jpg", MagicLandmarks.mounth_up)
# visualize_landmarks_list("/home/devel/pyVHR_pack/example.jpg", MagicLandmarks.chin)
# visualize_landmarks_list("/home/devel/pyVHR_pack/example.jpg", MagicLandmarks.cheek_left_bottom)
# visualize_landmarks_list("/home/devel/pyVHR_pack/example.jpg", MagicLandmarks.cheek_left_top)
# visualize_landmarks_list("/home/devel/pyVHR_pack/example.jpg", MagicLandmarks.cheek_right_bottom)
# visualize_landmarks_list("/home/devel/pyVHR_pack/example.jpg", MagicLandmarks.cheek_right_top)
visualize_landmarks_list("/home/devel/pyVHR_pack/example.jpg", [_ for _ in range(468)])

In [None]:
#filter_landmarks = [i for i in range(1,468)]
filter_landmarks = MagicLandmarks.cheek_left_top +\
                   MagicLandmarks.cheek_right_top +\
                   MagicLandmarks.forehead_center +\
                   MagicLandmarks.forehoead_right +\
                   MagicLandmarks.forehead_left +\
                   MagicLandmarks.mounth_up 
filter_landmarks = [8, 9, 266, 10, 142, 151, 279, 280, 411, 285, 417, 290, 36, 423, 168, 425, 426, 427, 297, 299, 298, 48, 50, 436, 437, 55, 440, 187, 67, 293, 69, 420, 455, 329, 330, 203, 205, 206, 333, 336, 337, 338, 207, 216,122,4,197,195,193,5,126, 346, 347, 351, 101, 107, 108, 109, 371, 118]
filter_landmarks = list(dict.fromkeys(filter_landmarks))
print(len(filter_landmarks))
visualize_landmarks_list("/home/devel/pyVHR_pack/example.jpg", filter_landmarks)


Once we have chosen the list of landmarks, we can pass it to a SignalProcessing object by calling the following method

In [None]:
sig_extractor.set_landmarks(filter_landmarks)

Here are some examples of signal extraction.


1.   Holistic mean
2.   Rectangular Patches mean
3.   Square Patches mean

For Rectangular Patches we can choose the xy_dimension of each region.


In [None]:
# HOLISTIC EXTRACTION
hol_sig = sig_extractor.extract_holistic(videoFileName,1.0)

In [None]:
hol_sig.shape

In [None]:
# SQUARE EXTRACTION
sig_extractor.set_square_patches_side(24.0)
sig = sig_extractor.extract_patches(videoFileName, 1.0,"squares", "mean")

In [None]:
sig.shape

In [None]:
# RECTS EXTRACTION
# rects_list = MagicLandmarks.cheek_left_top + MagicLandmarks.cheek_right_top
# rects_sizes = []
# for _ in rects_list:
#    rects_sizes.append([30, 25])
# sig_extractor.set_landmarks(rects_list)
# sig_extractor.set_rect_patches_sides(
#     np.array(rects_sizes, dtype=np.float32))
# sig = sig_extractor.extract_patches(videoFileName, 1.0,"rects", "mean")
# print(sig.shape)

*SignalProcessing* class offers others usefull methods:


*   extract_raw_holistic: returns an np.ndarray with shape (num_frames, 1, N x M x rgb_channels), where NxM is the aspect ratio of the video. This method extract the whole skin in each frame and set to RGB (0,0,0) the non-skin pixels.

You can use Raw signal with the SSR method.


In [None]:
## RAW HOLISTIC and SSR method
# raw_sig = sig_extractor.extract_raw_holistic(videoFileName,1.0)
# print(raw_sig.shape)
# windowed_raw_sig, raw_times = raw_windowing(raw_sig, wsize, 1, fps)
# ssr_bvps = RGB_sig_to_BVP(windowed_raw_sig, fps, device_type='cpu', method=cpu_SSR, params={'fps':fps})
# ssr_bpm = BVP_to_BPM(ssr_bvps, fps)
# RMSE, MAE, MAX, PCC = getErrors(np.expand_dims(ssr_bpm, axis=0), bpmGT, raw_times, timesGT)
# printErrors(RMSE, MAE, MAX, PCC)

Let's plot extracted skin and patches!

In [None]:
visualize_skin_coll = sig_extractor.get_visualize_skin()
visualize_patches_coll = sig_extractor.get_visualize_patches()
print(len(visualize_skin_coll))
print(len(visualize_patches_coll))

In [None]:
interactive_image_plot(visualize_skin_coll,.3)

In [None]:
interactive_image_plot(visualize_patches_coll,0.3)

Now we are ready to pass to the next step, but first we have to transform the whole signal in 
a windowed signal. A window has shape (num_estimators, rgb_channels, num_frames)

Each window will become a single BPM estimate.

In [None]:
# [usage] sig_windowing(sig, wsize, stride, fps)
windowed_sig, timesES = sig_windowing(sig, wsize, 1, fps)
print(len(windowed_sig))
print(windowed_sig[0].shape)

In [None]:
visualize_windowed_sig(windowed_sig, 1)

Pre Filterings


*   "rgb_filter_ths": color threshold filter that filters out signals that, in at least one frame of the window, are outside the rgb colors interval [(LOW, LOW, LOW), (HIGH, HIGH, HIGH)]; where LOW is the dictionary parameter RGB_LOW_TH, and HIGH is RGB_HIGH_TH. We suggest to always use this filter before applying a BVP method.
*   "detrend": apply detrend to the signal
*   "zscore": apply z-score to the signal
*   "BPfilter": apply Butterworth filter to the signal

In [None]:
# -- color threshold filtering
filtered_windowed_sig = apply_filter(
    windowed_sig, rgb_filter_th, params={'RGB_LOW_TH': 2, 'RGB_HIGH_TH': 254})
print(filtered_windowed_sig[0].shape)

In [None]:
filtered_windowed_sig = apply_filter(filtered_windowed_sig,BPfilter, params={'order':6,'minHz':0.65,'maxHz':4.0,'fps':fps})
#filtered_windowed_sig = apply_filter(filtered_windowed_sig, detrend)
#filtered_windowed_sig = apply_filter(filtered_windowed_sig, zscore)
#filtered_windowed_sig = apply_filter(filtered_windowed_sig, zeromean)

In [None]:
visualize_windowed_sig(filtered_windowed_sig,0)

The second step is the BVP extraction.

We only need to call the function "*RGB_sig_to_BVP*" and pass the following parameters:


*   windowed signal
*   fps
*   method_type: 'cuda', 'cpu', 'torch'
*   method: method function that supports method_type device
*   params: dictionary of parameters needed by the method; default is {}.

pyVHR contains many methods, but you can also use a custom method. Remember that it must accept a numpy.ndarray with shape (num_estimators, channels, num_frames) and return a numpy.ndarray with shape (num_estimators, num_frames)


In [None]:
bvps = RGB_sig_to_BVP(filtered_windowed_sig, fps, device_type='cuda', method=cupy_CHROM)

*bvps* is a list of length num_windows of numpy.ndarray with shape (num_estimators,num_frames)

In [None]:
print(len(bvps))
print(bvps[0].shape)

Post Filtering

We can apply all the filters showed before also to the *BVP*. 

In [None]:
bvps = apply_filter(bvps, BPfilter, params={'order':6,'minHz':0.65,'maxHz':4.0,'fps':fps})

In [None]:
visualize_BVPs(bvps, 1)

Finally we need to extract the BPM. This function process all the windows and all the estimators, and returns a list of length num_windows of numpy.ndarray with shape (num_estimators, ).

There is also a CUDA version of this function!

In [None]:
bpmES = BVP_to_BPM(bvps, fps)

In [None]:
bpmES = BVP_to_BPM_cuda(bvps, fps)

At this point we can have many BPM estimates for each time window. 

The function below compute for each window the median of the numpy.ndarray (num_estimators, )

In [None]:
median_bpmES = multi_est_BPM_median(bpmES)

In [None]:
visualize_multi_est_BPM_vs_BPMs_list([bpmES,timesES], [[median_bpmES,timesES,"medianES"],[bpmGT,timesGT,"GT"]])

Here the same steps, but for the holistic case

In [None]:
# HOLISTIC analysis
windowed_hol_sig, hol_timesES = sig_windowing(hol_sig, wsize, 1, fps)
#windowed_hol_sig = apply_filter(windowed_hol_sig, detrend, params={})
hol_bvps = RGB_sig_to_BVP(windowed_hol_sig, fps, device_type='cuda', method=cupy_CHROM)
hol_bvps = apply_filter(hol_bvps, BPfilter, params={'minHz':0.65, 'maxHz':4.0, 'fps':fps, 'order':6})
hol_bpmES = BVP_to_BPM(hol_bvps, fps)
RMSE, MAE, MAX, PCC = getErrors(np.expand_dims(hol_bpmES, axis=0), bpmGT, hol_timesES, timesGT)
printErrors(RMSE, MAE, MAX, PCC)

This function plot a list of couple [BPMs, times, tag_Name]

In [None]:
#visualize_BPMs([[median_bpmES, timesES, "medianES"], [hol_bpmES, hol_timesES, "holistic"]])
#visualize_BPMs([[median_bpmES, timesES, "medianES"], [bpmGT, timesGT, "GT"]])
visualize_BPMs([[median_bpmES, timesES, "medianES"], [hol_bpmES, hol_timesES, "holistic"], [bpmGT, timesGT, "GT"]])

We are done!

Let's plot errors!

In [None]:
from pyVHR.utils.errors import getErrors, printErrors, displayErrors
RMSE, MAE, MAX, PCC = getErrors(np.expand_dims(median_bpmES, axis=0), bpmGT, timesES, timesGT)
printErrors(RMSE, MAE, MAX, PCC)

In [None]:
configure_plotly_browser_state()
displayErrors(np.expand_dims(median_bpmES, axis=0), bpmGT, timesES, timesGT)

In [None]:
configure_plotly_browser_state()
displayErrors(np.expand_dims(hol_bpmES, axis=0), bpmGT, hol_timesES, timesGT)



---



Here some usefull plotting functions for PSD and Spectrum

In [None]:
visualize_BVPs_PSD(bvps, 0, fps)

In [None]:
#display bvp spectr
configure_plotly_browser_state()
from pyVHR.BPM.BPM import BVPsignal
obj = BVPsignal(bvps[0][0], fps)
obj.spectrogram(winsize=wsize)
obj.displaySpectrum()



---

# SNR ranking

In [None]:
def compute_ranking(snr_peaks,bvp, perc):
  ranking = []
  i = 0
  bvps_t = []
  for window in snr_peaks:
    temp = []
    for est in window:
      ratio = est[-1] / (est[-2] + 0.00000001)
      temp.append(ratio)
    ranking.append(temp)
    top_half = np.argsort(ranking[i])[::-1]
    top_half = top_half[:int(len(top_half)*perc)]
    bvps_t.append( bvp[i][top_half] )
    i+=1
  return bvps_t

In [None]:
# bvps_pos = RGB_sig_to_BVP(filtered_windowed_sig, fps, device_type='cuda', method=cupy_POS, params={'fps':'adaptive'})
# bvps_pca = RGB_sig_to_BVP(filtered_windowed_sig, fps, device_type='cpu', method=cpu_PCA, params={'component':'second_comp'})

# snr_peaks = BVP_SNR(bvps,fps)
# snr_peaks_pos = BVP_SNR(bvps_pos,fps)
# snr_peaks_pca = BVP_SNR(bvps_pca,fps)

In [None]:
# bvps_top = compute_ranking(snr_peaks,bvps, 0.30)
# bvps_top_pos = compute_ranking(snr_peaks_pos,bvps_pos, 0.30)
# bvps_top_pca = compute_ranking(snr_peaks_pca,bvps_pca, 0.30)

In [None]:
# bvps_final = []
# for w in zip(bvps_top, bvps_top_pca, bvps_top_pos):
#   bvps_final.append(np.concatenate([w[0], w[1], w[2]], axis=0))

In [None]:
# bpmES_top = BVP_to_BPM(bvps_final, fps)
# median_bpmES_top = multi_est_BPM_median(bpmES_top)
# from pyVHR.utils.errors import getErrors, printErrors, displayErrors
# RMSE, MAE, MAX, PCC = getErrors(np.expand_dims(median_bpmES_top, axis=0), bpmGT, timesES, timesGT)
# printErrors(RMSE, MAE, MAX, PCC)

In [None]:
# visualize_BPMs([[median_bpmES, timesES, "medianES"], [median_bpmES_top, timesES, "top_patches"], [hol_bpmES, hol_timesES, "holistic"], [bpmGT, timesGT, "GT"]])

In [None]:
# visualize_BVPs_PSD(bvps_final, 30, fps)