## INIT

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
import json
from PIL import Image
import os

# Check if GPU is available, otherwise use CPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Paths
image_folder_path = "LaTex_data/generated_png_images"
mapping_path = "image_formula_mapping.json"
label_to_index_path = "LaTex_data/230k.json"

folders = ["split_1", "split_2", "split_3", "split_4", "split_5", 
		   "split_6", "split_7", "split_8", "split_9", "split_10"]

with open(mapping_path, 'r') as f:
	image_formula_mapping = json.load(f)
keys = list(image_formula_mapping.keys())


KeyboardInterrupt: 

In [12]:
print(torch.cuda.get_device_name(0))
# print(len(os.listdir('LaTex_data/split_1')))
# print(image_formula_mapping['0002475406d9932.png'])

NVIDIA GeForce RTX 3050 Ti Laptop GPU


## Load Data


In [4]:
class LaTeXDataset(Dataset):
    def __init__(self, image_folder, mapping_file, label_to_index_file, transform=None, max_images=None):
        self.image_folder = image_folder
        self.transform = transform

        # Load mappings and label-to-index dictionary
        with open(mapping_file, 'r') as f:
            self.image_formula_mapping = json.load(f)
        with open(label_to_index_file, 'r') as f:
            self.label_to_index = json.load(f)

        # Apply the image count limit if specified
        # if max_images:
        #     self.image_formula_mapping = dict(list(self.image_formula_mapping.items())[:max_images])

        self.index_to_label = {v: k for k, v in self.label_to_index.items()}
        self.vocab_size = len(self.label_to_index)
        self.formulas = list(self.image_formula_mapping.values())
        self.image_files = [f for f in os.listdir(image_folder) ]

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        image_name = self.image_files[idx]
        formula = self.image_formula_mapping[str(image_name)]

        # Load image
        image_path = os.path.join(self.image_folder, image_name)
        image = Image.open(image_path).convert("RGB")
        if self.transform:
            image = self.transform(image)

        # Tokenize the formula into indices
        formula_tokens = '<S> ' + formula + ' <E>'
        formula_tokens = formula.split()  # Splitting the formula string by whitespace
        
        formula_indices = []
        for token in formula_tokens:
            # Map each token to its index; if not found, use a default index (e.g., 0)
            index = self.label_to_index.get(token, 0)  # Assuming 0 is for unknown tokens
            formula_indices.append(int(index))
        
        # Convert the list of indices to a 1D tensor
        return image, torch.tensor(formula_indices, dtype=torch.long)


## Encoder / Decoder

In [None]:
class EncoderCNN(nn.Module):
	def __init__(self):
		super(EncoderCNN, self).__init__()
		conv_tiny = models.convnext_tiny(pretrained=True)

		unlock_section = 'features.4'
		for name, param in conv_tiny.named_parameters():
			if name.startswith(unlock_section):
				break
			
			param.requires_grad = False

		# Remove the final classification layer
		self.conv_tiny = nn.Sequential(*list(conv_tiny.children())[:-1])
		
		# connect directly to embed part
		# Add a fully connected layer to match the desired feature_dim
		# self.fc = nn.Linear(efficientnet.classifier[1].in_features, feature_dim)

	def forward(self, images):
		# Shape: [batch_size, feature_dim, 1, 1] = torch.Size([32, 768, 1, 1])
		features = self.conv_tiny(images)

		features = features.view(features.size(0), -1)  # Flatten to [batch_size, feature_dim]
		# features = self.fc(features)
		return features

class DecoderRNN(nn.Module):
	def __init__(self, embedding_dim, hidden_dim, vocab_size):
		super(DecoderRNN, self).__init__()
		self.embedding = nn.Embedding(vocab_size, embedding_dim)
		self.gru = nn.GRU(embedding_dim, hidden_dim, batch_first=True)
		self.fc = nn.Linear(hidden_dim, vocab_size)

	def forward(self, features, formulas):
		# Embed the input formula tokens
		embeddings = self.embedding(formulas)

		# Concatenate features and embeddings along the sequence dimension
		embeddings = torch.cat((features.unsqueeze(1), embeddings), dim=1)
		
		# Pass through GRU and then through the final linear layer
		gru_out, _ = self.gru(embeddings)
		outputs = self.fc(gru_out)
		return outputs

class ImageToLaTeXModel(nn.Module):
	def __init__(self, encoder, decoder):
		super(ImageToLaTeXModel, self).__init__()
		self.encoder = encoder
		self.decoder = decoder

	def forward(self, images, formulas):
		# Encode the images
		features = self.encoder(images)  # Shape: [batch_size, feature_dim]
		
		# Decode to generate the LaTeX expression
		outputs = self.decoder(features, formulas[:, :-1])  # Skip the end token
		return outputs


In [6]:
from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    # Separate images and formulas from the batch
    images, formulas = zip(*batch)

    # Stack images (assumes images are already the same size after transforms)
    images = torch.stack(images)

    # Pad formulas to the length of the longest formula in the batch
    formulas = pad_sequence(formulas, batch_first=True, padding_value=2)  

    return images.to(device), formulas.to(device)


## Save / Load model

In [7]:
import torch

# Assuming 'model' is your model and 'optimizer' is your optimizer
def save_model(model, optimizer, epoch, loss, filename='model.pth'):
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': loss,
    }, filename)

def load_model(model, optimizer, filename='model.pth'):
    checkpoint = torch.load(filename)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    epoch = checkpoint['epoch']
    loss = checkpoint['loss']
    return epoch, loss

In [None]:
# Hyperparameters
EMBED_SIZE = 768 # direct output dim from cv_tiny

hidden_size = 1024
num_epochs = 5
learning_rate = 0.003
batch_size = 32

# Image preprocessing
transform = transforms.Compose([
	transforms.Resize((224, 224)), # input dim of conv_next_tiny
	transforms.ToTensor(),
	transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

# Load dataset and dataloader


dataset = LaTeXDataset("LaTex_data/split_1" , mapping_path, label_to_index_path, transform)

# Model, loss, and optimizer
encoder = EncoderCNN().to(device)
decoder = DecoderRNN(EMBED_SIZE, hidden_size, dataset.vocab_size).to(device)
model = ImageToLaTeXModel(encoder, decoder).to(device)
criterion = nn.CTCLoss(blank=1, reduction='mean', zero_infinity=True)
optimizer = optim.Adam(model.parameters())
start_epoch = 0

# Function to save model state
def save_training_state(model, optimizer, epoch, folder_idx, loss):
	state = {
		'model_state_dict': model.state_dict(),
		'optimizer_state_dict': optimizer.state_dict(),
		'epoch': epoch,
		'folder_idx': folder_idx,
		'loss': loss
	}
	torch.save(state, 'model_checkpoint.pth')

# Function to load model state
def load_training_state(model, optimizer):
	checkpoint = torch.load('model_checkpoint.pth')
	model.load_state_dict(checkpoint['model_state_dict'])
	optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
	return checkpoint['epoch'], checkpoint['folder_idx'], checkpoint['loss']

# Try to resume from a checkpoint
try:
	start_epoch, start_folder_idx, last_loss = load_training_state(model, optimizer)
	print(f"Resuming training from folder {start_folder_idx+1}, epoch {start_epoch}, with loss {last_loss:.4f}")
except FileNotFoundError:
	print("No saved model found, starting fresh.")
	start_epoch = 0
	start_folder_idx = 0

# Training loop
for i in range(6):
	for folder_idx in range(start_folder_idx, len(folders)):
		print(f"Training on folder: {folders[folder_idx]}")
		dataset = LaTeXDataset("LaTex_data/" + folders[folder_idx], mapping_path, label_to_index_path, transform)
		dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)

		for epoch in range(start_epoch, num_epochs):
			for i, data in enumerate(dataloader):
				images, formulas = data
				targets = formulas[:, 1:]

				outputs = model(images, formulas[:, :-1])
				outputs = outputs.log_softmax(2)  # Apply log_softmax for CTC Loss

				loss = criterion(outputs, targets, input_lengths, target_lengths)
				# loss = criterion(outputs.view(-1, dataset.vocab_size), targets.contiguous().view(-1))

				optimizer.zero_grad()
				loss.backward()
				optimizer.step()

				if i % 100 == 0:
					print(f"Folder [{folder_idx+1}/{len(folders)}], Epoch [{epoch+1}/{num_epochs}], Step [{i}/{len(dataloader)}], Loss: {loss.item():.4f}")

				# Save model periodically and at the end of each folder
				if i % 200 == 0 or (i == len(dataloader) - 1):
					save_training_state(model, optimizer, epoch, folder_idx, loss.item())
			
			# Reset start_epoch for next folder
			start_epoch = 0
		start_folder_idx = 0
		start_epoch = 0



  checkpoint = torch.load('model_checkpoint.pth')


Resuming training from folder 1, epoch 0, with loss 1.0686
Training on folder: split_1
Folder [1/10], Epoch [1/5], Step [0/733], Loss: 0.9892
Folder [1/10], Epoch [1/5], Step [100/733], Loss: 0.5882
Folder [1/10], Epoch [1/5], Step [200/733], Loss: 1.1851
Folder [1/10], Epoch [1/5], Step [300/733], Loss: 1.1410
Folder [1/10], Epoch [1/5], Step [400/733], Loss: 0.8273
Folder [1/10], Epoch [1/5], Step [500/733], Loss: 1.0690
Folder [1/10], Epoch [1/5], Step [600/733], Loss: 0.8181
Folder [1/10], Epoch [1/5], Step [700/733], Loss: 0.4615
Folder [1/10], Epoch [2/5], Step [0/733], Loss: 0.8759
Folder [1/10], Epoch [2/5], Step [100/733], Loss: 0.6576
Folder [1/10], Epoch [2/5], Step [200/733], Loss: 0.6862
Folder [1/10], Epoch [2/5], Step [300/733], Loss: 0.7525
Folder [1/10], Epoch [2/5], Step [400/733], Loss: 0.6072
Folder [1/10], Epoch [2/5], Step [500/733], Loss: 0.4458
Folder [1/10], Epoch [2/5], Step [600/733], Loss: 0.7897
Folder [1/10], Epoch [2/5], Step [700/733], Loss: 1.0050
Folde

KeyboardInterrupt: 

In [14]:
def decode_formula(indices, index_to_label):
    return ' '.join([index_to_label[str(i.item())] for i in indices if i.item() and str(i.item()) != '2'])  # Skip padding


def validate_model(model, dataloader, criterion, device, index_to_label):
    model.eval()  # Set the model to evaluation mode
    total_loss = 0.0
    correct_predictions = 0
    total_samples = 0

    with torch.no_grad():  # Disable gradient calculation
        for images, formulas in dataloader:
            images, formulas = images.to(device), formulas.to(device)
            outputs = model(images, formulas[:, :-1])  # Pass images and input sequence

            # Calculate loss
            loss = criterion(outputs.view(-1, outputs.size(-1)), formulas[:, 1:].contiguous().view(-1))
            total_loss += loss.item()

            # Calculate accuracy (if applicable)
            predicted_indices = torch.argmax(outputs, dim=2)  # Get the index of the max log-probability
            correct_predictions += (predicted_indices == formulas[:, 1:].contiguous()).sum().item()
            total_samples += formulas[:, 1:].numel()  # Total number of tokens in the validation batch

            # Print images and predictions
            for i in range(len(images)):
                # Decode the actual and predicted formulas
                actual_formula = decode_formula(formulas[i, 1:], index_to_label)  # Skip <S> token
                predicted_formula = decode_formula(predicted_indices[i, 1:], index_to_label)  # Skip <S> token
                # print(f'Image: {images[i]}')  # This will print the tensor, consider using visualization instead
                print(f'Actual Formula: {actual_formula}')
                print(f'Predicted Formula: {predicted_formula}')
                print('-' * 50)

    avg_loss = total_loss / len(dataloader)
    accuracy = correct_predictions / total_samples if total_samples > 0 else 0.0

    return avg_loss, accuracy
# Assuming you have your model, dataloader, criterion, and device set up
# Assuming 230k.json is loaded as label_to_index













# model = load_model(model, optimizer)

val_dataset = LaTeXDataset('LaTex_data/split_1', mapping_path, label_to_index_path, transform)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)  # Set shuffle to False for validation
label_to_index = json.load(open(label_to_index_path, 'r'))
index_to_label = {v: k for k, v in label_to_index.items()}  # Reverse the mapping

val_loss, val_accuracy = validate_model(model, val_dataloader, criterion, device, index_to_label)
print(val_accuracy)
save_model(model, optimizer, epoch + 1, val_loss, 'model.pth')

Actual Formula: _ { a } \left( z \right) = \int _ { 0 } ^ { \infty } \mathrm { d } t \, \mathrm { c o s h } \left( a \, t \right) \mathrm { e } ^ { - z \, \mathrm { c o s h } \left( t \right) } ,
Predicted Formula: { i } } { \right) = \left( _ { 0 } ^ { \infty } d z d } a { \left( { t r s s } t { { { { { { d } ^ { - s } } } d } s } } } } \right) \right)
--------------------------------------------------
Actual Formula: [ A ] = { \frac { k } { 4 \pi } } \int _ { \Sigma } \mathrm { T r } \left[ A \wedge d A + \frac { 2 } { 3 } A \wedge A \wedge A \right]
Predicted Formula: { _ { \int d { 1 } { 2 \pi } } \int d { 0 } d ^ d r } \Bigl { d { A + { { 1 } { 3 } A A { A { A A
--------------------------------------------------
Actual Formula: _ { Q } = \int d ^ { 2 } x \, [ - \nabla ^ { m } \phi _ { + } \nabla _ { m } \phi _ { + } + \nabla ^ { m } \phi _ { - } \nabla _ { m } \phi _ { - } - m ^ { 2 } ( \phi _ { + } + \phi _ { - } ) ^ { 2 } ] .
Predicted Formula: { \mathrm } ( { d ^ { 2 } x \sqrt 

KeyboardInterrupt: 