In [3]:
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import sklearn 
from sklearn import linear_model
from sklearn.model_selection import train_test_split
from gensim.models.keyedvectors import KeyedVectors
import matplotlib.pyplot as plt
import nltk
import torch.nn.functional as F

In [1]:
def proba_to_label(proba_batch):
    # Detach the tensor and convert it to a NumPy array
    proba_batch_np = proba_batch.detach().numpy()

    # Find the index of the largest value in each sub-array
    max_indices = np.argmax(proba_batch_np, axis=1)

    # Create a new array of the same shape filled with 0s
    binary_array = np.zeros_like(proba_batch_np)

    # Set the largest value positions to 1
    for i, max_index in enumerate(max_indices):
        binary_array[i, max_index] = 1

    return binary_array

In [10]:
# Build architecture

# Distilled Dual-task Deep Averaging Net
class DistilledDAN(nn.Module):
    """
    Pytorch implementation for Deep Averaging Network for classification 
    """
    
    def __init__(self, num_classes,
                       embedding_dim: int, 
                       hidden_dim1: int, 
                       hidden_dim2: int, 
                       leaky_relu_negative_slope: float, 
                       dropout_probability: float
                ):
        """
        Create the network architecture. 
        In our sentiment analysis, we have three classes: 0, 1, 2
        """
        
        super().__init__()
        self.num_classes = num_classes
        
        self.embedding_dim = embedding_dim
        self.hidden_dim1 = hidden_dim1
        self.hidden_dim2 = hidden_dim2
        self.leaky_relu_negative_slope = leaky_relu_negative_slope
        self.dropout_probability = dropout_probability
        
        self.hidden1 = nn.Linear(self.embedding_dim, self.hidden_dim1)
        self.hidden2 = nn.Linear(self.hidden_dim1,self.hidden_dim2)
        self.theta = nn.Linear(self.hidden_dim2, self.num_classes)
        
        self.log_softmax = nn.LogSoftmax(dim=1) # A dimension along which LogSoftmax will be computed.
        self.apply_dropout = nn.Dropout(self.dropout_probability)
        
        
    def forward(self, x):
        """
        Define the forward pass of the network.

        Args:
            x (torch.Tensor): Input tensor containing embedded word vectors.
                              Shape: (batch_size, sequence_length, embedding_dim)

        Returns:
            torch.Tensor: Log probability of each class. Shape: (batch_size, num_classes)
        """
        
        # Average the input word embeddings
        x = x.mean(dim=1)

        # Pass through the shared layers
        x = self.hidden1(x)
        x = F.leaky_relu(x, negative_slope=self.leaky_relu_negative_slope)
        x = self.apply_dropout(x)

        x = self.hidden2(x)
        x = F.leaky_relu(x, negative_slope=self.leaky_relu_negative_slope)
        x = self.apply_dropout(x)

        # Pass through final layer
        x = self.theta(x)

        # Apply the LogSoftmax activation function
        x = self.log_softmax(x)

        return x
    
    def train_model(self,
                    X_train,
                    Y_train,
                    X_dev,
                    Y_dev,
                    soft_labels,
                    optimizer,
                    num_iterations,
                    soft_label_weight=0.5,
                    loss_fn=nn.CrossEntropyLoss(),
                    batch_size=500,
                    check_every=10,
                    verbose=False):
        """
        Method to train the model. 

        soft_labels are only available for the training set. 
        """

        # Let the model know that we're in training mode, which is important for dropout
        self.train()

        loss_history = []
        train_accuracy = []
        dev_accuracy = []

        for t in range(num_iterations):
            if batch_size >= X_train.shape[0]: 
                X_batch = X_train
                Y_batch = Y_train
                soft_labels_batch = soft_labels
            else:
                batch_indices = np.random.randint(X_train.shape[0], size=batch_size)
                X_batch = X_train[batch_indices]
                Y_batch = Y_train[batch_indices]
                soft_labels_batch = soft_labels[batch_indices]

            # Forward pass 
            log_probs_batch = self.forward(X_batch)

            # Distillation loss (cross entropy loss with hard labels + cross entropy loss with soft labels)
            # weighted with soft and hard label
            loss = (1 - soft_label_weight) * loss_fn(log_probs_batch, Y_batch) + \
                    soft_label_weight * loss_fn(log_probs_batch, soft_labels_batch)

            # Backprop
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if t % check_every == 0:
                loss_value = loss.item()
                loss_history.append(loss_value)

                # Check train accuracy (entire set, not just batch) 
                train_y_pred = self.predict(X_train)
                train_acc = self.accuracy(train_y_pred, Y_train.detach().numpy()) 
                train_accuracy.append(train_acc)

                # Check dev accuracy (entire set, not just batch) 
                dev_y_pred = self.predict(X_dev)
                dev_acc = self.accuracy(dev_y_pred, Y_dev.detach().numpy())
                dev_accuracy.append(dev_acc)

                if verbose: print(f"Iteration={t}, Loss={loss_value}")

        return loss_history, train_accuracy, dev_accuracy

    
    def predict(self, X, proba_mode=False):
        """
        Method to make predictions given a trained model. 
        
        No need to modify this method. 
        """
        self.eval()

        log_probs_batch = self.forward(X)

        if proba_mode:
            return log_probs_batch
        else:
            # Convert log probabilities to labels
            label_batch = proba_to_label(log_probs_batch)
            return label_batch
    
    @staticmethod
    def accuracy(y_pred: np.ndarray, y_true: np.ndarray) -> float: 
        """
        Calculates accuracy. No need to modify this method. 
        """
        return np.mean(y_pred == y_true)