## Packages

In [None]:
import random
import numpy as np
import os
import torch
import torch.nn as nn
import re
import pandas as pd 
import json
from torch.utils.data import Dataset
from torch.utils.data import DataLoader, SubsetRandomSampler
import pickle
from sklearn import metrics
import matplotlib.pyplot as plt
from collections import Counter
from sklearn.utils import shuffle
from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay

In [None]:
from tqdm import tqdm_notebook, trange

def seed_everything(seed = 42): 
  random.seed(seed) 
  os.environ['PYTHONHASHSEED'] = str(seed) 
  np.random.seed(seed)
  torch.manual_seed(seed) 
  torch.cuda.manual_seed(seed) 
  torch.backends.cudnn.deterministic = True
# For reproducible results
seed_everything()

In [None]:
import matplotlib as mpl
mpl.style.use('seaborn')

In [None]:
from google.colab import drive
drive.mount('/content/gdrive/')

In [None]:
%cd /content/gdrive/My Drive/seq

## Data Preprocessing

In [None]:
class MyDataset(Dataset):
    def __init__(self, X, Y):
        self.data = X
        self.target = Y
        
    def __getitem__(self, index):
        x = self.data[index]
        s = []
        for i in range(types):
          s.append(self.target[index][i])
        
        return x, s
    
    def __len__(self):
        return len(self.data)

### Reading Files

In [None]:
%ls data/

In [None]:
npzfile = np.load('data/mic_sst_comb_cap.npz')

In [None]:
X, Y = npzfile['arr_0'], npzfile['arr_1']

In [None]:
X, Y = shuffle(X, Y, random_state=0)

In [None]:
types = len(Y[0])

In [None]:
classes = max(max(Y[:,0]), max(Y[:,1])) + 1

#### Partition

In [None]:
test_size = Counter(Y[:,0])[0]*0.2
cap1, cap2 = test_size, test_size

In [None]:
testX, testY, idx_L = [], [], []
for idx, y in enumerate(Y):
  if y == 0 and cap1 > 0:
    testY.append(y)
    testX.append(X[idx])
    idx_L.append(idx)
    cap1 -= 1
  if y == 1 and cap2 > 0:
    testY.append(y)
    testX.append(X[idx])
    idx_L.append(idx)
    cap2 -= 1

In [None]:
validX, validY = [],[]
for idx, y in enumerate(Y):
  if idx in idx_L:
    continue 
  if y == 0 and cap1 > 0:
    validY.append(y)
    validX.append(X[idx])
    idx_L.append(idx)
    cap1 -= 1
  if y == 1 and cap2 > 0:
    validY.append(y)
    validX.append(X[idx])
    idx_L.append(idx)
    cap2 -= 1

In [None]:
trainX, trainY = [] []
for idx, y in enumerate(Y):
  if idx in idx_L:
    continue 
  trainX.append(X[idx])
  trainY.append(y)

### Divide Data

In [None]:
testX = X[int(len(Y)*0.8):]
testY = Y[int(len(Y)*0.8):]
validX = X[int(len(Y)*0.6):int(len(Y)*0.8)]
validY = Y[int(len(Y)*0.6):int(len(Y)*0.8)]
trainX = X[:int(len(Y)*0.6)]
trainY = Y[:int(len(Y)*0.6)]

In [None]:
trainX, trainY = shuffle(trainX, trainY, random_state=0)
validX, validY = shuffle(validX, validY, random_state=0)
testX, testY = shuffle(testX, testY, random_state=0)

### Convert to Torch

In [None]:
train_X = torch.from_numpy(trainX)
train_y = torch.from_numpy(trainY)
valid_X  = torch.from_numpy(validX)
valid_y = torch.from_numpy(validY)
test_X = torch.from_numpy(testX)
test_y = torch.from_numpy(testY)

In [None]:
train_dataset = MyDataset(train_X, train_y)
valid_dataset = MyDataset(valid_X, valid_y)
test_dataset = MyDataset(test_X, test_y)

## Helper Functions

In [None]:
def bestmodel(model_name,save_model_time,valid_loss):
    bestloss = 10000
    if valid_loss < bestloss :
        bestloss = valid_loss
        torch.save(model_name, 'model/model{save_model_time}/bestmodel.pkl'.format(save_model_time=save_model_time))
        torch.save(model_name.state_dict(), 'model/model{save_model_time}/net_params_bestmodel.pkl'.format(save_model_time=save_model_time))
    return True  

In [None]:
def onehot(y):
    y_onehot = np.zeros((len(y), classes), dtype=np.float32)

    all = [i for i in range(classes)]
    for i in range(len(y)):
      y_onehot[i][all.index(y[i])] = 1

    return y_onehot

## Training and Validating

In [None]:
save_model_time = '0'
mkpath = 'model/model%s'% save_model_time
# os.makedirs(mkpath)

In [None]:
class TrainHelper():
    '''
    Helper class that makes it a bit easier and cleaner to define the training routine
    
    '''

    def __init__(self,model,train_set,test_set,opts):
      self.model = model  # neural net

      # device agnostic code snippet
      self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
      self.model.to(self.device)

      self.epochs = opts['epochs']
      self.optimizer = torch.optim.Adam(model.parameters(), opts['lr']) # optimizer method for gradient descent
      #self.optimizer = torch.optim.SGD(model.parameters(), opts['lr'])
      if opts['loss_fxn'] == 'c':
        self.criterion = torch.nn.CrossEntropyLoss()                      # loss function
      else:
        self.criterion = torch.nn.BCEWithLogitsLoss()                    # loss function used in papers

      self.train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                                      batch_size=opts['batch_size'],
                                                      shuffle=True)
      self.valid_loader = torch.utils.data.DataLoader(dataset=valid_dataset,
                                                      batch_size=opts['batch_size'],
                                                      shuffle=True)
    def train(self):
      self.model.train() # put model in training mode
      for epoch in range(self.epochs):
          self.tr_loss = []
          for i, (data,labels) in tqdm_notebook(enumerate(self.train_loader),
                                                  total = len(self.train_loader)):
              label_list = []
              for i in range(len(labels)):
                label_list.append(labels[i].to(self.device))
              data = data.to(self.device)
              self.optimizer.zero_grad()  
              outputs = self.model(data)

              b_list = []
              for i in range(len(label_list)):
                b_list.append(label_list[i])
              if opts['loss_fxn'] == 'b':
                for i in range(len(label_list)):
                  b_list[i] = torch.from_numpy(onehot(labels[i])).to(self.device)

              loss = 0  # define loss
              for i in range(len(outputs)):
                loss += self.criterion(outputs[i], b_list[i])
   
              loss.backward()           
              self.optimizer.step()                  
              self.tr_loss.append(loss.item())       
          if (epoch+1) % 5 == 0 or epoch == 0: # save the model every _ epoch

              torch.save(self.model, 'model/model{save_model_time}/net_{epoch}.pkl'.format(save_model_time=save_model_time,epoch=int((epoch+1)/5)))
              torch.save(self.model.state_dict(), 'model/model{save_model_time}/net_params_{epoch}.pkl'.format(save_model_time=save_model_time,epoch=int((epoch+1)/5)))
          
          self.test(epoch) # run through the validation set

    def test(self,epoch):
            
      self.model.eval()    # puts model in eval mode
      self.test_loss = []
      self.test_accuracy_L = [[] for _ in range(types)]

      for i, (data, labels) in enumerate(self.valid_loader):
          
          label_list = []
          for i in range(len(labels)):
              label_list.append(labels[i].to(self.device))
          data = data.to(self.device)
          # pass data through network
          # turn off gradient calculation to speed up calcs and reduce memory
          with torch.no_grad():
              outputs = self.model(data)

          # make our predictions and update our loss info
          pred_list = []
          for i in range(len(outputs)):
            _, predicted = torch.max(outputs[i].data, 1)
            pred_list.append(predicted)

          b_list = []
          for i in range(len(label_list)):
            b_list.append(label_list[i])
          if opts['loss_fxn'] == 'b':
            for i in range(len(label_list)):
              b_list[i] = torch.from_numpy(onehot(labels[i])).to(self.device)

          loss = 0  # define loss
          for i in range(len(outputs)):
            loss += self.criterion(outputs[i], b_list[i])

          self.test_loss.append(loss.item())

          for i in range(len(pred_list)):
            self.test_accuracy_L[i].append((pred_list[i] == label_list[i]).sum().item() / pred_list[i].size(0))
      
      test_loss.append(np.mean(self.test_loss))
      train_loss.append(np.mean(self.tr_loss))
      av = [np.mean(self.test_accuracy_L[i]) for i in range(types)]
      bestmodel(self.model,save_model_time,np.mean(self.test_loss)) # find best model
      print('epoch: {}, train loss: {}, test loss: {}, test accuracy: {}'.format( 
            epoch+1, np.mean(self.tr_loss), np.mean(self.test_loss), av))

## Testing

In [None]:
train_X, train_y = shuffle(train_X, train_y, random_state=0) 
train_X_sub = train_X[:2000]
train_y_sub = train_y[:2000]
sub_dataset = MyDataset(train_X_sub, train_y_sub)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=100, shuffle=True)

In [None]:
def test_result(model, datatype):
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=100, shuffle=True)
    if datatype == 'sub':
      test_loader = torch.utils.data.DataLoader(sub_dataset, batch_size=100, shuffle=True)
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    test_accuracy_L = [[] for _ in range(types)]
    for i, (data, labels) in enumerate(test_loader):
      label_list = []
      for i in range(len(labels)):
          label_list.append(labels[i].to(device))
      data = data.to(device)
    # pass data through network
    # turn off gradient calculation to speed up calcs and reduce memory
      with torch.no_grad():
          outputs = model(data)
    # make our predictions and update our loss info
      pred_list = []
      for i in range(len(outputs)):
        _, predicted = torch.max(outputs[i].data, 1)
        pred_list.append(predicted)
      for i in range(len(pred_list)):
        test_accuracy_L[i].append((pred_list[i] == label_list[i]).sum().item() / pred_list[i].size(0))
    # if datatype == 'sub':
    #   print('Training accuracy for cell 1: {}, Training accuracy for cell 2: {}'.format( 
    #         np.mean(test_accuracy1), np.mean(test_accuracy2)))
    # else:
    #   print('Testing accuracy for cell 1: {}, Testing accuracy for cell 2: {}'.format(
    #        np.mean(test_accuracy1), np.mean(test_accuracy2)))
    return [np.mean(test_accuracy_L[i]) for i in range(types)]

In [None]:
def pltloss(train_loss, test_loss, epoch):
    epochs = [i for i in range(epoch)]
    fig = plt.figure()
    plt.plot(epochs, train_loss, 'g', label='Training loss')
    plt.plot(epochs, test_loss, 'b', label='Testing loss')
    plt.title('Training and Testing Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

Need better graph

In [None]:
 def pltacc(tr_acc_1, ts_acc_1, tr_acc_2, ts_acc_2, tr_acc_3, ts_acc_3, tr_acc_4, ts_acc_4, epoch):
    epochs = [i for i in range(epoch+1)][::5][1:]
    fig = plt.figure()
    plt.plot(epochs, tr_acc_1, 'g', linestyle='dashed', label='Training Accuracy for cell 1')
    plt.plot(epochs, ts_acc_1, 'g', label='Testing Accuracy for cell 1')
    plt.plot(epochs, tr_acc_2, 'b', linestyle='dashed', label='Training Accuracy for cell 2')
    plt.plot(epochs, ts_acc_2, 'b', label='Testing Accuracy for cell 2')
    plt.plot(epochs, tr_acc_3, 'r', linestyle='dashed', label='Training Accuracy for cell 3')
    plt.plot(epochs, ts_acc_3, 'r', label='Testing Accuracy for cell 3')
    plt.plot(epochs, tr_acc_4, 'y', linestyle='dashed', label='Training Accuracy for cell 4')
    plt.plot(epochs, ts_acc_4, 'y', label='Testing Accuracy for cell 4')

    plt.title('Accuracy over Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()

In [None]:
 def pltacc(tr_acc_1, ts_acc_1, tr_acc_2, ts_acc_2, epoch):
    epochs = [i for i in range(epoch+1)][::5][1:]
    fig = plt.figure()
    plt.plot(epochs, tr_acc_1, 'g', linestyle='dashed', label='Training Accuracy for cell 1')
    plt.plot(epochs, ts_acc_1, 'b', label='Testing Accuracy for cell 1')
    plt.plot(epochs, tr_acc_2, 'r', linestyle='dashed', label='Training Accuracy for cell 2')
    plt.plot(epochs, ts_acc_2, 'y', label='Testing Accuracy for cell 2')
    plt.title('Accuracy over Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()

In [None]:
def confusion(test_data, classifier, num):
    M = np.zeros((classes,classes))
    pred, label = [], []
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    for i, (data, labels) in enumerate(test_loader):
      data, labels_i = data.to(device),labels[num].to(device)
      label.extend(labels_i.tolist())
    # pass data through network
    # turn off gradient calculation to speed up calcs and reduce memory
      with torch.no_grad():
        outputs = classifier(data)
    # make our predictions and update our loss info
      _, predicted = torch.max(outputs[num].data, 1)
      pred.extend(predicted.tolist())

    tmp = [i for i in range(classes)]
    M = confusion_matrix(label, pred, labels = tmp)

    return M

def visualize_confusion(M):
    fig = plt.figure(figsize = (5, 5))
    ax = fig.add_subplot(1, 1, 1)
    tmp = [i for i in range(classes)]
    cm = ConfusionMatrixDisplay(M, display_labels = tmp);
    cm.plot(values_format = 'd', cmap = 'Blues', ax = ax)

## Model

In [None]:
class CNN(nn.Module):
    def __init__(self, input_size, num_classes):
        """
        init convolution and activation layers
        Args:
        x: (Nx4x601)
        class: 

        """
        super(CNN, self).__init__() 
        
        self.conv1 = torch.nn.Conv1d(input_size[0], 32, 3)
        self.relu = torch.nn.ReLU()
        self.conv2 = torch.nn.Conv1d(32, 64, 3)
        self.pool = torch.nn.MaxPool1d(4)
        self.fc1 = torch.nn.Linear(2304, num_classes)
        self.sig = nn.Sigmoid()

    def forward(self, x):
        """
        forward function describes how input tensor is transformed to output tensor
        Args:
            
        """
        # shared layers
        x = self.conv1(x)
        x = self.relu(x)
        x = self.pool(x)

        x = self.conv2(x)
        x = self.relu(x)
        x = self.pool(x)

        x = torch.flatten(x, 1)

        output = []
        for i in range(types):
          tmp = self.fc1(x)
          tmp = self.sig(tmp)
          output.append(tmp)

        return output

In [None]:
cnn = CNN(train_X.shape[1:], classes)
cnn

In [None]:
opts = {
    'lr': 5e-4,
    'epochs': 50,
    'batch_size': 100,
    'loss_fxn': 'c'
}

In [None]:
test_loss, train_loss = [], []
CNNTrainer = TrainHelper(model = cnn,
                      train_set = train_dataset,
                      test_set = valid_dataset, opts = opts)

In [None]:
CNNTrainer.train()

#### Check for Output

In [None]:
test_result(cnn,'test')

In [None]:
train_acc1, train_acc2, train_acc3, train_acc4, test_acc1, test_acc2, test_acc3, test_acc4 = [], [], [], [], [], [], [], []

In [None]:
for num in range(opts['epochs']//5):
  cnn.load_state_dict(torch.load('model/model'+save_model_time+'/net_params_'+str(num)+'.pkl'))
  cnn.cuda()
  tmp_train = test_result(cnn, 'sub')
  tmp_test = test_result(cnn, 'test')
  train_acc1.append(tmp_train[0])
  train_acc2.append(tmp_train[1])
  train_acc3.append(tmp_train[2])
  train_acc4.append(tmp_train[3])
  test_acc1.append(tmp_test[0])
  test_acc2.append(tmp_test[1])
  test_acc3.append(tmp_test[2])
  test_acc4.append(tmp_test[3])
  print(tmp_train)
  print(tmp_test)

In [None]:
max(test_acc4)

In [None]:
cnn.load_state_dict(torch.load('model/model'+save_model_time+'/net_params_10.pkl'))

In [None]:
M1 = confusion(test_loader, cnn, 0)
M2 = confusion(test_loader, cnn, 1)
M3 = confusion(test_loader, cnn, 2)
M4 = confusion(test_loader, cnn, 3)
visualize_confusion(M1)
visualize_confusion(M2)
visualize_confusion(M3)
visualize_confusion(M4)

In [None]:
pltloss(train_loss, test_loss, opts['epochs'])

In [None]:
pltacc(train_acc1, test_acc1, train_acc2, test_acc2, train_acc3, test_acc3, train_acc4, test_acc4, opts['epochs'])