In [None]:
# Multiclass Arrhythmia Classification from Photoplethysmography Signals based on VGGNet

In [None]:
import argparse
import os
import operator as op #
import random
import math
from scipy import signal as sig
import scipy.io as sio
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.parallel
import torch.nn.functional as F
import torch.backends.cudnn as cudnn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torch.autograd import Variable
from sklearn.metrics import classification

In [None]:
parser = argparse.ArgumentParser()
parser.add_argument('--dataroot', required=True, help='path to dataset')
parser.add_argument('--workers', type=int, help='number of data loading workers', default=8)
parser.add_argument('--multiLabel', type=bool, default=False, help='enable multiple label')
parser.add_argument('--batchSize', type=int, default=128, help='input batch size')
parser.add_argument('--recordLength', type=int, default=2500, help='the length of input record')
parser.add_argument('--niter', type=int, default=200, help='number of epochs to train for')
parser.add_argument('--lr', type=float, default=0.001, help='learning rate')
parser.add_argument('--beta1', type=float, default=0.9, help='beta1 for adam. ')
parser.add_argument('--cuda', action='store_true', help='enables cuda')
parser.add_argument('--ngpu', type=int, default=1, help='number of GPUs to use')
parser.add_argument('--outf', default='.', help='folder to output model checkpoints')
parser.add_argument('--manualSeed', type=int, help='manual seed')
os.environ['CUDA_VISIBLE_DEVICES'] = '8'
opt = parser.parse_args(['--dataroot','./PPGArrhythmiaClassification/','--cuda'])
print(opt)

In [None]:
try:
    os.makedirs(opt.outf)
except OSError:
    pass
if opt.manualSeed is None:
    opt.manualSeed = random.randint(1, 10000)
print("Random Seed: ", opt.manualSeed)
random.seed(opt.manualSeed)
torch.manual_seed(opt.manualSeed)
if opt.cuda:
    torch.cuda.manual_seed_all(opt.manualSeed)
cudnn.benchmark = True
if torch.cuda.is_available() and not opt.cuda:
    print("WARNING: You have a CUDA device, so you should probably run with --cuda")

In [None]:
# Mean filtering function
def MeanFilter(ppg,M):
    if M%2 ==0:
        M = M+1;
    Len = math.floor((M-1)/2)
    
    x = np.zeros((1,Len+len(ppg)+Len))
    x = x[0,:]
    x[0:Len] = ppg[0:Len]
    x[Len:Len+len(ppg)] = ppg
    x[Len+len(ppg):len(x)] = ppg[-Len:,]
    
    ppgs = ppg.astype(float)
    for i in range(3,len(x)-Len):
        ppgs[i-Len] = np.mean(x[i-Len:i+Len])
    return ppgs

In [None]:
class MyDataset(Dataset):
    
    def __init__(self, dir, ext='train', transform=None):
        """
        Args:
            dir (string): Directory
            ext (string): the extension used to claim training or testing data

        """
        labels = np.zeros((1,1))
        ppgseg = np.zeros((1,1000))
        with open(ext+'_list.txt','r') as f:
            for line in f.readlines():
                line = line.strip('\n')
                path1 = os.path.join('./NewPPGData1103',line[1:-1])
                path2 = os.path.join('./NewPPGData1207',line[1:-1])
                if os.path.exists(path1):
                    data = sio.loadmat(path1)
                else:
                    data = sio.loadmat(path2)
                labels = np.vstack((labels,data['labels']))
                ppgseg = np.vstack((ppgseg,data['ppgseg']))       
        
        labels = labels[1:len(labels),:]
        ppgseg = ppgseg[1:len(ppgseg),:]
        # labels
        # 0 sinus rhythm
        # 1 premature ventricular contraction
        # 2 premature atrial contraction
        # 3 ventricular tachycardia
        # 4 supraventricular tachycardia
        # 5 atrial fibrillation.
        print(ppgseg.shape)
        
        self.labels = labels
        self.ppgseg = ppgseg
        self.transform = transform
        
    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):

        ppgseg = self.ppgseg[idx]
        labels = self.labels[idx]
    
        sample = { 'ppgseg': ppgseg, 'labels': labels}
        
        if self.transform:
            sample = self.transform(sample)

        return sample
    
class CropAverage(object):
    """
    Crop the signal to certain length, and provide the average sbp as the target

    Args:
        length (int): the length to be cropped to
    """
    def __init__(self, length):
        self.length = length

    def __call__(self, sample):

        ppgseg, labels= sample['ppgseg'], sample['labels']
        signal = np.array([ppgseg])

        return {'signal': signal, 'labels': labels}
    
class ToTensor(object):
    """Convert ndarrays in sample to Tensors."""

    def __call__(self, sample):
        signal = sample['signal']
        labels = sample['labels']

        return {'signal': torch.from_numpy(signal).float(),
                'labels': torch.from_numpy(labels).float()
               }

In [None]:
# Loading training set
train_dataset = MyDataset(dir=opt.dataroot,
                                 ext='train',
                                 transform=transforms.Compose([
                                     CropAverage(opt.recordLength),
                                     ToTensor()
                                 ]))
train_loader = DataLoader(train_dataset, batch_size=opt.batchSize,
                          shuffle=True,  num_workers=int(opt.workers))

In [None]:
# Loading validation set
valid_dataset = MyDataset(dir=opt.dataroot,
                                 ext='valid',
                                 transform=transforms.Compose([
                                     CropAverage(opt.recordLength),
                                     ToTensor()
                                 ]))
valid_loader = DataLoader(valid_dataset, batch_size=opt.batchSize,
                          shuffle=False,  num_workers=int(opt.workers))

In [None]:
class VGG(nn.Module):

    def __init__(self, features, ngpu, num_classes=4, init_weights=True):
        super(VGG, self).__init__()
        self.features = features
        self.ngpu = ngpu
        
        self.classifier = nn.Sequential(
            nn.Linear(1024, 256), 
            nn.ReLU(True),        
            nn.Dropout(),        
            nn.Linear(256, num_classes)
        )
        if init_weights:
            self._initialize_weights()

    def forward(self, x):
        if isinstance(x.data, torch.cuda.FloatTensor) and self.ngpu > 1:
            x = nn.parallel.data_parallel(self.features, x, range(self.ngpu))
            x = x.view(x.size(0), -1)
            x = nn.parallel.data_parallel(self.classifier, x, range(self.ngpu))
        else:
            x = self.features(x)
            x = x.view(x.size(0), -1)
            x = self.classifier(x)
        return x

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv1d): 
                nn.init.kaiming_normal_(m.weight.data,mode='fan_out')
                if m.bias is not None:
                    m.bias.data.zero_()
            elif isinstance(m, nn.BatchNorm1d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()
            elif isinstance(m, nn.Linear):
                nn.init.kaiming_normal_(m.weight.data,mode='fan_out')
                m.bias.data.zero_()

def make_layers(cfg, batch_norm=True):
    layers = []
    in_channels = 1 
    for v in cfg:
        if v == 'M':
            layers += [nn.MaxPool1d(kernel_size=3, stride=3)]
        else:
            conv1d = nn.Conv1d(in_channels, v, kernel_size=3, padding=1)
            if batch_norm:
                layers += [conv1d, nn.BatchNorm1d(v), nn.ReLU(inplace=True)]
            else:
                layers += [conv1d, nn.ReLU(inplace=True)]
            in_channels = v
    return nn.Sequential(*layers)

cfg = {
    'A': [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'B': [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'D': [32, 32, 'M', 64, 64, 'M', 128, 128, 128, 'M', 256, 256, 256, 'M', 256, 256, 256, 'M'],
    'E': [32, 32, 'M', 64, 64, 'M', 128, 128, 128, 128, 'M', 256, 256, 256, 256, 'M', 256, 256, 256, 256, 'M'],
}

def vgg16_bn(**kwargs):
    """VGG 16-layer model (configuration "D") with batch normalization
    """
    model = VGG(make_layers(cfg['D'], batch_norm=True), **kwargs)
    return model

In [None]:
ngpu = int(opt.ngpu)
num_types = 1
model = vgg16_bn(ngpu=ngpu,num_classes=6)
print(model)

In [None]:

ngpu = int(opt.ngpu)
in_x = Variable(torch.randn(20,1,500*2))
in_y = Variable(torch.randn(20,1))
out_x = model(in_x)
#fc_fea = resnet_cifar(model, in_x)
#print(fc_fea.shape)
print(in_x.shape)
print(out_x.shape)

In [None]:
loss_fun = nn.CrossEntropyLoss()
if opt.cuda:
    model.cuda()
    loss_fun.cuda()
optimizer = optim.Adam(list(model.parameters()), lr = opt.lr, betas=(opt.beta1, 0.999),weight_decay=0.004)
scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)

In [None]:
for epoch in range(opt.niter):
    # Model training
    losses = []
    predictions = []
    labelsall = []
    model.train()
    for i, train_data in enumerate(train_loader, 0):
        model.zero_grad()
        inputs, labels = train_data['signal'], train_data['labels']
        if opt.cuda:
            inputs = Variable(inputs).cuda()
            labels = Variable(labels).cuda()
                   
        output = model(inputs)
        labels = labels[:,0].long()
        loss = loss_fun(output, labels)

        loss.backward()
        optimizer.step()
        losses.append(loss.item())  
        _, predicted = torch.max(output.data, 1)
        predicted = predicted.cpu()
        labels = labels.cpu()
        predictions.extend(predicted.numpy())
        labelsall.extend(labels.numpy())
        
    scheduler.step()
    print('[%d/%d] Loss: %.4f Training Accuracy: %.4f' %(epoch, opt.niter, np.average(losses), 
                                                         classification.accuracy_score(labelsall, predictions))) 
    EpochAcc = np.vstack((EpochAcc,np.array((epoch, np.average(losses)))))
    # Model validation
    model.eval()
    if (epoch+1) % 5 == 0:
        # validate
        predictions = []
        labelsall = []
        losses = []
        for i, valid_data in enumerate(valid_loader, 0):
            inputs, labels = valid_data['signal'], valid_data['labels']
            if opt.cuda:
                inputs = Variable(inputs).cuda()
                labels = Variable(labels).cuda()
                
            output = model(inputs)
            labels = labels[:,0].long()
            loss = loss_fun(output, labels)
            losses.append(loss.item())  
            _,predicted = torch.max(output.data, 1)
            predicted = predicted.cpu()
            labels = labels.cpu()
            
            predictions.extend(predicted.numpy())
            labelsall.extend(labels.numpy())
        print('[%d/%d] Validation Accuracy: %.4f' %(epoch, opt.niter, classification.accuracy_score(labelsall, predictions)))
        EpochValidAcc = np.vstack((EpochValidAcc,np.array((epoch, np.average(losses)))))
        
        if classification.accuracy_score(labelsall, predictions) > ValidAcc:
            ValidAcc = classification.accuracy_score(labelsall, predictions)
            labelsAll = labelsall
            predictionsAll = predictions
            torch.save(model, 'modelPPG.pkl')

In [None]:
# Loading test set
test_dataset = MyDataset(dir=opt.dataroot,
                                 ext='test',
                                 transform=transforms.Compose([
                                     CropAverage(opt.recordLength),
                                     ToTensor()
                                 ]))
test_loader = DataLoader(test_dataset, batch_size=opt.batchSize,
                          shuffle=False,  num_workers=int(opt.workers))

In [None]:
model = torch.load('modelPPG.pkl')
predictions = []
labelsall = []
Fea_all = np.zeros((1,256), dtype = float)
pred_score = np.zeros((1,6), dtype = float)
for i, test_data in enumerate(test_loader, 0):
    inputs, labels = test_data['signal'], test_data['labels']
    if opt.cuda:
        inputs = Variable(inputs).cuda()
        labels = Variable(labels).cuda()
        
    output = model(inputs)
    labels = labels[:,0].long()
    score = output.data.cpu().numpy()
    pred_score = np.vstack((pred_score,score))       

    _,predicted = torch.max(output.data, 1)
    predicted = predicted.cpu()
    labels = labels.cpu()  
    predictions.extend(predicted.numpy())
    labelsall.extend(labels.numpy())
    
labelsAll = labelsall
predictionsAll = predictions
pred_score = pred_score[1:len(pred_score),:]

In [None]:
print(classification.classification_report(labelsAll,predictionsAll,digits=4))
confusion = classification.confusion_matrix(labelsAll,predictionsAll)
classes = ['SR', 'PVC', 'PAC','VT','SVT','AF']
indices = range(len(confusion))
plt.xticks(indices, classes)
plt.yticks(indices, classes)
plt.imshow(confusion,cmap=plt.cm.OrRd)
plt.colorbar()
plt.xlabel('Predicted label')
plt.ylabel('True label')
for first_index in range(len(confusion)):
    for second_index in range(len(confusion[first_index])):
        plt.text(first_index-0.25, second_index, confusion[second_index][first_index])
        
print('Sensitivity, Specificity, Positive predictive value,Negative predictive values,Accuracy')
for i in range(len(confusion)):
    TP = confusion[i,i]
    FN = sum(confusion[i,:])-TP
    FP = sum(confusion[:,i])-TP
    TN = sum(sum(confusion))-TP-FP-FN
    
    print(np.array([TP/(TP+FN), TN/(FP+TN), TP/(TP+FP), TN/(FN+TN), (TP+TN)/(TP+FP+FN+TN)]))

In [None]:
from itertools import cycle
from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import label_binarize
from scipy import interp

y_true = label_binarize(np.array(labelsAll), classes=[0, 1, 2, 3, 4, 5])
n_classes = 6
fpr = dict()
tpr = dict()
roc_auc = dict()
for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(y_true[:, i], pred_score[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])
    
# Compute macro-average ROC curve and ROC area（方法一）
# First aggregate all false positive rates
all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes)]))
# Then interpolate all ROC curves at this points
mean_tpr = np.zeros_like(all_fpr)
for i in range(n_classes):
    mean_tpr += interp(all_fpr, fpr[i], tpr[i])
# Finally average it and compute AUC
mean_tpr /= n_classes
fpr["macro"] = all_fpr
tpr["macro"] = mean_tpr
roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])

# Plot all ROC curves
plt.figure()
plt.plot(fpr["macro"], tpr["macro"],
         label='Average ROC curve (AUC = {0:0.2f})'
               ''.format(roc_auc["macro"]),
         color='navy', linewidth=2)
   
colors = cycle(['aqua', 'darkorange', 'blue','red','blue','red'])
for i, color in zip(range(n_classes), colors):
    plt.plot(fpr[i], tpr[i], color=color, lw=1.5,
             label='ROC curve of class {0} (AUC = {1:0.2f})'
             ''.format(i, roc_auc[i]))

plt.plot([0, 1], [0, 1], 'k--', lw=1)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC to multi-class')
plt.legend(loc="lower right")
plt.show()