In [38]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

/kaggle/input/heartbeat/ptbdb_abnormal.csv
/kaggle/input/heartbeat/ptbdb_normal.csv
/kaggle/input/heartbeat/mitbih_test.csv
/kaggle/input/heartbeat/mitbih_train.csv


In [39]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

In [40]:
ptbdb_abnormal = '/kaggle/input/heartbeat/ptbdb_abnormal.csv'
ptbdb_normal = '/kaggle/input/heartbeat/ptbdb_normal.csv'
mitbih_test = '/kaggle/input/heartbeat/mitbih_test.csv'
mitbih_train = '/kaggle/input/heartbeat/mitbih_train.csv'

In [41]:
ptbdb_normal = pd.read_csv(ptbdb_normal, header=None)
ptbdb_abnormal = pd.read_csv(ptbdb_abnormal, header=None)
mitbih_train = pd.read_csv(mitbih_train, header=None)
mitbih_test = pd.read_csv(mitbih_test, header=None)

In [42]:
def preprocess_data(df, label):
    X = df.iloc[:, 1:].values.astype('float32')
    y = np.full(X.shape[0], label)
    return X, y

In [43]:
#Preprocess PTB-DB data
X_normal, y_normal = preprocess_data(ptbdb_normal, 'Normal')
X_abnormal, y_abnormal = preprocess_data(ptbdb_abnormal, 'Abnormal')

In [44]:
#Preprocess MIT-BIH data
X_mitbih_train = mitbih_train.iloc[:, :-1].values.astype('float32')
y_mitbih_train = mitbih_train.iloc[:, -1].values
X_mitbih_test = mitbih_test.iloc[:, :-1].values.astype('float32')
y_mitbih_test = mitbih_test.iloc[:, -1].values

In [45]:
# Combine all data
X = np.vstack((X_normal, X_abnormal, X_mitbih_train, X_mitbih_test))
y = np.concatenate((y_normal, y_abnormal, y_mitbih_train, y_mitbih_test))


In [46]:
from sklearn.preprocessing import LabelEncoder

label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)

In [47]:
#printing unique labels
print("unique labels: ", label_encoder.classes_)
print("Number of classes: ", len(label_encoder.classes_))

unique labels:  ['0.0' '1.0' '2.0' '3.0' '4.0' 'Abnormal' 'Normal']
Number of classes:  7


In [48]:
X = X.reshape(X.shape[0], X.shape[1], 1)

In [49]:
X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size = 0.2, random_state = 42)

In [50]:
print("shape of X_train: ", X_train.shape)
print("shape of X_test: ", X_test.shape)

shape of X_train:  (99198, 187, 1)
shape of X_test:  (24800, 187, 1)


In [51]:
# Convert to PyTorch tensors
X_train_tensor = torch.Tensor(X_train)
y_train_tensor = torch.LongTensor(y_train)
X_test_tensor = torch.Tensor(X_test)
y_test_tensor = torch.LongTensor(y_test)

In [52]:
# Create TensorDatasets
from torch.utils.data import TensorDataset, DataLoader 

train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

In [53]:
# Creating DataLoaders
train_loader = DataLoader(train_dataset, batch_size = 32, shuffle = True)
test_loader = DataLoader(test_dataset, batch_size = 32, shuffle = False)

In [54]:
import torch.nn as nn
class ECG_CNN(nn.Module):
    def __init__(self, num_classes):
        super(ECG_CNN, self).__init__()
        self.conv1 = nn.Conv1d(1, 64, kernel_size = 5, stride=1, padding = 2)
        self.conv2 = nn.Conv1d(64, 128, kernel_size = 5, stride = 1, padding = 2)
        self.conv3 = nn.Conv1d(128, 256, kernel_size = 5, stride = 1, padding = 2)
        self.pool  = nn.MaxPool1d(kernel_size = 5, stride =2)
        self.fc1 = nn.Linear(256 * 20, 128)
        self.fc2 = nn.Linear(128, num_classes)
        self.dropout = nn.Dropout(0.5)
        self.relu = nn.ReLU()
        self.bn1 = nn.BatchNorm1d(64)
        self.bn2 = nn.BatchNorm1d(128)
        self.bn3 = nn.BatchNorm1d(256)
        
    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.pool(x)
        x = self.relu(self.bn2(self.conv2(x)))
        x = self.pool(x)
        x = self.relu(self.bn3(self.conv3(x)))
        x = self.pool(x)
        x = x.view(x.size(0), -1)
        x = self.dropout(x)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x
num_classes = len(label_encoder.classes_)
model = ECG_CNN(num_classes)

In [55]:
import torch.optim as optim
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr = 0.001)

In [56]:
for inputs, labels in train_loader:
    print("input shape: ", inputs.shape)
    print("Label shape: ", labels.shape)
    break

input shape:  torch.Size([32, 187, 1])
Label shape:  torch.Size([32])


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

num_epochs = 50

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        # Check if we need to add a channel dimension
        if inputs.dim() == 2:
            inputs = inputs.unsqueeze(1)
        
        # Now transpose for Conv1d
        inputs = inputs.transpose(1, 2)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')

print('Training completed.')

Epoch [1/50], Loss: 0.1562
Epoch [2/50], Loss: 0.0901
Epoch [3/50], Loss: 0.0736
Epoch [4/50], Loss: 0.0643
Epoch [5/50], Loss: 0.0570
Epoch [6/50], Loss: 0.0508
Epoch [7/50], Loss: 0.0472
Epoch [8/50], Loss: 0.0439
Epoch [9/50], Loss: 0.0398
Epoch [10/50], Loss: 0.0382


In [None]:
model.eval()
correct = 0
total = 0

all_preds = 0
all_labels = 0

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        if inputs.dim() ==2 :
            inputs = inputs.unsqueeze(1)
            
        inputs = inputs.transpose(1, 2)
        
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct +=(predicted == labels).sum().item()
        
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())
        
accuracy = 100 * correct / total
print(f'Test Accuracy: {accuracy:.2f}')

In [None]:
torch.save(model.state_dict(), 'ecg_model.pth')

loaded_model = ECG_CNN(num_classes)
loaded_model.load_state_dict(torch.load('ecg_model.pth'))
loaded_model.to(device)