## Import the Libraries

In [1]:
import pandas
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

from torchvision.transforms import Lambda, ToTensor

## Write the Datasets and Dataloaders

In [2]:
# Create a Custom Dataset from train_data.csv and eval_data.csv
class CustomDataset(Dataset):
    def __init__(self, csv_file, transform=None):
        self.data = pandas.read_csv(csv_file)
        self.transform = transform
        # Remove the first row from the Dataframe
        self.data = self.data.iloc[1:]

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist() # Convert the tensor to a list
        # Get the data from the Dataframe
        data = self.data.iloc[idx]
        # Convert the data to a numpy array
        data = data[0] # First column of the Dataframe
        data_num = []
        for i in data:
            data_num.append(ord(i)- 96)
        data = np.array(data_num)
        # Convert the data to a tensor
        data = torch.from_numpy(data)
        label = self.data.iloc[idx]
        label = label[1]
        label_num = []
        for i in label:
            label_num.append(ord(i)- 96)
        # Append a 0 to the label_num list at the beginning
        label_num.insert(0, 0)
        label = np.array(label_num)
        if self.transform:
            data = self.transform(data) # Apply the transform on the data
        return data, label

# Write a DataLoader for the Custom Dataset
train_dataset = CustomDataset('./A3 files/train_data.csv', transform=None)

# Take only the first 200 examples
train_dataset.data = train_dataset.data.iloc[:200]

# Split the Dataset into Train and Validation Datasets
train_dataset, val_dataset = train_test_split(train_dataset, test_size=0.2)

# Create a DataLoader for the Train and Validation Datasets
train_dataloader = DataLoader(train_dataset, batch_size=1, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=1, shuffle=True)

############################# TESTING CODE #############################

# Print out some data from the Train Dataset
for i, data in enumerate(train_dataloader):
    print(data)
    break

[tensor([[ 1,  3, 21, 14,  5, 15,  6,  2]]), tensor([[ 0, 17, 24,  9, 21, 25,  5, 18, 15]])]


## Write the Transformer Class

In [3]:
import math

class PositionalEncoding(nn.Module):
    # Injecting Some Information about the Relative or the Absolute Positioning of the tokens in the sequence
    def __init__(self, d_model, dropout = 0.1, max_len=8):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p = dropout)
        position = []
        pos_enc = torch.zeros(max_len, d_model) # Positional Encoding -> Max Length and the dimensions of the model
        for i in range(max_len):
            position.append(i)
        position = torch.tensor(position).unsqueeze(1) # Got the pos -> value
        # Now, you want to make the position term to be max_len, d_model
        position_stacked = [position] * d_model 
        position = torch.cat(position_stacked, dim=1)
        # Now to obtain the 10000^2i/d_model 
        div_term = torch.arange(0, d_model, 2) # The Value to be divided
        div_term = div_term/d_model
        div_term = div_term.type(torch.float64)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))   
        pos_enc[:, 0::2] = torch.sin(position[:, 0::2]*div_term)
        pos_enc[:, 1::2] = torch.cos(position[:, 1::2]*div_term)
        self.pos_enc = pos_enc
        return
    
    def forward(self, x):
        x = x + self.pos_enc # This is placeholder -> Modify to include only the len given -> Not Max Len
        #print("Positional Encoding Completed")
        return x

class PosEncoding(nn.Module):
    def __init__(self):
        super(PosEncoding, self).__init__()
        self.max_len = 1000
        self.embedding_dim = 400
        #self.embedding_layer = nn.Embedding(len(word2index), self.embedding_dim)
    def forward(self, embedding):
        # embeddings (b, s, d)
        # pos runs from 0, s-1
        # i runs from 0, d-1
        #embedding = self.embedding_layer(x)
        pos_max = embedding.shape[1]
        d_max = embedding.shape[2]
        # create pos encoding matrix of size (b, s, d)
        pos_encoding = torch.zeros((pos_max, d_max))
        for pos in range(pos_max):
            for i in range(0, d_max, 2):
                pos_encoding[pos, i] = np.sin(pos / (10000 ** ((2 * i)/d_max)))
                pos_encoding[pos, i+1] = np.cos(pos / (10000 ** ((2 * (i+1))/d_max)))
        pos_encoding = pos_encoding.unsqueeze(0)
        pos_encoding = pos_encoding.repeat(embedding.shape[0], 1, 1)
        #print(embedding.shape, pos_encoding.shape)
        embedding = embedding + pos_encoding.to(embedding.device)
        return embedding
# Create a Transformer Encoder
class TransformerEncoder(nn.Module):
    def __init__(self, d_model, nhead, num_layers, dim_feedforward, dropout=0.1, activation="relu"):
        super(TransformerEncoder, self).__init__()
        self.d_model = d_model
        self.nhead = nhead
        self.num_layers = num_layers
        self.dim_feedforward = dim_feedforward
        self.dropout = dropout
        self.activation = activation
        self.pos_enc = PositionalEncoding(d_model, dropout, max_len = 8)
        self.transformer_encoder = nn.TransformerEncoder(nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout, activation), num_layers)
        return
    
    def forward(self, x):
        x = self.pos_enc(x)
        x = self.transformer_encoder(x)
        return x

# Create a Transformer Decoder
class TransformerDecoder(nn.Module):
    def __init__(self, d_model, nhead, num_layers, dim_feedforward, dropout=0.1, activation="relu"):
        super(TransformerDecoder, self).__init__()
        self.d_model = d_model
        self.nhead = nhead
        self.num_layers = num_layers
        self.dim_feedforward = dim_feedforward
        self.dropout = dropout
        self.activation = activation
        self.pos_enc = PosEncoding()
        self.transformer_decoder = nn.TransformerDecoder(nn.TransformerDecoderLayer(d_model, nhead, dim_feedforward, dropout, activation), num_layers)
        self.linear = nn.Linear(d_model, 27)
        return
    
    def forward(self, target, memory):
        #print("Decoder")
        seq_len = target.shape[1]
        target = self.pos_enc(target)
        # create target mask
        # add batcg dim to target mask, batch dim = 1
        target_mask = torch.triu(torch.ones(seq_len, seq_len), diagonal=1)
        # add batch dim to target mask of 1
        #target_mask = target_mask.unsqueeze(0)
        # reshape target and memory
        target = target.permute(1, 0, 2)
        memory = memory.permute(1, 0, 2)
        x = self.transformer_decoder(target, memory, tgt_mask = target_mask)
        x = self.linear(x)
        return x

# Create a Transformer Model
class Transformer(nn.Module):
    def __init__(self, d_model, nhead, num_layers, dim_feedforward, dropout=0.1, activation="relu"):
        super(Transformer, self).__init__()
        self.embedding = nn.Embedding(27, d_model)
        self.embed_outputs = nn.Embedding(27, d_model)
        self.encoder = TransformerEncoder(d_model, nhead, num_layers, dim_feedforward, dropout, activation)
        self.decoder = TransformerDecoder(d_model, nhead, num_layers, dim_feedforward, dropout, activation)
        return
    
    def forward(self, x, tgt, train=True):
        if train:
            # Embed input sequence
            x = self.embedding(x)  # x: batch_size, seq_len, d_model
            x = self.encoder(x)  # Encoder output

            # Prepare variables for autoregressive generation in training
            outputs = []
            next_input = tgt[:, 0].unsqueeze(1)  # Start with the first input token

            # Loop through the sequence
            for i in range(1, tgt.size(1)):
                # Get embedding for the current input token
                input_emb = self.embed_outputs(next_input)
                output = self.decoder(input_emb, x)
                # Predict the next output token
                outputs.append(next_input)
                # Use the actual next token from the target sequence as the next input
                next_input = tgt[:, i].unsqueeze(1)
            # Stack all the outputs
            y = torch.cat(outputs, dim=1)
        else:
            x = self.embedding(x)
            x = self.encoder(x)
            # Start token is usually the first token in the target vocabulary
            start_token = tgt[:, 0].unsqueeze(1)
            # Initialize tensor for decoder outputs
            outputs = start_token
            # Autoregressively generate tokens using the decoder
            for i in range(1, tgt.size(1)):
                input_emb = self.embed_outputs(outputs)
                output = self.decoder(input_emb, x)
                next_token = torch.argmax(output[:, -1, :], dim=-1, keepdim=True)
                outputs = torch.cat((outputs, next_token), dim=1)

            y = outputs
        return y


### Train the Model

In [4]:
# Create a Transformer Model
Model = Transformer(300, 2, 2, 300, 0.1, "relu")

# Create a Loss Function
criterion = nn.CrossEntropyLoss()

# Create an Optimizer
optimizer = torch.optim.Adam(Model.parameters(), lr=0.0001)

# Create a Training Loop
acc_tot = 0
tot_cnt = 0
def train_loop(dataloader, model, loss_fn, optimizer):
    acc_tot = 0
    tot_cnt = 0
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        # Compute the prediction and the loss
        pred = model(X, y, train=True)
        print(pred.shape, y.shape)
        # reshape preds to 1 2 0
        pred = pred.permute(1, 2,0)
        #print(pred.shape, y.shape)
        loss = loss_fn(pred, y)
        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        # calculate accuracy
        accuracy = (pred.argmax(1) == y).type(torch.float).sum().item()
        # divide by batch size and number of chars
        accuracy = accuracy/(y.shape[0]*y.shape[1])
        acc_tot += accuracy
        tot_cnt += 1
        acc_here = acc_tot/tot_cnt
        # decode the preds using argmax
        prediction = pred.argmax(1)
        # print("Predcition: ")
        # print(prediction)
        # print("Label: ")
        # print(y)
        # print("Input: ")
        # print(X)
        # print("------------------------------------------------------")
        acc_here = acc_tot/tot_cnt
        # Print the loss
        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            # print epoch, loss, batch, acc_here
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}] accuracy: {acc_here:>7f}")
            
# Train the Model
epochs = 10

for t in range(1):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dataloader, Model, criterion, optimizer)

Epoch 1
-------------------------------
torch.Size([1, 8]) torch.Size([1, 9])




RuntimeError: permute(sparse_coo): number of dimensions in the tensor input does not match the length of the desired ordering of dimensions i.e. input.dim() = 2 is not equal to len(dims) = 3

In [None]:
# Create a Validation Loop  
def val_loop(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    acc_sum = 0
    cnt = 0
    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X,X,train=False)
            # Remove the First Element
            pred = pred[:, 1:]
            print("Input", X)
            print("Prediction", pred)
            print("===========================================================")
            accuracy = (pred.argmax(1) == y).type(torch.float).sum().item()
            accuracy = accuracy/(y.shape[0]*y.shape[1])
            acc_sum += accuracy
            cnt += 1
            if cnt == 10:
                break
    test_loss /= num_batches
    acc_here = acc_sum/cnt
    print("Eval loop: loss: ", test_loss, " accuracy: ", acc_here)

Model = Transformer(300, 2, 2, 300, 0.1, "relu")

val_loop(val_dataloader, Model, criterion)