# Imports

In [None]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
with open('metadata.txt','w') as f:
  f.write('asas')

In [None]:
import torch

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
torch.set_printoptions(edgeitems=2)

In [None]:
import tqdm.auto as tqdm

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cpu


# 1D-CNN Model

In [None]:
class ResBlock(nn.Module):
    def __init__(self, n_filters_in, n_samples_in, n_filters_out, n_samples_out,
                 dropout_rate=0.8, kernel_size=17):
        
        super(ResBlock, self).__init__()
        self.padding=(kernel_size-1)//2 
        downsample= n_samples_in//n_samples_out 

        self.conv1 = nn.Conv1d(n_filters_in, n_filters_out, kernel_size=kernel_size, padding=self.padding) 
        self.bn1 = nn.BatchNorm1d(n_filters_out)
        self.relu1 = nn.ReLU() 
        self.dropout1 = nn.Dropout(p=dropout_rate)
        self.conv2 = nn.Conv1d(n_filters_out, n_filters_out, kernel_size=kernel_size, stride=downsample, padding=self.padding) 
        
        self.sk_max_pool= nn.MaxPool1d(downsample)
        self.sk_conv = nn.Conv1d(n_filters_in, n_filters_out, kernel_size=1) 
        
        self.bn2 = nn.BatchNorm1d(n_filters_out) 
        self.relu2 = nn.ReLU() 
        self.dropout2 = nn.Dropout(p=dropout_rate)
        
       

    def forward(self, inputs):
        x,y = inputs

        y = self.sk_max_pool(y)# skip connection (Max Pool -> 1dConv)
        y = self.sk_conv(y)
        
        x = self.conv1(x) #Conv1d
        x = self.bn1(x) #bn
        x = self.relu1(x) #ReLU
        x = self.dropout1(x) #dropout 

        x = self.conv2(x) #conv

        x = x+y

        y = x

        x = self.bn2(x) #bn
        x = self.relu2(x) #relu
        x = self.dropout2(x) #dropout 

        return x,y
    
class ECGNet(nn.Module):
    def __init__(self, input_channels=12, N_labels=2, kernel_size =17,  n_blocks=4):
        super().__init__()

        self.padding= (kernel_size-1)//2
        
        self.conv1 = nn.Conv1d(input_channels, 64, kernel_size=kernel_size, padding=self.padding) # input_channelsx4096 -> #64x4096
        self.bn1 = nn.BatchNorm1d(64) #64x4096
        self.relu1 = nn.ReLU() #64x4096

        self.resblock1 = ResBlock(64,4096,128,1024)
        self.resblock2 = ResBlock(128,1024,196,256)
        self.resblock3 = ResBlock(196,256, 256, 64)
        self.resblock4 = ResBlock(256,64, 320, 16)
        
        self.flatten = nn.Flatten()
        self.dense_final = nn.Linear(320*16, N_labels)
        self.sigmoid_final = nn.Sigmoid()
    
    def forward(self, x_in):
        
        x = self.conv1(x_in)
        x = self.bn1(x)
        x = self.relu1(x)

        x, y = self.resblock1((x,x))
        x, y = self.resblock2((x,y))
        x, y = self.resblock3((x,y))
        x, _ = self.resblock4((x,y))
        
        x = self.flatten(x)
        x = self.dense_final(x)
        x = self.sigmoid_final(x)

        return x 

# New Model

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim


class ECGNet(nn.Module):
      
    def __init__(self, input_channels=12, N_labels=2, kernel_size =17,  n_blocks=4):
        super().__init__()

        self.padding= (kernel_size-1)//2
        self.conv1 = nn.Conv1d(input_channels, 64, kernel_size=kernel_size, padding=self.padding) # input_channelsx4096 -> #64x4096
        self.bn1 = nn.BatchNorm1d(64) #64x4096
        self.relu1 = nn.ReLU() #64x4096

        self.resblock1 = self.ResBlock(64,4096,128,1024)
        self.resblock2 = self.ResBlock(128,1024,196,256)
        self.resblock3 = self.ResBlock(196,256, 256, 64)
        self.resblock4 = self.ResBlock(256,64, 320, 16)
        
        self.flatten = nn.Flatten()
        self.dense_final = nn.Linear(320*16, N_labels)
        self.sigmoid_final = nn.Sigmoid()
    
    def forward(self, x_in):
        
        x = self.conv1(x_in)
        x = self.bn1(x)
        x = self.relu1(x)

        x, y = self.resblock1((x,x))
        x, y = self.resblock2((x,y))
        x, y = self.resblock3((x,y))
        x, _ = self.resblock4((x,y))
        
        x = self.flatten(x)
        x = self.dense_final(x)
        x = self.sigmoid_final(x)

        return x 
  
    class ResBlock(nn.Module):
      def __init__(self, n_filters_in, n_samples_in, n_filters_out, n_samples_out,
                  dropout_rate=0.8, kernel_size=17):
          super(ECGNet.ResBlock, self).__init__()
          self.padding=(kernel_size-1)//2 
          downsample= n_samples_in//n_samples_out 

          self.conv1 = nn.Conv1d(n_filters_in, n_filters_out, kernel_size=kernel_size, padding=self.padding) 
          self.bn1 = nn.BatchNorm1d(n_filters_out)
          self.relu1 = nn.ReLU() 
          self.dropout1 = nn.Dropout(p=dropout_rate)
          self.conv2 = nn.Conv1d(n_filters_out, n_filters_out, kernel_size=kernel_size, stride=downsample, padding=self.padding) 
          
          self.sk_max_pool= nn.MaxPool1d(downsample)
          self.sk_conv = nn.Conv1d(n_filters_in, n_filters_out, kernel_size=1) 
          
          self.bn2 = nn.BatchNorm1d(n_filters_out) 
          self.relu2 = nn.ReLU() 
          self.dropout2 = nn.Dropout(p=dropout_rate)
          
        

      def forward(self, inputs):
          x,y = inputs
          y = self.sk_max_pool(y)# skip connection (Max Pool -> 1dConv)
          y = self.sk_conv(y)
          x = self.conv1(x) #Conv1d
          x = self.bn1(x) #bn
          x = self.relu1(x) #ReLU
          x = self.dropout1(x) #dropout 
          x = self.conv2(x) #conv
          x = x+y
          y = x
          x = self.bn2(x) #bn
          x = self.relu2(x) #relu
          x = self.dropout2(x) #dropout 
          return x,y

model = ECGNet(N_labels = len(label_list))

#Proceed

In [None]:
label_list = [
"39732003"	,#LAD
"164934002"	,#TAb
"164889003"	,#AF
"427084000"	,#STach
"270492004"	,#IAVB
]

# Load Parameters

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/gdrive


In [None]:
filepath = '/content/gdrive/My Drive/TorchServer/original_model/weights.pt' #TODO Edit THIS

In [None]:
import torch
#Later to restore:
# model.load_state_dict(torch.load(filepath )) #On GPU
model.load_state_dict(torch.load(filepath, map_location=torch.device('cpu') ))

<All keys matched successfully>

In [None]:
torch.save(model.state_dict(), '/content/gdrive/My Drive/TorchServer/original_model/weights.pt')

# Inference Example from JSON File

In [None]:
import json

In [None]:
# Opening JSON file 
f = open('new_input.json',) #Sample input
  
# returns JSON object as a dictionary 
data = json.load(f) 

f.close() 

In [None]:
import numpy as np
inputs_arr = np.array(data,dtype=np.float32)

In [None]:
inputs_arr.shape

(12, 4096)

In [None]:
import cv2
cv2.imwrite('input.png',inputs_arr)

True

In [None]:
cv2.imread('input.png').shape

(12, 4096, 3)

In [None]:
inputs_tensor = torch.from_numpy(inputs_arr).unsqueeze(0)
#inputs_tensor = inputs_tensor.unsqueeze(0) #Add dimension for batch size

In [None]:
inputs_tensor.shape

torch.Size([1, 12, 4096])

In [None]:
output = model.forward(inputs_tensor)

In [None]:
dict={0:'LAD',1:'TAb',2:'AF',3:'STach',4:'iavb'}
label=int(torch.argmax(output))
dict[label]


'LAD'

In [None]:
output

In [None]:
for i in range(len(label_list)):
    print("%s %.3f"%(label_list[i], float(output[0][i])))

39732003 0.000
164934002 0.996
164889003 0.000
427084000 0.000
270492004 0.002


In [None]:
output.shape

torch.Size([1, 5])

In [None]:
output

tensor([[1.1893e-04, 9.9636e-01, 4.6840e-04, 3.8705e-05, 1.8320e-03]],
       grad_fn=<SigmoidBackward>)

# With Detailed Labels

In [None]:

def getDxMap(csv_name):
    csv_path = csv_name

    rows = open(csv_path).read().strip().split("\n")[1:]
    # random.shuffle(rows)

    print('Total labels: %d'%(len(rows)))

    dx_map ={}
    # for (i, row) in tqdm(enumerate(rows)):
    for row in rows:
        description, code, abbr= row.strip().split(",")
        dx_map[code] = (abbr,description)

    return dx_map

dx_map = getDxMap("Dx_map.csv")

Total labels: 111


In [None]:
dx_map

In [None]:
for i in range(len(label_list)):
    print("%s %.3f"%(dx_map[label_list[i]], float(output[0][i])))

('LAD', 'left axis deviation') 0.000
('TAb', 't wave abnormal') 0.996
('AF', 'atrial fibrillation') 0.000
('STach', 'sinus tachycardia') 0.000
('IAVB', '1st degree av block') 0.002
