# Lab07 - ECG Anomaly Detection System

In this lab, your task is to construct a deep learning system to detect anomalous ecg signal where abnormal heartbeat has a label`y`= 1 and normal heartbeat a label of `y`=0. An ECG signal is a time series data with 140 time units corresponding to a single heartbeat of a patient. There is a total of 4998 ECG samples collected from different patients.


In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
cd "/content/gdrive/MyDrive/UCCD3074_Labs/UCCD3074_Lab7"

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
from torch.utils.data import Dataset

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

Helper function

In [None]:
def display(x, y):
    # Display the sample
    plt.figure(figsize=(5, 2))
    label = 'Abnormal' if y==1 else 'normal'
    color = 'red' if y==1 else 'blue'
    plt.plot(x.squeeze(), label='id=' + str(i), color=color)
    plt.title(label, fontsize=18)
    plt.legend(loc="lower right")
    plt.show()

In [None]:
def display_dataset(X, y, num_rows=3):
    fig, axs = plt.subplots(num_rows, 2, sharex=True, sharey=True, figsize=(8, 5))
    fig.suptitle('ECG signal samples', fontsize=16)

    for r in range(num_rows):
        pos_id = np.random.choice(np.where(y == 1)[0])
        axs[r, 0].plot(X[pos_id].squeeze(), color="red", label="id=" + str(pos_id))
        axs[r, 0].legend(loc="lower right")

        neg_id = np.random.choice(np.where(y == 0)[0])
        axs[r, 1].plot(X[neg_id].squeeze(), color="blue", label="id=" + str(neg_id))
        axs[r, 1].legend(loc="lower right")

    axs[0,0].set_title('Abnormal')
    axs[0,1].set_title('Normal')
    fig.tight_layout(rect=[0, 0.03, 1, 0.95])

    plt.show()

---
# 1. Explore the dataset

**Task**: Load the dataset from `ecg.csv`. Then, extract the input matrix `X` and output vector `y`. Verify that `X` is a numpy array of shape `(4998, 140)`)  (numpy array of `(4998,)`). The label can be found in the last column of the csv file. 

In [None]:
# ...your code here ...

In [None]:
print(f'X is a {type(X)} with dtype {X.dtype} and shape {tuple(X.shape)}')
print(f'y is a {type(y)} with dtype {y.dtype} and shape {tuple(y.shape)}')

In [None]:
display_dataset(X, y)

**Task**: Check and confirm that there is no data imbalance in the dataset

In [None]:
# ... your code here ...

## Input preprocessing

Since the data is a time series, we shall use 1-D convolutional (`nn.Conv1d`) layers rather than 2-D to construct the network. The input to a 1-D convolutional layer is a tensor of shape `(B, C, L)` where `B` is the batch size, `C` is the number of channels, and `L` is the length of the 1-D input vector. The raw input matrix has a dimensionality of `(B, L)`. Hence, it is necessary to insert the channel dimension into `X`.

**Task**: Add the channel dimension `C` to the input matrix so that the input matrix has a dimension of `(B, C, L)` = `(4998, 1, 140)`.

In [None]:
# ...your code here ...
print(X.shape)

## Create custom dataset

**Task**: create a custom dataset class (inherits from `torch.utils.data.Datset`) for the ECG dataset.

In [None]:
class Dataset(Dataset):
    
    def __init__(self, filename):        
        # ...your code here ...

    def __getitem__(self, idx):
        # ...your code here ...
        return x, y
    
    def __len__(self):
        # ...your code here ...
        return num_items

Test your implementation here by displaying the ECG signal for one sample.

In [None]:
dataset = Dataset('ecg.csv')

i = np.random.choice(len(dataset))
x, y = dataset[i]

display(x, y)

## Split dataset

**Task**: Split your dataset so that 80% of the samples are used for training (`trainset`) and 20% for testing (`testset`). You may use the command [`torch.utils.data.random_split`)](https://pytorch.org/docs/stable/data.html#torch.utils.data.random_split)

In [None]:
trainset, testset = # ...your code here ...

## Implement the Data Loader

**Task**: implement the data loader

In [None]:
trainloader = # ... your code here...
testloader  = # ... your code here...

Test your implementation by loading a batch data from `dataloader`.

---
# Build the network

**Task**:  Design and implement a CNN that can perform ecg signal detection. Since the data is a 1-D data, you need to use 1-D convolutional layers (`torch.nn.Conv1d`) instead of the 2-D version. 

In [None]:
class Network(nn.Module):
    # ... your code here...

In [None]:
from torchsummary import summary
summary(Network(), input_size=(1, 140), batch_size=4, device="cpu")

Test your implementation by performing an inference on a batch data

---
# Train the model

**Task**: Train your model. 

In [None]:
from torch.optim.lr_scheduler import StepLR

In [None]:
def train(net, trainloader, num_epochs=10, lr=0.01, momentum=0.9):

    optimizer = optim.SGD(net.parameters(), lr=lr, momentum=momentum)
    scheduler = StepLR(optimizer, 10, 0.1)

    net.train()
    
    for e in range(num_epochs):

        train_loss = 0.0

        for i, (inputs, labels) in enumerate(trainloader):

            optimizer.zero_grad()
        
            outs = net(inputs)   
            outs = outs.view(-1)
            loss = F.binary_cross_entropy(outs, labels)
            loss.backward()
            
            train_loss += loss.item()*len(inputs)
        
            optimizer.step()  

        scheduler.step() 

        train_loss /= len(trainloader.dataset)
        print(f'Epoch {e+1:2d}: train_loss = {train_loss:.4f}')

# Test the model

**Task**: Evaluate your model by computing the test accuracy. This is a relatively easy problem. It is possible to achieve a test accuracy of 98%.

In [None]:
def evaluate(net, dataloader):
    net.eval()

    running_corrects = 0    
    count = 0
    for inputs, targets in dataloader:
        with torch.no_grad():
            outputs = net(inputs).view(-1)
            predicted =  (outputs >= 0.5).int()
            running_corrects += (targets == predicted).double().sum()
            count += len(targets)
    
    print('Accuracy = {:.2f}%'.format(100*running_corrects/len(dataloader.dataset)))        

In [None]:
evaluate(net, testloader)

<center> --- End of LabTest --- </center>