# Also known as dataloader part

In [1]:
import torch
import pandas as pd
import numpy as np
from scipy import signal

In [2]:
def Load_Dataset_A(data_path, model):
    """
    Load Dataset A. Since the dataset does not provide code, we follow the original literature to preprocess the data.

    Args:
        data_path (str): dataset path.
        model (str): 'fNIRS-T' or 'fNIRS-PreT'. fNIRS-T uses preprocessed data, and fNIRS-PreT uses raw data.

    Returns:
        feature : fNIRS signal data.
        label : fNIRS labels.
    """
    assert model in ['fNIRS-T', 'fNIRS-PreT', 'fNIRS_TTT', "fNIRS_TTT_bs16"], ''' The model parameter is 'fNIRS-T' or 'fNIRS-PreT' or 'fNIRS_TTT' '''
    feature = []
    label = []
    for sub in range(1, 3):
        if sub >= 4:
            trial_num = 4
        else:
            trial_num = 3
        for i in range(1, trial_num + 1):
            times = i
            path = data_path + '/S' + str(sub) + '/s' + str(sub) + str(times) + '_hb.xls'
            hb = pd.read_excel(path, header=None)
            path = data_path + '/S' + str(sub) + '/s' + str(sub) + str(times) + '_trial.xls'
            trial = pd.read_excel(path, header=None)
            path = data_path + '/S' + str(sub) + '/s' + str(sub) + str(times) + '_y.xls'
            y = pd.read_excel(path, header=None)

            hb = np.array(hb)
            trial = np.array(trial)
            # 1:MA, 2:REST
            y = np.array(y)

            HbO = hb[:, 0:52]
            HbR = hb[:, 52:104]

            # fNIRS-T uses preprocessed data
            if model == 'fNIRS-T' or model == 'fNIRS_TTT' or model == 'fNIRS_TTT_bs16':
                b, a = signal.butter(4, 0.018, 'lowpass')
                HbO = signal.filtfilt(b, a, HbO, axis=0)
                HbR = signal.filtfilt(b, a, HbR, axis=0)
                b, a = signal.butter(3, 0.002, 'highpass')
                HbO = signal.filtfilt(b, a, HbO, axis=0)
                HbR = signal.filtfilt(b, a, HbR, axis=0)

            HbO = HbO.transpose((1, 0))
            HbR = HbR.transpose((1, 0))

            MA_tr = []
            REST_tr = []
            for i in range(y.shape[0]):
                if y[i, 0] == 1:
                    MA_tr.append(trial[i, 0])
                else:
                    REST_tr.append(trial[i, 0])

            HbO_MA = []
            HbO_BL = []
            HbR_MA = []
            HbR_BL = []

            for i in range(int(len(MA_tr))):
                tr = MA_tr[i]
                HbO_MA.append(HbO[:, tr: tr + 140])
                HbR_MA.append(HbR[:, tr: tr + 140])
                HbO_BL.append(HbO[:, tr + 160: tr + 300])
                HbR_BL.append(HbR[:, tr + 160: tr + 300])

            # fNIRS channels = 52, sampling points = 140
            HbO_MA = np.array(HbO_MA).reshape((-1, 1, 52, 140))
            HbO_BL = np.array(HbO_BL).reshape((-1, 1, 52, 140))
            HbR_MA = np.array(HbR_MA).reshape((-1, 1, 52, 140))
            HbR_BL = np.array(HbR_BL).reshape((-1, 1, 52, 140))

            HbO_MA = np.concatenate((HbO_MA, HbR_MA), axis=1)
            HbO_BL = np.concatenate((HbO_BL, HbR_BL), axis=1)

            for i in range(HbO_MA.shape[0]):
                feature.append(HbO_MA[i, :, :, :])
                feature.append(HbO_BL[i, :, :, :])

                label.append(0)
                label.append(1)

        print(str(sub) + '  OK')

    feature = np.array(feature)
    label = np.array(label)
    print('feature', feature.shape)
    print('label', label.shape)

    return feature, label

In [3]:
class Dataset(torch.utils.data.Dataset):
    """
    Load data for training

    Args:
        feature: input data.
        label: class for input data.
        transform: Z-score normalization is used to accelerate convergence (default:True).
    """
    def __init__(self, feature, label, transform=True):
        self.feature = feature
        self.label = label
        self.transform = transform
        self.feature = torch.tensor(self.feature, dtype=torch.float)
        self.label = torch.tensor(self.label, dtype=torch.float)
        print(self.feature.shape)
        print(self.label.shape)

    def __len__(self):
        return len(self.label)

    def __getitem__(self, item):
        # z-score normalization
        if self.transform:
            mean, std = self.feature[item].mean(), self.feature[item].std()
            self.feature[item] = (self.feature[item] - mean) / std

        return self.feature[item], self.label[item]

In [6]:
feature, label = Load_Dataset_A("data_example", model='fNIRS-T')

1  OK
2  OK
feature (72, 2, 52, 140)
label (72,)


In [13]:
_, _, channels, sampling_points = feature.shape
print(channels, sampling_points)

52 140


In [14]:
def Split_Dataset_A(sub, feature, label, channels):
    """
    LOSO-CV for Dataset A

    Args:
        sub: leave one subject out.
        feature: input fNIRS signals.
        label: input fNIRS labels.
        channels: fNIRS channels.

    Returns:
        X_train: training set.
        y_train: labels for training set.
        X_test: test set.
        y_test: labels for test set.
    """
    if sub == 1:
        X_test = feature[: 36]
        y_test = label[: 36]
        X_train = feature[36:]
        y_train = label[36:]
    elif sub == 2:
        X_test = feature[300:]
        y_test = label[300:]
        X_train = feature[:300]
        y_train = label[: 300]
    else:
        start, end = 0, 0
        if sub in [2, 3]:
            start = 36 * (sub - 1)
            end = 36 * sub
        elif sub in [4, 5, 6, 7]:
            start = 108 + 48 * (sub - 4)
            end = 108 + 48 * (sub - 3)

        X_test = feature[start: end]
        y_test = label[start: end]
        feature_set_1 = feature[: start]
        label_set_1 = label[:start]
        feature_set_2 = feature[end:]
        label_set_2 = label[end:]
        X_train = np.append(feature_set_1, feature_set_2, axis=0)
        y_train = np.append(label_set_1, label_set_2, axis=0)

    X_train = X_train.reshape((X_train.shape[0], 2, channels, -1))
    X_test = X_test.reshape((X_test.shape[0], 2, channels, -1))

    return X_train, y_train, X_test, y_test

In [15]:
for sub in range(1, 3):
    X_train, y_train, X_test, y_test = Split_Dataset_A(sub, feature, label, channels)
    train_set = Dataset(X_train, y_train, transform=True)
    test_set = Dataset(X_test, y_test, transform=True)
    train_loader = torch.utils.data.DataLoader(train_set, batch_size=128, shuffle=True)
    test_loader = torch.utils.data.DataLoader(test_set, batch_size=128, shuffle=True)

