In [3]:
import sklearn
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from sklearn import metrics
from sklearn.preprocessing import MultiLabelBinarizer
import scipy  
import numpy as np
import pandas as pd 
import matplotlib 
from matplotlib import pyplot as plt
import time
import wfdb
import ast

# pytorch
import torch
import torch.nn as nn
from torch.autograd import Variable

In [4]:
def loadData(dfHeaders, path):
    data = [wfdb.rdsamp(path+f) for f in dfHeaders['filename_lr']]
    data = torch.Tensor(np.array([signal for signal, meta in data]))
    return data


def getLabel(codeDict, SCP): 
    diag = []  
    for key in codeDict.keys():
        if key in SCP.index: 
            diag.append(SCP.loc[key]['diagnostic_class'])
    diag = np.asarray(list(set(diag)))              
    return diag


def getOHE(header):
    mlb = MultiLabelBinarizer()           
    ohe = pd.DataFrame(mlb.fit_transform(header['scp_codes']), columns = mlb.classes_, index=header['scp_codes'].index)  
    return ohe


def get3LSignal(data, header):
    combData = []
    for d in data:
        d = d.T
        combSig = (d[0,:]+d[1,:]+d[2,:])/3
        combData.append(combSig.tolist())
    return pd.DataFrame(combData, index=header.index.to_numpy())


def getDataLoader(X_train, X_test, Y_train, Y_test, batch_size):
    trainLoad = torch.utils.data.TensorDataset(X_train, Y_train)
    testLoad = torch.utils.data.TensorDataset(X_test, Y_test)
    trainLoad = torch.utils.data.DataLoader(trainLoad, batch_size=batch_size, drop_last=True, shuffle=True)
    testLoad = torch.utils.data.DataLoader(testLoad, batch_size=batch_size, drop_last=True, shuffle=True)
    return trainLoad, testLoad

In [5]:
dfHeaders = pd.read_csv('ptb-xl/ptbxl_database.csv', index_col = 'ecg_id').dropna(axis=1)
dfHeaders['scp_codes'] = dfHeaders['scp_codes'].apply(lambda x: ast.literal_eval(x))
dfSCP = pd.read_csv('ptb-xl/scp_statements.csv', index_col=0)
dfSCP = dfSCP[dfSCP['diagnostic']==1]
output_size = 5 

data = get3LSignal(loadData(dfHeaders, 'ptb-xl/'), dfHeaders)
dfHeaders['scp_codes'] = dfHeaders['scp_codes'].apply(lambda x: getLabel(x, dfSCP))

In [6]:
# use OHE for binary, tempOHE for multi-label binary
tempOHE, OHE = getOHE(dfHeaders), getOHE(dfHeaders)['NORM']
output_size = 1
dataWLabel = pd.merge(tempOHE, data, how='outer', left_index=True, right_index=True)
cdData = dataWLabel.groupby(by=['CD'], axis=0).mean().iloc[1,4:]
hypData = dataWLabel.groupby(by=['HYP'], axis=0).mean().iloc[1,4:]
miData = dataWLabel.groupby(by=['MI'], axis=0).mean().iloc[1,4:]
normData = dataWLabel.groupby(by=['NORM'], axis=0).mean().iloc[1,4:]
sttcData = dataWLabel.groupby(by=['STTC'], axis=0).mean().iloc[1,4:]

# CD and MI look most like normData
truncID = tempOHE[tempOHE['CD']==1].index.to_numpy()
truncID = np.append(truncID, tempOHE[tempOHE['MI']==1].index.to_numpy())
truncData = data[~data.index.isin(truncID)]
OHE = OHE[~OHE.index.isin(truncID)]
data = truncData
display(OHE.size)
display(truncData.size)

13245

13245000

In [7]:
trainID = dfHeaders[dfHeaders['strat_fold']<10].index.to_numpy()
testID = dfHeaders[dfHeaders['strat_fold']==10].index.to_numpy()
X_train, Y_train = data[data.index.isin(trainID)], OHE[OHE.index.isin(trainID)]
X_test, Y_test = data[~data.index.isin(trainID)], OHE[~OHE.index.isin(trainID)]
X_train = torch.Tensor(X_train.to_numpy()).unsqueeze(dim=2).float()
X_test = torch.Tensor(X_test.to_numpy()).unsqueeze(dim=2).float()
Y_train = torch.Tensor(Y_train.to_numpy()).float()
Y_test = torch.Tensor(Y_test.to_numpy()).float()

In [8]:
def getAcc(true, pred):
    if true.dim() != 1:
        return sklearn.metrics.balanced_accuracy_score(true[:,3], pred[:,3])*100
    return sklearn.metrics.balanced_accuracy_score(true, pred)*100


def analyzeList(labelList, predList, output_size, loss_eq, threshold=.5): 
    pred = torch.Tensor(np.where(np.asarray(predList) > threshold, 1, 0)).view(-1,output_size).squeeze()
    true = torch.Tensor(np.asarray(labelList)).view(-1,output_size).squeeze()
    
    acc = getAcc(true, pred)
    avg_loss = loss_eq(torch.Tensor(predList), torch.Tensor(labelList)).item()
    return acc, avg_loss, true, pred


def getGraph(trainloss, true, pred):
    plt.title("Training Avg Loss")
    plt.plot(trainloss)
    plt.show()
    plt.title("Derivative of Avg training Loss")
    plt.plot(np.diff(np.diff(trainloss)))
    plt.show()
    if true.dim() != 1: 
        true = true[:,3]
        pred = pred[:,3]
    print("AUC: " + str(sklearn.metrics.roc_auc_score(y_true=true, y_score=pred)))
    plt.title("ROC")
    fpr, tpr, thresholds = metrics.roc_curve(y_true=true, y_score=pred)
    plt.plot(fpr, tpr)
    plt.show()

In [9]:
def test(testLoad, model, loss_eq, batch_size):
    predList, labelList = [], []
    
    with torch.no_grad():
        for i, (feature, label) in enumerate(testLoad):    
            hn, cn = model.initHid(batch_size), model.initHid(batch_size)
            pred, hn, cn = model(feature, hn, cn)
            pred = pred.squeeze().numpy().tolist()
            label = label.squeeze().numpy().tolist()
            if type(pred)!=list:
                predList.extend([pred])
                labelList.extend([label])
            else:
                predList.extend(pred)
                labelList.extend(label)       
                
    return labelList, predList

In [17]:
def paramOptim(trainLoad, testLoad, params, epochs, output_size=1):
    start_time = time.time()
    hidden_size = params[0]
    num_layers = params[1]
    lr = params[2]
    batch_size = params[3]
    input_size = 1
    drop = .1
    
    RNNModel = RNN(input_size, hidden_size, num_layers, output_size, drop, bidi=False)    
    loss_eq = nn.MSELoss(reduction='mean') 
    trainloss = trainLoop(RNNModel, trainLoad, epochs, batch_size, loss_eq, lr)
    testLL, testPL = test(testLoad, RNNModel, loss_eq, batch_size)
    testacc, testloss, true, pred = analyzeList(testLL, testPL, output_size, loss_eq, .5)
    
    print("Testing Acc: " + str(testacc))
    print("Testing Loss: " + str(testloss))
    print("--- %s seconds ---" % (time.time() - start_time))
    return trainloss, true, pred, RNNModel


def trainLoop(model, trainLoad, epochs, batch_size, loss_eq, lr):
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)
    #scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma = .9)
    trainloss = []
    
    for i in range(epochs):
        print("Epoch: " + str(i+1))
        tnloss = train(trainLoad, model, loss_eq, optimizer, batch_size)
        trainloss.append(tnloss)
    return trainloss

In [18]:
def train(trainLoad, model, loss_eq, optimizer, batch_size): 
    tnlossList = []
    for i, (feature, label) in enumerate(trainLoad):   
        optimizer.zero_grad()
   
        hn, cn = model.initHid(batch_size), model.initHid(batch_size)        
        pred, hn, cn = model(feature, hn, cn)
        
        loss = loss_eq(pred.squeeze(), label.squeeze())
        tnlossList.append(loss.item())
        loss.backward(retain_graph=True)
        optimizer.step()

    return sum(tnlossList)/len(tnlossList)

In [19]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, drop, bidi):
        super(RNN, self).__init__() 
        self.hidden_size = hidden_size 
        self.num_layers = num_layers 
        self.bidi = int(bidi)
        self.LSTM = nn.LSTM(input_size, hidden_size, num_layers, bias=True, batch_first=True, bidirectional=bidi)
        self.fc = nn.Sequential(nn.Linear((self.bidi+1)*hidden_size, 1, bias=False),
                                nn.Sigmoid())
                                
            
    def forward(self, feature, hn, cn): 
        torch.autograd.set_detect_anomaly(True)
        out, (hn, cn) = self.LSTM(feature, (hn, cn))

        if self.bidi: 
            cat = torch.cat((hn[-2, :, :], hn[-1, :, :]), dim=1)#should be (batch, 2*nodes)
        else:
            cat = out[:,-1,:] # should be (batch, nodes)
            
        out = self.fc(cat)
        return out, hn, cn

    
    def initHid(self, batch_size):
        hn = torch.zeros((self.bidi+1)*self.num_layers, batch_size, self.hidden_size).float()
        return hn

In [None]:
# params = [nodes, layers, loss rate, batch size]
paramDict = {
    'T': [20, 2, .05, 128],
    #'C': [50, 3,  512]
} 
for key in paramDict:
    params = paramDict[key]
    display(params)
    trainLoad, testLoad = getDataLoader(X_train, X_test, Y_train, Y_test, batch_size=params[3])
    trainloss, true, pred, model = paramOptim(trainLoad, testLoad, params, epochs=20, output_size=output_size)
    display(true.sum())
    display(pred.sum())
    display(len(pred))
    getGraph(trainloss, true, pred)

[20, 2, 0.05, 128]

Epoch: 1


In [None]:
# Importing Real Data
df_real = pd.read_csv('unlabeled.csv', dtype=np.float64, header=None, skiprows=1)
df_ecg = df_real[1]

df_ecg = pd.Series((df_ecg-df_ecg.min())/(df_ecg.max()-df_ecg.min()))
idx0 = df_ecg[0:100].idxmax()
df_chop = df_ecg[idx0:idx0+187].reset_index(drop=True)
real_data = torch.Tensor(df_chop)

In [None]:
def testReal(sample, model):
    with torch.no_grad():
        hn, cn = model.initHid(batch_size=params[3]), model.initHid(batch_size=params[3])
        pred, hn, cn = model(sample, hn, cn)
        return pred

sample = real_data.tile(params[3], 1).unsqueeze(dim=1)
results = testReal(sample, model).squeeze().tolist()
display(results)
results = sum(results)/len(results)


if results<.5:
    print("Negative")
else:
    print("Positive")