In [None]:
import pandas as pd
import numpy as np
import torch
from sklearn.model_selection import train_test_split

In [None]:
full_dataset = pd.DataFrame(columns=['Timestamp', 'ActivityCounts', 'Barometer', 
                                     'BloodPerfusion', 'BloodPulseWave', 'EnergyExpenditure', 
                                     'HR', 'HRV', 'RESP', 'Steps', 'SkinTemperature', 
                                     'SubjectID'])
date_format = '%d.%m.%y %H:%M'
dfs=[]
num_rows = 0
for i in range(1, 29):
    file_path = 'data/raw/subjectID_' + str(i) + '.csv'
    df = pd.read_csv(file_path)

    if 'ActivityClass' in df.columns:
        df = df.drop(['ActivityClass'], axis=1)
    if 'GalvanicSkinResponse' in df.columns:
        df = df.drop(['GalvanicSkinResponse'], axis=1)

    if 'SkinTemperature.Value' in df.columns:
        df = df.rename(columns={'SkinTemperature.Value': 'SkinTemperature'})
    
    df['Timestamp'] = pd.to_datetime(df['Timestamp'], format=date_format)
    df = df.set_index('Timestamp')
    
    # resample to be the mean of hour
    df = df.resample('H').mean()
    df = df.reset_index()

    df['SubjectID'] = i
    dfs.append(df)

full_dataset = pd.concat(dfs, ignore_index=True)

In [None]:
full_dataset

In [None]:
fatiguePROs = pd.read_csv('data/fatiguePROs_processed.csv')

fatiguePROs.reset_index(drop=True, inplace=True)
# convert the Timestamp column to datetime type
fatiguePROs['DateTime'] = pd.to_datetime(fatiguePROs['DateTime'], format=date_format)
# find all timestamp that have missing values in the proanswers column
missing_proanswers = fatiguePROs[fatiguePROs['PROanswer_value'].isnull()].DateTime.unique()
# drop all rows that have the above timestamps
fatiguePROs = fatiguePROs[~fatiguePROs['DateTime'].isin(missing_proanswers)]
fatiguePROs.reset_index(drop=True, inplace=True)
fatiguePROs


In [None]:
# extract 5 day segments from full_dataset
segments_fulldataset = []
segments_subjectIDs = []
PROlabels_PhF = []
PROlabels_MF = []
PROlabels_VAS = []
PROlabels_RelP = []

for i in range(1, 29):
    subject_data = full_dataset[full_dataset['SubjectID'] == i]
    subject_labels = fatiguePROs[fatiguePROs['SubjectID'] == i]
    unique_date_time = subject_labels['DateTime'].unique()
    
    for j in range(len(unique_date_time)):
        start_date_time = max(unique_date_time[j] - pd.Timedelta(days=5), subject_data['Timestamp'].min())
        end_date_time = unique_date_time[j]
        
        segment = subject_data[(subject_data['Timestamp'] > start_date_time) & (subject_data['Timestamp'] <= end_date_time)]  
        segment = segment.drop(columns=['Timestamp'])
        segment = segment.drop(columns=['SubjectID'])

        segment = (segment - segment.mean()) / segment.std()

        # if >=80% of the values in the segment are not NaN or None, append the segment to a new data frame
        if segment.size > 0 and segment.notnull().sum().sum() / segment.size >= 0.8 and segment.shape[0] == 120:
            segment = segment.to_numpy()
            segments_fulldataset.append(segment)
            segments_subjectIDs.append(i)

            # find the PROlabel that corresponds to the DateTime, append it to PROlabels
            PROlabel = subject_labels[subject_labels['DateTime'] == end_date_time]
            PROlabels_PhF.append(PROlabel[PROlabel['question'] == 'PhF'].PROanswer_value.values[0])
            PROlabels_MF.append(PROlabel[PROlabel['question'] == 'MF'].PROanswer_value.values[0])
            PROlabels_VAS.append(PROlabel[PROlabel['question'] == 'VAS'].PROanswer_value.values[0])
            PROlabels_RelP.append(PROlabel[PROlabel['question'] == 'RelP'].PROanswer_value.values[0])

len(segments_fulldataset)

In [None]:
# number of occurrences of each subject ID
_, counts = np.unique(segments_subjectIDs, return_counts=True)
counts

In [None]:
def split_into_groups(nums, num_groups):
    nums_with_index = list(enumerate(nums))
    nums_with_index.sort(key=lambda x: x[1], reverse=True)

    group_assignments = [0] * len(nums)
    group_sums = [0] * num_groups

    for index, num in nums_with_index:
        min_sum_index = min(range(num_groups), key=lambda i: group_sums[i])
        group_sums[min_sum_index] += num
        group_assignments[index] = min_sum_index

    return group_assignments, group_sums

num_groups = 5

group_assignments, group_sums = split_into_groups(counts, num_groups)
print(group_assignments)

for i, group_sum in enumerate(group_sums):
    print(f"Group {i + 1} sum: {group_sum}")



In [None]:
full_group_assignemnt = []

for group, count in zip(group_assignments, counts):
    full_group_assignemnt.extend([group] * count)

print(full_group_assignemnt)

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, mean_squared_error
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt

class AttentionLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(AttentionLSTM, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        attention_weights = self.softmax(lstm_out)
        context_vector = torch.sum(attention_weights * lstm_out, dim=1)
        output = self.fc(context_vector)
        return output

In [None]:
PROlabels = [PROlabels_PhF, PROlabels_MF, PROlabels_VAS, PROlabels_RelP]
PROlabels_name = ['PhF', 'MF', 'VAS', 'RelP']

lst_outputs = []
lst_targets = []

for i in range(len(PROlabels)):
    X_train, X_test, y_train, y_test = train_test_split(segments_fulldataset, PROlabels[i], test_size=0.2, random_state=42)
    # Convert data to PyTorch tensors
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

    X_train_tensor = torch.nan_to_num(X_train_tensor, nan=-10000.0)
    X_test_tensor = torch.nan_to_num(X_test_tensor, nan=-10000.0)

    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
    batch_size = 64
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    # Initialize the model, loss function, and optimizer
    input_size = 10 
    hidden_size = 64 
    output_size = 1 
    model = AttentionLSTM(input_size, hidden_size, output_size)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Train the model
    num_epochs = 30
    train_losses = []

    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0.0
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs.squeeze(), labels)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()

        avg_epoch_loss = epoch_loss / len(train_loader)
        train_losses.append(avg_epoch_loss)
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_epoch_loss:.4f}')
    
    # torch.save(model, f'models/LSTMWithSelfAttention_{PROlabels_name[i]}.pth')
    
    # Evaluate the model
    model.eval()
    with torch.no_grad():
        test_outputs = []
        for inputs, labels in test_loader:
            outputs = model(inputs)
            test_outputs.extend(outputs.squeeze().tolist())

    test_outputs = np.array(test_outputs)
    y_test = np.array(y_test)
    lst_outputs.append(test_outputs)
    lst_targets.append(y_test)

    if PROlabels_name[i] == 'VAS':
        plt.xlim(0.5, 10.5)
        plt.ylim(0.5, 10.5)
    elif PROlabels_name[i] == 'RelP':
        plt.xlim(-1.25, 1.25)
        plt.ylim(-1.25, 1.25)
    else:
        plt.xlim(-0.5, 4.5)
        plt.ylim(-0.5, 4.5)

    plt.gca().set_aspect('equal', adjustable='box')
    # Plot scatter plot
    plt.scatter(y_test, test_outputs, alpha=0.5)
    plt.title(PROlabels_name[i], y=-0.2)
    plt.xlabel('Groud Truth')
    plt.ylabel('Predictions')
    plt.show()
    plt.clf()

    # Evaluate the performance 
    mse = mean_squared_error(y_test, test_outputs)
    print(f'Mean Squared Error on Test Data: {mse}')

In [None]:
# data-based kfold CV
from sklearn.metrics import accuracy_score, recall_score, f1_score, precision_score, mean_squared_error
from sklearn.model_selection import KFold
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim
import numpy as np
import torch.nn as nn
from sklearn.model_selection import train_test_split

PROlabels = [PROlabels_PhF, PROlabels_MF, PROlabels_VAS, PROlabels_RelP]
PROlabels_name = ['PhF', 'MF', 'VAS', 'RelP']

lst_outputs = []
lst_targets = []

# Number of folds for cross-validation
num_folds = 5
kf = KFold(n_splits=num_folds, shuffle=True, random_state=42)
mse_values = {label: [] for label in PROlabels_name}

for i, label in enumerate(PROlabels_name):
    X = np.array(segments_fulldataset)
    y = np.array(PROlabels[i])

    fold = 1
    for train_index, test_index in kf.split(X):
        print(f'Fold {fold}/{num_folds}')

        X_train, X_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]

        # Convert data to PyTorch tensors
        X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
        y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
        X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
        y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

        X_train_tensor = torch.nan_to_num(X_train_tensor, nan=-10000.0)
        X_test_tensor = torch.nan_to_num(X_test_tensor, nan=-10000.0)
        # Create DataLoader
        train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
        test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
        batch_size = 64
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

        # Initialize the model, loss function, and optimizer
        input_size = 10 
        hidden_size = 64 
        output_size = 1 
        model = AttentionLSTM(input_size, hidden_size, output_size)
        criterion = nn.MSELoss()

        optimizer = optim.Adam(model.parameters(), lr=0.001)

        # Train the model
        num_epochs = 30
        train_losses = []

        for epoch in range(num_epochs):
            model.train()
            epoch_loss = 0.0
            for inputs, labels in train_loader:
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs.squeeze(), labels)
                loss.backward()
                optimizer.step()
                epoch_loss += loss.item()

            avg_epoch_loss = epoch_loss / len(train_loader)
            train_losses.append(avg_epoch_loss)
            print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_epoch_loss:.4f}')
        
        # torch.save(model, f'models/LSTMWithSelfAttention_{PROlabels_name[i]}_binary.pth')
        
        # Evaluate the model
        model.eval()
        with torch.no_grad():
            test_outputs = []
            for inputs, labels in test_loader:
                outputs = model(inputs).squeeze()

                test_outputs.extend(outputs.squeeze().tolist())
  
        mse = mean_squared_error(y_test, test_outputs)
        mse_values[label].append(mse)
        print(f'Mean Squared Error on Test Data (Fold {fold}): {mse}')

        fold += 1

# Calculate average and standard deviation for each PROlabel
for label in PROlabels_name:
    mse_array = np.array(mse_values[label])
    avg_mse = np.mean(mse_array)
    sd_mse = np.std(mse_array)
    print(f'PROlabel: {label}, Average MSE: {avg_mse}, SD MSE: {sd_mse}')

In [None]:
# subject-based kfold CV
from sklearn.metrics import accuracy_score, recall_score, f1_score, precision_score, mean_squared_error
from sklearn.model_selection import KFold
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim
import numpy as np
import torch.nn as nn
from sklearn.model_selection import train_test_split

PROlabels = [PROlabels_PhF, PROlabels_MF, PROlabels_VAS, PROlabels_RelP]
PROlabels_name = ['PhF', 'MF', 'VAS', 'RelP']

lst_outputs = []
lst_targets = []
# Number of folds for cross-validation
num_folds = 5
kf = KFold(n_splits=num_folds, shuffle=True, random_state=42)
mse_values = {label: [] for label in PROlabels_name}

new_array = full_group_assignemnt
for i, label in enumerate(PROlabels_name):
    X = np.array(segments_fulldataset)
    y = np.array(PROlabels[i])

    fold = 1
    for group in np.unique(new_array):
        print(f'Fold {fold}/{num_folds}')

        group_indices = np.where(new_array == group)[0]

        # Split data based on group indices
        X_test = X[group_indices]
        y_test = y[group_indices]

        train_indices = np.setdiff1d(np.arange(len(X)), group_indices)
        X_train = X[train_indices]
        y_train = y[train_indices]

        # Convert data to PyTorch tensors
        X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
        y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
        X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
        y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

        X_train_tensor = torch.nan_to_num(X_train_tensor, nan=-10000.0)
        X_test_tensor = torch.nan_to_num(X_test_tensor, nan=-10000.0)
        # Create DataLoader
        train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
        test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
        batch_size = 64
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

        # Initialize the model, loss function, and optimizer
        input_size = 10  
        hidden_size = 64  
        output_size = 1  
        model = AttentionLSTM(input_size, hidden_size, output_size)
        criterion = nn.MSELoss()

        optimizer = optim.Adam(model.parameters(), lr=0.001)

        # Train the model
        num_epochs = 30
        train_losses = []

        for epoch in range(num_epochs):
            model.train()
            epoch_loss = 0.0
            for inputs, labels in train_loader:
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs.squeeze(), labels)
                loss.backward()
                optimizer.step()
                epoch_loss += loss.item()

            avg_epoch_loss = epoch_loss / len(train_loader)
            train_losses.append(avg_epoch_loss)
            print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_epoch_loss:.4f}')
        
        # torch.save(model, f'models/LSTMWithSelfAttention_{PROlabels_name[i]}_binary.pth')
        
        # Evaluate the model
        model.eval()
        with torch.no_grad():
            test_outputs = []
            for inputs, labels in test_loader:
                outputs = model(inputs).squeeze()
                # print(outputs)
                # if output is a float, convert to a tensor of size 1
                if outputs.dim() == 0:
                    test_outputs.append(outputs)
                else:
                    test_outputs.extend(outputs.tolist())
                
        mse = mean_squared_error(y_test, test_outputs)
        mse_values[label].append(mse)
        print(f'Mean Squared Error on Test Data (Fold {fold}): {mse}')

        fold += 1

# Calculate average and standard deviation for each PROlabel
for label in PROlabels_name:
    mse_array = np.array(mse_values[label])
    avg_mse = np.mean(mse_array)
    sd_mse = np.std(mse_array)
    print(f'PROlabel: {label}, Average MSE: {avg_mse}, SD MSE: {sd_mse}')

In [None]:
# 1 FOLD
from sklearn.metrics import accuracy_score, recall_score, f1_score, precision_score, mean_squared_error
from sklearn.model_selection import KFold
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim
import numpy as np
import torch.nn as nn
from sklearn.model_selection import train_test_split

PROlabels = [PROlabels_PhF, PROlabels_MF, PROlabels_VAS, PROlabels_RelP]
PROlabels_name = ['PhF', 'MF', 'VAS', 'RelP']

lst_outputs = []
lst_targets = []
# Number of folds for cross-validation
num_folds = 5
kf = KFold(n_splits=num_folds, shuffle=True, random_state=42)
mse_values = {label: [] for label in PROlabels_name}

new_array = full_group_assignemnt
for i, label in enumerate(PROlabels_name):
    X = np.array(segments_fulldataset)
    y = np.array(PROlabels[i])

    fold = 1
    for group in np.unique(new_array)[2:3]:
        print(f'Fold {fold}/{num_folds}')

        group_indices = np.where(new_array == group)[0]

        # Split data based on group indices
        X_test = X[group_indices]
        y_test = y[group_indices]

        train_indices = np.setdiff1d(np.arange(len(X)), group_indices)
        X_train = X[train_indices]
        y_train = y[train_indices]

        # Convert data to PyTorch tensors
        X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
        y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
        X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
        y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

        X_train_tensor = torch.nan_to_num(X_train_tensor, nan=-10000.0)
        X_test_tensor = torch.nan_to_num(X_test_tensor, nan=-10000.0)
        # Create DataLoader
        train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
        test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
        batch_size = 64
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

        # Initialize the model, loss function, and optimizer
        input_size = 10  
        hidden_size = 64
        output_size = 1  
        model = AttentionLSTM(input_size, hidden_size, output_size)
        criterion = nn.MSELoss()

        optimizer = optim.Adam(model.parameters(), lr=0.001)

        # Train the model
        num_epochs = 30
        train_losses = []

        for epoch in range(num_epochs):
            model.train()
            epoch_loss = 0.0
            for inputs, labels in train_loader:
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs.squeeze(), labels)
                loss.backward()
                optimizer.step()
                epoch_loss += loss.item()

            avg_epoch_loss = epoch_loss / len(train_loader)
            train_losses.append(avg_epoch_loss)
            print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_epoch_loss:.4f}')
        
        # torch.save(model, f'models/LSTMWithSelfAttention_{PROlabels_name[i]}_binary.pth')
        
        # Evaluate the model
        model.eval()
        with torch.no_grad():
            test_outputs = []
            for inputs, labels in test_loader:
                outputs = model(inputs).squeeze()
                # print(outputs)
                # if output is a float, convert to a tensor of size 1
                if outputs.dim() == 0:
                    test_outputs.append(outputs)
                else:
                    test_outputs.extend(outputs.tolist())
            
        mse = mean_squared_error(y_test, test_outputs)
        mse_values[label].append(mse)
        test_outputs = np.array(test_outputs)
        y_test = np.array(y_test)
        lst_outputs.append(test_outputs)
        lst_targets.append(y_test)
        print(f'Mean Squared Error on Test Data (Fold {fold}): {mse}')

        fold += 1

# Calculate average and standard deviation for each PROlabel
for label in PROlabels_name:
    mse_array = np.array(mse_values[label])
    avg_mse = np.mean(mse_array)
    sd_mse = np.std(mse_array)
    print(f'PROlabel: {label}, Average MSE: {avg_mse}, SD MSE: {sd_mse}')

In [None]:
import matplotlib.pyplot as plt
from scipy.stats import pearsonr
num_plots = len(lst_outputs)

# Create a 2x2 subplot layout
fig, axes = plt.subplots(nrows=2, ncols=2, figsize=(10, 10))

for i in range(num_plots):
    row = i // 2
    col = i % 2
    ax = axes[row, col]
    # print(len(lst_outputs_adjusted[i]), len(lst_targets[i]))
    ax.scatter(lst_targets[i], lst_outputs[i], alpha=0.3)
    m, b = np.polyfit(lst_targets[i], lst_outputs[i], 1)
    ax.annotate(f'r={np.corrcoef(lst_targets[i], lst_outputs[i])[0,1]:.2f}', xy=(0.05, 0.9), xycoords='axes fraction')
    ax.annotate(f'p={pearsonr(lst_targets[i], lst_outputs[i])[1]:.4f}', xy=(0.05, 0.85), xycoords='axes fraction')
    ax.plot(lst_targets[i], m*lst_targets[i] + b, color='black', linewidth=1, alpha=0.6)
    
    if PROlabels_name[i] == 'VAS':
        ax.set_xlim(0.5, 10.5)
        ax.set_ylim(0.5, 10.5)
    elif PROlabels_name[i] == 'RelP':
        ax.set_xlim(-1.25, 1.25)
        ax.set_ylim(-1.25, 1.25)
    else:
        ax.set_xlim(-0.5, 4.5)
        ax.set_ylim(-0.5, 4.5)
    ax.set_xlabel('Groud Truth')
    ax.set_ylabel('Predictions')

    ax.set_aspect('equal', adjustable='box')
    ax.set_title(f'({chr(97 + i)}) {PROlabels_name[i]}', fontsize=12, fontweight='bold', loc='left')

plt.tight_layout()
plt.show()

In [None]:
binarized_predictions = []
binarized_targets = []
for i in range(len(lst_outputs)-1):
    outputs = lst_outputs[i]
    if i < 2:
        binarized_predictions.append(np.where(np.round(outputs) >= 1 , 1, 0))
        binarized_targets.append(np.where(np.round(lst_targets[i]) >= 1 , 1, 0))
    else:
        binarized_predictions.append(np.where(np.round(outputs) >= 5 , 1, 0))
        binarized_targets.append(np.where(np.round(lst_targets[i]) >= 5 , 1, 0))

In [None]:
#find the accuracy of the predictions
from sklearn.metrics import accuracy_score
for i in range(len(binarized_predictions)):
    print(PROlabels_name[i])
    print(accuracy_score(binarized_targets[i], binarized_predictions[i]))
    print('')

In [None]:
# # save lst_outputs_adjusted, lst_outputs and lst_targets to csv files
# import csv
# with open('data/lst_outputs.csv', 'w') as f:
#     writer = csv.writer(f)
#     writer.writerows(lst_outputs)
# with open('data/lst_targets.csv', 'w') as f:
#     writer = csv.writer(f)
#     writer.writerows(lst_targets)

train model using binary labels

In [None]:
import torch.nn as nn
class AttentionLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(AttentionLSTM, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.softmax = nn.Softmax(dim=1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        attention_weights = self.softmax(lstm_out)
        context_vector = torch.sum(attention_weights * lstm_out, dim=1)
        output = self.fc(context_vector)
        # output = self.sigmoid(self.fc(context_vector))
        return output
    

In [None]:
from sklearn.metrics import accuracy_score, recall_score, f1_score, precision_score
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim

PROlabels = [PROlabels_PhF, PROlabels_MF, PROlabels_VAS, PROlabels_RelP]
# PROlabels_name = ['PhF', 'MF', 'VAS', 'RelP']

PROlabels = [np.where(np.array(PROlabels_PhF) >= 1 , 1, 0),
                    np.where(np.array(PROlabels_MF) >= 1 , 1, 0),
                    np.where(np.array(PROlabels_VAS) >= 5 , 1, 0)]

PROlabels_name = ['PhF', 'MF', 'VAS']

lst_outputs = []
lst_targets = []

for i in range(len(PROlabels)):
    X_train, X_test, y_train, y_test = train_test_split(segments_fulldataset, PROlabels[i], test_size=0.2, random_state=42)
    # Convert data to PyTorch tensors
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

    X_train_tensor = torch.nan_to_num(X_train_tensor, nan=-10000.0)
    X_test_tensor = torch.nan_to_num(X_test_tensor, nan=-10000.0)
    # Create DataLoader
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
    batch_size = 64
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    # Initialize the model, loss function, and optimizer
    input_size = 10 
    hidden_size = 64  
    output_size = 1  
    model = AttentionLSTM(input_size, hidden_size, output_size)
    criterion = nn.BCEWithLogitsLoss()

    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Train the model
    num_epochs = 30
    train_losses = []

    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0.0
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs.squeeze(), labels)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()

        avg_epoch_loss = epoch_loss / len(train_loader)
        train_losses.append(avg_epoch_loss)
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_epoch_loss:.4f}')
    
    torch.save(model, f'models/LSTMWithSelfAttention_{PROlabels_name[i]}_binary.pth')
    
    # Evaluate the model
    model.eval()
    with torch.no_grad():
        test_outputs = []
        for inputs, labels in test_loader:
            outputs = model(inputs).squeeze()
            outputs = np.where(outputs >= 0.5, 1, 0)
            test_outputs.extend(outputs.squeeze().tolist())

        # Compute evaluation metrics
        accuracy = accuracy_score(y_test, test_outputs)
        sensitivity = recall_score(y_test, test_outputs, average='weighted')
        f1 = f1_score(y_test, test_outputs, average='weighted')
        precision = precision_score(y_test, test_outputs, average='weighted', zero_division=1)
        recall = recall_score(y_test, test_outputs, average='weighted')
        print(f'Test Accuracy: {accuracy:.4f}'
              f'\nTest Precision: {precision:.4f}'
              f'\nTest Recall: {recall:.4f}'
              f'\nTest F1 Score: {f1:.4f}'
              f'\nTest Sensitivity: {sensitivity:.4f}')

In [None]:
# data-based k fold CV
from sklearn.metrics import accuracy_score, recall_score, f1_score, precision_score
from sklearn.model_selection import KFold
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim
import numpy as np
import torch.nn as nn
from sklearn.model_selection import train_test_split

PROlabels = [np.where(np.array(PROlabels_PhF) >= 1, 1, 0),
             np.where(np.array(PROlabels_MF) >= 1, 1, 0),
             np.where(np.array(PROlabels_VAS) >= 5, 1, 0)]

PROlabels_name = ['PhF', 'MF', 'VAS']

# Number of folds for cross-validation
num_folds = 5
kf = KFold(n_splits=num_folds, shuffle=True, random_state=42)

all_accuracies = {label: [] for label in PROlabels_name}
all_precisions = {label: [] for label in PROlabels_name}
all_recalls = {label: [] for label in PROlabels_name}
all_f1_scores = {label: [] for label in PROlabels_name}

for i, label in enumerate(PROlabels_name):
    X = np.array(segments_fulldataset)
    y = PROlabels[i]

    fold = 1
    for train_index, test_index in kf.split(X):
        print(f'Fold {fold}/{num_folds}')

        X_train, X_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]

        # Convert data to PyTorch tensors
        X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
        y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
        X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
        y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

        X_train_tensor = torch.nan_to_num(X_train_tensor, nan=-10000.0)
        X_test_tensor = torch.nan_to_num(X_test_tensor, nan=-10000.0)
        # Create DataLoader
        train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
        test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
        batch_size = 64
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

        # Initialize the model, loss function, and optimizer
        input_size = 10  
        hidden_size = 64  
        output_size = 1 
        model = AttentionLSTM(input_size, hidden_size, output_size)
        criterion = nn.BCEWithLogitsLoss()

        optimizer = optim.Adam(model.parameters(), lr=0.001)

        # Train the model
        num_epochs = 30
        train_losses = []

        for epoch in range(num_epochs):
            model.train()
            epoch_loss = 0.0
            for inputs, labels in train_loader:
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs.squeeze(), labels)
                loss.backward()
                optimizer.step()
                epoch_loss += loss.item()

            avg_epoch_loss = epoch_loss / len(train_loader)
            train_losses.append(avg_epoch_loss)
            # print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_epoch_loss:.4f}')
        
        # torch.save(model, f'models/LSTMWithSelfAttention_{PROlabels_name[i]}_binary.pth')
            
        # Evaluate the model
        model.eval()
        with torch.no_grad():
            test_outputs = []
            for inputs, labels in test_loader:
                outputs = model(inputs).squeeze()
                outputs = np.where(outputs >= 0.5, 1, 0)
                test_outputs.extend(outputs.squeeze().tolist())
            # Convert outputs to binary (0 or 1)

            # Compute evaluation metrics
            accuracy = accuracy_score(y_test, test_outputs)
            sensitivity = recall_score(y_test, test_outputs, average='weighted')
            f1 = f1_score(y_test, test_outputs, average='weighted')
            precision = precision_score(y_test, test_outputs, average='weighted', zero_division=1)
            recall = recall_score(y_test, test_outputs, average='weighted')
            print(f'Test Accuracy: {accuracy:.4f}'
                f'\nTest Precision: {precision:.4f}'
                f'\nTest Recall: {recall:.4f}'
                f'\nTest F1 Score: {f1:.4f}'
                f'\nTest Sensitivity: {sensitivity:.4f}')

        # Append metrics to lists
        all_accuracies[label].append(accuracy)
        all_precisions[label].append(precision)
        all_recalls[label].append(recall)
        all_f1_scores[label].append(f1)

        fold += 1

# Print average and standard deviation of metrics for each label
for label in PROlabels_name:
    print(f'\nLabel: {label}')
    print(f'Average Accuracy: {np.mean(all_accuracies[label]):.4f} ± {np.std(all_accuracies[label]):.4f}')
    print(f'Average Precision: {np.mean(all_precisions[label]):.4f} ± {np.std(all_precisions[label]):.4f}')
    print(f'Average Recall: {np.mean(all_recalls[label]):.4f} ± {np.std(all_recalls[label]):.4f}')
    print(f'Average F1 Score: {np.mean(all_f1_scores[label]):.4f} ± {np.std(all_f1_scores[label]):.4f}')

In [None]:
# subject-based k fold CV
from sklearn.metrics import accuracy_score, recall_score, f1_score, precision_score
from sklearn.model_selection import KFold
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim
import numpy as np
import torch.nn as nn
from sklearn.model_selection import train_test_split

PROlabels = [np.where(np.array(PROlabels_PhF) >= 1, 1, 0),
             np.where(np.array(PROlabels_MF) >= 1, 1, 0),
             np.where(np.array(PROlabels_VAS) >= 5, 1, 0)]

PROlabels_name = ['PhF', 'MF', 'VAS']

# Number of folds for cross-validation
num_folds = 5
kf = KFold(n_splits=num_folds, shuffle=True, random_state=42)

all_accuracies = {label: [] for label in PROlabels_name}
all_precisions = {label: [] for label in PROlabels_name}
all_recalls = {label: [] for label in PROlabels_name}
all_f1_scores = {label: [] for label in PROlabels_name}

for i, label in enumerate(PROlabels_name):
    X = np.array(segments_fulldataset)
    y = PROlabels[i]

    fold = 1
    for group in np.unique(new_array):
        print(f'Fold {fold}/{num_folds}')

        group_indices = np.where(new_array == group)[0]

        # Split data based on group indices
        X_test = X[group_indices]
        y_test = y[group_indices]

        train_indices = np.setdiff1d(np.arange(len(X)), group_indices)
        X_train = X[train_indices]
        y_train = y[train_indices]


        # Convert data to PyTorch tensors
        X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
        y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
        X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
        y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

        X_train_tensor = torch.nan_to_num(X_train_tensor, nan=-10000.0)
        X_test_tensor = torch.nan_to_num(X_test_tensor, nan=-10000.0)
        # Create DataLoader
        train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
        test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
        batch_size = 64
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

        # Initialize the model, loss function, and optimizer
        input_size = 10  
        hidden_size = 64 
        output_size = 1  
        model = AttentionLSTM(input_size, hidden_size, output_size)
        criterion = nn.BCEWithLogitsLoss()

        optimizer = optim.Adam(model.parameters(), lr=0.001)

        # Train the model
        num_epochs = 30
        train_losses = []

        for epoch in range(num_epochs):
            model.train()
            epoch_loss = 0.0
            for inputs, labels in train_loader:
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs.squeeze(), labels)
                loss.backward()
                optimizer.step()
                epoch_loss += loss.item()

            avg_epoch_loss = epoch_loss / len(train_loader)
            train_losses.append(avg_epoch_loss)
            print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_epoch_loss:.4f}')
        
        # torch.save(model, f'models/LSTMWithSelfAttention_{PROlabels_name[i]}_binary.pth')
        
        # Evaluate the model
        model.eval()
        with torch.no_grad():
            test_outputs = []
            for inputs, labels in test_loader:
                outputs = model(inputs).squeeze()
                outputs = np.where(outputs >= 0.5, 1, 0)
                if outputs.ndim == 0:
                    test_outputs.append(outputs)
                else:
                    test_outputs.extend(outputs.tolist())
            # Convert outputs to binary (0 or 1)

            # Compute evaluation metrics
            accuracy = accuracy_score(y_test, test_outputs)
            sensitivity = recall_score(y_test, test_outputs, average='weighted')
            f1 = f1_score(y_test, test_outputs, average='weighted')
            precision = precision_score(y_test, test_outputs, average='weighted', zero_division=1)
            recall = recall_score(y_test, test_outputs, average='weighted')
            print(f'Test Accuracy: {accuracy:.4f}'
                f'\nTest Precision: {precision:.4f}'
                f'\nTest Recall: {recall:.4f}'
                f'\nTest F1 Score: {f1:.4f}'
                f'\nTest Sensitivity: {sensitivity:.4f}')

        # Append metrics to lists
        all_accuracies[label].append(accuracy)
        all_precisions[label].append(precision)
        all_recalls[label].append(recall)
        all_f1_scores[label].append(f1)

        fold += 1

# Print average and standard deviation of metrics for each label
for label in PROlabels_name:
    print(f'\nLabel: {label}')
    print(f'Average Accuracy: {np.mean(all_accuracies[label]):.4f} ± {np.std(all_accuracies[label]):.4f}')
    print(f'Average Precision: {np.mean(all_precisions[label]):.4f} ± {np.std(all_precisions[label]):.4f}')
    print(f'Average Recall: {np.mean(all_recalls[label]):.4f} ± {np.std(all_recalls[label]):.4f}')
    print(f'Average F1 Score: {np.mean(all_f1_scores[label]):.4f} ± {np.std(all_f1_scores[label]):.4f}')