In [5]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
import numpy as np
import pandas as pd

In [3]:
train_df=pd.read_csv('/content/gdrive/MyDrive/ECGData/mitbih_train.csv',header=None)
test_df=pd.read_csv('/content/gdrive/MyDrive/ECGData/mitbih_test.csv',header=None)

In [4]:
train_df[187]=train_df[187].astype(int)
distribution=train_df[187].value_counts()
print(distribution)

0    72471
4     6431
2     5788
1     2223
3      641
Name: 187, dtype: int64


In [5]:
test_df[187]=test_df[187].astype(int)
distribution=test_df[187].value_counts()
print(distribution)

0    18118
4     1608
2     1448
1      556
3      162
Name: 187, dtype: int64


In [6]:
type(distribution)

pandas.core.series.Series

In [7]:
from sklearn.utils import resample
count1=10000
newtraindf=(train_df[train_df[187]==0]).sample(n=count1)
for i in range(1,5):
  df_i=train_df[train_df[187]==i]
  df_x=resample(df_i,replace=True,n_samples=count1)
  newtraindf=pd.concat([newtraindf,df_x])

newtestdf=pd.DataFrame()
newvaldf=pd.DataFrame()

for i in range(0,5):
  shuffled_df=test_df[test_df[187]==i].sample(frac=1, random_state=42)
  split_index = len(shuffled_df) // 2
  df_half1 = shuffled_df.iloc[:split_index, :]
  df_half2 = shuffled_df.iloc[split_index:, :]
  newtestdf=pd.concat([newtestdf,df_half1])
  newvaldf=pd.concat([newvaldf,df_half1])


In [8]:
def standardize(train,test,final_test):

    mean = np.mean(train, axis=0)
    std = np.std(train, axis=0)+0.000001

    X_train = (train - mean) / std
    X_test = (test-mean)/std
    X_final_test = (final_test-mean)/std
    return X_train,X_test,X_final_test

In [9]:

# Access the datasets in the HDF5 file
train_1_data = newtraindf[newtraindf.columns[:187]].values
train_1_labels = newtraindf[newtraindf.columns[187:188]].values
y_train = train_1_labels.reshape(train_1_labels.shape[0])
train_1_data = train_1_data.reshape((train_1_data.shape[0],1,train_1_data.shape[1]))
train_1_data = train_1_data.astype(float)


test_1_data = newvaldf[newvaldf.columns[:187]].values
test_1_labels = newvaldf[newvaldf.columns[187:188]].values
y_test = test_1_labels.reshape(test_1_labels.shape[0])
test_1_data = test_1_data.reshape((test_1_data.shape[0],1,test_1_data.shape[1]))
test_1_data = test_1_data.astype(float)


final_test_1_data = newtestdf[newtestdf.columns[:187]].values
final_test_1_labels = newtestdf[newtestdf.columns[187:188]].values
y_final_test = final_test_1_labels.reshape(final_test_1_labels.shape[0])
final_test_1_data = final_test_1_data.reshape((final_test_1_data.shape[0],1,final_test_1_data.shape[1]))
final_test_1_data = final_test_1_data.astype(float)

X_train,X_test,X_final_test=standardize(train_1_data,test_1_data,final_test_1_data)

In [10]:
X_train.shape

(50000, 1, 187)

In [11]:
y_train.shape

(50000,)

In [12]:
import torch

In [13]:
# Convert the NumPy arrays to PyTorch tensors
train_loaded_data = torch.from_numpy(X_train)
train_loaded_labels = torch.from_numpy(y_train)

test_loaded_data = torch.from_numpy(X_test)
test_loaded_labels = torch.from_numpy(y_test)

final_test_loaded_data = torch.from_numpy(X_final_test)
final_test_loaded_labels = torch.from_numpy(y_final_test)

In [12]:
from torch.nn import Module
from torch.nn import Conv2d
from torch.nn import Linear
from torch.nn import MaxPool2d
from torch.nn import ReLU
from torch.nn import LogSoftmax
from torch import flatten
import torch
import torch.nn as nn
import torch.optim as optim

In [15]:
X_train.shape

(50000, 1, 187)

In [13]:


# Define the CNN model
class HybridCNN(nn.Module):
  def __init__(self, num_classes=5):
      super(HybridCNN, self).__init__()

      self.hidden_size = 64
      self.num_layers = 2

      # Convolutional layers
      self.conv1 = nn.Conv1d(1, 100, kernel_size=3, stride=1, padding=1)
      self.relu1 = nn.ReLU()
      self.pool1 = nn.MaxPool1d(kernel_size=2, stride=2)

      self.conv2 = nn.Conv1d(100, 500, kernel_size=3, stride=1, padding=1)
      self.relu2 = nn.ReLU()
      self.pool2 = nn.MaxPool1d(kernel_size=2, stride=2)

      self.conv3 = nn.Conv1d(500, 500, kernel_size=3, stride=1, padding=1)
      self.relu3 = nn.ReLU()
      self.pool3 = nn.MaxPool1d(kernel_size=2, stride=2)


      self.lstm = nn.LSTM(11500, self.hidden_size, self.num_layers, batch_first=True)

      self.fc = nn.Linear(64, num_classes)

  def forward(self, x):
      x = self.pool1(self.relu1(self.conv1(x)))
      x = self.pool2(self.relu2(self.conv2(x)))
      x = self.pool3(self.relu3(self.conv3(x)))
      x = x.view(-1,1, 11500)  # Reshape before fully connected layer

      h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
      c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

      out, _ = self.lstm(x, (h0, c0))
      out = out[:, -1, :]
      out = self.fc(out)
      return out

In [17]:
from torch.utils.data import DataLoader, TensorDataset
import torch.nn.functional as F

In [18]:
batch_size = 500  # Adjust as needed
train_dataset = TensorDataset(train_loaded_data, train_loaded_labels)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = TensorDataset(test_loaded_data, test_loaded_labels)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

final_test_dataset = TensorDataset(final_test_loaded_data, final_test_loaded_labels)
final_test_loader = DataLoader(final_test_dataset, batch_size=batch_size, shuffle=False)

In [19]:
def getAccuracy(neuralmodel,loader):
    neuralmodel.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in loader:
            inputs = inputs.float()
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = neuralmodel(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    # Calculate test accuracy
    test_accuracy = correct / total
    return test_accuracy,(f'{test_accuracy * 100:.2f}%')

In [24]:
!pip install torchinfo
from torchinfo import summary
cnn_model = HybridCNN()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
cnn_model.to(device)
summary(cnn_model,(1,187))



Layer (type:depth-idx)                   Output Shape              Param #
HybridCNN                                [1, 5]                    --
├─Conv1d: 1-1                            [100, 187]                400
├─ReLU: 1-2                              [100, 187]                --
├─MaxPool1d: 1-3                         [100, 93]                 --
├─Conv1d: 1-4                            [500, 93]                 150,500
├─ReLU: 1-5                              [500, 93]                 --
├─MaxPool1d: 1-6                         [500, 46]                 --
├─Conv1d: 1-7                            [500, 46]                 750,500
├─ReLU: 1-8                              [500, 46]                 --
├─MaxPool1d: 1-9                         [500, 23]                 --
├─LSTM: 1-10                             [1, 1, 64]                2,994,176
├─Linear: 1-11                           [1, 5]                    325
Total params: 3,895,901
Trainable params: 3,895,901
Non-trainable 

In [20]:
plotdata=[]
num_epochs = 101
cnn_model = HybridCNN()
criterion = nn.CrossEntropyLoss()
learning_rate = 0.01
# cnn_model.train()
# optimizer = optim.SGD(cnn_model.parameters(),lr=10)
optimizer = optim.SGD(cnn_model.parameters(), lr=learning_rate,momentum=0.9)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
cnn_model.to(device)
for epoch in range(num_epochs):
    for inputs, labels in train_loader:
        # print(inputs.shape)
        inputs, labels = inputs.to(device), labels.to(device)
        inputs = inputs.float()
        outputs = cnn_model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    if (epoch) % 10 == 0:
        # print(outputs.sum())
        # print(loss.numpy())


        train_val,trainacc=getAccuracy(cnn_model,train_loader)
        test_val,testacc=getAccuracy(cnn_model,test_loader)
        plotdata.append([epoch+1,train_val,test_val])
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
        print("Train Acc ",trainacc," Test Acc ",testacc)

        cnn_model.train()
print("Training complete!")

Epoch [1/101], Loss: 1.3791
Train Acc  57.62%  Test Acc  27.43%
Epoch [11/101], Loss: 0.1464
Train Acc  95.62%  Test Acc  89.79%
Epoch [21/101], Loss: 0.0521
Train Acc  98.73%  Test Acc  93.28%
Epoch [31/101], Loss: 0.0273
Train Acc  99.42%  Test Acc  95.79%
Epoch [41/101], Loss: 0.0137
Train Acc  99.87%  Test Acc  96.18%
Epoch [51/101], Loss: 0.0046
Train Acc  99.92%  Test Acc  96.17%
Epoch [61/101], Loss: 0.0059
Train Acc  99.84%  Test Acc  95.68%
Epoch [71/101], Loss: 0.0027
Train Acc  99.98%  Test Acc  96.07%
Epoch [81/101], Loss: 0.0011
Train Acc  99.99%  Test Acc  96.14%
Epoch [91/101], Loss: 0.0026
Train Acc  99.87%  Test Acc  96.38%
Epoch [101/101], Loss: 0.0012
Train Acc  100.00%  Test Acc  96.35%
Training complete!


In [21]:
plotdf = pd.DataFrame(columns=["Epoch","Train Acc","Test Acc"])

for dat in plotdata:
  plotdf.loc[plotdf.shape[0]]=dat

In [22]:
plotdf.to_csv("/content/gdrive/MyDrive/ECGData/PlotData/hybrid_plotdata.csv",index=False)

In [23]:
torch.save(cnn_model.state_dict(), "/content/gdrive/MyDrive/ECGData/Models/HYBRID_model.pth")

In [6]:
cnn_model = torch.load( "/content/gdrive/MyDrive/ECGData/Models/HYBRID_model.pth")

In [25]:
cnn_model.eval()
correct = 0
total = 0

total_labels=[]
total_predicted=[]
with torch.no_grad():
    for inputs, labels in final_test_loader:
        inputs = inputs.float()
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = cnn_model(inputs)
        total_labels = total_labels +labels.tolist()

        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        total_predicted = total_predicted+predicted.tolist()
        correct += (predicted == labels).sum().item()

# Calculate test accuracy
test_accuracy = correct / total
print(f'{test_accuracy * 100:.2f}%')

96.35%


In [26]:
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

In [27]:
y_true_np = np.array(total_labels)
y_pred_np = np.array(total_predicted)
# Calculate precision, recall, and F1 score
precision = precision_score(y_true_np, y_pred_np, average='weighted')
recall = recall_score(y_true_np, y_pred_np, average='weighted')
f1 = f1_score(y_true_np, y_pred_np, average='weighted')
accuracy = accuracy_score(y_true_np, y_pred_np)

print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1)
print("Accuracy:", accuracy)

Precision: 0.9693109717367459
Recall: 0.9635483281564041
F1 Score: 0.9655741811950059
Accuracy: 0.9635483281564041


In [28]:
metrics_df = pd.DataFrame(columns=["Metric","Value"])
metrics_df.loc[0]=["Accuracy",accuracy]
metrics_df.loc[1]=["Precision",precision]
metrics_df.loc[2]=["Recall",recall]
metrics_df.loc[3]=["F1 Score",f1]
metrics_df.to_csv("/content/gdrive/MyDrive/ECGData/Metrics/HYBRID_metrics.csv",index=False)