Install the required Python libraries and frameworks (Torch is assumed to be install in your virtual environment)

In [None]:
%pip install sentencepiece
%pip install transformers
%pip install rich[jupyter]

This is the script that gathers data from the different folders.

In [None]:
import os
import pandas as pd

# Step 1: Generate captions.txt file with token filtering

base_path = "data" 
output_file = os.path.join(base_path, "captions.txt")

# Open the output file to write the image paths and captions
with open(output_file, 'w', encoding='utf-8') as f_out:
    # Loop through each numbered subfolder
    for subdir in os.listdir(base_path):
        subfolder_path = os.path.join(base_path, subdir)

        # Check if the path is a directory
        if os.path.isdir(subfolder_path):
            print(f"Processing subfolder: {subfolder_path}")

            # List files in the current subfolder for debugging
            files_in_subfolder = os.listdir(subfolder_path)
            print(f"Files in {subfolder_path}: {files_in_subfolder}")

            image_path = os.path.join(subfolder_path, 'img.jpg')
            caption_path = os.path.join(subfolder_path, 'captions.txt')

            print(f"Looking for image: {image_path}")
            print(f"Looking for caption: {caption_path}")

            # Check if both files exist
            if os.path.exists(image_path) and os.path.exists(caption_path):
                try:
                    with open(caption_path, 'r', encoding='utf-8') as f_caption:
                        caption = f_caption.read().replace('\n', ' ').strip()

                        # Tokenize caption and check token length
                        tokens = caption.split()
                        if 10 <= len(tokens) <= 512:
                            # Write image path and valid caption to output file
                            f_out.write(f"{image_path}\t{caption}\n")
                            print(f"Found image and caption. Writing to file: {image_path}\t{caption}\n")
                        else:
                            print(f"Caption in {caption_path} does not meet token length requirements (10-512 tokens). SKIP THIS FILE.")

                except UnicodeDecodeError as e:
                    print(f"Error reading caption file {caption_path}: {e}")
            else:
                if not os.path.exists(image_path):
                    print(f"Image file does not exist: {image_path}")
                if not os.path.exists(caption_path):
                    print(f"Caption file does not exist: {caption_path}")

print(f"Captions file created at {output_file}")

# Step 2: Load the captions.txt into a DataFrame

input_file = output_file
df = pd.read_csv(input_file, delimiter='\t', header=None, names=['image_path', 'caption'])

# Check the DataFrame structure and contents
print("DataFrame structure:")
print(df.info())  # This will show the column names and types
print("DataFrame contents:")
print(df.head())

# Step 3: Sample rows from the DataFrame if not empty

if not df.empty:
    sampled_df = df.sample(n=1)  # Sample a single random row
    print("Sampled DataFrame row:")
    print(sampled_df)
else:
    print("The DataFrame is empty. Check if captions.txt has valid entries.")


In [None]:
# Check if the captions.txt file exists and read its contents
output_file = "data/captions.txt"

# Check if the file exists
if os.path.exists(output_file):
    with open(output_file, 'r', encoding='utf-8') as f:
        contents = f.read()
        print(contents)  # Print the contents to see if it's populated
else:
    print("File does not exist.")

Confirmation that the data was changed correctly

In [None]:
#This just checks if the data actually got stored where we want it to be
print("Shape of DataFrame:", df.shape)
print(df.head())  # Show the first few rows

In [None]:
df.sample(0)

In [None]:
print("DataFrame Columns:", df.columns)
print("First few rows of DataFrame:")
print(df.head())

In [None]:
for index, row in df.iterrows():
    print(f"Image Path: {row['image_path']}, Caption: {row['caption']}")

In [14]:
df["caption"] = df["caption"]

In [None]:
df.head()

Now we use PyTorch to set up the model

In [None]:
# Importing necessary libraries
import os  # For operating system interactions
import numpy as np  # For numerical operations
import pandas as pd  # For data manipulation and analysis
import torch  # Core PyTorch library
import torch.nn.functional as F  # For various activation functions and neural network utilities
from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler  # For data loading utilities

# Importing T5 model and tokenizer from Hugging Face transformers library
from transformers import T5Tokenizer, T5ForConditionalGeneration

# Importing rich library for better console output formatting
from rich.table import Column, Table  # For displaying tables in the console
from rich import box  # For setting table borders
from rich.console import Console  # Console class for logging

# Define a rich console logger to format and display outputs
console = Console(record=True)

def display_df(df):
    """Display a DataFrame in an ASCII table format using rich library."""

    # Create a console for output
    console = Console()
    
    # Define the table structure with two columns, aligning text in the center
    table = Table(Column("source_text", justify="center"), Column("target_text", justify="center"), title="Sample Data", pad_edge=False, box=box.ASCII)

    # Iterate over each row in the DataFrame and add it to the table
    for i, row in enumerate(df.values.tolist()):
        table.add_row(row[0], row[1])

    # Print the table to the console
    console.print(table)

# Define a table for logging training status with columns for epoch, steps, and loss
training_logger = Table(
    Column("Epoch", justify="center"),
    Column("Steps", justify="center"),
    Column("Loss", justify="center"),
    title="Training Status",
    pad_edge=False,
    box=box.ASCII
)


Check for CUDA (not recommended to run in CPU unless if you have a gaming laptop)

In [None]:
# Setting up the device for GPU usage
from torch import cuda
device = 'cuda' if cuda.is_available() else 'cpu'
print(device)

In [None]:
class YourDataSetClass(Dataset):
    
    # Custom dataset class for preparing and loading data into the DataLoader to be passed to the neural network for fine-tuning.
  

    def __init__(self, dataframe, tokenizer, source_len, target_len, source_text, target_text):
        # Initialize tokenizer and dataset attributes
        self.tokenizer = tokenizer
        self.data = dataframe
        self.source_len = source_len
        self.summ_len = target_len
        self.target_text = self.data[target_text]  # Target text column in the dataset
        self.source_text = self.data[source_text]  # Source text column in the dataset

    def __len__(self):
        # Return the length of the dataset (number of samples)
        return len(self.target_text)

    def __getitem__(self, index):
        # Get source and target text for the given index, ensuring they are in string format
        source_text = str(self.source_text[index])
        target_text = str(self.target_text[index])

        # Clean and preprocess the text data
        source_text = ' '.join(source_text.split())
        target_text = ' '.join(target_text.split())

        # Tokenize the source text
        source = self.tokenizer.batch_encode_plus(
            [source_text],
            max_length=self.source_len,
            pad_to_max_length=True,
            truncation=True,
            padding="max_length",
            return_tensors='pt'
        )
        
        # Tokenize the target text
        target = self.tokenizer.batch_encode_plus(
            [target_text],
            max_length=self.summ_len,
            pad_to_max_length=True,
            truncation=True,
            padding="max_length",
            return_tensors='pt'
        )

        # Extract token IDs and attention masks for source and target
        source_ids = source['input_ids'].squeeze()
        source_mask = source['attention_mask'].squeeze()
        target_ids = target['input_ids'].squeeze()
        target_mask = target['attention_mask'].squeeze()

        # Return a dictionary of tensors with source and target data
        return {
            'source_ids': source_ids.to(dtype=torch.long),
            'source_mask': source_mask.to(dtype=torch.long),
            'target_ids': target_ids.to(dtype=torch.long),
            'target_ids_y': target_ids.to(dtype=torch.long)  # Target IDs for the model's output
        }


In [None]:
def train(epoch, tokenizer, model, device, loader, optimizer):
    # Function to perform training for a single epoch.


    # Set the model to training mode (enables dropout, etc.)
    model.train()
    
    # Iterate over each batch of data in the training DataLoader
    for _, data in enumerate(loader, 0):
        # Move target IDs to the specified device and separate labels and decoder inputs
        y = data['target_ids'].to(device, dtype=torch.long)
        y_ids = y[:, :-1].contiguous()  # Decoder input IDs (shifted to the left)
        lm_labels = y[:, 1:].clone().detach()  # Target labels shifted to the right
        lm_labels[y[:, 1:] == tokenizer.pad_token_id] = -100  # Ignore padding tokens in the loss calculation

        # Move source IDs and attention masks to the specified device
        ids = data['source_ids'].to(device, dtype=torch.long)
        mask = data['source_mask'].to(device, dtype=torch.long)

        # Forward pass through the model to compute the output and loss
        outputs = model(input_ids=ids, attention_mask=mask, decoder_input_ids=y_ids, labels=lm_labels)
        loss = outputs[0]  # Extract the loss from the model output

        # Print training progress every 10 batches
        if _ % 10 == 0:
            training_logger.add_row(str(epoch), str(_), str(loss))
            console.print(training_logger)

        # Backpropagation: reset gradients, compute gradients, and update model parameters
        optimizer.zero_grad()  # Clear existing gradients
        loss.backward()        # Backpropagate the loss
        optimizer.step()       # Update model parameters based on gradients


In [None]:
def validate(epoch, tokenizer, model, device, loader):
    
    #Function to evaluate model for predictions on a validation dataset.
    

    # Set the model to evaluation mode (disables dropout, etc.)
    model.eval()
    
    predictions = []  # List to store generated predictions
    actuals = []      # List to store actual target sequences

    # Disable gradient computation for validation
    with torch.no_grad():
        for _, data in enumerate(loader, 0):
            # Extract target IDs and source inputs, and move them to the specified device
            y = data['target_ids'].to(device, dtype=torch.long)
            ids = data['source_ids'].to(device, dtype=torch.long)
            mask = data['source_mask'].to(device, dtype=torch.long)

            # Generate predictions using the model with specified generation parameters
            generated_ids = model.generate(
                input_ids=ids,
                attention_mask=mask,
                max_length=150,
                num_beams=2,
                repetition_penalty=2.5,
                length_penalty=1.0,
                early_stopping=True
            )

            # Decode generated IDs to text predictions and target sequences
            preds = [tokenizer.decode(g, skip_special_tokens=True, clean_up_tokenization_spaces=True) for g in generated_ids]
            target = [tokenizer.decode(t, skip_special_tokens=True, clean_up_tokenization_spaces=True) for t in y]

            # Print progress for every 10 batches processed
            if _ % 10 == 0:
                console.print(f'Completed {_}')

            # Append predictions and actual target sequences for evaluation
            predictions.extend(preds)
            actuals.extend(target)

    # Return the lists of predictions and actuals for comparison
    return predictions, actuals


In [None]:
def T5Trainer(
    dataframe, source_text, target_text, model_params, output_dir="./outputs/"
):

    #T5 Trainer

    # Set random seeds and deterministic pytorch for reproducibility
    torch.manual_seed(model_params["SEED"])  # pytorch random seed
    np.random.seed(model_params["SEED"])  # numpy random seed
    torch.backends.cudnn.deterministic = True

    # logging
    console.log(f"""[Model]: Loading {model_params["MODEL"]}...\n""")

    # tokenzier for encoding the text
    tokenizer = T5Tokenizer.from_pretrained(model_params["MODEL"])

    # Defining the model. We are using t5-base model and added a Language model layer on top for generation of Summary.
    # Further this model is sent to device (GPU/TPU) for using the hardware.
    model = T5ForConditionalGeneration.from_pretrained(model_params["MODEL"])
    model = model.to(device)

    # logging
    console.log(f"[Data]: Reading data...\n")

    # Importing the raw dataset
    dataframe = dataframe[[source_text, target_text]]
    display_df(dataframe.head(2))

    # Creation of Dataset and Dataloader
    # Defining the train size. So 80% of the data will be used for training and the rest for validation.
    train_size = 0.8
    train_dataset = dataframe.sample(frac=train_size, random_state=model_params["SEED"])
    val_dataset = dataframe.drop(train_dataset.index).reset_index(drop=True)
    train_dataset = train_dataset.reset_index(drop=True)

    console.print(f"FULL Dataset: {dataframe.shape}")
    console.print(f"TRAIN Dataset: {train_dataset.shape}")
    console.print(f"TEST Dataset: {val_dataset.shape}\n")

    # Creating the Training and Validation dataset for further creation of Dataloader
    training_set = YourDataSetClass(
        train_dataset,
        tokenizer,
        model_params["MAX_SOURCE_TEXT_LENGTH"],
        model_params["MAX_TARGET_TEXT_LENGTH"],
        source_text,
        target_text,
    )
    val_set = YourDataSetClass(
        val_dataset,
        tokenizer,
        model_params["MAX_SOURCE_TEXT_LENGTH"],
        model_params["MAX_TARGET_TEXT_LENGTH"],
        source_text,
        target_text,
    )

    # Defining the parameters for creation of dataloaders
    train_params = {
        "batch_size": model_params["TRAIN_BATCH_SIZE"],
        "shuffle": True,
        "num_workers": 0,
    }

    val_params = {
        "batch_size": model_params["VALID_BATCH_SIZE"],
        "shuffle": False,
        "num_workers": 0,
    }

    # Creation of Dataloaders for testing and validation. This will be used down for training and validation stage for the model.
    training_loader = DataLoader(training_set, **train_params)
    val_loader = DataLoader(val_set, **val_params)

    # Defining the optimizer that will be used to tune the weights of the network in the training session.
    optimizer = torch.optim.Adam(
        params=model.parameters(), lr=model_params["LEARNING_RATE"]
    )

    # Training loop
    console.log(f"[Initiating Fine Tuning]...\n")

    for epoch in range(model_params["TRAIN_EPOCHS"]):
        train(epoch, tokenizer, model, device, training_loader, optimizer)

    console.log(f"[Saving Model]...\n")
    # Saving the model after training
    path = os.path.join(output_dir, "model_files")
    model.save_pretrained(path)
    tokenizer.save_pretrained(path)

    # evaluating test dataset
    console.log(f"[Initiating Validation]...\n")
    for epoch in range(model_params["VAL_EPOCHS"]):
        predictions, actuals = validate(epoch, tokenizer, model, device, val_loader)
        final_df = pd.DataFrame({"Generated Text": predictions, "Actual Text": actuals})
        final_df.to_csv(os.path.join(output_dir, "predictions.csv"))

    console.save_text(os.path.join(output_dir, "logs.txt"))

    console.log(f"[Validation Completed.]\n")
    console.print(
        f"""[Model] Model saved @ {os.path.join(output_dir, "model_files")}\n"""
    )
    console.print(
        f"""[Validation] Generation on Validation data saved @ {os.path.join(output_dir,'predictions.csv')}\n"""
    )
    console.print(f"""[Logs] Logs saved @ {os.path.join(output_dir,'logs.txt')}\n""")

If you want to train t5-base instead of t5-large, change the model name to t5-base and change both of the batch size to 8. We use one in the code so CUDA can allocate enough memory.

In [None]:
model_params={
    "MODEL":"t5-large",             # model_type: t5-base/t5-large
    "TRAIN_BATCH_SIZE":1,          # training batch size  - > batch size has to be 1 or 2 if using t-5 large (for t5-base it should be 8)
    "VALID_BATCH_SIZE":1,          # validation batch size - > batch size has to be 1 or 2 if using t-5 large (for t5-base it should be 8)
    "TRAIN_EPOCHS":3,              # number of training epochs
    "VAL_EPOCHS":1,                # number of validation epochs
    "LEARNING_RATE":1e-4,          # learning rate
    "MAX_SOURCE_TEXT_LENGTH":512,  # max length of source text
    "MAX_TARGET_TEXT_LENGTH":50,   # max length of target text
    "SEED": 42                     # set seed for reproducibility

}

More confirmation that the data was changed correctly

In [None]:
print(df.columns) #checking to see if data was parsed and changed correctly

In [None]:
df.rename(columns={'image_path': 'text', 'caption': 'headlines'}, inplace=True) #rename the files if you are too lazy to individually go through change every file name in your Python script

In [None]:
# Check the new column names
print("Renamed DataFrame columns:") #checking to see if data was parsed and changed correctly
print(df.columns)

This is the line that fine tunes the model.

In [None]:
T5Trainer(dataframe=df[:500], source_text="text", target_text="headlines", model_params=model_params, output_dir="outputs")

More confirmation that the data was changed correctly

In [None]:
print("Column names in df_predictions:", df.columns.tolist()) #checking to see if data was parsed and changed correctly

In [None]:
print(df.columns) #checking to see if data was parsed and changed correctly

In [None]:
if 'headlines' in df.columns:
    df['input'] = "Summarize this: " + df['headlines']
else:
    print("Caption column not found. Check the DataFrame structure.") #checking to see if data was parsed and changed correctly

In [None]:
df['input'] = "Summarize this: " + df['headlines']

In [None]:
print(df['input'].apply(len))  # Check lengths of input strings 

In [None]:
print(df) #checking to see if data was parsed and changed correctly

In [None]:
print(df['headlines'])
print(df['input'])

Since the model prediction and generated text are in captions.txt (which was created by the system), we get outputs from the text file

In [None]:
import pandas as pd

# Load the captions.txt into a DataFrame
input_file = "/content/drive/My Drive/LaVi-Bridge/captions.txt"
df_predictions = pd.read_csv(input_file, delimiter='\t', header=None, names=['image_path', 'caption'])

# Confirm the DataFrame was loaded correctly
print("DataFrame loaded. Contents:")
print(df_predictions.head())

This is the script to print the generated output alongside the actual output that was stored in the text file

In [None]:
import re
import torch

# Assuming model and tokenizer are defined
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)  # Move model to device

df_predictions['input'] = df_predictions['caption']

def generate_summary(text):
    try:
        inputs = tokenizer.encode(text, return_tensors='pt').to(device)
        summary_ids = model.generate(inputs, max_length=150, num_beams=4, early_stopping=True)
        generated_text = tokenizer.decode(summary_ids[0], skip_special_tokens=True)

        # Remove consecutive duplicate phrases in generated text
        unique_summary = re.sub(r'\b(\w+\s+){2,}(\w+\s+)+\1\b', r'\1', generated_text.strip())

        # Truncate at the first occurrence of "!" or "..." -> this fixed the problem where the model would repeat itself over and over again about some generated text
        truncated_summary = re.split(r'(!|\.\.\.)', unique_summary, maxsplit=1)[0]

        # Split into sentences and get at least 2-3 sentences if possible
        sentences = re.split(r'(?<=[.!?]) +', truncated_summary)
        if len(sentences) < 3:
            # If fewer than 3 sentences, use them all; otherwise, limit to first 3
            final_summary = ' '.join(sentences)
        else:
            final_summary = ' '.join(sentences[:3])

        return final_summary.strip()

    except Exception as e:
        print(f"Error generating summary for text: {text}")
        print(f"Error: {e}")
        return ""  # Return an empty string in case of error

# Apply the summary generation
df_predictions['Generated Text'] = df_predictions['input'].apply(generate_summary)

# Now print the generated summaries
print("Generated Summaries and Actual Texts:")
for index, row in df_predictions.iterrows():
    generated_text = row['Generated Text'] if 'Generated Text' in df_predictions.columns else "Not Generated"
    actual_text = row['caption']  # or row['Actual Text'] if that was in your previous DataFrame
    print(f"Entry {index + 1}:")
    print(f"Generated Summary: {generated_text}")
    print(f"Actual Summary: {actual_text}\n")
