<a href="https://colab.research.google.com/github/poojakedia/EnglishToPython/blob/main/EnglishToPythonTransformerModel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Importing libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from timeit import default_timer as timer

import torch
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import torchtext
from torchtext.data import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
import tokenize
import io
from sklearn.model_selection import train_test_split
import re
import nltk
from nltk.stem import PorterStemmer

# Setting the device for model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

%matplotlib inline

In [None]:
# Retrieving the data
!wget "https://drive.google.com/u/0/uc?id=1rHb0FQ5z5ZpaY2HpyFGY6CeyDG0kTLoO&export=download" -O english_python_data.txt

--2024-01-16 03:44:20--  https://drive.google.com/u/0/uc?id=1rHb0FQ5z5ZpaY2HpyFGY6CeyDG0kTLoO&export=download
Resolving drive.google.com (drive.google.com)... 74.125.137.102, 74.125.137.113, 74.125.137.138, ...
Connecting to drive.google.com (drive.google.com)|74.125.137.102|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://drive.google.com/uc?id=1rHb0FQ5z5ZpaY2HpyFGY6CeyDG0kTLoO&export=download [following]
--2024-01-16 03:44:20--  https://drive.google.com/uc?id=1rHb0FQ5z5ZpaY2HpyFGY6CeyDG0kTLoO&export=download
Reusing existing connection to drive.google.com:443.
HTTP request sent, awaiting response... 303 See Other
Location: https://drive.usercontent.google.com/download?id=1rHb0FQ5z5ZpaY2HpyFGY6CeyDG0kTLoO&export=download [following]
--2024-01-16 03:44:20--  https://drive.usercontent.google.com/download?id=1rHb0FQ5z5ZpaY2HpyFGY6CeyDG0kTLoO&export=download
Resolving drive.usercontent.google.com (drive.usercontent.google.com)... 142.250.101.132, 2607

In [None]:
# Examining the dataset
with open('english_python_data.txt',"r") as data_file:
  print(data_file.readlines()[:5]) # Printing out the first 5 lines of the data

['# write a python program to add two numbers \n', 'num1 = 1.5\n', 'num2 = 6.3\n', 'sum = num1 + num2\n', "print(f'Sum: {sum}')\n"]


In [None]:
# Creating a Regular Expression (Regex) pattern of urls to remove them
url_pattern = re.compile(r"https?://\S+")

# Making a dataset
with open('english_python_data.txt',"r") as data_file:
  data_lines = data_file.readlines()
  dps = [] # List of dictionaries
  dp = None # The current problem and solution
  for line in data_lines:
    if line[0] == "#":
      if dp:
        dp['solution'] = ''.join(dp['solution'])
        dps.append(dp)
      dp = {"question": None, "solution": []}
      dp['question'] = line[1:].strip("\n ") # Removing any \n in the question
      dp['question'] = re.sub(r'^\d+ ', "", dp['question']) # If the question starts with numbers, I remove them.
      dp['question'] = url_pattern.sub('',dp['question']) # Replacing any urls
      dp['question'] = dp['question'].lower() # lowercasing the question
      dp['question'] = re.sub(r"([.!?])","",dp['question']) # removing any punctuation
    else:
      dp["solution"].append(line)

# converting the data to a table for easier viewing
dataset = pd.DataFrame(dps)
dataset

Unnamed: 0,question,solution
0,write a python program to add two numbers,num1 = 1.5\nnum2 = 6.3\nsum = num1 + num2\npri...
1,write a python function to add two user provid...,"def add_two_numbers(num1, num2):\n sum = nu..."
2,write a program to find and print the largest ...,\nnum1 = 10\nnum2 = 12\nnum3 = 14\nif (num1 >=...
3,write a program to find and print the smallest...,num1 = 10\nnum2 = 12\nnum3 = 14\nif (num1 <= n...
4,write a python function to merge two given lis...,"def merge_lists(l1, l2):\n return l1 + l2\n..."
...,...,...
4952,write a program to print bit wise and of two n...,a = 60 # 60 = 0011 1100\nb = 13 ...
4953,write a program to print bit wise or of two nu...,"a = 60\nb = 13\n\nc = a | b\nprint(""OR"", c)\n\n\n"
4954,write a program to print bit wise xor of two n...,"a = 60\nb = 13\n\nc = a ^ b\nprint(""XOR"", c)\n..."
4955,write a program to calculate binary ones compl...,"a = 60\n\nc = ~a\nprint(""Binary Ones Complemen..."


In [None]:
# Looking at the first question and the corresponding solution
print(dataset.loc[0,'question'])
print(dataset.loc[0,'solution'])

write a python program to add two numbers
num1 = 1.5
num2 = 6.3
sum = num1 + num2
print(f'Sum: {sum}')





In [None]:
# Creating dictionaries for the tokenizers and the vocabularies
SRC_LANGUAGE = 'en'
TGT_LANGUAGE = 'python'

tokenizers = {}
vocabularies = {}

tokenizers[SRC_LANGUAGE] = get_tokenizer('spacy',language='en_core_web_sm')
tokenizers[TGT_LANGUAGE] = get_tokenizer('spacy',language='en_core_web_sm')

# Generating special characters and their indicies
UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3 # Tokens for Unknown, Padding, beginning of sentence, end of sentence
special_symbols = ['<unk>','<pad>','<bos>','<eos>']

# Generating the tokens
# This function returns
def yield_tokens(data_iter, language):
  for data_sample in data_iter:
    yield tokenizers[SRC_LANGUAGE](data_sample)

# Generating the tokens and making the vocabularies based off of them
for language in [SRC_LANGUAGE, TGT_LANGUAGE]:
  if language == SRC_LANGUAGE:
    vocabularies[language] = build_vocab_from_iterator(yield_tokens(dataset['question'],language),min_freq=1,specials=special_symbols,special_first=True)
  else:
    vocabularies[language] = build_vocab_from_iterator(yield_tokens(dataset['solution'],language),min_freq=1,specials=special_symbols,special_first=True)

for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
  vocabularies[ln].set_default_index(UNK_IDX) # Setting the unknown index to be the default index when a token isn't found

In [None]:
# Viewing how big our vocabularies are
print(vocabularies[SRC_LANGUAGE].__len__())
print(vocabularies[TGT_LANGUAGE].__len__())

2387
11880


In [None]:
# Looking up a word in the vocabulary
print(vocabularies[SRC_LANGUAGE]['write'])

6


In [None]:
from torch import Tensor
import torch
import torch.nn as nn
from torch.nn import Transformer
import math
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Positional Encoding module -> this class is the positional encoder (see above for details)
class PositionalEncoding(nn.Module):
  def __init__(self,emb_size:int, dropout:float, maxlen:int = 5000):
    super(PositionalEncoding,self).__init__()
    den = torch.exp(-torch.arange(0,emb_size,2)*math.log(10000) / emb_size)
    pos = torch.arange(0,maxlen).reshape(maxlen,1)
    pos_embedding = torch.zeros((maxlen,emb_size))
    pos_embedding[:,0::2] = torch.sin(pos * den)
    pos_embedding[:,1::2] = torch.cos(pos * den)
    pos_embedding = pos_embedding.unsqueeze(-2)

    self.dropout = nn.Dropout(dropout)

    # Saving the positional encoding in the model state dict, but making sure PyTorch doesn't "train"
    # these parameters because they don't need to be trained
    self.register_buffer('pos_embedding',pos_embedding)

  def forward(self,token_embedding: Tensor):
    return self.dropout(token_embedding + self.pos_embedding[:token_embedding.size(0), :])

# Converting the tokens into embeddings
class TokenEmbedding(nn.Module):
  def __init__(self,vocab_size: int, emb_size):
    super(TokenEmbedding, self).__init__()
    self.embedding = nn.Embedding(vocab_size,emb_size)
    self.embed_size = emb_size

  def forward(self,tokens:Tensor):
    return self.embedding(tokens.long()) * math.sqrt(self.embed_size) # we multiply by square root of embedding size to scale. The Transformer paper mentions this.

# The Actual Model
class Seq2SeqTransformer(nn.Module):
  def __init__(self, num_encoder_layers:int, num_decoder_layers:int, emb_size:int, nhead:int, src_vocab_size:int, tgt_vocab_size:int, dim_feedforward: int=512, dropout:float = 0.1):
    super(Seq2SeqTransformer, self).__init__()
    self.transformer = nn.Transformer(d_model=emb_size, nhead=nhead, num_encoder_layers=num_encoder_layers, num_decoder_layers=num_decoder_layers,dim_feedforward=dim_feedforward,dropout=dropout,
                                      batch_first=True)
    self.generator = nn.Linear(emb_size,tgt_vocab_size) # A layer to convert the matrix (seq_len, emb_size) to (seq_len, tgt_vocab_size)
    self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
    self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size,emb_size)

    # Getting the positional encodings
    self.positional_encoding = PositionalEncoding(emb_size,dropout=dropout)

  def forward(self, src:Tensor, trg: Tensor, src_mask: Tensor, tgt_mask: Tensor, src_padding_mask: Tensor, tgt_padding_mask: Tensor,
              memory_key_padding_mask: Tensor):

    # Embedding both the input and output
    src_embedding = self.positional_encoding(self.src_tok_emb(src))
    tgt_embedding = self.positional_encoding(self.tgt_tok_emb(trg))

    # Getting the output
    output = self.transformer(src_embedding, tgt_embedding, src_mask, tgt_mask, None, src_padding_mask,tgt_padding_mask,memory_key_padding_mask)

    # Getting the logits
    return self.generator(output)

  # Encoding the input
  def encode(self, src: Tensor, src_mask: Tensor):
    embedding = self.positional_encoding(self.src_tok_emb(src))
    encoder_output = self.transformer.encoder(embedding, src_mask)
    return encoder_output

  # Decoding the output
  def decode(self,tgt:Tensor, memory: Tensor, tgt_mask:Tensor):
    tgt_embedding = self.tgt_tok_emb(tgt)
    return self.transformer.decoder(self.positional_encoding(tgt_embedding),memory, tgt_mask)


In [None]:
# Defining the lookahead mask that will prevent the model from looking ahead during training
# Also need to define masks that will mask the padding tokens.
# If we don't mask the padding tokens, the model will end up taking the values of the padding into account
# into prediction

# Generating the lookahead mask
def generate_square_subsequent_mask(sz):
  mask = (torch.triu(torch.ones((sz,sz),device=DEVICE)) == 1).transpose(0,1)
  mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
  return mask

# Creating the other mask
def create_mask(src, tgt):
  src_seq_len = src.shape[1]
  tgt_seq_len = tgt.shape[1]

  tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
  src_mask = torch.zeros((src_seq_len, src_seq_len),device=DEVICE).type(torch.bool)

  src_padding_mask = (src == PAD_IDX)
  tgt_padding_mask = (tgt == PAD_IDX)
  return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask

In [None]:
# Splitting the data into training and testing
training, testing = train_test_split(dataset,test_size=0.2,random_state=42,shuffle=True)

# Running the data through a pipeline to get the transformed and prepared dataset
# helper function to club together sequential operations
def sequential_transforms(*transforms):
  def func(txt_input):
    for transform in transforms:
      txt_input = transform(txt_input)
    return txt_input
  return func

# Function to add BOS/EOS and create tensor for input sequence indicies
def tensor_transform(token_ids):
  return torch.cat((torch.tensor([BOS_IDX]),torch.tensor(token_ids),torch.tensor([EOS_IDX])))

text_transform = {}
for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
  text_transform[ln] = sequential_transforms(tokenizers[ln],vocabularies[ln],tensor_transform) # Tokenize, Convert to Indicies, then Add Special Tokens

# function to put all the data samples into batches
def collate_fn(batch):
  src_batch, tgt_batch = [], []

  # Iterating through the questions
  for X in batch['question'].values:
    src_batch.append(text_transform[SRC_LANGUAGE](X.strip('\n\t')))

  # Iterating through the solutions
  for y in batch['solution'].values:
    tgt_batch.append(text_transform[TGT_LANGUAGE](y.strip('\n\t')))

  src_batch = pad_sequence(src_batch, padding_value=PAD_IDX)
  tgt_batch = pad_sequence(tgt_batch, padding_value=PAD_IDX)
  return src_batch.T, tgt_batch.T

In [None]:
# Defining the model, loss function, and optimizer
torch.manual_seed(10)

SRC_VOCAB_SIZE = len(vocabularies[SRC_LANGUAGE])
TGT_VOCAB_SIZE = len(vocabularies[TGT_LANGUAGE])
EMB_SIZE = 128
NHEAD = 4
FFN_HID_DIM = 128
BATCH_SIZE = 15
NUM_ENCODER_LAYERS = 3
NUM_DECODER_LAYERS = 3

# Defining the model
transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE, NHEAD, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, FFN_HID_DIM)

# Setting the parameters using the xavier uniform distribution
for p in transformer.parameters():
  if p.dim() > 1:
    nn.init.xavier_uniform_(p)

# Putting the model on GPU
transformer = transformer.to(DEVICE)

# Defining the loss function
loss_fn = torch.nn.CrossEntropyLoss(ignore_index=PAD_IDX) # makes sure that the padding token doesn't contribute to the loss function!

# Defining the optimizer
optimizer = torch.optim.Adam(transformer.parameters(),lr=0.0001)

In [None]:
def train_epoch(model,optimizer):
  # Setting the model to training mode
  model.train()
  losses = 0

  # Preparing the data
  X,y = collate_fn(training)
  training_dataset = TensorDataset(X,y)
  train_dataloader = DataLoader(training_dataset, batch_size=BATCH_SIZE)

  # Iterating through the data
  for src, tgt in train_dataloader:
    src = src.to(DEVICE)
    tgt = tgt.to(DEVICE)

    tgt_input = tgt[:,:-1] # Getting the sentence except the EOS since EOS is never inputted to decoder

    # Getting the masks
    src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)
    logits = model(src,tgt_input, src_mask, tgt_mask, src_padding_mask,tgt_padding_mask,src_padding_mask) # memory is the encoder outputs

    optimizer.zero_grad()
    tgt_out = tgt[:,1:]
    loss = loss_fn(logits.reshape(logits.size(0),-1,1827),tgt_out)
    loss.backward() # Back propagation, calculating the gradients

    optimizer.step()
    losses += loss.item()

  return losses / len(list(train_dataloader)) # Getting the average loss per example

# Evaluation Loop
def evaluate(model):
  model.eval()
  losses = 0

  # Preparing the data
  X,y = collate_fn(testing)
  testing_data = TensorDataset(X,y)
  val_dataloader = DataLoader(testing_data, batch_size=BATCH_SIZE)

  # Iterating through the data
  for src, tgt in val_dataloader:
    src = src.to(DEVICE)
    tgt = tgt.to(DEVICE)

    tgt_input = tgt[:,:-1] # Getting the sentence except the EOS since EOS is never inputted to decoder

    # Getting the masks
    src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)
    logits = model(src,tgt_input, src_mask, tgt_mask, src_padding_mask,tgt_padding_mask,src_padding_mask) # memory is the encoder outputs
    tgt_out = tgt[:,1:]
    loss = loss_fn(logits.reshape(-1,logits.shape[-1]),tgt_out.reshape(-1))
    losses += loss.item()

  return losses / len(list(val_dataloader)) # Getting the average loss per example

In [None]:
# Training the model
NUM_EPOCHS = 5

for epoch in range(1,NUM_EPOCHS+1):
  start_time = timer()
  train_loss = train_epoch(transformer, optimizer)
  end_time = timer()
  val_loss = evaluate(transformer)
  print(f'Epoch: {epoch}, Train loss: {train_loss:.3f}, Val loss: {val_loss:.3f}')
  print(f'Epoch time: {(end_time - start_time):.3f}s')



Epoch: 1, Train loss: 8.633, Val loss: 8.743
Epoch time: 132.208s
Epoch: 2, Train loss: 7.844, Val loss: 8.927
Epoch time: 131.146s
Epoch: 3, Train loss: 6.660, Val loss: 9.081
Epoch time: 130.989s
Epoch: 4, Train loss: 5.932, Val loss: 9.217
Epoch time: 131.456s
Epoch: 5, Train loss: 5.661, Val loss: 9.231
Epoch time: 130.988s


In [None]:
# A function to generate the output sequence autoregressively using the greedy decoder algorithm
def greedy_decode(model, src, max_len, start_symbol):
  src = src.to(DEVICE)
  memory = model.encode(src.view(1,-1), None)
  memory = memory.to(DEVICE)
  ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)
  for i in range(max_len-1):
      out = model.decode(ys.view(1,-1), memory, None)
      prob = nn.functional.softmax(model.generator(out[:, -1]),dim=1)
      _, next_word = torch.max(prob, dim=1)
      next_word = next_word.item()

      ys = torch.cat([ys,torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=0)
      if next_word == EOS_IDX:
          break
  return ys

# Function for translation
def translate(model: torch.nn.Module, src_sentence: str):
  model.eval()
  src = text_transform[SRC_LANGUAGE](src_sentence).view(-1, 1)
  num_tokens = src.shape[0]
  tgt_tokens = greedy_decode(model,  src, max_len=num_tokens + 5, start_symbol=BOS_IDX).flatten()
  return " ".join(vocabularies[TGT_LANGUAGE].lookup_tokens(list(tgt_tokens.cpu().numpy()))).replace("<bos>", "").replace("<eos>", "")

In [None]:
# Trying to translate from English to Python
print(translate(transformer, "Give me a function to add 3 numbers."))

 isLucky(next_position isLucky(next_position isLucky(next_position zip(*test_dict.values y':5874 y':5874 y':5874 rel_tol rel_tol rel_tol d.append(a king king king king
