In [1]:
################################################################################
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import random

from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedGroupKFold
import seaborn as sns
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import StandardScaler

from sklearn.metrics import precision_score, recall_score, f1_score, balanced_accuracy_score, accuracy_score, classification_report

import fusion_model

import torch.optim as optim
import torch
import torch.nn as nn

from collections import defaultdict

In [2]:
###############################################################################################
# Čitanje podataka; formiranje baze koja se sastoji od rečnika sa EKG, PPG, PCG i ACC signalima
###############################################################################################

ppg_data = pd.read_csv('ppg_beats_for_classification_fixed_length_20_hz.csv')
ppg_data = ppg_data[~np.isnan(ppg_data.iloc[:,-1])]

y = ppg_data.iloc[:,-1].to_numpy() # označene labele
y_bin = (y != 0).astype(int) # binarne labele
subjects = ppg_data.iloc[:,-2] # ovde imam sve ispitanike 

ppg_data = ppg_data.iloc[:,:-2].to_numpy()

ecg_data = pd.read_csv('ecg_beats_for_classification_fixed_length_500_hz.csv')
ecg_data= ecg_data[~np.isnan(ecg_data.iloc[:,-1])].iloc[:,:-2].to_numpy()

pcg_data = pd.read_csv('pcg_beats_for_classification_fixed_length_200_hz.csv')
pcg_data= pcg_data[~np.isnan(pcg_data.iloc[:,-1])].iloc[:,:-2].to_numpy()

acc_data = pd.read_csv('acc_beats_for_classification_fixed_length_50_hz.csv')
acc_data= acc_data[~np.isnan(acc_data.iloc[:,-1])].iloc[:,:-2].to_numpy()

In [3]:
ppg_data = torch.tensor(ppg_data, dtype=torch.float32)
pcg_data = torch.tensor(pcg_data, dtype=torch.float32)
ecg_data = torch.tensor(ecg_data, dtype=torch.float32)
acc_data = torch.tensor(acc_data, dtype=torch.float32)
y = torch.tensor(y_bin, dtype=torch.long)

In [4]:
# Mešanje podataka iz skupa (obučavajućeg)
def shuffle_data(ppg_data, pcg_data, acc_data, y):
    indices = np.arange(len(y))
    np.random.shuffle(indices)
    indices = torch.tensor(indices)
    return [ppg_data[indices], pcg_data[indices], acc_data[indices]], y[indices]

def train_val_split(X_train_val, y_train_val, groups_train_val, outer_fold):
    inner_kf = StratifiedGroupKFold(n_splits=5, shuffle=True, random_state=outer_fold + 42)

    for inner_train_idx, inner_val_idx in inner_kf.split(X_train_val, y_train_val, groups_train_val):
        return inner_train_idx, inner_val_idx
    
def scaler_fitting(X_train):
    scaler_ppg = StandardScaler()
    scaler_pcg = StandardScaler()
    scaler_acc = StandardScaler()
    X_train_ppg = scaler_ppg.fit_transform(X_train[0]).astype(np.float32)
    X_train_pcg = scaler_pcg.fit_transform(X_train[1]).astype(np.float32)
    X_train_acc = scaler_acc.fit_transform(X_train[2]).astype(np.float32)

    X_train = [X_train_ppg, X_train_pcg, X_train_acc]

    return X_train, scaler_ppg, scaler_pcg, scaler_acc

In [13]:
def accuracy_estimation(model, criterion, X, y, groups):
    
    outputs = fusion_model.model_output(model, criterion, X, y)
    outputs = torch.sigmoid(outputs)
    sigmoids = defaultdict(list)
    predictions = defaultdict(float)
    true_labels = defaultdict(float)
    counter_true = 0
    counter = 0
    for i in range(0, np.shape(groups)[0]):
        sigmoids[groups.iloc[i]].append(np.round(outputs[i]))
        true_labels[groups.iloc[i]] = y[i]
    for subj in sigmoids:
        counter = counter + 1
        #predictions[subj] = np.round(np.mean(sigmoids[subj]))
        classes, counts = torch.unique(torch.tensor(sigmoids[subj]), return_counts=True)
        predictions[subj] = classes[torch.argmax(counts)]
        if predictions[subj] == true_labels[subj]:
            counter_true = counter_true + 1
    
    return counter_true/counter

def accuracy_estimation_multiclass(model, criterion, X, y, groups):

    outputs = fusion_model.model_output(model, criterion, X, y)
    outputs = torch.argmax(torch.softmax(outputs, dim=1), dim=1)
    softmaxes = defaultdict(list)
    predictions = defaultdict(float)
    true_labels = defaultdict(float)
    counter_true = 0
    counter = 0
    for i in range(0, np.shape(groups)[0]):
        softmaxes[groups.iloc[i]].append(outputs[i])
        true_labels[groups.iloc[i]] = y[i]
    for subj in softmaxes:
        counter = counter + 1
        classes, counts = torch.unique(torch.tensor(softmaxes[subj]), return_counts=True)
        predictions[subj] = classes[torch.argmax(counts)]
        if predictions[subj] == true_labels[subj]:
            counter_true = counter_true + 1
    return counter_true/counter

In [14]:
ecg_shape = (417, 1)
ppg_shape = (17, 1)
pcg_shape = (167,1)
acc_shape = (42, 1)
num_classes = 4
class_names = ["class0", "class1"] #"class2", "class3"]

precisions = []
f1scores = []
recalls = []

criterion = nn.BCEWithLogitsLoss()
#criterion = nn.CrossEntropyLoss()

groups = subjects

outer_kf = StratifiedGroupKFold(n_splits=5, shuffle=True, random_state=42)
accuracies = []
estimated_accuracies = []
all_reports = {cls: {'precision': [], 'recall': [], 'f1-score': []} for cls in class_names}

for outer_fold, (train_val_idx, test_idx) in enumerate(outer_kf.split(ppg_data, y, groups)):
    print(f"\n=== Outer Fold {outer_fold+1} ===")
    model = fusion_model.MultiRateCNN()
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=2, min_lr=1e-4)

    #X_train_val = [ppg_data[train_val_idx], pcg_data[train_val_idx], acc_data[train_val_idx]]
    X_test = [ppg_data[test_idx], pcg_data[test_idx], acc_data[test_idx]]
    y_train_val, y_test = y[train_val_idx], y[test_idx]
    
    groups_train_val = groups.iloc[train_val_idx]
    groups_test = groups.iloc[test_idx]
    train_idx, val_idx = train_val_split(train_val_idx, y_train_val, groups_train_val, outer_fold)

    X_train, y_train = [ppg_data[train_idx], pcg_data[train_idx], acc_data[train_idx]], y[train_idx]
    X_val, y_val = [ppg_data[val_idx], pcg_data[val_idx], acc_data[val_idx]], y[val_idx]

    # TODO: Standardization - must be added here
    X_train, scaler_ppg, scaler_pcg, scaler_acc = scaler_fitting(X_train)
    X_val = [scaler_ppg.transform(X_val[0]).astype(np.float32), scaler_pcg.transform(X_val[1]).astype(np.float32), scaler_acc.transform(X_val[2]).astype(np.float32)]
    X_test = [scaler_ppg.transform(X_test[0]).astype(np.float32), scaler_pcg.transform(X_test[1]).astype(np.float32), scaler_acc.transform(X_test[2]).astype(np.float32)]

    #X_train, y_train = shuffle_data(X_train[0], X_train[1], X_train[2], y_train)
    #X_val, y_val = shuffle_data(X_val[0], X_val[1], X_val[2], y_val)
    
    model = fusion_model.train_model(model, criterion, optimizer, scheduler, X_train, y_train, X_val, y_val, batch_size=32, epochs=50)
    
    acc = fusion_model.evaluate_model(model, criterion, X_test, y_test)
    outputs = fusion_model.model_output(model, criterion, X_test, y_test)
    #y_pred = torch.argmax(outputs, dim=1)
    y_pred = (torch.sigmoid(outputs) > 0.5)
    report = classification_report(y_test, y_pred, target_names=class_names, output_dict=True, zero_division=0)
    for cls in class_names:
        for metric in ['precision', 'recall', 'f1-score']:
            all_reports[cls][metric].append(report[cls][metric])

    acc_vote = accuracy_estimation(model, criterion, X_test, y_test, groups_test)

    precisions.append(precision_score(y_test, y_pred))
    recalls.append(recall_score(y_test, y_pred))
    f1scores.append(f1_score(y_test, y_pred))
    accuracies.append(acc)
    estimated_accuracies.append(acc_vote)
    print(f"Test Accuracy: {acc:.4f}")

print(f"Mean test accuracy: {np.mean(accuracies):.4f}, std: {np.std(accuracies):.4f}")
print(f"Mean test precision: {np.mean(precisions):.4f}, std: {np.std(precisions):.4f}")
print(f"Mean test recall: {np.mean(recalls):.4f}, std: {np.std(recalls):.4f}")
print(f"Mean test F1 score: {np.mean(f1scores):.4f}, std: {np.std(f1scores):.4f}")
print(f"Mean test voted accuracy: {np.mean(estimated_accuracies):.4f}, std: {np.std(estimated_accuracies):.4f}")



=== Outer Fold 1 ===
Epoch 1: Train Loss=0.4490, Train Acc=0.7868, Val Loss=0.9558, Val Acc=0.6169
Epoch 2: Train Loss=0.3458, Train Acc=0.8433, Val Loss=1.0770, Val Acc=0.5842
Epoch 3: Train Loss=0.3097, Train Acc=0.8647, Val Loss=1.4062, Val Acc=0.5329
Epoch 4: Train Loss=0.2780, Train Acc=0.8841, Val Loss=1.0547, Val Acc=0.6304
Early stopping triggered. Restoring best weights.
Test Accuracy: 0.8645

=== Outer Fold 2 ===
Epoch 1: Train Loss=0.5110, Train Acc=0.7534, Val Loss=0.5582, Val Acc=0.6943
Epoch 2: Train Loss=0.4168, Train Acc=0.8144, Val Loss=0.5562, Val Acc=0.6967
Epoch 3: Train Loss=0.3604, Train Acc=0.8539, Val Loss=0.5767, Val Acc=0.7343
Epoch 4: Train Loss=0.3258, Train Acc=0.8674, Val Loss=0.6511, Val Acc=0.7117
Epoch 5: Train Loss=0.3062, Train Acc=0.8800, Val Loss=0.5755, Val Acc=0.7381
Early stopping triggered. Restoring best weights.
Test Accuracy: 0.8374

=== Outer Fold 3 ===
Epoch 1: Train Loss=0.4933, Train Acc=0.7609, Val Loss=0.6368, Val Acc=0.6687
Epoch 2: T

In [None]:
summary = pd.DataFrame(index=class_names, columns=['Precision (mean ± std)', 'Recall (mean ± std)', 'F1 (mean ± std)'])

for cls in class_names:
    precision = np.array(all_reports[cls]['precision'])
    recall = np.array(all_reports[cls]['recall'])
    f1 = np.array(all_reports[cls]['f1-score'])

    summary.loc[cls, 'Precision (mean ± std)'] = f"{precision.mean():.2f} ± {precision.std():.2f}"
    summary.loc[cls, 'Recall (mean ± std)'] = f"{recall.mean():.2f} ± {recall.std():.2f}"
    summary.loc[cls, 'F1 (mean ± std)'] = f"{f1.mean():.2f} ± {f1.std():.2f}"

print(summary)

       Precision (mean ± std) Recall (mean ± std) F1 (mean ± std)
class0            0.77 ± 0.09         0.71 ± 0.16     0.72 ± 0.09
class1            0.48 ± 0.21         0.59 ± 0.11     0.51 ± 0.17
class2            0.32 ± 0.31         0.16 ± 0.16     0.21 ± 0.21
class3            0.61 ± 0.13         0.69 ± 0.19     0.64 ± 0.13


In [None]:
from torchinfo import summary

# Example input shapes
ppg_input = torch.randn(1, 17).float()     # [batch_size, features]
pcg_input = torch.randn(1, 167).float()
acc_input = torch.randn(1, 42).float()

# Create model instance
model = fusion_model.MultiRateCNN()

# Print summary
summary(model, input_data=(ppg_input, pcg_input, acc_input))

Layer (type:depth-idx)                   Output Shape              Param #
MultiRateCNN                             [1, 4]                    --
├─Sequential: 1-1                        [1, 16, 17]               --
│    └─Conv1d: 2-1                       [1, 8, 84]                48
│    └─BatchNorm1d: 2-2                  [1, 8, 84]                16
│    └─ReLU: 2-3                         [1, 8, 84]                --
│    └─MaxPool1d: 2-4                    [1, 8, 42]                --
│    └─Conv1d: 2-5                       [1, 16, 21]               400
│    └─BatchNorm1d: 2-6                  [1, 16, 21]               32
│    └─ReLU: 2-7                         [1, 16, 21]               --
│    └─AdaptiveAvgPool1d: 2-8            [1, 16, 17]               --
├─Sequential: 1-2                        [1, 16, 17]               --
│    └─Conv1d: 2-9                       [1, 8, 42]                48
│    └─BatchNorm1d: 2-10                 [1, 8, 42]                16
│    └─ReLU: 2