In [1]:
import os
import numpy as np
import pandas as pd
import h5py


In [2]:
def import_hdf5_sequences(file_path,):
    sample = {}
    with h5py.File(file_path, 'r') as f:
        
        data_dict = f["data"]
        for key in data_dict:
            sample[key] = data_dict[key][:]  
                    
        sample["participant"] = os.path.basename(file_path).split("_")[1]
        sample["session"] = os.path.basename(file_path).split("_")[2].replace(".hdf5", "")
        sample["label"] = [el.split("_")[0] for el in f.keys() if el != "data"]
        sample["label_times"] = [(f[key]["start_time"][()], f[key]["end_time"][()]) for key in f.keys() if key != "data"]
        sample["label_indices"] = [(f[key]["start_index"][()], f[key]["end_index"][()]) for key in f.keys() if key != "data"]
        sample["timestamp"] = [el.split("_")[1] for el in f.keys() if el != "data"]
    return sample

def save_hd5f_sequence(file_path, sample, overwrite=False):
    if not overwrite:
        assert not os.path.exists(file_path), "File already exists"
    with h5py.File(file_path, 'w') as f:
        data_dict = f.create_group("data")
        for key in sample:
            if key == "participant" or key == "session" or key == "label" or key == "label_times" or key == "label_indices" or key == "timestamp":
                continue
            data_dict.create_dataset(key, data=sample[key])
        for i, label in enumerate(sample["label"]):
            label_group = f.create_group(label + "_{}".format(sample["timestamp"][i]))
            label_group.create_dataset("start_time", data=sample["label_times"][i][0])
            label_group.create_dataset("end_time", data=sample["label_times"][i][1])
            label_group.create_dataset("start_index", data=sample["label_indices"][i][0])
            label_group.create_dataset("end_index", data=sample["label_indices"][i][1])
            label_group.create_dataset("label", data=label)
            
    print("Saved to {}".format(file_path))

In [52]:
data_path = r"C:\Users\lhauptmann\Code\WristPPG2\data\dataset"

for participant_folder in os.listdir(data_path):
    participant_folder_path = os.path.join(data_path, participant_folder)
    if not os.path.isdir(participant_folder_path):
        continue
    for session_file in os.listdir(participant_folder_path):
        if not session_file.endswith(".hdf5"):
            continue
        print(session_file)
        session_file_path = os.path.join(participant_folder_path, session_file)
        session = import_hdf5_sequences(session_file_path)
        assert len(session["label_indices"]) == len(session["label_times"]) == len(session["label"])
        session = add_neg_labels( session)
        assert len(session["label_indices"]) == len(session["label_times"]) == len(session["label"])
        save_path_session = session_file_path.replace("dataset", "dataset_corr")
        os.makedirs(os.path.split(save_path_session)[0], exist_ok=True)
        save_hd5f_sequence(session_file_path, session, overwrite=True)
        

participant_amran_1.hdf5
Added 42 empty labels
Saved to C:\Users\lhauptmann\Code\WristPPG2\data\dataset\participant_amran\participant_amran_1.hdf5
participant_amran_2.hdf5
Added 29 empty labels
Saved to C:\Users\lhauptmann\Code\WristPPG2\data\dataset\participant_amran\participant_amran_2.hdf5
participant_amran_4.hdf5
Added 21 empty labels
Saved to C:\Users\lhauptmann\Code\WristPPG2\data\dataset\participant_amran\participant_amran_4.hdf5
participant_anusha_1.hdf5
Added 0 empty labels
Saved to C:\Users\lhauptmann\Code\WristPPG2\data\dataset\participant_anusha\participant_anusha_1.hdf5
participant_anusha_2.hdf5
Added 0 empty labels
Saved to C:\Users\lhauptmann\Code\WristPPG2\data\dataset\participant_anusha\participant_anusha_2.hdf5
participant_anusha_3.hdf5
Added 0 empty labels
Saved to C:\Users\lhauptmann\Code\WristPPG2\data\dataset\participant_anusha\participant_anusha_3.hdf5
participant_anusha_4.hdf5
Added 0 empty labels
Saved to C:\Users\lhauptmann\Code\WristPPG2\data\dataset\particip

In [49]:
def add_neg_labels(session):

    longest_label_list = np.argsort([el[1] - el[0] for el in session["label_indices"]])[::-1]
    longest_label = session["label_indices"][longest_label_list[0]]
    if longest_label[1] - longest_label[0] > 500 and session["label"][longest_label_list[0]] == "o":
        neg_start_time = session["label_times"][longest_label_list[0]][0]
        neg_end_time = session["label_times"][longest_label_list[0]][1]
        neg_start_index = session["label_indices"][longest_label_list[0]][0]
        neg_end_index = session["label_indices"][longest_label_list[0]][1]
        session["label"].pop(longest_label_list[0])
        session["label_indices"].pop(longest_label_list[0])
        session["label_times"].pop(longest_label_list[0])
    else:
        labels_time_sorted = np.argsort([el[0] for el in session["label_indices"]])
        
        neg_start_time = np.array(session["label_times"])[labels_time_sorted[-2]][1]
        neg_end_time = np.array(session["label_times"])[labels_time_sorted[-1]][0]
        neg_start_index = np.array(session["label_indices"])[labels_time_sorted[-2]][1]
        neg_end_index = np.array(session["label_indices"])[labels_time_sorted[-1]][0]
        if neg_end_index  - neg_start_index > 500:
            return session
        
    sampling_rate_ppg = 112.3
    window_size, step_size = 180, 100
    time_window_size = window_size / sampling_rate_ppg
    s_indices = np.arange(neg_start_index, neg_end_index - window_size, step_size)
    s_times = np.linspace(neg_start_time, neg_end_time - time_window_size, len(s_indices))
    
    i = 0
    for s_index, s_time in zip(s_indices, s_times): 
        e_index = s_index + window_size
        e_time = s_time + time_window_size

        if e_index > len(session["acc_x"]):
            print(e_index)
            continue
        i = i + 1
        session["label_indices"].append((s_index, e_index))
        session["label_times"].append((s_time, e_time))
        session["label"].append("o")
        session["timestamp"].append(int(s_time*1000))
    print(f"Added {i} empty labels")
    return session
session = add_neg_labels(session)







Added 0 empty labels
