## Generating Recipe Title from Food Image (BLIP)

## Import Libraries

In [3]:
import json
from PIL import Image
import numpy as np
import os
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoProcessor, BlipForConditionalGeneration
from difflib import SequenceMatcher
import random  
from tqdm import tqdm

### Datset Class

In [4]:
class CustomDataset(Dataset):
    """Summary: Custom Dataset for loading images and titles together
    Args:
        data_list (list): list of dictionaries containing image and title
        processor (transformers.Processor): Processor for encoding the data
    """
    def __init__(self, data_list, processor):
        """Summary: Constructor for CustomDataset
        Args:
            data_list (list): list of dictionaries containing image and title
            processor (transformers.Processor): Processor for encoding the data
        """
        self.data = data_list
        self.processor = processor

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        """Summary: Returns the encoding of the image and title at the given index
        Args:
            index (int): index of the data
            Returns:
                encoding (dict): dictionary containing the encoding of the image and title
        """
        item = self.data[index]
        image = item['image']
        title = item['title']
        id = item["id"]
        
        image_obj = Image.open(f"train/{item['id']}/{item['image']}")        
        encoding = self.processor(images=image_obj, text=title, padding="max_length", return_tensors="pt")
        encoding = {k:v.squeeze() for k,v in encoding.items()}
        
        return encoding
    


### Load Dataset

In [None]:
def load_dataset(dataset_path, processor, batch_size):
    """Summary: Loads the dataset from the given path
    Args:
        dataset_path (str): path to the dataset
        processor (transformers.Processor): Processor for encoding the data
        batch_size (int): batch size for the dataloader
    Returns:
        dataloader (torch.utils.data.DataLoader): dataloader for the dataset
    """
    with open(dataset_path, "r") as f:
        dataset_list = json.load(f)
        
    dataset = CustomDataset(dataset_list, processor)
    shuffle = False
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)
    return dataloader

### Longest Common Subsequence

In [None]:
def similar(a, b):
    """Summary: Calculate the longest common subsequence between two strings
    Args:
        a (str): First string
        b (str): Second string
    Returns:
        float: Ratio of the longest common subsequence to the length of the longest string
    """
    return SequenceMatcher(None, a, b).ratio()

### Evaluate the model based on Longest Common Subsequence

In [None]:
def evaluate(model, device, processor, dataset_path):
    """Summary: Evaluate the model on the given dataset
    Args:
        model (transformers.Model): Model to be evaluated
        device (torch.device): Device to run the model on
        processor (transformers.Processor): Processor for encoding the data
        dataset_path (str): Path to the dataset
    Returns:
        float: Average similarity between the original and predicted titles
    """
    
    with open(dataset_path, "r") as f:
        dataset_list = json.load(f)
        
    
    k = 1000
    random.shuffle(dataset_list)

    total_similarity = 0.0
    total_samples = 0
    model.to(device)
    
    with torch.no_grad():
        model.eval()

        for item in tqdm(dataset_list[:k]):
            image = item["image"]
            original_answers = item["title"]
            id = item['id']
            
            image_path = f"test/{id}/{image}" 
            image_obj = Image.open(image_path)
            
            inputs = processor(images=image_obj, return_tensors="pt").to(device)
            pixel_values = inputs.pixel_values
            generated_ids = model.generate(pixel_values=pixel_values, max_length=10)
            generated_caption = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
            
            # Calculate string similarity between original and predicted answers
            batch_similarity = similar(original_answers, generated_caption)      
            total_similarity += batch_similarity
            total_samples += 1

    average_similarity = total_similarity / total_samples

    return average_similarity


### Train the model

In [5]:
def train(model, train_dataloader, optimizer, processor, device):
    """Summary: Train the model on the given dataset and save the best models
    Args:
        model (transformers.Model): Model to be trained
        train_dataloader (torch.utils.data.DataLoader): Dataloader for the training dataset
        optimizer (torch.optim.Optimizer): Optimizer for the model
        processor (transformers.Processor): Processor for encoding the data
        device (torch.device): Device to run the model on
    """

    model.train()
    max_eval_score = 0  # Variable to track the best validation loss
    best_model_state = None  # Variable to store the state of the best model
    epochs = 10
    for epoch in tqdm(range(epochs)):
        print("Epoch: ", epoch)

        # Training loop
        for idx, batch in enumerate(train_dataloader):
            input_ids = batch.pop("input_ids").to(device)
            pixel_values = batch.pop("pixel_values").to(device)

            model.train()  # Set the model in training mode
            outputs = model(input_ids=input_ids, pixel_values=pixel_values, labels=input_ids)
            loss = outputs.loss
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

        # Validation loop
        model.eval()  # Set the model in evaluation mode
        validation_loss = 0.0

        with torch.no_grad():
            eval_score = evaluate(model, device, processor, "dataset_test.json")

        print("Validation Score:", eval_score)
        
        # Save the last model checkpoint
        torch.save(model.state_dict(), "last_model_checkpoint.pt")

        # Save the model if it has the best validation loss
        if eval_score > max_eval_score:
            max_eval_score = eval_score
            best_model_state = model.state_dict()

            # Save the best model checkpoint
            torch.save(best_model_state, "best_model_checkpoint.pt")


### Load best model

In [None]:
def load_best(device):
    """Summary: Load the best model checkpoint
    Args:
        device (torch.device): Device to run the model on
    Returns:
        model (transformers.Model): Model loaded from the best checkpoint
    """
    model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
    model.load_state_dict(torch.load("best_model_checkpoint.pt"))
    model.to(device)
    model.eval()
    return model

### Main Function

In [None]:
def main():
    device = torch.device(f"cuda:1" if torch.cuda.is_available() else "cpu")
    batch_size = 8
    print(device) 
    processor = AutoProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
    model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base") 
    train_dataloader = load_dataset("dataset.json", processor, batch_size)
    optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
    model.to(device)
    train(model, train_dataloader, optimizer, processor, device)

main()