In [1]:
import pandas as pd
import numpy as np
import os

In [2]:
train_files = ["aleppo_train.csv"]
validation_files = ["aleppo_validation.csv"]
test_files = ["aleppo_test.csv"]
train_datas = []
validation_datas = []
test_datas = []

In [3]:
for train_file in train_files:
    train_datas.append(pd.read_csv(train_file))
for validation_file in validation_files:
    validation_datas.append(pd.read_csv(validation_file))
for test_file in test_files:
    test_datas.append(pd.read_csv(test_file))

In [5]:
folder_paths = ["Aleppo2017"]
file_names = []
for folder_path in folder_paths:
    # Assuming the folder contains files to be processed
    # You can replace this with your actual logic
    # For example, if you want to read all CSV files in the folder:
    file_names.append(os.listdir(folder_path))

before applying code, check the consistency between file_name and ptid

In [6]:
# need to do some special preprocessing for some data's id
# train, validation, test, folder_paths, file_names's(list of files from each folder_path) sequence are the same
def creating_folder(new_folder_name, datas, sample_number=50, replace=False):

    counting = 0
    new_folder = os.makedirs(new_folder_name, exist_ok=True)
    # for all the train_datas
    for i, data in enumerate(datas):
        # for each train_data
        for id in data["PtID"]:
            # get the matching subjects
            matching_files = [f for f in file_names[i] if f.split("_")[0] == str(id)]

            # print(f"Files for ID {id}: {matching_files}")

            all_segments = []
            # every segment of every subjects
            for matching_file in matching_files:
                file_path = os.path.join(folder_paths[i], matching_file)
                cont_data = np.load(file_path)
                all_segments.append(cont_data)

            # concat data from the same subject together
            concatenated_data = np.concatenate(all_segments, axis=0)

            # arr is your numpy array, shape (N, ...)
            num_rows = concatenated_data.shape[0]
            # for those with too many data, select a subset of 50
            if num_rows >= sample_number:
                random_indices = np.random.choice(
                    num_rows, size=sample_number, replace=replace
                )
                concatenated_data = concatenated_data[random_indices]

                # chunk each segment into multiple one day data, and each one day data is saved as a training sample
                for j in range(len(concatenated_data)):

                    # Save the loaded data to the train folder
                    trial_id = train_files[i][0]
                    new_file_path = os.path.join(
                        new_folder_name, f"{trial_id}_{counting}.npy"
                    )
                    np.save(new_file_path, concatenated_data[j].flatten())
                    counting += 1

In [7]:
creating_folder("train", train_datas, 5, False)

In [8]:
creating_folder("validation", validation_datas, 5, False)

In [9]:
synthetic = [0]

In [10]:
label_columns = ["label"]

In [11]:
def calculate_tir(cgm_array, lower=70, upper=180):
    """
    cgm_array: numpy array of shape (7, 1440, 1)
    lower, upper: glucose range for TIR (mg/dL)
    Returns: TIR for each day as a list of percentages
    """

    flatten_cgm = cgm_array.flatten()  # (7,1440,1)->(7*1440,)
    in_range = np.logical_and(flatten_cgm >= lower, flatten_cgm <= upper)
    tir = np.sum(in_range) / len(flatten_cgm) * 100
    return tir

In [12]:
def tir_group(tir):
    if tir > 90:
        return 0
    elif tir > 70:
        return 1
    elif tir > 50:
        return 2
    else:
        return 3

In [13]:
test_folder = os.makedirs("test", exist_ok=True)
# for all the train_datas
for i, test_data in enumerate(test_datas):
    # for each train_data
    for test_id, test_label in zip(test_data["PtID"], test_data[label_columns[i]]):
        # get the matching subjects
        matching_files = [f for f in file_names[i] if f.split("_")[0] == str(test_id)]

        # print(f"Files for ID {test_id}: {matching_files}")

        # remove hba1c label
        test_label //= 10
        # replace last digit with 0
        test_label *= 10

        # for counting test segment belongs to a subject
        test_counting = 0
        for matching_file in matching_files:
            file_path = os.path.join(folder_paths[i], matching_file)
            cont_data = np.load(file_path)

            for j in range(0, len(cont_data) - 7 + 1, 3):
                seven_days_data = cont_data[j : j + 7]
                seven_days_data_squeeze = seven_days_data.squeeze()

                # trial name use the first character of the file name
                trial_id = test_files[i][0]
                synthetic_id = synthetic[i]
                pt_id = test_id
                segment_id = test_counting

                test_counting += 1
                # calculate tir
                tir = calculate_tir(seven_days_data_squeeze)
                tir_index = tir_group(tir)
                segment_test_label = test_label + tir_index

                new_file_path = os.path.join(
                    "test",
                    f"{trial_id}_{synthetic_id}_{pt_id}_{segment_id}_{segment_test_label}.npy",
                )
                np.save(new_file_path, seven_days_data_squeeze)