In [None]:
# Handle data path
from pathlib import Path

# Read and display data from Physionet
import wfdb
import pprint
import collections
from IPython.display import clear_output

# Data manipulation and plotting
import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import resample
from scipy.signal import butter,filtfilt

# Divide data into train and test set and save to HDF5
import h5py
import os
from sklearn.model_selection import train_test_split
from sklearn import preprocessing

# Over and undersampling
from imblearn.over_sampling import RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler

from tqdm.notebook import tqdm
import cv2
from tqdm.notebook import tqdm

In [None]:
data_dir = Path('./data/2D_BW')
os.makedirs(data_dir, exist_ok=True)

# directory that store original MIT-BIH data

img_dir = data_dir / 'img'
os.makedirs(img_dir, exist_ok=True)

label_dir = data_dir / 'label'
os.makedirs(label_dir, exist_ok=True)



In [None]:
# Convert training images to nparray
# Convert png files in a directory into the destination nparray
def pngConverter (destination, image_dir, if_red_dim):
    for root, dirs, files in os.walk(image_dir):
        for file in files:
            if file.endswith('.png'):
                file_path = os.path.join(root,file)
                img = cv2.imread(file_path)
            if if_red_dim:
                img = np.mean(img, axis = 2)
            destination.append(img)
            
train_record_list = [101, 106, 108, 109, 112, 114, 115, 116, 118, 119, 122, 124, 201, 203, 205, 207, 208, 209, 215, 220, 223, 230]
test_record_list = [100, 103, 105, 111, 113, 117, 121, 123, 200, 202, 210, 212, 213,214, 219, 221, 222, 228, 231, 232, 233, 234]

In [None]:
train_image = []
train_label = []
test_image = []
test_label = []
train_or_test = True

for record_number in tqdm(train_record_list, total = len(train_record_list)):
    image_dir = img_dir / str(record_number)
    pngConverter(train_image, image_dir, True)
    label_str = str(record_number)+'.csv'
    label_path = label_dir / label_str
    train_label.append(np.genfromtxt(label_path,delimiter = ','))



In [None]:
# Ratio Split Helper Function
import math
def train_val_split (data_set, label_set, val_ratio = 0.2, shuffle = True):

    n = len(data_set)
    label_count = [[] for _ in range(4)]

    if shuffle:
        index = np.random.permutation(n)
    else:
        index = np.arange(n)
    
    data_set =  [data_set[idx] for idx in index]
    label_set = [label_set[idx] for idx in index]

    for idx, label in enumerate(label_set):
        if label != 4:
            label_count[int(label)].append(idx)

    train_idx = []
    val_idx = []

    for i in range(len(label_count)):
        current_count = len(label_count[i])
        print("current label is %d with count %d" % (i, current_count))
        split_idx = math.ceil(current_count * val_ratio)
        train_idx = train_idx + label_count[i][split_idx:]
        val_idx = val_idx + label_count[i][:split_idx]
    
    return [data_set[idx] for idx in train_idx], [label_set[idx] for idx in train_idx], [data_set[idx] for idx in val_idx], [label_set[idx] for idx in val_idx]


train_label_flattened = []
for i in range(len(train_label)):
    for j in range(len(train_label[i])):
        train_label_flattened.append(train_label[i][j])
        
exp_train_image, exp_train_label, exp_val_image, exp_val_label = train_val_split(train_image, train_label_flattened)


In [None]:
from skimage.transform import rotate
from skimage.util import random_noise

# the augmentation method that keeps data from all classes the same augmentation
# ratio and number

def data_aug_2 (train_image, train_label):
    aug_img = []
    aug_label = []
    for i in range(len(train_image)):
        if train_label[i] == 0:
            continue
        r1_img = rotate(train_image[i], 90)
        aug_img.append(r1_img)
        aug_label.append(train_label[i])
        if train_label[i] == 1:
            continue
        r2_img = rotate(train_image[i], 180)
        aug_img.append(r2_img)
        aug_label.append(train_label[i])
        r3_img = np.flipud(train_image[i])
        aug_img.append(r3_img)
        aug_label.append(train_label[i])
        if train_label[i] == 2:
            continue
        r4_img = np.fliplr(train_image[i])
        aug_img.append(r4_img)
        aug_label.append(train_label[i])

    return train_image + aug_img, train_label + aug_label


In [None]:
print("Initial Train set length is %d" % (len(exp_train_image)))
print("Validation set length is %d" % (len(exp_val_image)))

aug_img, aug_label = data_aug_2(exp_train_image, exp_train_label)

# shuffle
index = np.random.permutation(len(aug_img))
aug_img =  [aug_img[idx] for idx in index]
aug_label = [aug_label[idx] for idx in index]

print(len(aug_img))
print(len(aug_label))


# Prepare model

In [None]:
from torch import nn
from scipy.stats import truncnorm
from torch.nn.parameter import Parameter
import torch.optim as optim
from torch.autograd import Variable
import torch
import h5py
from pathlib import Path

In [None]:


# setting device on GPU if available, else CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# a generator for batches of data
# yields data (batchsize) and labels (batchsize)
# if shuffle is True, it will load batches in a random order
def DataBatch(data, label, batchsize, shuffle=True):
    n = len(data)
    if shuffle:
        index = np.random.permutation(n)
    else:
        index = np.arange(n)
    for i in range(int(np.ceil(n/batchsize))):
        inds = index[i*batchsize : min(n,(i+1)*batchsize)]
        yield [data[idx] for idx in inds], [label[idx] for idx in inds]

# evaluate method for cnn
def eval_cnn(model, test_data, test_label, minibatch = 100, n=4):
    correct=0.
    M = np.zeros((n,n))

    for i, (data,label) in enumerate(DataBatch(test_data,test_label,minibatch,shuffle=False)):
        data = Variable(torch.FloatTensor(np.asarray(data)))
        data = data.unsqueeze(1)
        data = data.to(torch.device("cuda"))
        labels = Variable(torch.LongTensor(np.asarray(label)))
        labels = labels.to(torch.device("cuda"))
        prediction = model.forward(data)
        with torch.no_grad():
            numpy_pred = prediction.cpu().numpy()
            batch_pred = np.argmax(numpy_pred, axis=1)
            correct += np.sum(batch_pred==label)
        
            for j in range(len(label)):
                M[int(label[j]),int(batch_pred[j])] += 1

    for i in range(n):
        M[i,:] /= np.sum(M[i,:])
      
    acc = correct/len(test_data)*100
    print('Test accuracy is %f' % (acc))
    return M, acc


# helper function to initialize weight variable
def weight_variable(shape):
    initial = torch.Tensor(truncnorm.rvs(-1/0.01, 1/0.01, scale=0.01, size=shape))
    return Parameter(initial, requires_grad=True)

# helper function to initialize bias variable
def bias_variable(shape):
    initial = torch.Tensor(np.ones(shape)*0.1)
    return Parameter(initial, requires_grad=True)


In [None]:
def train_net(model, trainData, trainLabels, valData, valLabels, epochs=10, learnRate = 1e-4, batchSize=50, weights = None):
        
        if weights:
            class_weights = torch.FloatTensor(weights).cuda()
            criterion = nn.CrossEntropyLoss(weight=class_weights)
        else:
            criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr = learnRate)

        train_loss = []
        val_loss = []
        train_accuracy = []
        test_accuracy = []
        
        
        for epoch in tqdm(range(epochs)):
            model.to(torch.device("cuda"))
            model.train()  # set network in training mode
            epoch_train_loss = []
            epoch_val_loss = []

            for i, (data,labels) in enumerate(DataBatch(trainData, trainLabels, batchSize, shuffle=True)):
                data = Variable(torch.FloatTensor(np.asarray(data)))
                data = data.unsqueeze(1)
                data = data.to(torch.device("cuda"))
                labels = Variable(torch.LongTensor(np.asarray(labels)))
                labels = labels.to(torch.device("cuda"))
                
                # Now train the model using the optimizer and the batch data
                prediction = model.forward(data)
                loss = criterion(prediction, labels)
                epoch_train_loss.append(loss.item())
                # print('Epoch %d Batch number %d loss: %f' % (epoch+1, i, np.mean(np.array(epoch_train_loss))))
                model.zero_grad()
                loss.backward()
                optimizer.step()

            model.to(torch.device("cuda"))
            model.eval()  # set network in evaluation mode
            # validation loss
            for i, (val_data, val_labels) in enumerate(DataBatch(valData, valLabels, batchSize, shuffle=False)):
                val_data = Variable(torch.FloatTensor(np.asarray(val_data)))
                val_data = val_data.unsqueeze(1)
                val_data = val_data.to(torch.device("cuda"))
                val_labels = Variable(torch.LongTensor(np.asarray(val_labels)))
                val_labels = val_labels.to(torch.device("cuda"))
                with torch.no_grad():
                    prediction = model.forward(val_data)
                    loss = criterion(prediction, val_labels)
                    epoch_val_loss.append(loss.item())

            epoch_mean_val_loss = np.mean(np.array(epoch_val_loss))
            val_loss.append(epoch_mean_val_loss)
                
            epoch_mean_train_loss = np.mean(np.array(epoch_train_loss))
            train_loss.append(epoch_mean_train_loss)
            
        print ('Epoch:%d train loss: %f val loss: %f'%(epoch+1, epoch_mean_train_loss, epoch_mean_val_loss))
        
        return train_loss, val_loss

In [None]:
import torchvision.models as models
resnet18 = models.resnet18(pretrained=True)
resnet18.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3,bias=False)
resnet18.fc = nn.Linear(512, 4)


In [None]:
print(resnet18)

In [None]:
train_loss, val_loss = train_net(resnet18, aug_img, aug_label, exp_val_image, exp_val_label, epochs=10, batchSize=128)
torch.save(resnet18.state_dict(), Path('./model/exp_resnet18_pretrained_aug.pt'))

# Test model



In [None]:
# Handle data path
from pathlib import Path

# Read and display data from Physionet
import wfdb
import pprint
import collections
from IPython.display import clear_output

# Data manipulation and plotting
import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import resample
from scipy.signal import butter,filtfilt

# Divide data into train and test set and save to HDF5
import h5py
import os
from sklearn.model_selection import train_test_split
from sklearn import preprocessing

# Over and undersampling
from imblearn.over_sampling import RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler

from tqdm.notebook import tqdm
import cv2
from tqdm.notebook import tqdm

from torch import nn
from scipy.stats import truncnorm
from torch.nn.parameter import Parameter
import torch.optim as optim
from torch.autograd import Variable
import torch

In [None]:
data_dir = Path('./data/2D_BW')
os.makedirs(data_dir, exist_ok=True)

# directory that store original MIT-BIH data

img_dir = data_dir / 'img'
os.makedirs(img_dir, exist_ok=True)

label_dir = data_dir / 'label'
os.makedirs(label_dir, exist_ok=True)

# Convert training images to nparray
# Convert png files in a directory into the destination nparray
def pngConverter (destination, image_dir, if_red_dim):
    for root, dirs, files in os.walk(image_dir):
        for file in files:
            if file.endswith('.png'):
                file_path = os.path.join(root,file)
                img = cv2.imread(file_path)
            if if_red_dim:
                img = np.mean(img, axis = 2)
            destination.append(img)
            
train_record_list = [101, 106, 108, 109, 112, 114, 115, 116, 118, 119, 122, 124, 201, 203, 205, 207, 208, 209, 215, 220, 223, 230]
test_record_list = [100, 103, 105, 111, 113, 117, 121, 123, 200, 202, 210, 212, 213,214, 219, 221, 222, 228, 231, 232, 233, 234]

In [None]:
test_label = []
test_image = []

for record_number in tqdm(test_record_list, total = len(test_record_list)):
    image_dir = img_dir / str(record_number)
    pngConverter(test_image, image_dir, True)
    label_str = str(record_number)+'.csv'
    label_path = label_dir / label_str
    test_label.append(np.genfromtxt(label_path,delimiter = ','))

test_label_flattened = []
for i in range(len(test_label)):
    for j in range(len(test_label[i])):
        test_label_flattened.append(test_label[i][j])

In [None]:
label_arr = np.array(test_label_flattened)
index = np.where(np.array(label_arr) == 4)[0]

In [None]:
test_label_flattened = [i for j, i in enumerate(test_label_flattened) if j not in index]
test_image = [i for j, i in enumerate(test_image) if j not in index]

In [None]:
print(np.unique(test_label_flattened))

In [None]:
print(len(test_image), test_image[0].shape)

In [None]:
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from sklearn.metrics import plot_confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay
import torchvision.models as models

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


def DataBatch(data, label, batchsize, shuffle=True):
    n = len(data)
    if shuffle:
        index = np.random.permutation(n)
    else:
        index = np.arange(n)
    for i in range(int(np.ceil(n/batchsize))):
        inds = index[i*batchsize : min(n,(i+1)*batchsize)]
        yield [data[idx] for idx in inds], [label[idx] for idx in inds]

def test_model(model, test_data, test_label, batch_size=64, n=4):
    """
    This function will run test of the model on the test dataset and return 
        - classification report string (for display purpose)
        - dictionary of classification report (for query purpose)
        - confusion matrix
    """
    
    predictions = []
    labels = []
    model.to(device)
    
    for i, (data,label) in enumerate(DataBatch(test_data,test_label,batch_size,shuffle=False)):
        data = Variable(torch.FloatTensor(np.asarray(data)))
        data = data.unsqueeze(1)
        data = data.to(torch.device("cuda"))
        prediction = model.forward(data)
        with torch.no_grad():
            data, label = data.to(device), label
            predictions += list(np.argmax(model(data).cpu().numpy(), axis=1))
            labels += list(label)
            
    predictions = np.array(predictions)
    labels = np.array(labels)
        
    target_names = ['N', 'S', 'V', 'F']
    report = classification_report(labels, predictions, target_names=target_names, digits=3)
    report_dict = classification_report(labels, predictions, target_names=target_names, output_dict=True)
    c_matrix = confusion_matrix(labels, predictions)
    return report, report_dict, c_matrix

resnet18 = models.resnet18(pretrained=False)
resnet18.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3,bias=False)
resnet18.fc = nn.Linear(512, 4)
resnet18.load_state_dict(torch.load(Path('./model/exp_resnet18_pretrained_aug.pt')))
resnet18.eval()

report, report_dict, c_matrix = test_model(resnet18, test_image, test_label_flattened)

In [None]:
print('Training result:\n', report)

In [None]:
import itertools

## display confusion matrix
display_labels = ['N', 'S', 'V', 'F']

def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.show()
    plt.clf()
    
plot_confusion_matrix(c_matrix, display_labels ,
                      title='Normalzied Confusion Matrix', normalize=True, cmap='Greys')
