In [63]:
import torch
import pandas as pd

In [64]:
df_normal = pd.read_csv('ptbdb_normal.csv', header=None)
df_abnormal = pd.read_csv('ptbdb_abnormal.csv', header=None)

# Split into train, validation, test
df_normal_train = df_normal[:int(0.8*len(df_normal))]
df_normal_test = df_normal[int(0.8*len(df_normal)):]

df_abnormal_train = df_abnormal[:int(0.8*len(df_abnormal))]
df_abnormal_test = df_abnormal[int(0.8*len(df_abnormal)):]

# Add normal and abnormal data together
df_train = pd.concat([df_normal_train, df_abnormal_train])
df_test = pd.concat([df_normal_test, df_abnormal_test])


In [65]:
display(df_train.head())

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,178,179,180,181,182,183,184,185,186,187
0,1.0,0.900324,0.35859,0.051459,0.046596,0.126823,0.133306,0.119125,0.110616,0.113047,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,1.0,0.794681,0.375387,0.116883,0.0,0.171923,0.283859,0.293754,0.325912,0.345083,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,0.909029,0.791482,0.423169,0.186712,0.0,0.007836,0.063032,0.077002,0.074957,0.077342,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,1.0,0.478893,0.05676,0.064176,0.081289,0.072732,0.055619,0.048774,0.054478,0.041643,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,1.0,0.867238,0.20136,0.099349,0.141336,0.120934,0.108516,0.096393,0.093436,0.100828,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [66]:
import torch
from torch import nn

class ECG1DCNN(nn.Module):
    def __init__(self):
        super(ECG1DCNN, self).__init__()
        self.conv1 = nn.Conv1d(1, 64, kernel_size=5, stride=1)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool1d(kernel_size=2)

        self.conv2 = nn.Conv1d(64, 128, kernel_size=5, stride=1)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool1d(kernel_size=2)

        self.fc1 = nn.Linear(128 * 43, 256)  # Adjust the first dimension based on the output of the last MaxPool1d layer
        self.relu3 = nn.ReLU()
        self.fc2 = nn.Linear(256, 64)
        self.relu4 = nn.ReLU()

        self.fc3 = nn.Linear(64, 2)
        self.relu5 = nn.ReLU()

        self.flatten = nn.Flatten()

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.pool2(x)
        x = self.flatten(x)  # Flatten the tensor
        x = self.fc1(x)
        x = self.relu3(x)
        x = self.fc2(x)
        x = self.relu4(x)
        x = self.fc3(x)
        x = self.relu5(x)
        return x


In [67]:
import torch
from torch.utils.data import DataLoader, TensorDataset
from torch import optim

# Convert dataframes to tensors
X_train = torch.tensor(df_train.iloc[:, :-1].values, dtype=torch.float32).unsqueeze(1)  # Add channel dimension
y_train = torch.tensor(df_train.iloc[:, -1].values, dtype=torch.long)

# Create dataloaders
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Initialize model, loss function, and optimizer
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
model = ECG1DCNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())
# optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

EPOCHS_NO = 20
# Training loop
best_model_wts = model.state_dict()
best_acc = 0.0
iteration = 0
for epoch in range(EPOCHS_NO):  # Number of epochs
    print('Epoch {}/{}'.format(epoch+1, EPOCHS_NO))
    print('-'*10)
    train_loss = 0.0
    train_corrects = 0.0
    model = model.train()
    for i, (inputs, labels) in enumerate(train_loader):
        inputs = inputs.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        iter_loss = loss.item()
        train_loss += iter_loss
        iter_corrects = torch.sum(preds == labels.data).to(torch.float32)
        train_corrects += iter_corrects
        iteration += 1
    epoch_loss = train_loss / len(train_dataset)
    epoch_acc = train_corrects / len(train_dataset)
    epoch_acc = epoch_acc * 100

    if epoch_acc > best_acc:
        best_acc = epoch_acc
        best_model_wts = model.state_dict()

    print('epoch train loss: {:.4f} Acc: {:.4f}'.format(epoch_loss, epoch_acc))


cuda
Epoch 1/20
----------
epoch train loss: 0.0125 Acc: 80.7388
Epoch 2/20
----------
epoch train loss: 0.0084 Acc: 88.4278
Epoch 3/20
----------
epoch train loss: 0.0055 Acc: 92.7835
Epoch 4/20
----------
epoch train loss: 0.0040 Acc: 95.3436
Epoch 5/20
----------
epoch train loss: 0.0030 Acc: 96.5292
Epoch 6/20
----------
epoch train loss: 0.0022 Acc: 97.4399
Epoch 7/20
----------
epoch train loss: 0.0017 Acc: 98.0584
Epoch 8/20
----------
epoch train loss: 0.0014 Acc: 98.4021
Epoch 9/20
----------
epoch train loss: 0.0014 Acc: 98.4622
Epoch 10/20
----------
epoch train loss: 0.0008 Acc: 99.1237
Epoch 11/20
----------
epoch train loss: 0.0007 Acc: 99.1065
Epoch 12/20
----------
epoch train loss: 0.0009 Acc: 98.8918
Epoch 13/20
----------
epoch train loss: 0.0005 Acc: 99.3900
Epoch 14/20
----------
epoch train loss: 0.0003 Acc: 99.6478
Epoch 15/20
----------
epoch train loss: 0.0005 Acc: 99.3814
Epoch 16/20
----------
epoch train loss: 0.0004 Acc: 99.6134
Epoch 17/20
----------
epoch

In [68]:
# Assuming that `model` is your model
model.load_state_dict(best_model_wts)

# Convert dataframes to tensors
X_test = torch.tensor(df_test.iloc[:, :-1].values, dtype=torch.float32).unsqueeze(1)  # Add channel dimension
y_test = torch.tensor(df_test.iloc[:, -1].values, dtype=torch.long)

# Create dataloaders
test_dataset = TensorDataset(X_train, y_train)
test_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Then you can evaluate the model
model.eval()

test_dataset_size = len(test_dataset)
true_positive = 0
true_negative = 0
false_positive = 0
false_negative = 0
with torch.no_grad():
    for data in test_loader:
        images, labels = data
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        true_positive += (predicted * labels).sum().item()
        true_negative += ((1 - predicted) * (1 - labels)).sum().item()
        false_positive += (predicted * (1 - labels)).sum().item()
        false_negative += ((1 - predicted) * labels).sum().item()


In [69]:
# Accuracy
acc = (true_positive + true_negative) / test_dataset_size
print('Test Acc: {:.4f}'.format(acc))

# Precision
prec = true_positive / (true_positive + false_positive)
print('Precision: {:.4f}'.format(prec))
# Recall
rec = true_positive / (true_positive + false_negative)
print('Recall: {:.4f}'.format(rec))
# F1 Score
f1 = 2 * prec * rec / (prec + rec)
print('F1 Score: {:.4f}'.format(f1))

Test Acc: 0.9993
Precision: 0.9994
Recall: 0.9996
F1 Score: 0.9995


In [70]:
# MLP Classifier using pytorch
import torch
import torch.nn as nn
import torch.nn.functional as F

class MLP(nn.Module):
    def __init__(self, in_features, out_features):
        super(MLP, self).__init__()
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(in_features, 512)  # Input layer
        self.fc2 = nn.Linear(512, 256)  # Hidden layer
        self.fc3 = nn.Linear(256, 128)  # Hidden layer
        self.fc4 = nn.Linear(128, 64)  # Hidden layer
        self.fc5 = nn.Linear(64, out_features)   # Output layer

    def forward(self, x):
        x = self.flatten(x)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.relu(x)
        x = self.fc3(x)
        x = F.relu(x)
        x = self.fc4(x)
        x = F.relu(x)
        x = self.fc5(x)
        return x




In [71]:
import torch
from torch.utils.data import DataLoader, TensorDataset
from torch import optim

# Convert dataframes to tensors
X_train = torch.tensor(df_train.iloc[:, :-1].values, dtype=torch.float32).unsqueeze(1)  # Add channel dimension
y_train = torch.tensor(df_train.iloc[:, -1].values, dtype=torch.long)

# Create dataloaders
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

# Initialize model, loss function, and optimizer
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
# Create an instance of the model
model = MLP(187, 2).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())
# optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

EPOCHS_NO = 20
# Training loop
best_model_wts = model.state_dict()
best_acc = 0.0
iteration = 0
for epoch in range(EPOCHS_NO):  # Number of epochs
    print('Epoch {}/{}'.format(epoch+1, EPOCHS_NO))
    print('-'*10)
    train_loss = 0.0
    train_corrects = 0.0
    model = model.train()
    for i, (inputs, labels) in enumerate(train_loader):
        inputs = inputs.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        iter_loss = loss.item()
        train_loss += iter_loss
        iter_corrects = torch.sum(preds == labels.data).to(torch.float32)
        train_corrects += iter_corrects
        iteration += 1
    epoch_loss = train_loss / len(train_dataset)
    epoch_acc = train_corrects / len(train_dataset)
    epoch_acc = epoch_acc * 100

    if epoch_acc > best_acc:
        best_acc = epoch_acc
        best_model_wts = model.state_dict()

    print('epoch train loss: {:.4f} Acc: {:.4f}'.format(epoch_loss, epoch_acc))


cuda
Epoch 1/20
----------
epoch train loss: 0.0254 Acc: 80.6186
Epoch 2/20
----------
epoch train loss: 0.0186 Acc: 86.5378
Epoch 3/20
----------
epoch train loss: 0.0147 Acc: 89.9313
Epoch 4/20
----------
epoch train loss: 0.0125 Acc: 91.3660
Epoch 5/20
----------
epoch train loss: 0.0112 Acc: 92.2337
Epoch 6/20
----------
epoch train loss: 0.0103 Acc: 93.2818
Epoch 7/20
----------
epoch train loss: 0.0088 Acc: 94.4244
Epoch 8/20
----------
epoch train loss: 0.0080 Acc: 95.0172
Epoch 9/20
----------
epoch train loss: 0.0070 Acc: 95.6014
Epoch 10/20
----------
epoch train loss: 0.0068 Acc: 95.7990
Epoch 11/20
----------
epoch train loss: 0.0062 Acc: 96.3574
Epoch 12/20
----------
epoch train loss: 0.0054 Acc: 96.6151
Epoch 13/20
----------
epoch train loss: 0.0055 Acc: 96.6151
Epoch 14/20
----------
epoch train loss: 0.0048 Acc: 97.2509
Epoch 15/20
----------
epoch train loss: 0.0045 Acc: 97.3110
Epoch 16/20
----------
epoch train loss: 0.0045 Acc: 97.3969
Epoch 17/20
----------
epoch

In [74]:
# Test the model

# Assuming that `model` is your model
model.load_state_dict(best_model_wts)

# Convert dataframes to tensors
X_test = torch.tensor(df_test.iloc[:, :-1].values, dtype=torch.float32).unsqueeze(1)  # Add channel dimension
y_test = torch.tensor(df_test.iloc[:, -1].values, dtype=torch.long)

# Create dataloaders
test_dataset = TensorDataset(X_train, y_train)
test_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

# Then you can evaluate the model
model.eval()

test_dataset_size = len(test_dataset)
true_positive_avg = 0
true_negative_avg = 0
false_positive_avg = 0
false_negative_avg = 0

num_classes = len(torch.unique(y_test))
true_positive = [0] * num_classes
true_negative = [0] * num_classes
false_positive = [0] * num_classes
false_negative = [0] * num_classes

with torch.no_grad():
    for data in test_loader:
        images, labels = data
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        true_positive_avg += (predicted * labels).sum().item()
        true_negative_avg += ((1 - predicted) * (1 - labels)).sum().item()
        false_positive_avg += (predicted * (1 - labels)).sum().item()
        false_negative_avg += ((1 - predicted) * labels).sum().item()

        for i in range(num_classes):
            true_positive[i] += ((predicted == i) & (labels == i)).sum().item()
            true_negative[i] += ((predicted != i) & (labels != i)).sum().item()
            false_positive[i] += ((predicted == i) & (labels != i)).sum().item()
            false_negative[i] += ((predicted != i) & (labels == i)).sum().item()


In [75]:
# Accuracy
acc = (true_positive_avg + true_negative_avg) / test_dataset_size
print('Test Acc: {:.4f}'.format(acc))

# Precision
prec = true_positive_avg / (true_positive_avg + false_positive_avg)
print('Precision: {:.4f}'.format(prec))
# Recall
rec = true_positive_avg / (true_positive_avg + false_negative_avg)
print('Recall: {:.4f}'.format(rec))
# F1 Score
f1 = 2 * prec * rec / (prec + rec)
print('F1 Score: {:.4f}'.format(f1))

Test Acc: 0.9848
Precision: 0.9898
Recall: 0.9892
F1 Score: 0.9895


In [76]:
# Calculate metrics for each class
accuracy = [0] * num_classes
precision = [0] * num_classes
recall = [0] * num_classes
f1_score = [0] * num_classes

for i in range(num_classes):
    accuracy[i] = (true_positive[i] + true_negative[i]) / (true_positive[i] + true_negative[i] + false_positive[i] + false_negative[i])
    precision[i] = true_positive[i] / (true_positive[i] + false_positive[i]) if (true_positive[i] + false_positive[i]) != 0 else 0
    recall[i] = true_positive[i] / (true_positive[i] + false_negative[i]) if (true_positive[i] + false_negative[i]) != 0 else 0
    f1_score[i] = 2 * (precision[i] * recall[i]) / (precision[i] + recall[i]) if (precision[i] + recall[i]) != 0 else 0

    # Print metrics
for i in range(num_classes):
    print(f"Class {i} metrics:")
    print(f"  Accuracy: {accuracy[i]:.4f}")
    print(f"  Precision: {precision[i]:.4f}")
    print(f"  Recall: {recall[i]:.4f}")
    print(f"  F1 Score: {f1_score[i]:.4f}")

Class 0 metrics:
  Accuracy: 0.9848
  Precision: 0.9719
  Recall: 0.9734
  F1 Score: 0.9727
Class 1 metrics:
  Accuracy: 0.9848
  Precision: 0.9898
  Recall: 0.9892
  F1 Score: 0.9895
