In [None]:
#This will mount Google drive to Colab VM
from google.colab import drive
drive.mount('/content/drive')

FolderName = '/rabeet/'
assert FolderName is not None, "[!] Enter the foldername"

import sys
sys.path.append('/content/drive/MyDrive/rabeet/data'.format(FolderName))

%cd /content/drive/My\ Drive/$FolderName

Mounted at /content/drive
/content/drive/My Drive/rabeet


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# **Import Libraries**

In [None]:
import os
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
from PIL import Image
import nltk
from collections import Counter
from nltk.tokenize import word_tokenize

# **Define Hyperparameters**

In [None]:
# Download the NLTK tokenizer data
nltk.download('punkt')

# Hyperparameters
embed_size = 256
hidden_size = 512
num_layers = 1
learning_rate = 0.001
num_epochs = 10
batch_size = 32

# Paths
data_dir = 'data'
captions_file = os.path.join(data_dir, 'captions.txt')
images_dir = os.path.join(data_dir, 'Images')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


# **Read Captions**

In [None]:
# Read the captions file
captions_data = {}
with open('/content/drive/MyDrive/rabeet/data/captions.txt', 'r') as f:
    for line in f:
        img_id, caption = line.strip().split(',', 1)
        if img_id not in captions_data:
            captions_data[img_id] = []
        captions_data[img_id].append(caption)

# **Vocabulary Class**

In [None]:
class Vocabulary:
    def __init__(self, freq_threshold):
        self.itos = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
        self.stoi = {v: k for k, v in self.itos.items()}
        self.freq_threshold = freq_threshold

    def __len__(self):
        return len(self.itos)

    @staticmethod
    def tokenizer_eng(text):
        return word_tokenize(text.lower())

    def build_vocabulary(self, sentence_list):
        frequencies = Counter()
        idx = 4

        for sentence in sentence_list:
            for word in self.tokenizer_eng(sentence):
                frequencies[word] += 1
                if frequencies[word] == self.freq_threshold:
                    self.stoi[word] = idx
                    self.itos[idx] = word
                    idx += 1

    def numericalize(self, text):
        tokenized_text = self.tokenizer_eng(text)
        return [self.stoi[token] if token in self.stoi else self.stoi["<UNK>"] for token in tokenized_text]

# **Dataset Class**

In [None]:
class CaptionDataset(Dataset):
    def __init__(self, root_dir, captions_data, transform=None, freq_threshold=5):
        self.root_dir = root_dir
        self.captions_data = captions_data
        self.transform = transform
        self.imgs = list(self.captions_data.keys())
        self.captions = [caption for captions in self.captions_data.values() for caption in captions]
        self.vocab = Vocabulary(freq_threshold)
        self.vocab.build_vocabulary(self.captions)

    def __len__(self):
        return len(self.imgs)

    def __getitem__(self, index):
        img_id = self.imgs[index]
        caption = self.captions_data[img_id][0]
        img_path = os.path.join(self.root_dir, 'Images', img_id)
        img = Image.open(img_path).convert("RGB")

        if self.transform:
            img = self.transform(img)

        numericalized_caption = [self.vocab.stoi["<SOS>"]] + self.vocab.numericalize(caption) + [self.vocab.stoi["<EOS>"]]
        return img, torch.tensor(numericalized_caption)

# Create data loaders for training, validation, and testing.

In [None]:
# Transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
])

# Custom collate function to pad captions
def collate_fn(batch):
    images, captions = zip(*batch)
    images = torch.stack(images, 0)
    lengths = [len(cap) for cap in captions]
    captions = pad_sequence(captions, batch_first=True, padding_value=0)
    return images, captions, lengths

# Create dataset and dataloaders
dataset = CaptionDataset(root_dir=data_dir, captions_data=captions_data, transform=transform)
train_size = int(0.8 * len(dataset))
val_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, val_size, test_size])
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(dataset=val_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)

# **Image Feature Extraction**

Define the CNN to extract image features.

In [None]:
class CNNModel(nn.Module):
    def __init__(self, embed_size):
        super(CNNModel, self).__init__()
        resnet = models.resnet50(pretrained=True)
        for param in resnet.parameters():
            param.requires_grad_(False)
        modules = list(resnet.children())[:-1]
        self.resnet = nn.Sequential(*modules)
        self.linear = nn.Linear(resnet.fc.in_features, embed_size)
        self.bn = nn.BatchNorm1d(embed_size, momentum=0.01)

    def forward(self, images):
        features = self.resnet(images)
        features = features.view(features.size(0), -1)
        features = self.bn(self.linear(features))
        return features


# **Text Generator**

Define the decoder (RNN) to generate captions.

In [None]:
class RNNModel(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers):
        super(RNNModel, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.dropout = nn.Dropout(0.5)

    def forward(self, features, captions, lengths):
        embeddings = self.dropout(self.embed(captions))
        embeddings = torch.cat((features.unsqueeze(1), embeddings), 1)
        packed_embeddings = pack_padded_sequence(embeddings, lengths, batch_first=True, enforce_sorted=False)
        packed_hiddens, _ = self.lstm(packed_embeddings)
        hiddens, _ = pad_packed_sequence(packed_hiddens, batch_first=True)
        outputs = self.linear(hiddens)
        return outputs

# **Caption Generator**

In [None]:
class CaptionGenerator(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers):
        super(CaptionGenerator, self).__init__()
        self.encoder = CNNModel(embed_size)
        self.decoder = RNNModel(embed_size, hidden_size, vocab_size, num_layers)

    def forward(self, images, captions, lengths):
        features = self.encoder(images)
        outputs = self.decoder(features, captions, lengths)
        return outputs

    def caption_image(self, image, vocab, max_length=20):
        result_caption = []
        with torch.no_grad():
            x = self.encoder(image).unsqueeze(0)
            states = None
            for _ in range(max_length):
                hiddens, states = self.decoder.lstm(x, states)
                output = self.decoder.linear(hiddens.squeeze(1))
                predicted = output.argmax(1)
                result_caption.append(predicted.item())
                x = self.decoder.embed(predicted).unsqueeze(1)
                if vocab.itos[predicted.item()] == "<EOS>":
                    break
        return [vocab.itos[idx] for idx in result_caption]

In [None]:
# Training and testing
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
vocab_size = len(dataset.vocab)
model = CaptionGenerator(embed_size, hidden_size, vocab_size, num_layers).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=dataset.vocab.stoi["<PAD>"])
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:02<00:00, 34.8MB/s]


# **Training and Validation**

In [None]:
# Training function
def train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs, model_save_path):
    for epoch in range(num_epochs):
        model.train()
        for i, (images, captions, lengths) in enumerate(train_loader):
            images, captions = images.to(device), captions.to(device)
            lengths = [length - 1 for length in lengths]  # Adjust lengths for <SOS> token
            targets = pack_padded_sequence(captions[:, 1:], lengths, batch_first=True, enforce_sorted=False).data
            outputs = model(images, captions, lengths)
            outputs = pack_padded_sequence(outputs, lengths, batch_first=True, enforce_sorted=False).data
            loss = criterion(outputs, targets)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            if (i + 1) % 100 == 0:
                print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}')
        validate_model(model, criterion, val_loader)
    torch.save(model.state_dict(), model_save_path)
    print(f'Model saved to {model_save_path}')

# Validation function
def validate_model(model, criterion, val_loader):
    model.eval()
    with torch.no_grad():
        total_loss = 0
        for i, (images, captions, lengths) in enumerate(val_loader):
            images, captions = images.to(device), captions.to(device)
            lengths = [length - 1 for length in lengths]  # Adjust lengths for <SOS> token
            targets = pack_padded_sequence(captions[:, 1:], lengths, batch_first=True, enforce_sorted=False).data
            outputs = model(images, captions, lengths)
            outputs = pack_padded_sequence(outputs, lengths, batch_first=True, enforce_sorted=False).data
            loss = criterion(outputs, targets)
            total_loss += loss.item()
        avg_loss = total_loss / len(val_loader)
        print(f'Validation Loss: {avg_loss:.4f}')


In [None]:
import matplotlib.pyplot as plt
import torch
from torch.nn.utils.rnn import pack_padded_sequence

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Training function
def train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs, model_save_path):
    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        for i, (images, captions, lengths) in enumerate(train_loader):
            images, captions = images.to(device), captions.to(device)
            lengths = [length - 1 for length in lengths]  # Adjust lengths for <SOS> token
            targets = pack_padded_sequence(captions[:, 1:], lengths, batch_first=True, enforce_sorted=False).data
            outputs = model(images, captions, lengths)
            outputs = pack_padded_sequence(outputs, lengths, batch_first=True, enforce_sorted=False).data
            loss = criterion(outputs, targets)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            total += targets.size(0)
            correct += (predicted == targets).sum().item()

            if (i + 1) % 100 == 0:
                print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}')

        train_loss = running_loss / len(train_loader)
        train_accuracy = 100 * correct / total
        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)
        val_loss, val_accuracy = validate_model(model, criterion, val_loader)
        val_losses.append(val_loss)
        val_accuracies.append(val_accuracy)
        print(f'Epoch [{epoch+1}/{num_epochs}], Training Loss: {train_loss:.4f}, Training Accuracy: {train_accuracy:.2f}%, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%')

    torch.save(model.state_dict(), model_save_path)
    print(f'Model saved to {model_save_path}')

    # Plotting the loss and accuracy graphs
    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, label='Training Accuracy')
    plt.plot(val_accuracies, label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.title('Training and Validation Accuracy')
    plt.legend()

    plt.tight_layout()
    plt.show()

# Validation function
def validate_model(model, criterion, val_loader):
    model.eval()
    with torch.no_grad():
        total_loss = 0
        correct = 0
        total = 0
        for i, (images, captions, lengths) in enumerate(val_loader):
            images, captions = images.to(device), captions.to(device)
            lengths = [length - 1 for length in lengths]  # Adjust lengths for <SOS> token
            targets = pack_padded_sequence(captions[:, 1:], lengths, batch_first=True, enforce_sorted=False).data
            outputs = model(images, captions, lengths)
            outputs = pack_padded_sequence(outputs, lengths, batch_first=True, enforce_sorted=False).data
            loss = criterion(outputs, targets)
            total_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            total += targets.size(0)
            correct += (predicted == targets).sum().item()

        avg_loss = total_loss / len(val_loader)
        accuracy = 100 * correct / total
        print(f'Validation Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%')
        return avg_loss, accuracy

# Ensure to import the necessary packages and define your dataset, model, and collate_fn
# import torch
# import torch.nn as nn
# import torch.optim as optim
# from torch.utils.data import DataLoader
# from your_dataset_and_model import ImageCaptionDataset, CaptioningModel

# Dummy setup for train and validation data loaders
# train_dataset = ImageCaptionDataset(train=True)
# val_dataset = ImageCaptionDataset(train=False)
# train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
# val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

# Initialize the model, criterion, and optimizer
# model = CaptioningModel().to(device)
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(model.parameters(), lr=0.001)

# Define number of epochs and model save path
# num_epochs = 10
# model_save_path = 'image_caption_model.pth'

# Train the model
# train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs, model_save_path)


In [1]:
# Training the model
model_save_path = 'image_captioning_model.pth'
train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs, model_save_path)

NameError: name 'train_model' is not defined

# **Test the model**

In [None]:
# Test function
def test_model(model, test_loader, vocab):
    model.eval()
    test_image, test_caption, _ = next(iter(test_loader))
    test_image = test_image.to(device)
    generated_caption = model.caption_image(test_image[0].unsqueeze(0), vocab)
    print('Generated Caption:', ' '.join(generated_caption))
    print('Actual Caption:', ' '.join([vocab.itos[idx] for idx in test_caption[0].cpu().numpy()]))


In [None]:
import matplotlib.pyplot as plt
import torch

# Test function
def test_model(model, test_loader, vocab):
    model.eval()
    test_image, test_caption, _ = next(iter(test_loader))
    test_image = test_image.to(device)

    # Generate caption
    generated_caption = model.caption_image(test_image[0].unsqueeze(0), vocab)

    # Convert the actual caption to words
    actual_caption = ' '.join([vocab.itos[idx] for idx in test_caption[0].cpu().numpy()])

    # Convert the generated caption to words
    generated_caption_str = ' '.join(generated_caption)

    print('Generated Caption:', generated_caption_str)
    print('Actual Caption:', actual_caption)

    # Move the image to CPU and convert it to numpy for plotting
    image = test_image[0].cpu().permute(1, 2, 0).numpy()

    # Plot the image along with the captions
    plt.figure(figsize=(8, 8))
    plt.imshow(image)
    plt.title(f'Generated: {generated_caption_str}\nActual: {actual_caption}')
    plt.axis('off')
    plt.show()

# Ensure to import the necessary packages and define your dataset, model, and other components
# import torch
# from torch.utils.data import DataLoader
# from your_dataset_and_model import ImageCaptionDataset, CaptioningModel, Vocabulary

# Dummy setup for test data loader
# test_dataset = ImageCaptionDataset(train=False)
# test_loader = DataLoader(test_dataset, batch_size=1, shuffle=True, collate_fn=collate_fn)

# Initialize the model and load the trained weights
# model = CaptioningModel().to(device)
# model.load_state_dict(torch.load('image_caption_model.pth'))

# Assuming vocab is defined elsewhere and passed to the test function
# test_model(model, test_loader, vocab)


In [None]:
# Testing the model
test_model(model, test_loader, dataset.vocab)

NameError: name 'model' is not defined