This notebook will be used to test helper functions

In [1]:
import numpy as np
import pandas as pd
import cv2
import h5py
from PIL import Image
import csv
from glob import iglob
import os
from pathlib import Path

In [2]:
import matplotlib.pyplot as plt
%matplotlib inline

### Reference: https://realpython.com/storing-images-in-python/#reading-many-images (Accessed 18/03/2022)

In [3]:
def display_frame(frame, title):
    plt.figure(figsize=(10, 10))
    plt.title(title)
    plt.imshow(frame)

In [4]:
def read_many_disk(num_images, imagesPath, gtPath):
    images, labels = [], []


     # For each frame
    for imagePath in imagesPath:
        
        # Store each frame 
        # print(f'[INFO] Working on Image: {image}')

        # Read and resize the image
        # Reference: https://pillow.readthedocs.io/en/stable/reference/Image.html (Accessed 21/03/2022)

        with Image.open(imagePath) as image:
            image_resized = image.resize((36, 36))
            images.append(np.array(image_resized))
        
        # print(f'[INFO] images list contains: {len(images)} elements  of type {type(images[0])}')

    with open(gtPath, "r") as csvfile:
        reader = csv.reader(
            csvfile, delimiter=","
        )

        for idx, row in enumerate(reader):
            
            # Skip the title row
            if idx > 0:
                
                # Skip the ppg recording for the last frame as it doesn't have a successor for normalization. 
                # This frame will only be used to normalize the 2nd last frame.
                if len(labels) < num_images - 1:        
                    ppg = float(row[2])                 # row[2] is the column containing ppg signal (label)
                    # print(f'[INFO] ppg: {ppg}')
                    labels.append(ppg)  

    # print(f'[INFO] labels list contains: {len(labels)} elements  of type {type(labels[0])}')

    # List containing the images with normailzed frames added in the 3rd dimension
    expanded_images = []

    # Perform frame normalization using every two adjacent frames as (c(t + 1) - c(t))/(c(t) + c(t + 1))
    # where c is the channel of the frame.
    for idx, image in enumerate(images):
        if idx < len(images) - 1:
            for i in range(3):

                # print(f'[INFO] Shape of Frame {idx}: {(images[idx][:, :, i]).shape}')

                # Displaying the frame at channel i
                # display_frame(images[idx][:, :, i], f'Frame {idx} Channel {i}')   

                # Normalized frame calculated by the formula above
                normalizedFrame = (images[idx + 1][:, :, i] - images[idx][:, :, i]
                ) / (images[idx][:, :, i] + images[idx + 1][:, :, i])

                # Displaying the normalized frame at channel i
                # display_frame(normalizedFrame, f'Normalized Frame {idx} Channel {i}')

                # print(f'[INFO] Shape of Normalized Frame {idx}: {normalizedFrame.shape}')

                # Adding an extra dimension to the normalized frame to make it possible to append to original image
                normalizedFrame = np.expand_dims(normalizedFrame, axis=2)

                image = np.append(image, normalizedFrame, axis=2)
            
            #     print(f'shape of normalizedFrame: {normalizedFrame.shape}')
            #     print(f'shape of image: {image.shape}')

            # print(f'shape of image after going through each channel: {image.shape}')
            
            # Storing the expanded images 
            expanded_images.append(image)
            # break


    return np.array(expanded_images), np.array(labels)

    # # Loop over all IDs and read each image in one by one
    # for image_id in range(num_images):
    #     images.append(np.array(Image.open(disk_dir / f"{image_id}.png")))


In [5]:
def store_many_hdf5(target_dir, subID, images, labels):
    """ Stores an array of images to HDF5.
        Parameters:
        ---------------
        target_dir:  path to the directory where the HDF5 file will be stored.
        subID:       subject ID.
        images       images array, (N, W, H, NC) to be stored (where N: number of images, W: width, H: height, NC: number of channels).
        labels       labels array, (N, 1) to be stored

        Returns:
        ----------
        pathToTarget    path to the HDF5 file.
    """

    pathToTarget = os.path.join(target_dir, f"{subID}.h5")

    # Create a new HDF5 file
    file = h5py.File(pathToTarget, "w")

    # Create a dataset in the file
    dataset = file.create_dataset(
        "images", data=images
    )
    meta_set = file.create_dataset(
        "labels", data=labels
    )
    file.close()
    
    return pathToTarget

In [6]:
def read_many_hdf5(file_path):
    """ Reads image from HDF5.
        Parameters:
        ---------------
        path   path to file

        Returns:
        ----------
        images       images array, (N, W, H, NC) to be stored (where N: number of images, W: width, H: height, NC: number of channels).
        labels       labels array, (N, 1) to be stored
    """
    images, labels = [], []

    # Open the HDF5 file
    file = h5py.File(file_path, "r+")

    images = np.array(file["/images"]).astype("float64")
    labels = np.array(file["/labels"]).astype("float64")

    return images, labels


In [15]:
# Get iterator over different subjects
imageDirs = iglob("D:\\OneDrive\\Documents\\rPPG-Projects\\Datasets-Preprocessed\\UBFC2\\DATASET_2\\[0-9]*\\subject[0-9]*")

# Lists that will contain the images, labels and the subject IDs
images, labels, subjects = [], [], []

# For each subject
for idx, path_ in enumerate(imageDirs):
    # print(f"[INFO] Working in {path_}")

    if idx > 9:
        break  # Testing only for first 10 subjects (for now)
  
    # Get the number of frames
    num_images = len(os.listdir(path_))

    # print(f'[INFO] Number of images: {num_images}')
    
    # Get the path to the frames
    imagesPath = iglob(os.path.join(path_, "*.png"))

    # Get subject number from path
    subID = path_.split("\\")[-1]

    # Add the subject ID to subjects list
    subjects.append(subID)

    # Get the path to the csv file
    gtPath = path_.replace(subID, r"0\phys.csv")    

    images, labels = read_many_disk(num_images, imagesPath, gtPath) 

     # Create a new directory for the hdf5 file of subject
    hdf5_dir = Path(r'D:\OneDrive\Documents\rPPG-Projects\Datasets-Preprocessed\hdf5\UBFC2\DATASET_2')
    if not os.path.exists(hdf5_dir): 
        hdf5_dir.mkdir(parents=True, exist_ok=True)

    # Store the images and labels at the target path
    storedFilePath = store_many_hdf5(hdf5_dir, subID, images, labels)

    # Reading the stored data for each subject
    images_, labels_ = read_many_hdf5(storedFilePath)

    # Checking if the read_image function works by comparing stored images and labels with the images and labels 
    # read from the file 
    assert(images.all()==images_.all())
    assert(labels.all()==labels_.all())

# print(f'shape of images: {np.shape(images)}, type: {type(images)}')
# print(f'shape of labels: {np.sha  pe(labels)}, type: {type(labels)}')


  normalizedFrame = (images[idx + 1][:, :, i] - images[idx][:, :, i]
  normalizedFrame = (images[idx + 1][:, :, i] - images[idx][:, :, i]
