%pip install numpy==1.26.2

In [47]:
import os
import torch
import pandas as pd
from skimage import io, transform
import numpy as np
import matplotlib.pyplot as plt

from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# Ignore warnings
import warnings
warnings.filterwarnings("ignore")

import logging

debug = logging.getLogger("Debug")
info  = print
plt.ion()   # interactive mode

<contextlib.ExitStack at 0x2fadf4490>

In [48]:
#check GPU
device = None
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("Running CUDA Mode:", device, torch.cuda.get_device_name(0))
elif torch.backends.mps.is_available():
    device = torch.device("mps")
    print("Running MPS Mode:", device)
else:
    device = torch.device("cpu")
    print("Running CPU Mode:", device)



Running MPS Mode: mps


## Data and Classes
- Create Dataloader class

Note: Working on Part (a) as of now.  
Guiding light: https://pytorch.org/tutorials/intermediate/seq2seq_translation_tutorial.html

In [49]:
START_TOKEN = "<START>"
END_TOKEN = "<END>"
UNK_TOKEN = "<UNK>"
PAD_TOKEN = "<PAD>"

class Vocabulary:
    def __init__(self, freq_dict, wd_to_id, id_to_wd):
        self.freq_dict = freq_dict
        self.wd_to_id = wd_to_id
        self.id_to_wd = id_to_wd
        self.N = len(freq_dict)
    
    def get_id(self, word):
        if word in self.wd_to_id:
            return self.wd_to_id[word]
        else:
            return self.wd_to_id[UNK_TOKEN]
        
    def decode(self, formula):
        """
        Input shape: (seq_len,)
        Output Shape: seq_len -> python list
        """
        # decoded = []
        # for id in formula:
        #     idx = id.item()
        #     if idx in self.id_to_wd:
        #         decoded.append(self.id_to_wd[idx])
        #     else:
        #         decoded.append(UNK_TOKEN)
        return " ".join([self.id_to_wd[idx.item()] for idx in formula])

class LatexFormulaDataset(Dataset):
    """Latex Formula Dataset: Image and Text"""
    
    def __init__(self, csv_file, root_dir, transform = None, max_examples = None, vocab=None):
        """
        Arguments:
            csv_file (string): Path to the csv file with image name and text
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        info("Loading Dataset...")
        self.df = pd.read_csv(csv_file)
        info("Loaded.")
        #info("Loaded Dataset", self.df.info)
        self.externVocab = vocab
        #Slice the dataset if max_examples is not None
        if max_examples is not None:
            self.df = self.df.iloc[:max_examples, :]

        self.root_dir = root_dir
        self.transform = transform

        self.df['formula'] = self.df['formula'].apply(lambda x: x.split())
        self.df['formula'] = self.df['formula'].apply(lambda x: [START_TOKEN] + x + [END_TOKEN])

        self.maxlen = 0
        for formula in self.df['formula']:
            if len(formula) > self.maxlen:
                self.maxlen = len(formula)
        
        # def convert_to_ids(formula):
        #     form2 = [self.vocab.get_id(wd) for wd in formula]
        #     return torch.tensor(form2, dtype=torch.int64)
        
        self.df['formula'] = self.df['formula'].apply(lambda x: x +[PAD_TOKEN]*(self.maxlen - len(x)))
        self.vocab= self.construct_vocab() 
        # self.df['formula'] = self.df['formula'].apply(convert_to_ids)
        

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        """
        Returns sample of type image, textformula
        """
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_name = os.path.join(self.root_dir,
                                self.df.iloc[idx, 0])
        image = io.imread(img_name)
        formula = self.df.iloc[idx, 1]

        # formula = np.array([formula], dtype=str).reshape(-1, 1)
        # formula = [self.vocab.get_id(wd[0]) for wd in formula]
        def convert_to_ids(formula):
            vocab = self.vocab
            if self.externVocab is not None:
                vocab = self.externVocab
            else:
                vocab = self.vocab
            form2 = [vocab.get_id(wd) for wd in formula]
            return torch.tensor(form2, dtype=torch.int64)
        
        formula = convert_to_ids(formula)

        sample = {'image': image, 'formula': formula}

        if self.transform:
            sample['image'] = self.transform(sample['image'])
            
        return sample 
    
    def construct_vocab(self):
        """
        Constructs vocabulary from the dataset formulas
        """
        #Split on spaces to tokenize
        freq_dict = {}
        for formula in self.df['formula']:
            for wd in formula:
                if wd not in freq_dict:
                    freq_dict[wd] = 1
                else:
                    freq_dict[wd] += 1
        freq_dict[UNK_TOKEN] = 1
        N = len(freq_dict)
        wd_to_id = {}
        for i, wd in enumerate(freq_dict):
            wd_to_id[wd] = i
        #make id_to_wd a simple list
        id_to_wd = [None]*N
        for wd in wd_to_id:
            id_to_wd[wd_to_id[wd]] = wd
    
        #pad the formulas with 
        return Vocabulary(freq_dict, wd_to_id, id_to_wd)      

def get_dataloader(csv_path, image_root, batch_size, transform = None, max_examples = None, shuffle=True, externVocab=None):
    """
    Returns dataloader,dataset for the dataset
    """
    dataset = LatexFormulaDataset(csv_path, image_root, max_examples=max_examples,transform=transform, vocab=externVocab) #checked
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)
    return dataloader, dataset
 

## Encoder Network
- A CNN to encode image to more meaningful vector

In [50]:
class EncoderCNN(nn.Module):
    def __init__(self):
        super().__init__()
    
        #@TODO:reduce number of layers: eliminate pools and acts
        self.conv1 = nn.Conv2d(3, 32, (5, 5))       
        self.conv2 = nn.Conv2d(32, 64, (5, 5))
        self.conv3 = nn.Conv2d(64, 128, (5, 5))        
        self.conv4 = nn.Conv2d(128, 256, (5, 5))        
        self.conv5 = nn.Conv2d(256, 512, (5, 5))
        
        self.pool = nn.MaxPool2d((2, 2))
        self.avg_pool = nn.AvgPool2d((3, 3))
    
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        
        x = F.relu(self.conv3(x))
        x = self.pool(x)
        
        x = F.relu(self.conv4(x))
        x = self.pool(x)
        
        x = F.relu(self.conv5(x))
        x = self.pool(x)
        
        x = self.avg_pool(x)
        x = x.view(-1,512) 
        # info(f"Encoder Output Shape: {x.shape}")
        return x
    
class DecoderLSTM(nn.Module):
    """
    Inputs:
    (here M is whatever the batch size is passed)

    context_size : size of the context vector [shape: (1,M,context_size)]
    n_layers: number of layers [for our purposes, defaults to 1]
    hidden_size : size of the hidden state vectors [shape: (n_layers,M,hidden_size)]
    embed_size : size of the embedding vectors [shape: (1,M,embed_size)]
    vocab_size : size of the vocabulary
    max_length : maximum length of the formula
    """
    def __init__(self, context_size, vocab, max_seq_len, n_layers = 1, hidden_size = 512, embed_size = 512):
        super().__init__()
        self.context_size = context_size
        self.vocab = vocab
        self.vocab_size = vocab.N
        self.n_layers = n_layers
        self.hidden_size = hidden_size
        self.embed_size = embed_size
        self.input_size = context_size + embed_size
        self.max_seq_len = max_seq_len

        self.embed = nn.Embedding(self.vocab_size, embed_size)
        self.lstm = nn.LSTMCell(self.input_size, self.hidden_size)
        self.linear = nn.Linear(hidden_size, self.vocab_size)
    
    def forward(self, context, target_tensor = None):
        """
        M: batch_size
        context is the context vector from the encoder [shape: (M,context_size)]
        target_tensor is the formula in tensor form [shape: (M,max_length)] (in the second dimension, it is sequence of indices of formula tokens)
            if target_tensor is not None, then we are in Teacher Forcing mode
            else normal jo bhi (last prediction ka embed is concatenated)
        """
        context.to(device)
        batch_size = context.shape[0]

        #initialize hidden state and cell state
        hidden = context
        cell = torch.zeros(batch_size, self.hidden_size).to(device)

        #initialize the input with embedding of the start token. Expand for batch size.
        init_embed = self.embed(torch.tensor([self.vocab.wd_to_id[START_TOKEN]]).to(device).expand(batch_size, -1)).squeeze()
        
        #initialize the output_history and init_output
        outputs = []
        output = torch.zeros((batch_size, self.vocab_size)).to(device)
        
        
        for i in range(self.max_seq_len):
            #teacher forcing: 50% times
            r = torch.rand(1)
            if r>0.5 and target_tensor is not None:
                if i==0 :
                    embedding = init_embed
                else: 
                    embedding = self.embed(target_tensor[:, i-1]).reshape((batch_size, self.embed_size)).to(device)            
            else:
                if i==0 :
                    embedding = init_embed
                else:
                    #create embedding from previous input
                    embedding = self.embed(torch.argmax(output, dim = 1))

            lstm_input = torch.cat([context, embedding], dim = 1).to(device)
    
            hidden, cell = self.lstm(lstm_input, (hidden, cell))
            output = self.linear(hidden)
            outputs.append(output)
            
        output_tensor = torch.stack(outputs).permute(1,0,2) #LBV - > BLV

        return output_tensor, hidden, cell

### Vocabulary
- https://github.com/harvardnlp/im2markup/blob/master

### Complete Network

In [51]:
class HandwritingToLatexModel(nn.Module):
    def __init__(self, context_size, vocab, max_seq_len, n_layers, hidden_size, embed_size):
        super().__init__()
        self.encoder = EncoderCNN()
        self.decoder = DecoderLSTM(context_size, vocab, max_seq_len, n_layers, hidden_size, embed_size)
    
    def forward(self, image, target_tensor = None):
        context = self.encoder(image)
        outputs, hidden, cell = self.decoder(context, target_tensor)
        return outputs

### Utility Functions

In [52]:
import time
import math
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import numpy as np
from tqdm import tqdm

plt.switch_backend('agg')
def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)
def timeSince(since, percent):
    now = time.time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return '%s (- %s)' % (asMinutes(s), asMinutes(rs))
    
def saveModel(save_path, model_state, optimiser_state, loss):
    torch.save({
            'model_state_dict': model_state,
            'optimizer_state_dict':optimiser_state,
            'loss': loss,  
    }, save_path)

### Training Code.
- Dataloader automatically loads in batches. The data need not be modified by us.

In [53]:
#part a
train_csv_path = "data/SyntheticData/train.csv"
image_root_path = "data/SyntheticData/images/"
train_dataloader, train_dataset = get_dataloader(train_csv_path, image_root_path, 32, transform, max_examples=None)

Loading Dataset...
Loaded.


In [54]:
def generated_formula(output, vocab):
    """
    output: [shape: (Max_length,vocab_size)]
    """
    output = torch.argmax(output, dim = 1)
    output = output.tolist()
    formula = ' '.join([vocab.id_to_wd[id] for id in output])
    return formula

In [61]:
def train_epoch(dataloader,model, optimizer, criterion, vocab=None):
    total_loss = 0
    idx = 0
    pb = tqdm(dataloader, desc="Batch")
    for data in pb:
        idx+=1
        input_tensor, target_tensor = data['image'].to(device), data['formula'].to(device)
        outputs = model(input_tensor, target_tensor)
        if(train_dataset and idx%150==0):
            generated_formula = [train_dataset.vocab.id_to_wd[token.item()] for token in torch.argmax(outputs, dim=2)[0]]
            required_formula = [train_dataset.vocab.id_to_wd[token.item()] for token in target_tensor[0]]
            print(f"Generated: {' '.join(generated_formula)}")
            print(f"Actual: {' '.join(required_formula)}")

        output_logits = outputs.permute(0,2,1)
        
        loss = criterion(
            output_logits,
            target_tensor
        )
        
        #backprop
        optimizer.zero_grad()
        loss.backward()
        optimizer.step() 

        total_loss += loss.item()
        pb.set_description(f"Loss: {loss.item()}")

    return total_loss / len(dataloader)

def train(train_dataloader, model, n_epochs, optimizer = None, learning_rate=0.001, print_every=1, save_interval=2, save_prefix = 'model'):
    start = time.time()
    plot_losses = []
    print_loss_total = 0  # Reset every print_every

    if not optimizer: optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss(ignore_index=train_dataloader.dataset.vocab.wd_to_id[PAD_TOKEN]).to(device) #as stated in assignment

    model.train()
    for epoch in range(1, n_epochs + 1):
        info(f"Epoch {epoch}")
        loss = train_epoch(train_dataloader, model, optimizer, criterion)
        print_loss_total += loss

        if epoch % print_every == 0:
            print_loss_avg = print_loss_total / print_every
            print_loss_total = 0
    
        if epoch % save_interval == 0:
            saveModel(f'/kaggle/working/{save_prefix}_epoch_{epoch}.pt', model.state_dict(), optimizer.state_dict(), loss)    
                
        info('%s (%d %d%%) %.4f' % (timeSince(start, epoch / n_epochs), epoch, epoch / n_epochs * 100, print_loss_avg))

## Training

In [62]:

batch_size = 32
vocab_size = 1000
CONTEXT_SIZE = 512
HIDDEN_SIZE = 512
EMBED_SIZE = 512
MAX_EXAMPLES = 1000
# image processing
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Grayscale(num_output_channels=3),
    transforms.ToTensor(),
    transforms.Resize((224, 224)),
])

### Load Dataset and Dataloader

In [63]:
#part a
hw_train_csv_path = "data/HandwrittenData/train_hw.csv"
hw_image_root_path = "data/HandwrittenData/images/train/"
hw_train_dataloader, hw_train_dataset = get_dataloader(hw_train_csv_path, hw_image_root_path, batch_size, transform, max_examples=None, externVocab = train_dataset.vocab)

Loading Dataset...
Loaded.


### Create Model

In [64]:
#create a network instance
# model = HandwritingToLatexModel(CONTEXT_SIZE, train_dataset.vocab, n_layers=1, hidden_size= HIDDEN_SIZE, embed_size=EMBED_SIZE, max_seq_len=train_dataset.maxlen).to(device)

### Train

In [65]:
def load_from_checkpoint(checkpoint_path, max_len):
    model_dicts = torch.load(checkpoint_path, map_location=device)
    
    model = HandwritingToLatexModel(CONTEXT_SIZE, train_dataset.vocab, n_layers=1, hidden_size= HIDDEN_SIZE, embed_size=EMBED_SIZE, max_seq_len=max_len).to(device)
    model.load_state_dict(model_dicts['model_state_dict'])
    
    optims = torch.optim.Adam(model.parameters(), lr=0.001)
    optims.load_state_dict(model_dicts['optimizer_state_dict'])

    return model, optims

# model, optim = load_from_checkpoint("checkpoints/model2.0+12_epoch_6.pt")

#resume training from checkpoints
# losses = train(train_dataloader, model, 20, optimizer = optim, save_interval=2, save_prefix = 'model2.0+18')


In [66]:
model_load, optims_load = load_from_checkpoint("checkpoints/model3.0_epoch_24.pt", hw_train_dataset.maxlen)

## Fine Tuning

In [67]:
train(hw_train_dataloader, model_load, 30, optimizer = optims_load, save_interval=4, save_prefix = 'fineTuned')

Epoch 1


Loss: 0.788834273815155:   8%|▊         | 22/282 [00:25<05:02,  1.16s/it] 


KeyboardInterrupt: 

## BLEU

In [None]:
import math
from nltk import word_tokenize
from collections import Counter
from nltk.util import ngrams


class BLEU(object):
    @staticmethod
    def compute(candidate, references, weights):
        candidate = [c.lower() for c in candidate]
        references = [[r.lower() for r in reference] for reference in references]

        p_ns = (BLEU.modified_precision(candidate, references, i) for i, _ in enumerate(weights, start=1))
        s = math.fsum(w * math.log(p_n) for w, p_n in zip(weights, p_ns) if p_n)

        bp = BLEU.brevity_penalty(candidate, references)
        return bp * math.exp(s)

    @staticmethod
    def modified_precision(candidate, references, n):
        counts = Counter(ngrams(candidate, n))

        if not counts:
            return 0

        max_counts = {}
        for reference in references:
            reference_counts = Counter(ngrams(reference, n))
            for ngram in counts:
                max_counts[ngram] = max(max_counts.get(ngram, 0), reference_counts[ngram])

        clipped_counts = dict((ngram, min(count, max_counts[ngram])) for ngram, count in counts.items())

        return sum(clipped_counts.values()) / sum(counts.values())
    
    @staticmethod
    def brevity_penalty(candidate, references):
        c = len(candidate)
        # r = min(abs(len(r) - c) for r in references)
        r = min(len(r) for r in references)

        if c > r:
            return 1
        else:
            return math.exp(1 - r / c)
      

In [None]:
test_csv_path = "data/SyntheticData/test.csv"
image_root_path = "data/SyntheticData/images/"
test_dataloader, test_dataset = get_dataloader(test_csv_path, image_root_path, 32, transform, max_examples=None, shuffle=False)

Loading Dataset...
Loaded.


In [None]:
model_loaded, _ = load_from_checkpoint("checkpoints/model3.0+6_epoch_12.pt")

In [None]:
#testing the model
import time

model_loaded.eval()
predictions = []
gts = []
pb = tqdm(test_dataloader, total=len(test_dataloader))
with torch.no_grad():
    for batch in pb:
        input_tensor = batch['image'].to(device)
        target_tensor = batch['formula'].to(device)
        outputs = model_loaded(input_tensor)
        outputs_final = torch.argmax(outputs, dim=2)
        for j in range(len(outputs)):
            # generated_formula = [train_dataset.vocab.id_to_wd[token.item()] for token in outputs_final[j]]
            generated_formula = train_dataset.vocab.decode(outputs_final[j])
            # required_formula = [test_dataloader.dataset.vocab.id_to_wd[token.item()] for token in target_tensor[j]]
            required_formula = test_dataloader.dataset.vocab.decode(target_tensor[j])
            predictions.append(generated_formula)
            gts.append(required_formula)


  0%|          | 0/279 [00:04<?, ?it/s]


KeyboardInterrupt: 

In [None]:
N = len(predictions)
totalBleu = 0
for i in len(predictions):
    gen_formula = predictions[i]
    req_formula = gts[i]
    gen_trim = []
    for tken in gen_formula:
        if tken == END_TOKEN:
            break
        gen_trim.append(tken)
    req_trim = []
    for tken in req_formula:
        if tken == END_TOKEN:
            break
        req_trim.append(tken)
    gen_formula = gen_trim[1:]
    req_formula = req_trim[1:]
    bleu = BLEU.compute(gen_formula, req_formula, [1/4, 1/4, 1/4, 1/4])
    totalBleu += bleu

print(f"Macro Bleu Score: {totalBleu/N}")