In [1]:
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


Transformer_1 code

In [2]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import torch.utils.data 
import numpy as np
from torch.utils.data import TensorDataset, DataLoader,Dataset
import os
import csv
from torch.utils.tensorboard import SummaryWriter
from sklearn.model_selection import train_test_split
import math
import matplotlib as plt
import pandas as pd
s = nn.Softmax()
class PositionalEncoding(nn.Module):

    def __init__(self, d_model, max_len=900):
        """
        Inputs
            d_model - Hidden dimensionality of the input.
            max_len - Maximum length of a sequence to expect.
        """
        super().__init__()

        # Create matrix of [SeqLen, HiddenDim] representing the positional encoding for max_len inputs
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / (d_model)))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        # self.mask = torch.cat((torch.zeros(300,1),torch.ones(300,1))).unsqueeze(0)
        # self.start = torch.zeros(600,1).unsqueeze(0)
        # self.mask = torch.cat((self.start,self.mask),2)
        # pe = torch.cat((pe,self.mask),2)
        # register_buffer => Tensor which is not a parameter, but should be part of the modules state.
        # Used for tensors that need to be on the same device as the module.
        # persistent=False tells PyTorch to not add the buffer to the state dict (e.g. when we save the model)
        self.register_buffer('pe', pe, persistent=False)

    def forward(self, x):
       
        # print(self.pe.shape)
        # print(x.shape)
        
        # np_arr = self.pe.cpu().detach().numpy()
        # np_arr_1 = np.transpose(np_arr, (0, 2, 1))

        # np_arr_1 = torch.tensor(np_arr_1)
        # x = x + np_arr_1[:, :x.size(2)]
        #x = x + self.pe[:, :x.size(2)]]
        # print(self.pe[:, :x.size(1)].shape)
        # print(x.shape)
        #x = x + self.pe[:, :x.size(1)]
        return x
def scaled_dot_product(q, k, v, mask=None):
    d_k = q.size()[-1]
   
    attn_logits = torch.matmul(q, k.transpose(-2, -1))
    attn_logits = attn_logits / math.sqrt(d_k)
    if mask is not None:
        attn_logits = attn_logits.masked_fill(mask == 0, -9e15)
    attention = s(attn_logits)
    values = torch.matmul(attention, v)
    return values, attention
class MultiheadAttention(nn.Module):

    def __init__(self, input_dim, embed_dim, num_heads):
        super().__init__()
        assert embed_dim % num_heads == 0, "Embedding dimension must be 0 modulo number of heads."

        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads

        # Stack all weight matrices 1...h together for efficiency
        # Note that in many implementations you see "bias=False" which is optional
        self.qkv_proj = nn.Linear(input_dim, 3*embed_dim)
        self.o_proj = nn.Linear(embed_dim, embed_dim)

        self._reset_parameters()

    def _reset_parameters(self):
        # Original Transformer initialization, see PyTorch documentation
        nn.init.xavier_uniform_(self.qkv_proj.weight)
        self.qkv_proj.bias.data.fill_(0)
        nn.init.xavier_uniform_(self.o_proj.weight)
        self.o_proj.bias.data.fill_(0)

    def forward(self, x, mask=None, return_attention=False):
        batch_size, seq_length, embed_dim = x.size()
        qkv = self.qkv_proj(x)

        # Separate Q, K, V from linear output
        qkv = qkv.reshape(batch_size, seq_length, self.num_heads, 3*self.head_dim)
        qkv = qkv.permute(0, 2, 1, 3) # [Batch, Head, SeqLen, Dims]
        q, k, v = qkv.chunk(3, dim=-1)

        # Determine value outputs
        values, attention = scaled_dot_product(q, k, v, mask=mask)
        values = values.permute(0, 2, 1, 3) # [Batch, SeqLen, Head, Dims]
        values = values.reshape(batch_size, seq_length, embed_dim)
        o = self.o_proj(values)

        if return_attention:
            return o, attention
        else:
            return o
class EncoderBlock(nn.Module):

    def __init__(self, input_dim, num_heads, dim_feedforward, dropout=0.2):
        """
        Inputs:
            input_dim - Dimensionality of the input
            num_heads - Number of heads to use in the attention block
            dim_feedforward - Dimensionality of the hidden layer in the MLP
            dropout - Dropout probability to use in the dropout layers
        """
        super().__init__()
        self.input_dim = input_dim
        # Attention layer
        self.self_attn = MultiheadAttention(input_dim, input_dim, num_heads)

        # Two-layer MLP
        self.linear_net = nn.Sequential(
            nn.Linear(input_dim, dim_feedforward),
            nn.Dropout(dropout),
            nn.ReLU(inplace=True),
            nn.Linear(dim_feedforward, input_dim)
        )

        # Layers to apply in between the main layers
        self.norm1 = nn.LayerNorm(input_dim)
        self.norm2 = nn.LayerNorm(input_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        # Attention part
        attn_out = self.self_attn(x, mask=mask)
        x = x + self.dropout(attn_out)
        x = self.norm1(x)

        # MLP part
        linear_out = self.linear_net(x)
        x = x + self.dropout(linear_out)
        x = self.norm2(x)
        return x
class TransformerEncoder(nn.Module):

    def __init__(self, num_layers, **block_args):
        super().__init__()
        self.layers = nn.ModuleList([EncoderBlock(**block_args) for _ in range(num_layers)])
        self.positional_encoding = PositionalEncoding(d_model=block_args["input_dim"])
        self.fc = nn.Linear(block_args["input_dim"], 1)
    def forward(self, x, mask=None):
        X0 = torch.ones(x.shape[0],1,x.shape[2]).to(torch.device('cuda'))
        
        x = torch.cat((X0,x),1)

        x = self.positional_encoding(x)
        # print(f'x shape{x.shape}')
        for l in self.layers:
            x = l(x, mask=mask)
        
        return x,torch.sigmoid(self.fc(x))

    def get_attention_maps(self, x, mask=None):
        attention_maps = []
        print(len(self.layers))
        for l in self.layers:
            _, attn_map = l.self_attn(x, mask=mask, return_attention=True)
            # attention_maps.append(attn_map)
            x = l(x)
        return attn_map
        # return 1



Transformer_2 code

In [3]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import torch.utils.data 
import numpy as np
from torch.utils.data import TensorDataset, DataLoader,Dataset
import os
import csv
from torch.utils.tensorboard import SummaryWriter
from sklearn.model_selection import train_test_split
import math
import matplotlib as plt
s = nn.Softmax()
class PositionalEncoding_1(nn.Module):

    def __init__(self, d_model, max_len=900):
        """
        Inputs
            d_model - Hidden dimensionality of the input.
            max_len - Maximum length of a sequence to expect.
        """
        super().__init__()

        # Create matrix of [SeqLen, HiddenDim] representing the positional encoding for max_len inputs
        pe = torch.zeros(max_len, d_model)
        print(pe.shape)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / (d_model)))
        
       
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        # self.mask = torch.cat((torch.zeros(300,1),torch.ones(300,1))).unsqueeze(0)
        # self.start = torch.zeros(600,1).unsqueeze(0)
        # self.mask = torch.cat((self.start,self.mask),2)
        # pe = torch.cat((pe,self.mask),2)
        # register_buffer => Tensor which is not a parameter, but should be part of the modules state.
        # Used for tensors that need to be on the same device as the module.
        # persistent=False tells PyTorch to not add the buffer to the state dict (e.g. when we save the model)
        self.register_buffer('pe', pe, persistent=False)

    def forward(self, x):
       
        # print(self.pe.shape)
        # print(x.shape)
        
        # np_arr = self.pe.cpu().detach().numpy()
        # np_arr_1 = np.transpose(np_arr, (0, 2, 1))

        # np_arr_1 = torch.tensor(np_arr_1)
        # x = x + np_arr_1[:, :x.size(2)]
        #x = x + self.pe[:, :x.size(2)]]
        # print(self.pe[:, :x.size(1)].shape)
        # print(x.shape)
        x = x + self.pe[:, :x.size(1)]
        return x
def scaled_dot_product(q, k, v, mask=None):
    d_k = q.size()[-1]
   
    attn_logits = torch.matmul(q, k.transpose(-2, -1))
    attn_logits = attn_logits / math.sqrt(d_k)
    if mask is not None:
        attn_logits = attn_logits.masked_fill(mask == 0, -9e15)
    attention = s(attn_logits)
    values = torch.matmul(attention, v)
    return values, attention
class MultiheadAttention_1(nn.Module):

    def __init__(self, input_dim, embed_dim, num_heads):
        super().__init__()
        assert embed_dim % num_heads == 0, "Embedding dimension must be 0 modulo number of heads."

        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads

        # Stack all weight matrices 1...h together for efficiency
        # Note that in many implementations you see "bias=False" which is optional
        self.qkv_proj = nn.Linear(input_dim, 3*embed_dim)
        self.o_proj = nn.Linear(embed_dim, embed_dim)

        self._reset_parameters()

    def _reset_parameters(self):
        # Original Transformer initialization, see PyTorch documentation
        nn.init.xavier_uniform_(self.qkv_proj.weight)
        self.qkv_proj.bias.data.fill_(0)
        nn.init.xavier_uniform_(self.o_proj.weight)
        self.o_proj.bias.data.fill_(0)

    def forward(self, x, mask=None, return_attention=False):
        batch_size, seq_length, embed_dim = x.size()
        qkv = self.qkv_proj(x)

        # Separate Q, K, V from linear output
        qkv = qkv.reshape(batch_size, seq_length, self.num_heads, 3*self.head_dim)
        qkv = qkv.permute(0, 2, 1, 3) # [Batch, Head, SeqLen, Dims]
        q, k, v = qkv.chunk(3, dim=-1)

        # Determine value outputs
        values, attention = scaled_dot_product(q, k, v, mask=mask)
        values = values.permute(0, 2, 1, 3) # [Batch, SeqLen, Head, Dims]
        values = values.reshape(batch_size, seq_length, embed_dim)
        o = self.o_proj(values)

        if return_attention:
            return o, attention
        else:
            return o
class EncoderBlock_1(nn.Module):

    def __init__(self, input_dim, num_heads, dim_feedforward, dropout=0.15):
        """
        Inputs:
            input_dim - Dimensionality of the input
            num_heads - Number of heads to use in the attention block
            dim_feedforward - Dimensionality of the hidden layer in the MLP
            dropout - Dropout probability to use in the dropout layers
        """
        super().__init__()
        self.input_dim = input_dim
        # Attention layer
        self.self_attn = MultiheadAttention_1(input_dim, input_dim, num_heads)

        # Two-layer MLP
        self.linear_net = nn.Sequential(
            nn.Linear(input_dim, dim_feedforward),
            nn.Dropout(dropout),
            nn.ReLU(inplace=True),
            nn.Linear(dim_feedforward, input_dim)
        )

        # Layers to apply in between the main layers
        self.norm1 = nn.LayerNorm(input_dim)
        self.norm2 = nn.LayerNorm(input_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        # Attention part
        attn_out = self.self_attn(x, mask=mask)
        x = x + self.dropout(attn_out)
        x = self.norm1(x)

        # MLP part
        linear_out = self.linear_net(x)
        x = x + self.dropout(linear_out)
        x = self.norm2(x)
        return x
class TransformerEncoder_1(nn.Module):

    def __init__(self, num_layers, **block_args):
        super().__init__()
        self.layers = nn.ModuleList([EncoderBlock_1(**block_args) for _ in range(num_layers)])
        self.positional_encoding = PositionalEncoding_1(d_model=block_args["input_dim"])
        self.fc = nn.Linear(block_args["input_dim"], 1)
    def forward(self, x, mask=None):
        X0 = torch.ones(x.shape[0],1,x.shape[2]).to(torch.device('cuda'))
        
        x = torch.cat((X0,x),1)

        x = self.positional_encoding(x)
        # print(f'x shape{x.shape}')
        for l in self.layers:
            x = l(x, mask=mask)
        
        return x,torch.sigmoid(self.fc(x))

    def get_attention_maps(self, x, mask=None):
        attention_maps = []
        print(len(self.layers))
        for l in self.layers:
            _, attn_map = l.self_attn(x, mask=mask, return_attention=True)
            # attention_maps.append(attn_map)
            x = l(x)
        return attn_map
        # return 1



All the imports

In [4]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
import torch
import torch.nn as nn

from scipy import sparse
import sys
from torch.optim.lr_scheduler import LambdaLR
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import numpy as np
from torch.utils.data import TensorDataset, DataLoader
import os
import csv
from torch.utils.tensorboard import SummaryWriter
from sklearn.model_selection import train_test_split
import math
from torch.utils.data import Dataset, DataLoader
from os.path import exists
import random

import time

seed=42
torch.manual_seed(seed)

<torch._C.Generator at 0x7fd810dc4930>

Load the train or test data accordingly

In [5]:


batch_size=64

#change the values accordingly to load train data or test data or train data
# train_value is set to one to load the train data and also the validation data on which the model is run every epoch.
#test_value is set to one to load the test data on the model to evaluate the model

train_value = 0
test_value = 1





if train_value == 1:
  features = np.load('/content/drive/MyDrive/explicit_npy/features_face_674_90.npy')
  labels = np.load('/content/drive/MyDrive/explicit_npy/labels_face_674_90.npy')
  features_head = np.load('/content/drive/MyDrive/explicit_npy/features_pose_76_90.npy')
  labels_head = np.load('/content/drive/MyDrive/explicit_npy/labels_pose_76_90.npy')
  features_tensor = torch.Tensor(features)
  # # features_tensor = features_tensor[:,:,3:]
  # # to_be_attached_1  = features_tensor[:,:,1:2]
  # # # print(features_tensor.shape)
  # # # print(to_be_attached_1.shape)



  # # features_tensor = features_tensor[:,:,2:]

  features_tensor_s = torch.roll(features_tensor, shifts=-1, dims=1)
  features_tensor = abs(features_tensor_s-features_tensor)

  # features_tensor = torch.cat((features_tensor,to_be_attached_1),2)


  features_head = torch.Tensor(features_head)

  # # features_head = features_head[:,:,1:]
  # # features_head = features_head[:,:,:70]

  features_head_s = torch.roll(features_head, shifts=-1, dims=1)
  features_head = abs(features_head_s-features_head)
  # # print(features_tensor.shape)
  # # print(features_head.shape)
  features_tensor = torch.cat((features_tensor,features_head),dim = 2)

  # # np_arr = features_tensor.numpy()
  # # np_arr_1 = np.transpose(np_arr, (0, 2, 1))

  # # features_tensor = torch.tensor(np_arr_1)



  labels_tensor = torch.Tensor(labels.astype(np.float64))
  train_dataset = TensorDataset(features_tensor,labels_tensor)
  my_dataloader = torch.utils.data.DataLoader(
                  train_dataset,batch_size=batch_size, shuffle=True)
  print(features_tensor.shape)
  print(labels_tensor.shape)

  features_val = np.load('/content/drive/MyDrive/explicit_npy/features_test_face_674_90.npy')
  labels_val = np.load('/content/drive/MyDrive/explicit_npy/labels_test_face_674_90.npy')


  features_val_head = np.load('/content/drive/MyDrive/explicit_npy/features_test_pose_76_90.npy')
  labels_val_head = np.load('/content/drive/MyDrive/explicit_npy/labels_test_pose_76_90.npy')
  features_tensor_val = torch.Tensor(features_val)
  # features_tensor_test = features_tensor_test[:,:,3:]
  # to_be_attached       = features_tensor_test[:,:,1:2]
  # print(features_tensor_test.shape)
  # print(to_be_attached.shape)



  # features_tensor_test = features_tensor_test[:,:,2:]
  features_tensor_val_s = torch.roll(features_tensor_val, shifts=-1, dims=1)
  features_tensor_val = abs(features_tensor_val_s-features_tensor_val)
  # features_tensor_test = torch.cat((features_tensor_test,to_be_attached),2)


  features_val_head = torch.Tensor(features_val_head)
  # features_test_head = features_test_head[:,:,1:]
  # features_test_head = features_test_head[:,:,:70]
  features_val_head_s = torch.roll(features_val_head, shifts=-1, dims=1)
  features_val_head = abs(features_val_head_s-features_val_head)
  features_tensor_val = torch.cat((features_tensor_val,features_val_head),dim = 2)

  # features_tensor = torch.cat((features_tensor,features_tensor_test),dim = 0)
  # np_arr = features_tensor_test.numpy()
  # np_arr_1 = np.transpose(np_arr, (0, 2, 1))

  # features_tensor_test = torch.tensor(np_arr_1)


  labels_tensor_val = torch.Tensor(labels_val.astype(np.float64))

  # labels = torch.cat((labels_tensor,labels_tensor_test),dim = 0)

  print(features_tensor_val.shape)
  print(labels_tensor_val.shape)

  train_dataset_val = TensorDataset(features_tensor_val,labels_tensor_val)
  my_dataloader_val = torch.utils.data.DataLoader(
                  train_dataset_val,batch_size=batch_size)

if test_value ==1 :

  features_test = np.load('/content/drive/MyDrive/explicit_npy/features_test_face_674_90_test.npy')
  labels_test = np.load('/content/drive/MyDrive/explicit_npy/labels_test_face_674_90.npy')


  features_test_head = np.load('/content/drive/MyDrive/explicit_npy/features_test_pose_76_90_test.npy')
  labels_test_head = np.load('/content/drive/MyDrive/explicit_npy/labels_test_pose_76_90.npy')
  features_tensor_test = torch.Tensor(features_test)
  # features_tensor_test = features_tensor_test[:,:,3:]
  # to_be_attached       = features_tensor_test[:,:,1:2]
  # print(features_tensor_test.shape)
  # print(to_be_attached.shape)



  # features_tensor_test = features_tensor_test[:,:,2:]
  features_tensor_test_s = torch.roll(features_tensor_test, shifts=-1, dims=1)
  features_tensor_test = abs(features_tensor_test_s-features_tensor_test)
  # features_tensor_test = torch.cat((features_tensor_test,to_be_attached),2)


  features_test_head = torch.Tensor(features_test_head)
  # features_test_head = features_test_head[:,:,1:]
  # features_test_head = features_test_head[:,:,:70]
  features_test_head_s = torch.roll(features_test_head, shifts=-1, dims=1)
  features_test_head = abs(features_test_head_s-features_test_head)
  features_tensor_test = torch.cat((features_tensor_test,features_test_head),dim = 2)

  # features_tensor = torch.cat((features_tensor,features_tensor_test),dim = 0)
  # np_arr = features_tensor_test.numpy()
  # np_arr_1 = np.transpose(np_arr, (0, 2, 1))

  # features_tensor_test = torch.tensor(np_arr_1)


  #labels_tensor_test = torch.Tensor(labels_test.astype(np.float64))
  labels_tensor_test = torch.zeros(4898,1)
  # labels = torch.cat((labels_tensor,labels_tensor_test),dim = 0)

  print(features_tensor_test.shape)
  print(labels_tensor_test.shape)

  train_dataset_test = TensorDataset(features_tensor_test,labels_tensor_test)
  my_dataloader_test = torch.utils.data.DataLoader(
                  train_dataset_test,batch_size=batch_size)




torch.Size([4898, 90, 750])
torch.Size([4898, 1])


Parameters for training

In [6]:

learning_rate =0.0005
num_epochs = 350
device = torch.device('cuda')
input_shape  = 750
seq_length = 90
#learning_rate_decay = 0.99
alpha = 0.5


def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

Training loop

In [7]:

model = TransformerEncoder_1(num_layers = 2,input_dim =750,num_heads = 10, dim_feedforward = 1000).to(device)
model_2 = TransformerEncoder(num_layers = 1,input_dim =90,num_heads =5, dim_feedforward = 9000).to(device)
#optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate,weight_decay = 0.0005)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate,weight_decay = 0.0005)
criterion = nn.BCELoss()
ll = count_parameters(model)
print(ll)

ll2 = count_parameters(model_2)
print(ll2)
best_val_acc = 0
for epoch in range(num_epochs):
    correct = 0
    num_samples = 0
    model.train()
    model_2.train()
    for i, (images, labels) in enumerate(my_dataloader):
        # origin shape: [N, 1, 28, 28]
        # resized: [N, 300, 2048][N,300,128]

        
        images = images.reshape(-1, seq_length, input_shape ).to(device)
      
        
        # print(images.shape)
        labels = labels.to(device)
        num_samples+=labels.size(0)
        # Forward pass
        inpp, outputs = model(images)
        inpp = inpp[:,1:,:]
        
        outputs_1 =outputs[:,0,:]
       
        
        # outputs = outputs.cpu().detach()
        outputs = torch.permute(inpp, (0,2,1))
        
        _,outputs_2 = model_2(outputs)
        
        
        outputs_2 = outputs_2[:,0,:]

        # print(outputs_2.shape)
        # outputs = torch.cat((outputs,images[0,0,:]),dim = 2)
        
        # 
        # print(outputs.shape)
        # print(outputs)
        # print(f'outputs.shape:{outputs.shape}')
        # print(f'outputs.shape:{outputs[:,0,:].shape}')
        # print(labels.shape)
        # outputs = outputs.squeeze()
        
        # print(f'labels.shape:{labels.repeat(12).shape}')
        loss_1 = criterion(outputs_1, labels.unsqueeze(1))
        loss_2 = criterion(outputs_2, labels.unsqueeze(1))

        loss = alpha*loss_1 +  (1-alpha)*loss_2
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        predicted = (outputs_2 > 0.5).long()
        # print(f'predicted.shape:{predicted.shape}')
        # print(f'labels.shape:{labels.shape}')
        # print(f'correct_pre:{correct}')
        correct += (predicted.squeeze()== labels).sum().item()
       
    print('[%d/%d] loss: %.3f, accuracy: %.3f' %
          (i , epoch, loss.item(), 100 * correct /num_samples))
    # learning_rate *= learning_rate_decay
    # update_lr(optimizer, learning_rate)
    # writer.add_scalars('Loss',{'train':loss.item()},epoch)
    # writer.add_scalars('Accuracy', {'train': 100 * correct /num_samples},epoch)
        
    # Test the model
    # In test phase, we don't need to compute gradients (for memory efficiency)
    num_samples_val = 0
    model.eval()
    model_2.eval()
    with torch.no_grad():
        correct_val = 0
        for i, (images, labels) in enumerate(my_dataloader_val):
            #####
          images = images.reshape(-1, seq_length, input_shape).to(device)
          labels = labels.to(device)
          num_samples_val +=labels.size(0)
          # Forward pass
          inpp, outputs = model(images)
          inpp = inpp[:,1:,:]
          
          outputs_1 =outputs[:,0,:]
        
          
          # outputs = outputs.cpu().detach()
          # outputs = torch.permute(inpp, (0,2,1))
          # images = torch.permute(images, (0,2,1))
          
          
          outputs = torch.permute(inpp, (0,2,1))
        
          _,outputs_2 = model_2(outputs)
        
        
          outputs_2 = outputs_2[:,0,:]

          # print(outputs_2.shape)
          # outputs = torch.cat((outputs,images[0,0,:]),dim = 2)
          
          # 
          # print(outputs.shape)
          # print(outputs)
          # print(f'outputs.shape:{outputs.shape}')
          # print(f'outputs.shape:{outputs[:,0,:].shape}')
          # print(labels.shape)
          # outputs = outputs.squeeze()
          
          # print(f'labels.shape:{labels.repeat(12).shape}')
          loss_1 = criterion(outputs_1, labels.unsqueeze(1))
          loss_2 = criterion(outputs_2, labels.unsqueeze(1))


          loss = alpha*loss_1 +  (1-alpha)*loss_2
          predicted = (outputs_2 > 0.5).long()
          correct_val += (predicted.squeeze()== labels).sum().item()
        
        val_acc = 100 * correct_val / num_samples_val
        print(f'Accuracy of the network on the validation: {val_acc} %')
        # writer.add_scalars('Accuracy', {'val': val_acc},epoch)
    if(val_acc> best_val_acc):
        best_val_acc = val_acc
        torch.save(model.state_dict(),'./best_model'+'.ckpt')
        torch.save(model_2.state_dict(),'./best_model_2'+'.ckpt')                           
        print("best model with val acc "+ str(best_val_acc)+ "is saved")


Test loop

In [9]:
model = TransformerEncoder_1(num_layers = 2,input_dim =750,num_heads =10, dim_feedforward = 1000).to(device)
model_2 = TransformerEncoder(num_layers = 1,input_dim =90,num_heads =5, dim_feedforward = 9000).to(device)
model.load_state_dict(torch.load('/content/best_model.ckpt'))
model_2.load_state_dict(torch.load('/content/best_model_2.ckpt'))
criterion = nn.BCELoss()
model.eval()
model_2.eval()
p1 = []
p2 = []
num_samples_val = 0
df = pd.DataFrame(columns=['feature','labels'])
def test():
   with torch.no_grad():
          correct_val = 0
          for i, (images,labels) in enumerate(my_dataloader_test):
              #####
            images = images.reshape(-1, seq_length, input_shape).to(device)
            labels = labels.to(device)
            # num_samples_val +=labels.size(0)
            # Forward pass
            inpp, outputs = model(images)
            inpp = inpp[:,1:,:]
            
            outputs_1 =outputs[:,0,:]
          
            
            # outputs = outputs.cpu().detach()
            # outputs = torch.permute(inpp, (0,2,1))
            # images = torch.permute(images, (0,2,1))
            
            
            outputs = torch.permute(inpp, (0,2,1))
          
            outs,outputs_2 = model_2(outputs)
            if i==0:
             outs_2 = outs.detach().cpu().numpy()
             labels_2 = labels.detach().cpu().numpy()

            if i !=0:
             out_2 =  outs.detach().cpu().numpy()
             label_2 = labels.detach().cpu().numpy()
             outs_2 = np.append(outs_2,out_2,axis = 0)
             labels_2 = np.append(labels_2,label_2)
           

            
          
            outputs_2 = outputs_2[:,0,:]



            # print(outputs_2.shape)
            # outputs = torch.cat((outputs,images[0,0,:]),dim = 2)
            
            # 
            # print(outputs.shape)
            # print(outputs)
            # print(f'outputs.shape:{outputs.shape}')
            # print(f'outputs.shape:{outputs[:,0,:].shape}')
            # print(labels.shape)
            # outputs = outputs.squeeze()
            
            # print(f'labels.shape:{labels.repeat(12).shape}')
            # loss_1 = criterion(outputs_1, labels.unsqueeze(1))
            # loss_2 = criterion(outputs_2, labels.unsqueeze(1))


            # loss = alpha*loss_1 +  (1-alpha)*loss_2
            predicted = (outputs_2 > 0.5).long()

            if i==0:
             
             labels_2_p = predicted.detach().cpu().numpy()

            if i !=0:
             
             label_2_p = predicted.detach().cpu().numpy()
             
             labels_2_p = np.append(labels_2_p,label_2_p)

            # predicted = predicted.detach().cpu()
            # predicted = np.array(predicted)
            predicted = predicted.tolist()
            # print((len(predicted)))
            # print(type(predicted[1]))
            # p1.append(predicted)
            with open('my_file.csv', 'a') as f:
             for line in predicted:
                f.write(f"{line}\n") 

        
                
    

the below function call generates the csv file with labels.

In [None]:
test()

