In [None]:
'''
Mount Google Drive, copy data to runtime, and unzip folders

Make sure to put a link to "EC 523 Project" in your main google drive!
'''

from google.colab import drive
drive.mount('/content/drive')

! cp /content/drive/'My Drive'/'EC 523 Project'/CompressedData/train.zip /content
! cp /content/drive/'My Drive'/'EC 523 Project'/CompressedData/test.zip /content
! cp /content/drive/'My Drive'/'EC 523 Project'/CompressedData/val.zip /content
! cp /content/drive/'My Drive'/'EC 523 Project'/data/math.txt /content

# from path will differ depending on where you saved the zip file in Google Drive
! unzip -DD -q  /content/train.zip -d  /content/
! unzip -DD -q  /content/test.zip -d  /content/
! unzip -DD -q  /content/val.zip -d  /content/



Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
'''
Length of datasets
'''
num_train_str = !ls train | wc -l
num_test_str = !ls test | wc -l
num_val_str = !ls val | wc -l
num_train = int(num_train_str[0])
num_test = int(num_test_str[0])
num_val = int(num_val_str[0])

print(f'Number of train images: {num_train}\nNumber of test images: {num_test}\nNumber of validation images: {num_val}\nTotal images: {num_train+num_test+num_val}')

Number of train images: 158480
Number of test images: 30637
Number of validation images: 6765
Total images: 195882


In [None]:
train_root = "/content/train/"
test_root = "/content/test/"
val_root = "/content/val/"
label_file = "/content/math.txt"

In [None]:
import os
import cv2
import torch.utils.data
from PIL import Image


class LatexDataset(torch.utils.data.Dataset):
  def __init__(self, transform=None, dataroot=train_root): # can change dataroot to be either train_root, test_root, val_root
        '''Initialize the dataset.'''
        self.transform = transform
        self.dataroot = dataroot
        self.labels_txt = label_file
        self._parse()

  def _parse(self):
        '''
        Parse the math.txt file.
        Populates the following private variables:
        - self.im_paths: A list of strings storing the associated image paths
        - self.labels: A list of strings, where each string is the latex code for an image
        '''
        def getImPath(idx):
            # Find image in either train, test, or validation folder
            imname = str(idx - 1).zfill(7) + '.png'
            if os.path.exists(f'{self.dataroot}{imname}'):
              impath = f'{self.dataroot}{imname}'
            else:
              return None

            try:
                Image.open(impath).verify()
            except Exception as e:
                # Some images can't be opened
                # print(f"Image at path {impath} is corrupted. Error: {e}")
                return None

            return impath

        self.im_paths = []
        self.labels = []

        with open(self.labels_txt) as f:
            for idx, line in enumerate(f):
                impath = getImPath(idx+1)

                if impath is not None:
                    self.im_paths.append(impath)            # Image name
                    self.labels.append(line.strip('\n'))    # String of latex code


  def __len__(self):
        '''Return length of the dataset.'''
        assert len(self.labels) == len(self.im_paths)
        return len(self.labels)

  def __getitem__(self, index):
        '''
        Return the (image, attributes) tuple.
        This function gets called when you index the dataset.
        '''
        def img_load(index):
            imraw = Image.open(self.im_paths[index])
            imgray = imraw.convert('L')                         # Convert image to greyscale
            imthresh = imgray.point(lambda p: p > 240 and 255)  # Threshold image to remove background (white)
            im = self.transform(imthresh)
            return im

        target = self.labels[index]
        return img_load(index), target

In [None]:
'''
Dictionary block: converts a LaTeX string to a dictionary of latex tokens, where
each unique token has its own entry and integer value assigned to it

'''
class LatexDict():
    def __init__(self, num_tokens=256):
        self.labels_txt = label_file
        self.num_tokens = num_tokens
        self.latex_dict = {'<UKN>':0, '<PAD>':1} # Initialize with token for unknown and for padding
        self.latex_dict_inverse = {0:'<UKN>', 1:'<PAD>'} # Initialize inverse dict for quicker reverse lookups
        self.create_dict()

    def create_dict(self):
        # Go through entire label file and populate dictionary
        with open(self.labels_txt) as f:
            for line in f:
                tokens = line.split()
                for token in tokens:
                    if token not in self.latex_dict:
                        # Assign a new ID for the unseen token
                        new_id = len(self.latex_dict)
                        self.latex_dict[token] = new_id
                        self.latex_dict_inverse[new_id] = token

    def map_tokens(self, tex_str_list, batch_size):
        ids_tensor = torch.full((batch_size, self.num_tokens), self.latex_dict['<PAD>'], dtype=torch.float32)

        for row, tex_str in enumerate(tex_str_list):
            tokens = tex_str.split()
            for col, token in enumerate(tokens):
                ids_tensor[row, col] = self.latex_dict[token]

        return ids_tensor

    def tokens_to_tex(self, token_vec):
        tex_str = ' '
        for token_id in token_vec.tolist():
            if token_id in self.latex_dict_inverse:
                if self.latex_dict_inverse[token_id] != '<PAD>' and self.latex_dict_inverse[token_id] != '<UKN>':
                    tex_str += self.latex_dict_inverse[token_id] + ' '

        return tex_str

    def __dict__(self):
        return self.latex_dict

    def __len__(self):
        return len(self.latex_dict)

# latex_dict = LatexDict()


Creating the CNN Block

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torch.optim as optim
import math

class CNN_Block(nn.Module):
    def __init__(self, output_dims):
        super(CNN_Block, self).__init__()
        # self.conv1 = nn.Conv2d(3, 64, 3)
        self.conv1 = nn.Conv2d(1, 64, 3, padding=1)    # Images are originally one channel, added padding as well
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(64, 128, 3, padding=1)
        self.pool2 = nn.MaxPool2d(2, 2)
        self.conv3 = nn.Conv2d(128, 256, 3, padding=1)
        self.pool3 = nn.MaxPool2d(2, 2)
        self.conv4 = nn.Conv2d(256, 256, 3, padding=1)
        self.pool4 = nn.MaxPool2d(2, 2)

        self.fc1 = nn.Linear(256 * 2 * 8, 1024)
        self.fc2 = nn.Linear(1024, output_dims)


    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = self.pool(F.relu(self.conv4(x)))

        x = x.view(x.size(0),-1)   # Flatten so this can be used in linear layers

        x = F.relu(self.fc1(x))
        x = self.fc2(x)

        return x


In [None]:
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, batch_size):
        super(LSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.batch_size = batch_size

        self.cell_state = None
        self.hidden_state = None

        #Input-gate parameters
        self.W_i = nn.Parameter(torch.zeros(self.num_layers, hidden_size, hidden_size + input_size, dtype=torch.double))
        self.b_i = nn.Parameter(torch.zeros(self.num_layers, hidden_size, 1, dtype=torch.double))
        #forget_gate parameters
        self.w_f = nn.Parameter(torch.zeros(self.num_layers, hidden_size, hidden_size + input_size, dtype=torch.double))
        self.b_f = nn.Parameter(torch.zeros(self.num_layers, hidden_size, 1, dtype=torch.double))
        #candidate parameters
        self.w_c = nn.Parameter(torch.zeros(self.num_layers, hidden_size, hidden_size + input_size, dtype=torch.double))
        self.b_c = nn.Parameter(torch.zeros(self.num_layers, hidden_size, 1, dtype=torch.double))
        #output gate parameters
        self.w_o = nn.Parameter(torch.zeros(self.num_layers, hidden_size, hidden_size + input_size, dtype=torch.double))
        self.b_o = nn.Parameter(torch.zeros(self.num_layers, hidden_size, 1, dtype=torch.double))

        self.init_weights()
        self.reset_LSTM_states(batch_size)

    def init_weights(self):
        stdv = 1.0 / math.sqrt(self.hidden_size)
        for weight in self.parameters():
            weight.data.uniform_(-stdv, stdv)

    def reset_LSTM_states(self, batch_size):
        self.cell_state = torch.zeros(self.num_layers, batch_size, self.hidden_size, dtype=torch.double)
        self.hidden_state = torch.zeros(self.num_layers, batch_size, self.hidden_size, dtype=torch.double)

    def forward(self, x):
        print(x.shape, self.hidden_state.shape)

        X_H = torch.cat((x, self.hidden_state), dim=2)
        input_update = torch.sigmoid(torch.matmul(self.W_i, X_H) + self.b_i)
        forget_update = torch.sigmoid(torch.matmul(self.w_f, X_H) + self.b_f)
        candidate_update = torch.tanh(torch.matmul(self.w_c, X_H) + self.b_c)
        self.cell_state = forget_update * self.cell_state + input_update * candidate_update
        output_update = torch.sigmoid(torch.matmul(self.w_o, X_H) + self.b_o)
        self.hidden_state = output_update * torch.tanh(self.cell_state)

        return self.hidden_state

In [None]:
class attention(nn.Module):
    def __init__(self, beta_size, hidden_size, v_length):
        super(attention, self).__init__()
        #weights for the hidden layer
        self.w_h = nn.Linear(hidden_size, beta_size, bias=False)
        #weights for the encoded image
        self.w_v = nn.Linear(v_length, beta_size, bias=False)
        #weights for the betas
        self.w_beta = nn.Parameter(torch.Tensor(beta_size))
        nn.init.uniform_(self.w_beta, -1e-2, 1e-2)

        self.init_weights()

    def init_weights(self):
        torch.nn.init.xavier_uniform_(self.w_h.weight)
        torch.nn.init.xavier_uniform_(self.w_v.weight)

    def forward(self, V_new, h_t):
        #Multiplication
        U_t = torch.tanh(self.w_h(h_t).unsqueeze(1) + self.w_v(V_new)) # [B, H' * W', C]      !! Changed W to w_v

        #activation + sum
        E_t = torch.sum(U_t * self.w_beta, dim=-1)

        #activation
        A_t = torch.softmax(E_t, dim = 1).unsqueeze(1)

        C_t = torch.matmul(A_t, V_new).squeeze(1)

        return C_t, A_t

In [None]:
def PositionalEmbedding2D(D_model,height,width):
  if D_model % 4 != 0:
        raise ValueError("Cannot use sin/cos positional encoding with "
                         "odd dimension (got dim={:d})".format(D_model))
  pe = torch.zeros(D_model,height,width)
  d_model = int(D_model / 2)
  div_term = torch.exp(torch.arange(0., d_model, 2) * -(math.log(10000.0) / d_model))
  pos_w = torch.arange(0., width).unsqueeze(1)
  pos_h = torch.arange(0., height).unsqueeze(1)
  pe[0:d_model:2, :, :] = torch.sin(pos_w * div_term).transpose(0, 1).unsqueeze(1).repeat(1, height, 1)
  pe[1:d_model:2, :, :] = torch.cos(pos_w * div_term).transpose(0, 1).unsqueeze(1).repeat(1, height, 1)
  pe[d_model::2, :, :] = torch.sin(pos_h * div_term).transpose(0, 1).unsqueeze(2).repeat(1, 1, width)
  pe[d_model + 1::2, :, :] = torch.cos(pos_h * div_term).transpose(0, 1).unsqueeze(2).repeat(1, 1, width)

  return pe

In [None]:
'''
Overall Model Class
'''

class Model(nn.Module):
    def __init__(self, max_tokens):
        super(Model, self).__init__()
        self.cnn = CNN_Block(output_dims=max_tokens)
        # self.lstm = LSTM(input_size=256, hidden_size=256, num_layers=1, batch_size=64)
        self.lstm = nn.LSTM(input_size=max_tokens, hidden_size=max_tokens, num_layers=1)
        self.attention = attention(beta_size=max_tokens, hidden_size=max_tokens, v_length=max_tokens)
        # self.attention = nn.MultiheadAttention(embed_dim=max_tokens, num_heads=4, batch_first=True)
        self.fc = nn.Linear(max_tokens, max_tokens)

    def forward(self, x):
        x = self.cnn(x)
        x, (h_n, c_n) = self.lstm(x)
        # x, _ = self.attention(x, x, x) # Broken !!
        x, h_t  = self.attention(x, h_n)
        # = self.fc(x)

        return x

In [None]:
'''
Initialize dataset and image preprocessing

NOTE:
    Some of the images in the dataset are corrupted. To deal with this,
    there is a check for each image to ensure that it can be loaded.
'''
import torchvision.transforms as transforms

reduced_imsize = (32, 128)  # Images are reduced to this size

# Define the transform pipeline - add normalization?
transform = transforms.Compose([
    transforms.Resize(reduced_imsize),
    transforms.ToTensor(),
])

train_dataset = LatexDataset(transform=transform, dataroot=train_root)
test_dataset = LatexDataset(transform=transform, dataroot=test_root)
val_dataset = LatexDataset(transform=transform, dataroot=val_root)

# Device settings
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'Device: {device}')

Device: cuda:0


In [None]:
'''
Initialize hyperperameters, trainloader, and dictionary of LaTeX token mappings
'''

# Hyperparameters
batch_size = 64
learning_rate = 0.001
# weight_decay = 0.00001  # (L2 penalty)

max_tokens = 1784        # Maximum number of tokens in a latex string

latex_dict = LatexDict(num_tokens=max_tokens)

trainloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True, num_workers=8)

# print(f'Dictionary length: {latex_dict.__len__()}')
# print(f'Dictionary: {latex_dict.__dict__()}')



In [None]:
'''
Initialize Model

Initialize Loss Functions:
1. Normal Cross-Entropy Loss between prediction and label
2. LaTeX compile test:
    - Custom function, returns True if code can compile into LaTeX, False if not

Initialize Optimizer:
1. Adam Optimizer
'''
model = Model(max_tokens=max_tokens).to(device)
# criterion = nn.CrossEntropyLoss()    ## May need to change this
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# FOR DEBUG
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

In [None]:
import random
from tqdm import tqdm


'''
Training Loop
'''

num_epoch = 10

for epoch in range(num_epoch):
    print('epoch:', epoch)
    pbar = tqdm(trainloader)
    for images, y in pbar:

        images = images.to(device)              # Send to gpu

        y_vec = latex_dict.map_tokens(y, batch_size=batch_size)
        y_vec = y_vec.to(device)                # Send to gpu

        predictions = model(images)             # Get predictions

        loss1 = criterion(predictions, y_vec)   # Calculate first loss function

        # loss2 = can_compile(precitions)         # Check if output can compile

        optimizer.zero_grad()

        loss1.backward()

        optimizer.step()

    # After each epoch, print five of the output strings - throws cuda side assert error
    # random_indeces = [random.randint(1, batch_size) for _ in range(5)]
    # print(predictions[random_indeces])
    # token_ints = torch.floor(predictions).to(torch.int32)
    # for i, token_str in enumerate(token_ints[random_indeces]):
    #     print(f'prediction: {latex_dict.tokens_to_tex(token_str)}\nlabel: {y_vec[i]}\n\n')




epoch: 0


  return F.mse_loss(input, target, reduction=self.reduction)
 40%|███▉      | 980/2474 [01:15<01:34, 15.88it/s]

In [None]:
# print(latex_dict.tokens_to_tex(predictions))
# import random
# print(predictions)

# random_indeces = [random.randint(1, batch_size) for _ in range(5)]
# token_ints = torch.floor(predictions).to(torch.int32)
# for token_str in token_ints[random_indeces]:
#     print(latex_dict.tokens_to_tex(token_str))

# print(token_ints[0])
# # print(token_ints[1])
# # print(token_ints[2])
# # print(token_ints[3])
# # print(token_ints[4])

# a = latex_dict.tokens_to_tex(token_ints[0])
# print(a )

####Test outputs here

$$
 B \lbrace r h q r \rbrace \alpha o \alpha \theta o \theta q o c 1 q 2 2 1 1 . s 0 s \} 0 = = t 4 ) 4 4 4 4 ) ) ( 3 \, + \{ \{ \{ \prime \frac + \frac \frac \frac \frac e \frac \prime \mathrm \prime \mathrm \zeta \zeta \mathrm e \mathrm \: d \: \: l d l \infty l l \infty d } ^ \infty ^ ^ ^ \infty ^ } ^ \epsilon ^ } \epsilon } \epsilon \epsilon } } \epsilon - - - \epsilon \epsilon - \epsilon - { - { - { - { - - { { { { { - { _ _ { { \int - \int _ { _ _ _ _ { _ _ _ _ _ \int _ _ _ _ { \int _ _ \int _ _ _ \int \int \int _ { _ \int _ \int \int \int _ \int \int \int _ \int \int \int \int \int \int \int \int \int \int \int \int _ \int \int \int \int \int \int \int \int \int \int \int \int \int \int \int \int \int \int \int \int \int \int \int \int \int \int \int
$$