#### Convert the original dataset files (extension .dat) to CSV format.
 - The dataset comprises 20 DAT files.

#### Import libraries.

In [1]:

import wfdb
import pandas as pd


#### Functions for converting the list of DAT files to CSV format.
- The result is grouped into a single CSV file.

In [2]:

# Constants for task names.
FIRST_RELAX_TASK = "first_relax_task"
PHYS_STRESS_TASK = "physical_stress_task"
SECOND_RELAX_TASK = "second_relax_task"
COGN_STRESS_TASK = "cognitive_stress_task"
THIRD_RELAX_TASK = "third_relax_task"
EMOT_STRESS_TASK = "emotional_stress_task"
FOURTH_RELAX_TASK = "fourth_relax_task"

# Constants for timestamp range limits.
I_RANGE_VALUE = "initial_range_value"
F_RANGE_VALUE = "final_range_value"

# Constants for tasks sequence in annotation locations in samples relative to the beginning of the record.
FIRST_RELAX_TASK_IDX = 0
PHYS_STRESS_TASK_IDX = 1
SECOND_RELAX_TASK_IDX = 2
MINI_EMOT_STRESS_TASK_IDX = 3
COGN_STRESS_TASK_IDX = 4
THIRD_RELAX_TASK_IDX = 5
EMOT_STRESS_TASK_IDX = 6
FOURTH_RELAX_TASK_IDX = 7

def get_range_indexes(annotation_sample, task_seq):
    # Returns initial and final indexes of the annotations for each task.
    # Parameters:
    #    annotation_sample (list): list containing the initial indexes of the annotation locations for each task.
    #    task_seq (int): task sequence
    # Return:
    #    Dictionary with keys "initial_range_value" and "final_range_value".
    if annotation_sample is None or len(annotation_sample) == 0:
        return None
    result_dict = {}
    try:
        i_idx = annotation_sample[task_seq] // 8
        f_idx = (annotation_sample[task_seq] + 2400) // 8
        result_dict[I_RANGE_VALUE] = i_idx
        result_dict[F_RANGE_VALUE] = f_idx
    except Exception as e:
        return None
    return result_dict

def extract_timestamps_ranges(annotation_sample):
    # Returns initial and final indexes of the annotations for all tasks.
    # These indices represent the start and end timestamps of each task.
    # Parameters:
    #    annotation_sample (list): list containing the initial indexes of the annotation locations for each task.
    # Return:
    #    Dictionary with keys for each task.
    print("\nInit extract_timestamps_ranges function...")
    if annotation_sample is None or len(annotation_sample) == 0:
        return None
    result = {}
    try:
        result[FIRST_RELAX_TASK] = get_range_indexes(annotation_sample, FIRST_RELAX_TASK_IDX)
        result[PHYS_STRESS_TASK] = get_range_indexes(annotation_sample, PHYS_STRESS_TASK_IDX)
        result[SECOND_RELAX_TASK] = get_range_indexes(annotation_sample, SECOND_RELAX_TASK_IDX)
        result[COGN_STRESS_TASK] = get_range_indexes(annotation_sample, COGN_STRESS_TASK_IDX)
        result[THIRD_RELAX_TASK] = get_range_indexes(annotation_sample, THIRD_RELAX_TASK_IDX)
        result[EMOT_STRESS_TASK] = get_range_indexes(annotation_sample, EMOT_STRESS_TASK_IDX)
        result[FOURTH_RELAX_TASK] = get_range_indexes(annotation_sample, FOURTH_RELAX_TASK_IDX)
    except Exception as e:
        return None
    print("Finish extract_timestamps_ranges function...")
    return result;

def is_valid_timestamps_ranges_list(tasks_timestamps_range_dict):
    # Checks whether the timestamp range settings for each task were successfully loaded.
    # Parameters:
    #    tasks_timestamps_range_dict (dict): initial and final value settings of timestamps for each task.
    # Return:
    #    True or False
    if (len(tasks_timestamps_range_dict.keys()) != 7):
        return False
    for key_names in tasks_timestamps_range_dict.keys():
        if tasks_timestamps_range_dict[key_names] is None:
            return False
    return True

def join_files(files_list):
    # Generate a CSV file (dataset) that groups the contents of the DAT files.
    # Parameters:
    #    files_list (list): list of files to be processed.
    print("\nInit join_files function...")
    try:
        # Check the structure of DAT files. The structure can be verified in the MIT Header file (.hea)
        record = wfdb.rdrecord("../dataset/datafiles/" + files_list[0]) 
        print("\nDAT files structure:")
        print(record.__dict__)
        # Result Dataframe
        df_final = pd.DataFrame(columns = ["hr", "label"])
        # Process files list.
        for file_name in files_list:
            # Load the DAT file.
            print("\nInit loading {} file...".format(file_name))
            record = wfdb.rdrecord("../dataset/datafiles/" + file_name) 
            df = record.to_dataframe()
            # Get timestamps ranges for each task and load its annotations.
            annotation = wfdb.rdann("../dataset/datafiles/" + file_name, "atr")
            range_dict = extract_timestamps_ranges(annotation.sample)
            if range_dict is None or not is_valid_timestamps_ranges_list(range_dict):
                print("\nFail to load {} file annotations.".format(file_name))
                continue
            print("\nTimestamp ranges:")
            print(range_dict)
            # Add a target to the dataframe with a fake class.
            df["label"] = 5
            # Set correct annotations to the target.
            # Task first relaxation - class 0 - 5 minutes / 300 seconds.
            # Task physical stress - class 1 - 5 minutes / 300 seconds.
            # Task second relaxation - class 0 - 5 minutes / 300 seconds.
            # Task cognitive stress - class 2 - 5 minutures / 300 seconds.
            # Task third relaxation - class 0 - 5 minutues / 300 seconds.
            # Task emotional stress - class 3 - 5 minutes / 300 seconds.
            frt = range_dict[FIRST_RELAX_TASK]
            df.iloc[frt[I_RANGE_VALUE]:frt[I_RANGE_VALUE] + 300, 2] = 0
            pst = range_dict[PHYS_STRESS_TASK]
            df.iloc[pst[I_RANGE_VALUE]:pst[I_RANGE_VALUE] + 300, 2] = 1
            srt = range_dict[SECOND_RELAX_TASK]
            df.iloc[srt[I_RANGE_VALUE]:srt[I_RANGE_VALUE] + 300, 2] = 0
            cst = range_dict[COGN_STRESS_TASK]
            df.iloc[cst[I_RANGE_VALUE]:cst[I_RANGE_VALUE] + 300, 2] = 2
            trt = range_dict[THIRD_RELAX_TASK]
            df.iloc[trt[I_RANGE_VALUE]:trt[I_RANGE_VALUE] + 300, 2] = 0
            est = range_dict[EMOT_STRESS_TASK]
            df.iloc[est[I_RANGE_VALUE]:est[I_RANGE_VALUE] + 300, 2] = 3
            fhrt = range_dict[FOURTH_RELAX_TASK]
            df.iloc[fhrt[I_RANGE_VALUE]:fhrt[I_RANGE_VALUE] + 300, 2] = 0
            # Keep only the HR feature in the dataset. The SpO2 feature will not be used for this study.
            df.drop(["SpO2"], axis = "columns", inplace = True)
            # Records from the "mini emotional stress" task and surplus records will be removed from the dataset.
            df_aux = df[df["label"] != 5]
            df_final = pd.concat([df_final, df_aux])
            print("\nFinish loading {} file...".format(file_name))
    except Exception as e:
        print("\nFail to load DAT files.")
        print("Error: {}".format(e))
        return None
    print("\n")
    print(df_final.describe)
    print("\nStart writing original-noneeg-dataset.csv file...")
    df_final.to_csv('../dataset/original-noneeg-dataset.csv', index = False,  sep='|')
    print("Finish writing CSV file...")


#### Generate the list with the names of the DAT files to be converted to CSV and call the conversion function.
- A file named original-noneeg-dataset.csv will be generated.

In [3]:

files_list = []

# File name pattern: Subject[X]_SpO2HR
for i in range(1, 21):
    files_list.append("Subject" + str(i) + "_SpO2HR")

join_files(files_list)



Init join_files function...

DAT files structure:
{'record_name': 'Subject1_SpO2HR', 'n_sig': 2, 'fs': 1, 'counter_freq': None, 'base_counter': None, 'sig_len': 2299, 'base_time': None, 'base_date': None, 'comments': ['age: 30', 'gender: M', 'height/cm: 177', 'weight/kg: 94'], 'sig_name': ['SpO2', 'hr'], 'p_signal': array([[96.99996948, 89.00076296],
       [96.99996948, 88.00013733],
       [96.99996948, 87.00044252],
       ...,
       [94.00006104, 76.0000763 ],
       [95.00003052, 75.00038148],
       [95.00003052, 74.00068667]]), 'd_signal': None, 'e_p_signal': None, 'e_d_signal': None, 'file_name': ['Subject1_SpO2HR.dat', 'Subject1_SpO2HR.dat'], 'fmt': ['16', '16'], 'samps_per_frame': [1, 1], 'skew': [None, None], 'byte_offset': [None, None], 'adc_gain': [10922.3333333, 1074.32786885], 'baseline': [-1048544, -103672], 'units': ['%', 'bpm'], 'adc_res': [16, 16], 'adc_zero': [0, 0], 'init_value': [10922, -8056], 'checksum': [54860, 19971], 'block_size': [0, 0]}

Init loading Subj


Init extract_timestamps_ranges function...
Finish extract_timestamps_ranges function...

Timestamp ranges:
{'first_relax_task': {'initial_range_value': 0, 'final_range_value': 300}, 'physical_stress_task': {'initial_range_value': 300, 'final_range_value': 600}, 'second_relax_task': {'initial_range_value': 628, 'final_range_value': 928}, 'cognitive_stress_task': {'initial_range_value': 968, 'final_range_value': 1268}, 'third_relax_task': {'initial_range_value': 1322, 'final_range_value': 1622}, 'emotional_stress_task': {'initial_range_value': 1622, 'final_range_value': 1922}, 'fourth_relax_task': {'initial_range_value': 1981, 'final_range_value': 2281}}

Finish loading Subject18_SpO2HR file...

Init loading Subject19_SpO2HR file...

Init extract_timestamps_ranges function...
Finish extract_timestamps_ranges function...

Timestamp ranges:
{'first_relax_task': {'initial_range_value': 0, 'final_range_value': 300}, 'physical_stress_task': {'initial_range_value': 300, 'final_range_value': 6