First I import the packages I need



In [None]:
import json
import torch
import pandas as pd
import numpy as np
import sklearn

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split # we'll use it to split the training set into training and validation data

Here I define my dependent variables (the predictions of valence & arousal) so Y (https://machinelearningmastery.com/multi-label-classification-with-deep-learning/)

In [None]:
def load_dataframe(filename):
  dataframe = pd.read_json(filename)
  dataframe = dataframe.T
  #print(dataframe.head)
  return dataframe

In [None]:
def loadgroundtruth(dataframe):
  #I first zip values of valence and arousal together 
  Y = list(zip(dataframe["valence"], dataframe["activation"]))
  Y = [ torch.tensor(datapoint, device=device, dtype=torch.float) for datapoint in Y]
  return Y

In [None]:
def load_features(dataframe):
  X = dataframe["features"]
  X = [torch.tensor(datapoint, device=device, dtype=torch.float) for datapoint in X]
  return X

In [None]:
train_df = load_dataframe('/content/drive/MyDrive/DeepLProject/train.json')
dev_df = load_dataframe('/content/drive/MyDrive/DeepLProject/test.json')

train_X = load_features(train_df)
train_Y = loadgroundtruth(train_df)
dev_X = load_features(dev_df)
X_train, X_val, Y_train, Y_val = train_test_split(train_X, train_Y, test_size=0.1, random_state=0)

In [None]:
#print(X[0]) #list of tensors
#print(X[0].shape)

In [None]:
def analyze(Y):
  c = Y.sum(dim=0) / len(Y)
  print(c)
  print(torch.unique(Y, dim=0 , return_counts=True))
analyze(torch.stack(train_Y, dim=0))

tensor([0.4315, 0.7099], device='cuda:0')
(tensor([[0., 0.],
        [0., 1.],
        [1., 0.],
        [1., 1.]], device='cuda:0'), tensor([1240, 3194, 1023, 2343], device='cuda:0'))


the lstm will only accept one entry of our sequence at a time
so we need to iterate voer the first dimension of the data matrix, 
and pass this to the lstm while updating the, new freshly generated hidden and cell states
one application of the lstm gives somthing like, (out, (h1,c1)) = self.lstm(x[0], (h0, c0))

sequence_length, n_features = x.shape # the shape gives us the size of each dimension of the matrix

The schedule your learning rate is going to follow is a major hyperparameter that you want to tune. PyTorch provides support for scheduling learning rates with it’s torch.optim.lr_scheduler module which has a variety of learning rate schedules. The following example demonstrates one such example.

In [None]:
import math
import itertools
def evaluate(model, X, Y):
    thresholds = [0.25, 0.5, 0.75]
    num_correct = np.zeros((len(thresholds),len(thresholds)), dtype=int)

    num_samples = len(X)
    model.eval()
    with torch.no_grad():
        for x,y in zip(X,Y):        
            scores = model(x)
            ground_truth = y >= 0.5 # [False, True]
            for row, column in itertools.product(range(len(thresholds)), range(len(thresholds))):
              c0_th = thresholds[row]
              c1_th = thresholds[column]
              predictions = torch.zeros(scores.shape, dtype=torch.bool, device=device)
              predictions[0] = scores[0] >= c0_th
              predictions[1] = scores[1] >= c1_th
              if torch.equal(predictions, ground_truth):
                num_correct[row, column] += 1

    accuracy = num_correct / num_samples
    accuracy = np.round(accuracy * 100 , 4)
    print(accuracy)

    maximum = np.argmax(accuracy)
    c0_th = thresholds[int(maximum // len(thresholds))]
    c1_th = thresholds[maximum % len(thresholds)]
    maximum = accuracy.max()

    model.train()
    return maximum, (c0_th, c1_th)


In [None]:
import random
def train_step(model, optimizer, X_train, Y_train, batch_size=32):
  criterion= nn.MSELoss()
  
  e = 1e-7 

  def criterion(props, y):
    return (- y * torch.log(props + e) - (1 -y)*torch.log(1 - props + e)).sum()
    

  losses=[]
  for _ in range(batch_size):
    index = random.randint(0,len(X_train)-1)
    x = X_train[index]
    y = Y_train[index]

    scores = model(x)
    loss = criterion(scores, y) 
    losses.append(loss)

  summed_loss = losses[0]
  for loss in losses[1:]:
    summed_loss = summed_loss + loss
  summed_loss = summed_loss / batch_size
  summed_loss.backward()
  optimizer.step()
  optimizer.zero_grad()
  return summed_loss.detach().item()

In [None]:
#I create a class
class RNN(nn.Module):
  #define the constructor
    def __init__(self, input_size, hidden_size, num_layers, num_classes, bidirectional=False, conv=False) -> None:
        super(RNN,self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        #self.seq_length = seq_length

        self.conv = conv
        if self.conv:
          self.nconv = nn.Conv2d(in_channels=1, out_channels=1, kernel_size=(7,5),stride=(4,1),padding="valid")
        self.embed = nn.Linear(input_size, hidden_size+4)

        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=False,bidirectional=bidirectional, dropout=0.05)
        #This line creates a module for a linear transformation, 𝑥𝐖+𝑏xW+b
        #fc means fully connected
        if bidirectional:
          self.fc = nn.Linear(hidden_size*2, num_classes) #(hidden_size*2, num_classes) if i wanted bidirection
        else:
          self.fc = nn.Linear(hidden_size, num_classes)

    #PyTorch networks created with nn.Module must have a forward method. It will take in a tensor and pass it
    #through the operations that I have defined in the __init__ method 
    
    def forward(self, x):
        sequence_length, n_features = x.shape
          #forward propagation of the input through LSTM
        x = self.embed(x)
        x = torch.relu(x)
        if self.conv:
          x = self.nconv(x.unsqueeze(0)).squeeze(0)
        out, _ = self.lstm(x) #lstm with input, hidden, and internal state
        # out = out.reshape(out.shape[0], -1) #reshaping the data for Dense Layer next
        # Decode the hidden state of the last time step
        out = self.fc(out[-1])
        out = torch.sigmoid(out)
        return out

In [None]:
from pathlib import Path
result_dir= Path('/content/drive/MyDrive/DeepLProject/models-conv2')
result_dir.mkdir(exist_ok=True, parents=True)


#here I initialize the model and check how the architecture is represented
model = RNN(input_size=26,hidden_size=64, num_layers=1, num_classes=2, conv=True, bidirectional=False).to(device)
optimizer = optim.Adam(model.parameters(), lr = 0.001)
n_epochs=7000
best_model=0
best_mod=0

for epoch in range(n_epochs):
  loss = train_step(model, optimizer, X_train, Y_train, batch_size=128)
  if epoch % 50 == 0:
    print(f"Epoch {epoch}/{n_epochs} {loss=}")
    train_scores,(c0,c1) = evaluate(model, X_train, Y_train)
    print("train", train_scores)
    score,(c0,c1) = evaluate(model, X_val, Y_val)
    print(score, c0, c1)
    if train_scores > best_mod:
      best_mod=train_scores
      torch.save(model.state_dict(), result_dir / f"Train_model{score:.3f}_{c0}_{c1}")
    if score > best_model:
      best_model=score
      torch.save(model.state_dict(), result_dir / f"Valid_model{score:.3f}_{c0}_{c1}")


print(f"Epoch {epoch}/{n_epochs} {loss=}")
evaluate(model, X_val, Y_val)

In [None]:
from pathlib import Path
def infer(model_name: str, th0, th1, X, file: Path):
  state_dict_file = Path('/content/drive/MyDrive/DeepLProject/models-conv2') / model_name
  model = RNN(input_size=26,hidden_size=64, num_layers=1, num_classes=2, conv=True, bidirectional=False)
  state_dict = torch.load(state_dict_file, map_location=device)
  model.load_state_dict(state_dict)
  model.to(device)
  model.eval()

  evaluate(model,X_val, Y_val)

  with torch.no_grad():
    predictions = []
    for input in X:
      scores = model(input)
      prediction = torch.zeros(scores.shape, dtype=torch.int, device=device)
      prediction[0][scores[0] >= th0] = 1
      prediction[1][scores[1] >= th1] = 1
      predictions.append(prediction) 

  #store in the format
  results = {}
  for count, values in enumerate(predictions):
    results[f"{count}"]={"valence":values[0].item(), "activation":values[1].item()}

  parent = file.parent
  parent.mkdir(exist_ok=True, parents=True)

  with open (file, "w+") as outfile:
    json.dump(results, outfile)



infer("model52.949_0.5_0.5", 0.5,0.5, dev_X, Path('/content/drive/MyDrive/DeepLProject/submissions/test_final.json'))



[[45.     45.8974 42.0513]
 [47.6923 52.9487 50.1282]
 [39.6154 43.7179 44.359 ]]


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive
