# Model Processing

## Imports & General Settings 

In [1]:
import unittest
import os
import sys
import pathlib
import urllib
import shutil
import re
import zipfile
import numpy as np
import torch
import matplotlib.pyplot as plt
from torch import nn
from IPython.display import display
from torchvision import transforms
import matplotlib.pyplot as plt
import numpy as np
import os
import shutil
import posixpath
import wfdb
import pycwt as wavelet
from data import WaveletTransform, AFECGDataset
from PIL import Image
import dsp
import sklearn
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import numpy as np
import pandas as pd
from sklearn.metrics import confusion_matrix
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import classification_report
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import time

%matplotlib inline
%load_ext autoreload
%autoreload 2

In [2]:
test = unittest.TestCase()
plt.rcParams.update({'font.size': 12})
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

Using device: cpu


## Dataset creation

In [3]:
dataset_name = 'afdb'
dataset = AFECGDataset(dataset_name, '../data/files/')

In [4]:
total_data_size = dataset.get_len()
print("Total data size: ", total_data_size)

Total data size:  1397


In [5]:
data = [dataset[i][0] for i in range(total_data_size)]
labels = [dataset[i][1] for i in range(total_data_size)]

### Example of one ECG sample

In [6]:
# samples, label = dataset[0]
# print('P-signal: ', samples)
# print('Has AF: ', 'Yes' if label == 1 else 'No')

In [7]:
# to_wavelet = WaveletTransform(wavelet.Morlet(6), size=(256, 256))
# image_test = to_wavelet(data[0][0])
# transforms.ToPILImage()(test_img.permute(2, 1, 0)).show()

##  Wavelet Transform

In [49]:
# Total data size is 1397
# You can choose the data size 
data_size = 5

In [50]:
fmt = '../data/images/sample_{}_win_{}.pt'
# to_wavelet = WaveletTransform(wavelet.Morlet(6), size=(256, 256))
# start = time.time()

# for sample_idx in range(150, data_size):
#     sample = data[sample_idx]
#     for signal_idx, signal in enumerate(sample):
#         new_sample = to_wavelet(signal)
#         torch.save(new_sample, fmt.format(sample_idx, signal_idx))
    
# end = time.time()
# print('Elapsed time: {} ms'.format(1000 * (end - start)))

In [51]:
transformed_data = []
transformed_labels= []

for sample_idx in range(data_size):
    new_sample = []
    for signal_idx in range(20):
        img = torch.load(fmt.format(sample_idx, signal_idx))
        new_sample.append(img)
    transformed_data.append(new_sample)
    transformed_labels.append(labels[sample_idx])

## Train & Test set creation

In [52]:
x_train, x_test, y_train, y_test =  train_test_split(transformed_data, transformed_labels, test_size=0.2, random_state=1)

## CNN

In [53]:
class ConvNet(nn.Module):
    def __init__(self, in_channels=3):
        super(ConvNet, self).__init__()
                
        self.layer1 = nn.Sequential(
            nn.Conv2d(in_channels, 10, kernel_size=(3,21)),
            nn.ReLU(),
        )
        
        self.layer2 = nn.Sequential(
            nn.Conv2d(10, 10, kernel_size=(3,21)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2,2), stride=2)
        )
        
        self.layer3 = nn.Sequential(
            nn.Conv2d(10, 10, kernel_size=(4,21)),
            nn.ReLU(),
        )
        
        self.layer4 = nn.Sequential(
            nn.Conv2d(10, 10, kernel_size=(4,21)),
            nn.ReLU(),
        )
        
        self.fc = nn.Linear(81600, 50)
        
    def forward(self, x):
        # print(x.shape)
        out = self.layer1(x)
        # print(out.shape)
        out = self.layer2(out)
        # print(out.shape)

        # out = out.reshape(out.size(0), -1)

        out = self.layer3(out)
        # print(out.shape)
        out = self.layer4(out)
        
        # print(out.shape)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        # print(out.shape)
        return out

In [54]:
display(ConvNet(in_channels=3))

ConvNet(
  (layer1): Sequential(
    (0): Conv2d(3, 10, kernel_size=(3, 21), stride=(1, 1))
    (1): ReLU()
  )
  (layer2): Sequential(
    (0): Conv2d(10, 10, kernel_size=(3, 21), stride=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=(2, 2), stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (layer3): Sequential(
    (0): Conv2d(10, 10, kernel_size=(4, 21), stride=(1, 1))
    (1): ReLU()
  )
  (layer4): Sequential(
    (0): Conv2d(10, 10, kernel_size=(4, 21), stride=(1, 1))
    (1): ReLU()
  )
  (fc): Linear(in_features=81600, out_features=50, bias=True)
)

In [55]:
x0 = x_train[0][0].float()
encoder_cnn = ConvNet()

h = encoder_cnn(x0.permute(2, 0, 1).unsqueeze(0))
print(h.shape)

test.assertEqual(h.dim(), 2)
test.assertSequenceEqual(h.shape, (1, 50))

torch.Size([1, 50])


In [56]:
model = ConvNet()
num_epochs = 100
total_size = len(x_train)
test.assertEqual(total_size, len(y_train))

# Loss and optimizer
learning_rate = 0.001
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Train the model
total_step = len(x_train)
loss_list = []
acc_list = []

for epoch in range(num_epochs):
    acc = 0
    for idx, (samples, label) in enumerate(zip(x_train, y_train)):
        label = torch.tensor([label]).long()
        for image in samples:
            
            # Run the forward pass
            output = model(image.float().permute(2, 0, 1).unsqueeze(0))
            loss = criterion(output, label)
            loss_list.append(loss.item())
            
            # Backprop and perform Adam optimisation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            # Track the accuracy
            _, predicted = torch.max(output.data, 1)
            correct = (predicted == label).sum().item()
            acc += correct
                        
    acc = acc / (total_size * 20)          
    acc_list.append(acc)
    print('Epoch [{}/{}], Accuracy: {:.2f}%'
          .format(epoch + 1, num_epochs, acc * 100))    

Epoch [1/100], Accuracy: 86.25%
Epoch [2/100], Accuracy: 68.75%
Epoch [3/100], Accuracy: 52.50%
Epoch [4/100], Accuracy: 73.75%
Epoch [5/100], Accuracy: 75.00%


KeyboardInterrupt: 

# RF

In [None]:
rnd_clf = RandomForestClassifier(random_state=42, class_weight='balanced', n_estimators=225, criterion='gini', min_samples_leaf=1, min_samples_split=5, n_jobs=-1)

In [None]:
rnd_clf.fit(x_train, y_train)

print("")
print("Test results: ")
print("--------------------------------------------")

y_pred = rnd_clf.predict(df_test)
print(clf.__class__.__name__, accuracy_score(y_test, y_pred))