%pip install numpy==1.26.2

In [215]:
import os
import torch
import pandas as pd
from skimage import io, transform
import numpy as np
import matplotlib.pyplot as plt

from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# Ignore warnings
import warnings
warnings.filterwarnings("ignore")

import logging

debug = logging.getLogger("Debug")
info  = print
plt.ion()   # interactive mode

<contextlib.ExitStack at 0x296803be0>

In [216]:
#check GPU
device = None
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("Running CUDA Mode:", device, torch.cuda.get_device_name(0))
elif torch.backends.mps.is_available():
    device = torch.device("mps")
    print("Running MPS Mode:", device)
else:
    device = torch.device("cpu")
    print("Running CPU Mode:", device)



Running MPS Mode: mps


## Data and Classes
- Create Dataloader class

Note: Working on Part (a) as of now.  
Guiding light: https://pytorch.org/tutorials/intermediate/seq2seq_translation_tutorial.html

In [217]:
START_TOKEN = "<START>"
END_TOKEN = "<END>"
UNK_TOKEN = "<UNK>"
PAD_TOKEN = "<PAD>"
# MAX_EXAMPLES = 100
class Vocabulary:
    def __init__(self, freq_dict, wd_to_id, id_to_wd):
        self.freq_dict = freq_dict
        self.wd_to_id = wd_to_id
        self.id_to_wd = id_to_wd
        self.N = len(freq_dict)
    
    def get_id(self, word):
        if word in self.wd_to_id:
            return self.wd_to_id[word]
        else:
            return self.wd_to_id[UNK_TOKEN]

class LatexFormulaDataset(Dataset):
    """Latex Formula Dataset: Image and Text"""
    
    def __init__(self, csv_file, root_dir, max_examples=None, transform = None):
        """
        Arguments:
            csv_file (string): Path to the csv file with image name and text
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        #@TODO: May want to preload images
        info("Loading Dataset...")
        self.df = pd.read_csv(csv_file)
        info("Loaded Dataset", self.df.info)
        
        #Slice the dataset if max_examples is not None
        if max_examples is not None:
            self.df = self.df.iloc[:max_examples, :]

        self.root_dir = root_dir
        self.transform = transform

        self.df['formula'] = self.df['formula'].apply(lambda x: x.split())
        self.df['formula'] = self.df['formula'].apply(lambda x: [START_TOKEN] + x + [END_TOKEN])

        self.maxlen = 0
        for formula in self.df['formula']:
            if len(formula) > self.maxlen:
                self.maxlen = len(formula)

        
        self.df['formula'] = self.df['formula'].apply(lambda x: x +[UNK_TOKEN]*(self.maxlen - len(x)))

        self.vocab= self.construct_vocab() 

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        """
        Returns sample of type image, textformula
        """
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_name = os.path.join(self.root_dir,
                                self.df.iloc[idx, 0])
        image = io.imread(img_name)
        formula = self.df.iloc[idx, 1]
        formula = np.array([formula], dtype=str).reshape(-1, 1)
        formula = [self.vocab.get_id(wd[0]) for wd in formula]
        sample = {'image': image, 'formula': torch.tensor(formula, dtype=torch.int64)}

        if self.transform:
            sample['image'] = self.transform(sample['image'])
            
        return sample 
    
    def construct_vocab(self):
        """
        Constructs vocabulary from the dataset formulas
        """
        #Split on spaces to tokenize

        freq_dict = {}
        for formula in self.df['formula']:
            for wd in formula:
                if wd not in freq_dict:
                    freq_dict[wd] = 1
                else:
                    freq_dict[wd] += 1
        freq_dict[START_TOKEN] = 1
        freq_dict[END_TOKEN] = 1
        freq_dict[UNK_TOKEN] = 1
        N = len(freq_dict)
        wd_to_id = {}
        for i, wd in enumerate(freq_dict):
            wd_to_id[wd] = i
        id_to_wd = {v: k for k, v in wd_to_id.items()}
    
        #pad the formulas with 
        return Vocabulary(freq_dict, wd_to_id, id_to_wd)      

def get_dataloader(csv_path, image_root, batch_size, transform = None, max_examples = None):
    """
    Returns dataloader,dataset for the dataset
    """
    dataset = LatexFormulaDataset(csv_path, image_root, max_examples=max_examples,transform=transform) #checked
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    return dataloader, dataset
     

### Encoder Network
- A CNN to encode image to more meaningful vector

In [218]:
class EncoderCNN(nn.Module):
    def __init__(self):
        super().__init__()
    
        #@TODO:reduce number of layers: eliminate pools and acts
        self.conv1 = nn.Conv2d(3, 32, (5, 5))
        self.act1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d((2, 2))
        
        self.conv2 = nn.Conv2d(32, 64, (5, 5))
        self.act2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d((2, 2))
        
        self.conv3 = nn.Conv2d(64, 128, (5, 5))
        self.act3 = nn.ReLU()
        self.pool3 = nn.MaxPool2d((2, 2))
        
        self.conv4 = nn.Conv2d(128, 256, (5, 5))
        self.act4 = nn.ReLU()
        self.pool4 = nn.MaxPool2d((2, 2))
        
        self.conv5 = nn.Conv2d(256, 512, (5, 5))
        self.act5 = nn.ReLU()
        self.pool5 = nn.MaxPool2d((2, 2))
        
        self.avg_pool = nn.AvgPool2d((3, 3))
    
    def forward(self, x):
        x = self.act1(self.conv1(x))
        x = self.pool1(x)
        
        x = self.act2(self.conv2(x))
        x = self.pool2(x)
        
        x = self.act3(self.conv3(x))
        x = self.pool3(x)
        
        x = self.act4(self.conv4(x))
        x = self.pool4(x)
        
        x = self.act5(self.conv5(x))
        x = self.pool5(x)
        
        x = self.avg_pool(x)
        x = x.view(-1,512) 
        # info(f"Encoder Output Shape: {x.shape}")
        return x
    
class Decoder(nn.Module):
    """
    Inputs:
    (here M is whatever the batch size is passed)

    context_size : size of the context vector [shape: (1,M,context_size)]
    n_layers: number of layers [for our purposes, defaults to 1]
    hidden_size : size of the hidden state vectors [shape: (n_layers,M,hidden_size)]
    embed_size : size of the embedding vectors [shape: (1,M,embed_size)]
    vocab_size : size of the vocabulary
    max_length : maximum length of the formula
    """
    def __init__(self, context_size, vocab, n_layers = 1, hidden_size = 512, embed_size = 512,  max_length = 100):
        super().__init__()
        self.context_size = context_size
        self.vocab = vocab
        self.vocab_size = vocab.N
        self.n_layers = n_layers
        self.hidden_size = hidden_size
        self.embed_size = embed_size
        self.max_length = max_length
        self.input_size = context_size + embed_size

        self.embed = nn.Embedding(self.vocab_size, embed_size)
        self.lstm = nn.LSTMCell(self.input_size, self.hidden_size)
        self.linear = nn.Linear(hidden_size, self.vocab_size)
        self.softmax = nn.Softmax(dim = 1)
    
    def forward(self, context, target_tensor = None):
        """
        M: batch_size
        context is the context vector from the encoder [shape: (M,context_size)]
        target_tensor is the formula in tensor form [shape: (M,max_length)] (in the second dimension, it is sequence of indices of formula tokens)
            if target_tensor is not None, then we are in Teacher Forcing mode
            else normal jo bhi (last prediction ka embed is concatenated)
        """
        # info("Decoder Forward")
        # info(f"Context shape: {context.shape}")
        context.to(device)
        batch_size = context.shape[0]

        #initialize hidden state and cell state
            #@TODO: Some caveat in the size of the cell state. Should it be same as hidden_size? (check nn.LSTM documentation)
        hidden = context
        cell = torch.zeros((batch_size, self.hidden_size)).to(context.device)

        #initialize the input with embedding of the start token
        init_embed = self.embed(torch.tensor([self.vocab.wd_to_id[START_TOKEN]]).to(device)).reshape((1, self.embed_size))
        init_embed = torch.repeat_interleave(init_embed, batch_size, dim = 0).to(context.device)

        # info(f"Initial Embedding Shape: {init_embed.shape}")

        input = torch.cat([context, init_embed], dim = 1).to(context.device)

        #initialize the output_history and init_output
        outputs = []
        output = torch.zeros((batch_size, self.vocab_size)).to(context.device)
        
        
        for i in range(self.max_length):
            hidden, cell = self.lstm(input, (hidden, cell))
            output = self.linear(hidden)
            # output = self.softmax(output)
            outputs.append(output)
            
            #teacher forcing: 50% times
            r = torch.rand(1)
            if r>0.5 and target_tensor is not None:
                embedding = self.embed(target_tensor[:, i]).reshape((batch_size, self.embed_size)).to(context.device)
                input = torch.cat([context, embedding], dim = 1).to(context.device)              
            else:
                #add the embedding of the last prediction
                input = torch.cat([context, self.embed(torch.argmax(output, dim = 1))], dim = 1).to(context.device)
        # info(f"Outputs: {outputs}")
        return torch.stack(outputs).permute(1,0,2), hidden, cell

### Vocabulary
- https://github.com/harvardnlp/im2markup/blob/master

### Complete Network

In [219]:
class HandwritingToLatex1(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
    
    def forward(self, image, target_tensor = None):
        context = self.encoder(image)
        outputs, hidden, cell = self.decoder(context, target_tensor)
        return outputs

### Utility Functions

In [220]:
import time
import math
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import numpy as np
from tqdm import tqdm

plt.switch_backend('agg')
def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)
def timeSince(since, percent):
    now = time.time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return '%s (- %s)' % (asMinutes(s), asMinutes(rs))
def showPlot(points):
    plt.figure()
    fig, ax = plt.subplots()
    # this locator puts ticks at regular intervals
    loc = ticker.MultipleLocator(base=0.2)
    ax.yaxis.set_major_locator(loc)
    plt.plot(points)

### Training Code.
- Dataloader automatically loads in batches. The data need not be modified by us.

In [221]:
def train_epoch(dataloader,model, optimizer, criterion):
    total_loss = 0
    idx = 0
    for data in dataloader:
        idx+=1
        
        info(f"----Batch {idx}----")
        input_tensor, target_tensor = data['image'].to(device), data['formula'].to(device)
        optimizer.zero_grad()
        outputs = model(input_tensor, target_tensor)

        print(torch.argmax(outputs, dim=2).shape)

        train_dataset = dataloader.dataset

        if(train_dataset):
            generated_formula = [train_dataset.vocab.id_to_wd[token.item()] for token in torch.argmax(outputs, dim=2)[0]]
            required_formula = [train_dataset.vocab.id_to_wd[token.item()] for token in target_tensor[0]]
            print(f"Generated Formula: {' '.join(generated_formula)}")
            print(f"Required Formula: {' '.join(required_formula)}")

        #@TODO: Is the outputs thingy correct? because outputs are stacked outputs of the decoder??
        output_logits = outputs.permute(0,2,1)
        # print("BRO HI", output_logits.shape)
        loss = criterion(
            output_logits,
            target_tensor
        )
        loss.backward()
        optimizer.step() 

        total_loss += loss.item()

    return total_loss / len(dataloader)

def train(train_dataloader, model, n_epochs, learning_rate=0.001, print_every=1, plot_every=5):
    start = time.time()
    plot_losses = []
    print_loss_total = 0  # Reset every print_every
    plot_loss_total = 0  # Reset every plot_every

    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss(ignore_index=train_dataloader.dataset.vocab.wd_to_id[UNK_TOKEN]).to(device) #as stated in assignment

    
    # Print model's device
    # print("Encoder's device:", next(encoder.parameters()).device)

    pb = tqdm(range(1, n_epochs + 1))
    for epoch in pb:
        loss = train_epoch(train_dataloader, model, optimizer, criterion)
        print_loss_total += loss
        plot_loss_total += loss

        if epoch % print_every == 0:
            print_loss_avg = print_loss_total / print_every
            print_loss_total = 0
            
        if epoch % plot_every == 0:
            plot_loss_avg = plot_loss_total / plot_every
            plot_losses.append(plot_loss_avg)
            plot_loss_total = 0
        
        pb.set_description('%s (%d %d%%) %.4f' % (timeSince(start, epoch / n_epochs), epoch, epoch / n_epochs * 100, print_loss_avg))
        
    showPlot(plot_losses)

## Training

In [222]:

batch_size = 32
vocab_size = 1000
CONTEXT_SIZE = 512
HIDDEN_SIZE = 512
EMBED_SIZE = 512
MAX_EXAMPLES = 1000
# image processing
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224, 224)),
    transforms.Lambda(lambda x: x/255.0), #min-max normalisation
])

### Load Dataset and Dataloader

In [223]:
#part a
#train_csv_path = "/kaggle/input/converting-handwritten-equations-to-latex-code/col_774_A4_2023/SyntheticData/train.csv"
#image_root_path = "/kaggle/input/converting-handwritten-equations-to-latex-code/col_774_A4_2023/SyntheticData/images"
train_csv_path = "data/SyntheticData/train.csv"
image_root_path = "data/SyntheticData/images"
train_dataloader, train_dataset = get_dataloader(train_csv_path, image_root_path, batch_size, transform, max_examples=None)

Loading Dataset...
Loaded Dataset <bound method DataFrame.info of                 image                                            formula
0      74d337e8a0.png  $ \gamma _ { \Omega R , 5 } ^ { T } = - \gamma...
1      2d0f18f71d.png  $ l ^ { ( -- ) \underline { { m } } } u _ { \u...
2      6d9b9de88d.png  $ \left[ H , \gamma _ { i } ^ { \left( 2 \righ...
3      38c6d510bb.png  $ < a _ { i } > \; \propto \; \int _ { \omega ...
4      24537a86e3.png  $ \Psi ( \mu _ { 1 } , \ldots , \mu _ { K } ) ...
...               ...                                                ...
74995  1fa37e67d2.png  $ T _ { \theta } ^ { \theta } = - \frac { 1 } ...
74996  75518a26df.png  $ \alpha _ { + } = - 1 / \alpha _ { - } = \sqr...
74997  29f28cbc3a.png  $ d s ^ { 2 } = Z ^ { - 1 / 2 } \eta _ { \mu \...
74998  33ac7b385d.png  $ \tilde { H } _ { 0 } = \frac { 1 } { 2 } ( \...
74999  52672fbf76.png  $ \psi _ { \alpha \beta } = - g _ { \alpha \ga...

[75000 rows x 2 columns]>


### Create Model

In [224]:
#create a network instance
encoder = EncoderCNN().to(device)
decoder = Decoder(CONTEXT_SIZE, train_dataset.vocab, n_layers=1, hidden_size= HIDDEN_SIZE, embed_size=EMBED_SIZE,max_length=train_dataset.maxlen).to(device)
model = HandwritingToLatex1(encoder, decoder).to(device)

### Train

In [225]:
train(train_dataloader, model, 100)

  0%|          | 0/100 [00:00<?, ?it/s]

----Batch 1----
torch.Size([32, 629])
Target Tensor Shape: torch.Size([32, 629])
Dims:  torch.Size([32, 629, 549]) torch.Size([32, 629])
Generated Formula: \rightleftharpoons 6 6 \dagger \beta \hfill \hfill \Longrightarrow \hfill \pounds \right\Vert \left\{ 0.1 \bigr \omega \overwithdelims \framebox \framebox \framebox \left\vert \framebox \framebox \left\vert \cdots \ddag \framebox \d \end{array} \nu \d \d \end{array} \nu \ldots \footnotemark \acute \Rightarrow \bigr \omega \overwithdelims \framebox \circ \framebox \framebox \framebox \framebox \left\vert \cdots \framebox \varpi \mathbin \small \small \small \Downarrow P \left| \d \d \d \end{array} \d \d \end{array} \nu \d \d \end{array} \nu \ldots \footnotemark \acute \acute \d \end{array} \d \d \d \d \d \end{array} \d \d \end{array} \nu \d \end{array} \nu \ldots \d \d \d \d \d \d \framebox \framebox \framebox \framebox \varpi \left\vert \cdots \framebox \d \end{array} \d \end{array} \acute \right[ \d \end{array} \nu \d \end{array} Q