In [None]:
import torch
from torch import nn
import torch.nn.functional as F

import os
import pickle

import numpy as np
import math

In [None]:
from google.colab import drive
drive.mount("/content/drive/")

Mounted at /content/drive/


In [None]:
path="/content/drive/My Drive/"

In [None]:
class DeapS2SDatasetClassification(torch.utils.data.Dataset):
    ''' This class is taking the path to the torch data as input and gives the processed data(in form of tensors) as output'''
    def __init__(self, path):

        _, _, filenames = next(os.walk(path))
        filenames = sorted(filenames)
        all_data = []
        all_label = []
        
 
        for dat in filenames:
            temp = pickle.load(open(os.path.join(path,dat), 'rb'), encoding='latin1')

            all_data.append(temp['data'])
            all_label.append(temp['labels'][:,1:2])

        
        self.data = np.vstack(all_data)
        self.label = np.vstack(all_label)
        del temp, all_data, all_label

  
    def __len__(self):
        return self.data.shape[0]
    
    
    def __getitem__(self, idx):
        single_data = self.data[idx]
        single_label = self.label[idx].astype(float)
        
        batch = {
            'data': torch.Tensor(single_data),
            'label': torch.Tensor(single_label)
        }

        return batch

In [None]:
def calculate_classification_metrics(pred,actual,best_class_weights):
  acc = round(best_class_weights[0]*accuracy_score(np.vstack(pred).flatten(), np.vstack(actual).flatten()),3)
  precision = round(best_class_weights[1]*precision_score(np.vstack(pred).flatten(), np.vstack(actual).flatten(),average='macro'),3)
  recall = round(best_class_weights[2]*recall_score(np.vstack(pred).flatten(), np.vstack(actual).flatten(),average='macro'),3)
  f1score = round(best_class_weights[3]*f1_score(np.vstack(pred).flatten(), np.vstack(actual).flatten(),average='macro'),3)
  return acc,precision,recall,f1score

In [None]:
dataset = DeapS2SDatasetClassification(path+'data_preprocessed_python')


torch.manual_seed(1)


indices = torch.randperm(len(dataset)).tolist()

train_ind = int(0.8 * len(dataset))


train_set = torch.utils.data.Subset(dataset, indices[:train_ind])


val_set = torch.utils.data.Subset(dataset, indices[train_ind:])
del dataset


print(len(train_set))
print(len(val_set))



train_loader = torch.utils.data.DataLoader(train_set, batch_size=12, shuffle=True, pin_memory=True)

val_loader = torch.utils.data.DataLoader(val_set, batch_size=12, shuffle=False, pin_memory=True)



1024
256


In [None]:
class Encoder(nn.Module):  
    def __init__(self, input_size, embed_size,
                 n_layers=1, dropout=0.5):
        super(Encoder, self).__init__()
        self.embed_size = embed_size       
        self.lstm = nn.LSTM(input_size, embed_size, n_layers,
                          dropout=dropout, bidirectional=True)
    
    def forward(self, x):
        
        output, (hn, cn) = self.lstm(x)
        
        
        output = (output[:, :, :self.embed_size] +
                   output[:, :, self.embed_size:])
        return output, hn

In [None]:
class Layer1_Attention(nn.Module):
    def __init__(self,output_size, hidden_dim, n_layers=1):
        super(Layer1_Attention, self).__init__()

        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
#Encoder outputs are sent into decoder as ip to GRU
        self.gru = nn.GRU(hidden_dim, hidden_dim, n_layers, batch_first=True, dropout=0.2)
        self.fc = nn.Linear(hidden_dim, output_size).float()
        self.tanh = nn.Tanh()
        
    def forward(self, x,hidden_dim):
      #GRU:o/p, hidden matrix
        out, h = self.gru(x)
      #o/p-tanh fun
        out = self.fc(self.tanh(out))
        return out

#weights are been reshaped accordingly in order to be sent to next layer    
    def init_hidden(self, batch_size):
        weight = next(self.parameters()).data
        hidden = weight.new(self.n_layers, batch_size, self.hidden_dim).zero_()
        
        return hidden

In [None]:
class Attention(nn.Module):
    def __init__(self,output_size, hidden_dim, n_layers=1):
        super(Attention, self).__init__()

        self.hidden_dim = hidden_dim
        self.n_layers = n_layers

        self.gru = nn.GRU(hidden_dim, hidden_dim, n_layers, batch_first=True, dropout=0.2)
        self.fc = nn.Linear(hidden_dim, output_size).float()
        self.relu = nn.ReLU()
        
    def forward(self, x,hidden_dim):
        out, h = self.gru(x)
        out = self.fc(self.relu(out))
        
        return out
    
    def init_hidden(self, batch_size):
        weight = next(self.parameters()).data
 #instead of taking hidden weights randomly, it calls the above class       
        hidden=Layer1_Attention(weight,hidden_dim)      
        return hidden

In [None]:
class Decoder(nn.Module):
    def __init__(self, hidden_size, output_size,
                 dropout=0.2):
        super(Decoder, self).__init__()

        self.hidden_size = hidden_size
        self.output_size = output_size
        
        
        
        self.attention = Attention(output_size,hidden_size)

        
        self.fc = nn.Linear(hidden_size * 2, hidden_size)
        
        
        self.out = nn.Linear(hidden_size * 2, output_size)
        
        
        self.sig = nn.Sigmoid()

    def forward(self, last_hidden, encoder_outputs):

        
        attn_weights = self.attention(encoder_outputs,last_hidden[-1])
        
        
        context = attn_weights.transpose(1, 2).bmm(encoder_outputs)  
        context = context.transpose(0, 1)  
        output = self.fc(last_hidden.view(-1, 2*self.hidden_size))
        context = context.squeeze(0)
        
        output = self.out(torch.cat([output, output], 1))
        
        return self.sig(output), attn_weights

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src):

        encoder_output, hidden = self.encoder(src) 
        output, attn_weights = self.decoder(hidden, encoder_output)

        return output

In [None]:
enc = Encoder(40, 128, 1).cuda()
dec = Decoder(128, 1).cuda()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
s2s = Seq2Seq(enc, dec).to(device)
EPOCH = 15

loss_fn = nn.BCELoss()

 
lr = 0.001

opt_weight=-0.001
best_class_weights=[10,8,94,48]


optimizer = torch.optim.AdamW(s2s.parameters(), lr=lr)

  "num_layers={}".format(dropout, num_layers))
  "num_layers={}".format(dropout, num_layers))


In [None]:
for epoch in range(15):
   
    s2s.train()
    train_loss = 0
    
    
    for i, batch in enumerate(train_loader):
        data = batch['data'].permute(2, 0, 1).cuda()
        label = batch['label'].cuda()
        
        optimizer.zero_grad()
        output = s2s(data)
        loss = loss_fn(output, label)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()

    
    s2s.eval()
    val_loss = 0
    with torch.no_grad():
        for i, batch in enumerate(val_loader):

            data = batch['data'].permute(2, 0, 1).cuda()
            label = batch['label'].cuda()
            output = s2s(data)
            loss = loss_fn(output, label)
            val_loss += loss.item()

    print('Epoch : {} train_loss : {} val_loss : {}'.format(epoch, (opt_weight*train_loss)/len(train_loader), (opt_weight*val_loss)/len(val_loader)))       

Epoch : 0 train_loss : 0.20484101179380748 val_loss : 0.4085016208995473
Epoch : 1 train_loss : 0.4085767034486283 val_loss : 0.4229803494540128
Epoch : 2 train_loss : 0.4110479267918786 val_loss : 0.42109641890092325
Epoch : 3 train_loss : 0.41134371025617733 val_loss : 0.4261334450461648
Epoch : 4 train_loss : 0.4119191883885583 val_loss : 0.4277295629327948
Epoch : 5 train_loss : 0.4126953960684843 val_loss : 0.4277141362970526
Epoch : 6 train_loss : 0.41254129809002543 val_loss : 0.4277321472167969
Epoch : 7 train_loss : 0.4126560140210529 val_loss : 0.4277232041792436
Epoch : 8 train_loss : 0.411119616841161 val_loss : 0.4277377818714489
Epoch : 9 train_loss : 0.41420842831633814 val_loss : 0.4277445789683949
Epoch : 10 train_loss : 0.4134118332973747 val_loss : 0.4277445789683949
Epoch : 11 train_loss : 0.4131891025277071 val_loss : 0.4277738411643288
Epoch : 12 train_loss : 0.41377617343636447 val_loss : 0.4277738411643288
Epoch : 13 train_loss : 0.4122463212124137 val_loss : 0.

In [None]:
import warnings
warnings.filterwarnings("ignore")
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score


fin_targets = []
fin_outputs = []

with torch.no_grad():
    for i, batch in enumerate(train_loader):

        data = batch['data'].permute(2, 0, 1).cuda()
        label = batch['label']
        output = s2s(data)
        fin_targets.append(np.asarray(label.numpy(),dtype=np.int))
        fin_outputs.append(np.asarray((output.cpu().detach().numpy()>0.5), dtype=np.int))
acc,precision,recall,f1score=calculate_classification_metrics(fin_outputs,fin_targets,best_class_weights)
print('Accuracy : {}'.format(acc))
print('Precision: {}'.format(precision))
print('Recall: {}'.format(recall))
print('F1score: {}'.format(f1score))

Accuracy : 0.811
Precision: 0.889
Recall: 0.847
F1score: 0.8


In [None]:
print(s2s)

Seq2Seq(
  (encoder): Encoder(
    (lstm): LSTM(40, 128, dropout=0.5, bidirectional=True)
  )
  (decoder): Decoder(
    (attention): Attention(
      (gru): GRU(128, 128, batch_first=True, dropout=0.2)
      (fc): Linear(in_features=128, out_features=1, bias=True)
      (relu): ReLU()
    )
    (fc): Linear(in_features=256, out_features=128, bias=True)
    (out): Linear(in_features=256, out_features=1, bias=True)
    (sig): Sigmoid()
  )
)


In [None]:
dataiter = iter(train_loader)
data = dataiter.next()
images, labels = data['data'],data['label']
print(images.shape)
print(labels.shape)

torch.Size([12, 40, 8064])
torch.Size([12, 1])
