In [1]:
import torch
from torch import nn
import torch.nn.functional as F

In [2]:
#define the self attention class: the complete layer of multiple heads
#whatever was vertically dimensioned in the theory, is now horizontal.

In [3]:
class SelfAttention(nn.Module):
    def __init__(self,k,heads):
        super().__init__()
        assert k%heads==0
        self.k,self.heads=k,heads
        #The linear function creates a transformation layer which returns a transformed output for an input. y = x(W.T)
        self.toKeys = nn.Linear(k,k,bias=False) #wk
        self.toQueries = nn.Linear(k,k,bias=False) #wq
        self.toValues = nn.Linear(k,k,bias=False) #wv
        #to concatenate the resultant chunks of each attention head
        self.unifyHeads = nn.Linear(k,k)
    def forward(self,x):
        #the input would be a 3-d vector of form (batch_size,seq_len,in_features) because the sequence of input vectors comes in batches.
        b,t,k=x.size()
        h=self.heads
        #but how does the matrix multiplication work on 3d matrix such as x: it also gives out a 3d matrix of form (batch_size,seq_len,out_features)
        #here, both in and out features are k
        queries=self.toQueries(x)
        keys = self.toKeys(x)
        values = self.toValues(x)

        headSize = self.k//self.heads

        #This simply reshapes the tensors to break the last dimension into two dimensions.
        #purpose: to divide the features of each input vector into h parts, so that each head receives one chunk of that input vector.
        #the chunks are of low dimensions and easier to compute individually.
        keys = keys.view(b,t,h,headSize)
        queries = queries.view(b,t,h,headSize)
        values = values.view(b,t,h,headSize)

        # - fold heads into the batch dimension=> needed to compute the dot product parallely
        keys = keys.transpose(1, 2).contiguous().view(b * h, t, headSize)
        queries = queries.transpose(1, 2).contiguous().view(b * h, t, headSize)
        values = values.transpose(1, 2).contiguous().view(b * h, t, headSize)
        # now the first dimension is size of the batch: each batch has sequences of mini-vectors(one vector for each head).
        # size of sequence is the block size.
        # for each sequence, we have corresponding outputs
        # how to concatenate those outputs once you have transformed the matrix? we un-transform it first before concatenating.

        #compute weights
        raw_weights = torch.bmm(queries,keys.transpose(1,2))
        #raw_weights is of dimension: b*h, t, t

        raw_weights /= headSize**(1/2)
        weights = F.softmax(raw_weights,dim=2)

        #apply self-attention to the input vectors
        out = torch.bmm(weights, values).view(b,h,t,headSize)
        out = out.transpose(1,2).contiguous().view(b,t,h*headSize)
        #unifyHeads is not really necessary once we do h*headSize
        return self.unifyHeads(out)

Now, if we combine self attention with some other mechanisms, we can build a transformer block, which we can repeat to achieve better results.
Still don't understand how connecting a bunch of components works out?

In [4]:
class TransformerBlock(nn.Module):
  def __init__(self,k,heads):
    super().__init__()

    self.attention=SelfAttention(k,heads=heads)

    self.norm1 = nn.LayerNorm(k)
    self.norm2 = nn.LayerNorm(k)

    self.ff = nn.Sequential(
        nn.Linear(k,4*k),
        nn.ReLU(),
        nn.Linear(4*k,k)
    )
    #but sequential's units will output a number, not a vector? yep and a vector is k numbers
  def forward(self,x):
    attended = self.attention(x)
    x=self.norm1(attended+x)

    fedForward = self.ff(x)
    return self.norm2(fedForward+x)


In [11]:
class CTransformer(nn.Module):
  def __init__(self,k,heads,depth,seq_length,num_tokens,num_classes):
    super().__init__()

    #layer for handling input
    self.num_tokens=num_tokens #size of vocabulary i.e. no of unique tokens that the transformer knows.
    self.token_emb = nn.Embedding(num_tokens,k) # map each token(integer) to a size k vector.
    self.pos_emb = nn.Embedding(seq_length,k) # map each position (0->seq_length-1) to a size k vector
    #these embedding layers will be initialized randomly, but trained with the input.

    #transformer blocks
    tblocks=[]
    for i in range(depth):
      tblocks.append(TransformerBlock(k,heads))
    self.tblocks=nn.Sequential(*tblocks)

    #layer for handling output: project to an array of size num_classes
    self.toProbs = nn.Linear(k,num_classes)

  def forward(self,x):
    # process the input before feeding to transformer blocks
    """
        :param x: A (b, t) tensor of integer values representing
                  words (in some predetermined vocabulary).
                  Each batch has t tokens. so each batch corresponds to 1 sequence?
        :return: A (b, c) tensor of log-probabilities over the
                 classes (where c is the nr. of classes).
                 Probability distribution over c classes for each batch
    """
    tokens = self.token_emb(x)
    b, t, k = tokens.size()
    positions = torch.arange(t,device=x.device)
    positions = self.pos_emb(positions)[None,:,:].expand(b,t,k)
    print(x.device, tokens.device, positions.device)
    '''
    [None,:,:]
    This adds a new dimension at the beginning of the tensor.
    It changes the shape from (t, k) to (1, t, k).
    expand(b,t,k)
    This expands the tensor to shape (b, t, k), where b is the batch size.
    It repeats the positional embeddings b times along the first dimension.
    This operation doesn't allocate new memory; it creates a view of the original tensor.
    '''
    x = tokens+positions

    x = self.tblocks(x)

    #process the output before returning
    x = x.mean(dim=1) # calculates mean over the second dimension ie t. Now x is of shape (b,k) because each batch only has one vector
    x = self.toProbs(x) # projects x to a shape (b,1,num_classes)
    return F.log_softmax(x,dim=1) # calculates log of softmax across the second dimension ie num_classes. log is easier to handle than actual probability calculation
    #done according to the task of the transformer.

In [6]:
# Load the IMDB dataset
from google.colab import drive
drive.mount('/content/drive')
import pandas as pd
%cd /content/drive/My Drive/Colab Notebooks/


Mounted at /content/drive
/content/drive/My Drive/Colab Notebooks


In [7]:
# Get cpu, gpu or mps device for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

Using cuda device


In [8]:
import os
import torch
from transformers import AutoTokenizer
import torch.nn.functional as F
import pandas as pd

def handle_data(file_path,batch_size=10,seq_length=512):
      dataset = pd.read_csv("IMDB.csv")
      dataset = dataset.sample(frac=1).reset_index(drop=True)
      #making the length of df divisible by 10 so that suitable batches can be made
      while(len(dataset)%100!=0):
          dataset=dataset[:-1]
      reviews = dataset['review'].astype(str).tolist()
      label_mapping = {
          'positive': 1,
          'negative': 0,
          'neutral': 2
      }

      # Map the sentiment labels to numeric values
      dataset['sentiment'] = dataset['sentiment'].map(label_mapping)
      labels = torch.tensor(dataset['sentiment'])
      #Convert into vectors
      tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
      vocab_size = tokenizer.vocab_size
      encoded_reviews = tokenizer(
          reviews,
          return_tensors="pt", #retuning tensors are compatible with pytorch
          padding=True,
          truncation=True,
          max_length=seq_length
      )
      # Define the split ratio (e.g., 80% train, 20% test)
      train_size = int(0.5 * len(dataset))
      test_size = len(dataset) - train_size
      #divide data into train, test and then batches
      train_reviews =encoded_reviews['input_ids'][:train_size].view(train_size//batch_size,batch_size,seq_length)
      test_reviews =encoded_reviews['input_ids'][train_size:].view(test_size//batch_size,batch_size,seq_length)
      train_labels = labels[:train_size].view(train_size//batch_size,batch_size)
      test_labels = labels[train_size:].view(test_size//batch_size,batch_size)
      return (train_reviews.to(device), test_reviews.to(device),
        train_labels.to(device), test_labels.to(device),
        vocab_size)
def train(model,x_train,x_labels,epochs=20,learning_rate=0.0001,lr_warmup=10000):
  model.train(True)
  opt = torch.optim.Adam(lr=learning_rate, params=model.parameters())
  sch = torch.optim.lr_scheduler.LambdaLR(opt, lambda i: min(i / (lr_warmup / x_train.shape[1]), 1.0))
  for epoch in range(epochs):
    for batch_reviews,batch_labels in zip(x_train,x_labels):
      opt.zero_grad()
      out = model.forward(batch_reviews)
      loss = F.nll_loss(out,batch_labels)
      #log the loss
      print(f"Epoch {epoch}, loss: {loss}")
      loss.backward()
      #update parameters
      opt.step()
      sch.step()

def test(model,x_test,x_labels):
    model.eval(True)
    correct = 0
    for batch_reviews,batch_labels in zip(x_test,x_labels):
        out = model.forward(batch_reviews)
        for i,sentence in enumerate(out):
            if(torch.argmax(out[i])==batch_labels[i]):
                correct+=1
    print(f"Accuracy: {(correct/(x_test.shape[0]*x_test.shape[1]))*100}%")

In [9]:
#handle data
file_path="helper/IMDB.csv"
embedding_dim = 128
batch_size=5
seq_length=256
train_reviews,test_reviews,train_labels,test_labels,vocab_size=handle_data(file_path,batch_size=10,seq_length=512)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

In [15]:
if(os.path.exists("IMDBSentiment.pth")==False):
    #train and save the model
    """
    1. Load the datasets
    2. Pre-process the data:
        2a. Tokenization+ vectorization: Convert words to numbers in the input and labels to indices in the output.
        2b. Length of each input(no of words) should be equal to seq_length.
    3. Split into train and test
    4. Decide batch size for training
    5. Perform Training
    6. Perform Testing
    """
    #define the model
    model = CTransformer(k=embedding_dim,heads=8,depth=6,seq_length=seq_length,num_tokens=vocab_size,num_classes=2)
    model = model.to(device)

    #start the training
    train(model,x_train=train_reviews,x_labels=train_labels,epochs=80,learning_rate=0.0001,lr_warmup=10000)
    torch.save(model.state_dict(), "IMDBSentiment.pth")
else:
    #load the saved model
    model = CTransformer().to(device)
    model.load_state_dict(torch.load("IMDBSentiment.pth"),map_location=device)

test(model,test_reviews,test_labels)

RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


# Future Scope
1. Incorporating larger sequence lengths during inference.
2. Adding attention mask to the architecture and then train on larger seq length.
3. Prompt the user for hyper parameters.