## Dataset

*We need to conver text to numerical value*
* We need a vocabulary mapping for each word(or character) to int
* We need to setup a pytorch dataset
* Make sure that each sentence (input) is same size (padding) and dataloader

In [1]:
import os
from collections import defaultdict
import pandas as pd
import spacy
import torch
import numpy as np
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, Dataset
from PIL import Image
from torchvision.transforms import transforms

In [2]:
!python -m spacy download en_core_web_sm

Collecting en-core-web-sm==3.8.0
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m19.4 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25h[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')


In [3]:

class Vocabulary():
    spacy_eng = spacy.load("en_core_web_sm")

    
    def __init__(self, freq_threshold):
        self.itos = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
        self.stoi = {v:k for k,v in self.itos.items()}
        self.freq_threshold = freq_threshold

    
    def __len__(self):
        return len(self.itos)


    @staticmethod
    def tokenizer_eng(text):
        return [tok.text.lower() for tok in Vocabulary.spacy_eng.tokenizer(text)]
    
    def build_vocabulary(self, sentence_list):
        frequency = {}
        idx = 4
        N = 50

        for sentence in sentence_list:
            for token in self.tokenizer_eng(sentence):

                frequency[token] = 1 + frequency.get(token, 0)

                if frequency[token] == self.freq_threshold:
                    self.stoi[token] = idx
                    self.itos[idx] = token
                    idx += 1
    
    def tokenize(self, text):
        token_sent = self.tokenizer_eng(text)

        return [self.stoi[token] if token in self.stoi else self.stoi['<UNK>']
                for token in token_sent
            ]

In [4]:
v = Vocabulary(freq_threshold=1)

v.build_vocabulary(["This is a good place to find a city"])
print(v.stoi)
print(v.tokenize("This is a good place to find a city here!! test"))

{'<PAD>': 0, '<SOS>': 1, '<EOS>': 2, '<UNK>': 3, 'this': 4, 'is': 5, 'a': 6, 'good': 7, 'place': 8, 'to': 9, 'find': 10, 'city': 11}
[4, 5, 6, 7, 8, 9, 10, 6, 11, 3, 3, 3, 3]


In [5]:
class FlickrDataset(Dataset):

    def __init__(self, root_dir, caption_file, transform=None, freq_threshold=5):
        self.root_dir = root_dir
        self.df = pd.read_csv(caption_file)
        self.transform = transform

        # get the image and caption
        self.images = self.df['image']
        self.caption = self.df['caption']

        # Create our own vocabulary
        self.vocabulary = Vocabulary(freq_threshold)
        self.vocabulary.build_vocabulary(self.caption.tolist())
    
    def __len__(self):
        return len(self.df)


    def __getitem__(self, index):
        # get image
        image_path = os.path.join(self.root_dir, self.images[index])
        img = Image.open(image_path).convert('RGB')

        if self.transform is not None:
            img = self.transform(img)
        
        # get caption
        caption = self.caption[index]
        num_caption = [self.vocabulary.stoi['<SOS>']]
        num_caption += self.vocabulary.numericalize(caption)
        num_caption.append(self.vocabulary.stoi['<EOS>'])
        

        return img, torch.tensor(num_caption)
    


In [6]:
class MyCollate:
    def __init__(self, pad_idx):
        self.pad_idx = pad_idx
    
    def __call__(self, batch):
        img = [item[0].unsqueeze(0) for item in batch]
        img = torch.cat(img, 0)
        target = [item[1] for item in batch]
        target = pad_sequence(target, batch_first=False, padding_value=self.pad_idx)
        return img, target

In [7]:
def get_loader(
    root_folder,
    annotation_file,
    transform,
    batch_size=32,
    num_workers=8,
    shuffle=True,
    pin_memory=True,
):
    dataset = FlickrDataset(root_folder, annotation_file, transform=transform)

    pad_idx = dataset.vocab.stoi["<PAD>"]

    loader = DataLoader(
        dataset=dataset,
        batch_size=batch_size,
        num_workers=num_workers,
        shuffle=shuffle,
        pin_memory=pin_memory,
        collate_fn=MyCollate(pad_idx=pad_idx),
    )

    return loader, dataset

In [8]:

def train_val_split(caption_data, train_size=0.8, shuffle=True):
    """Split the captioning dataset into train and validation sets.

    Args:
        caption_data (dict): Dictionary containing the mapped caption data
        train_size (float): Fraction of all the full dataset to use as training data
        shuffle (bool): Whether to shuffle the dataset before splitting

    Returns:
        Traning and validation datasets as two separated dicts
    """

    # 1. Get the list of all image names
    all_images = list(caption_data.keys())

    # 2. Shuffle if necessary
    if shuffle:
        np.random.shuffle(all_images)

    # 3. Split into training and validation sets
    train_size = int(len(caption_data) * train_size)

    training_data = {
        img_name: caption_data[img_name] for img_name in all_images[:train_size]
    }
    validation_data = {
        img_name: caption_data[img_name] for img_name in all_images[train_size:]
    }

    # 4. Return the splits
    return training_data, validation_data

df = pd.read_csv("data/captions.txt")
data_dict = {}
caption_dict = defaultdict(list)
for _, row in df.iterrows():
    caption_dict[row.image].append(row.caption)
train_data, val_data = train_val_split(caption_dict)

In [14]:
def save_data(data_dict, save_path="data/caption_train.csv"):
    data = [(image_name, caption) for image_name, captions in data_dict.items() for caption in captions]
    df = pd.DataFrame(data, columns=["image_name", "caption"])
    df.to_csv(save_path)

In [None]:
save_data()

Unnamed: 0,image_name,caption
0,3540241710_a4f49cde52.jpg,A man in a white shirt is airborne .
1,3540241710_a4f49cde52.jpg,"A man , low to the ground , rolls down a ramp ..."
2,3540241710_a4f49cde52.jpg,A man skateboards in a warehouse .
3,3540241710_a4f49cde52.jpg,A skateboarder high in the air above an indoor...
4,3540241710_a4f49cde52.jpg,A skateboarder in midair above an indoor ramp .


In [None]:
transform = transforms.Compose([
    transforms.Resize((299, 299)),
    transforms.ToTensor()
])

loader, _ = get_loader(
    "data/images", "data/captions.txt", transform=transform
)


In [None]:

if __name__ == "__main__":
    for idx, (imgs, captions) in enumerate(loader):
        print(imgs.shape)
        print(captions.shape)
        if idx==5:
            break


torch.Size([32, 3, 299, 299])
torch.Size([24, 32])
torch.Size([32, 3, 299, 299])
torch.Size([24, 32])
torch.Size([32, 3, 299, 299])
torch.Size([21, 32])
torch.Size([32, 3, 299, 299])
torch.Size([28, 32])
torch.Size([32, 3, 299, 299])
torch.Size([23, 32])
torch.Size([32, 3, 299, 299])
torch.Size([30, 32])


## Pad Sequence

In [18]:
import torch
from torch.nn.utils.rnn import pad_sequence
# Example sequences of different lengths
sequences = [torch.tensor([1, 2, 3]),
             torch.tensor([4, 5]),
             torch.tensor([6, 7, 8, 9])]
padded_sequences = pad_sequence(sequences, batch_first=True, padding_value=0)
padded_sequences

tensor([[1, 2, 3, 0],
        [4, 5, 0, 0],
        [6, 7, 8, 9]])

In [17]:
padded_sequences = pad_sequence(sequences, batch_first=False, padding_value=0)
print(padded_sequences)


tensor([[1, 4, 6],
        [2, 5, 7],
        [3, 0, 8],
        [0, 0, 9]])


## Model

In [28]:
import torch
import torch.nn as nn
from torchvision.models import resnet50

## Loading inceptionnet

In [29]:
model = resnet50(weights="ResNet50_Weights.IMAGENET1K_V2")
model

Downloading: "https://download.pytorch.org/models/resnet50-11ad3fa6.pth" to /Users/ngkuissi/.cache/torch/hub/checkpoints/resnet50-11ad3fa6.pth
100%|██████████| 97.8M/97.8M [00:20<00:00, 5.04MB/s]


ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [34]:
from PIL import Image
from torchvision import transforms
image_path = "data/images/667626_18933d713e.jpg"  # Replace with your image path
image = Image.open(image_path)
transformation = transforms.transforms.Compose([transforms.transforms.Resize((256, 256)),
                                                transforms.transforms.ToTensor()])

image_tensor = transformation(image)
#image_tensor = image_tensor.permute(1, 2, 0)
image_tensor = image_tensor.unsqueeze(0)
print(image_tensor.shape)


torch.Size([1, 3, 256, 256])


In [35]:
with torch.no_grad():
    output = model(image_tensor)
output.shape


torch.Size([1, 1000])

## Encoder

In [36]:
class EncoderCNN(nn.Module):

    def __init__(self, embed_size, train_CNN=False):
        super(EncoderCNN, self).__init__()
        self.train_CNN = train_CNN

        ## loading inception model
        resnet = resnet50(weights="ResNet50_Weights.IMAGENET1K_V2")
        modules = list(resnet.children())[:-1]      # delete the last fc layer.
        self.resnet = nn.Sequential(*modules)

        self.linear = nn.Linear(resnet.fc.in_features, embed_size)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
    

    def forward(self, images):
        features = self.resnet(images)
        features = features.view(features.shape[0], -1)
        features = self.linear(features)
        return self.dropout(self.relu(features))


## test for Decoder

In [37]:
enocder = EncoderCNN(256)
embed = nn.Embedding(23556, 256)
lstm = nn.LSTM(256, 256)
lstm_2 = nn.LSTM(256, 512)
for img, caption in loader:
    output = enocder(img) # feature

    output = output.unsqueeze(0)
    caption = embed(caption) #embedding
    print(output.shape)
    print(caption.shape)
    print(torch.cat((output, caption), dim=0).shape)
    output, _ = lstm(output)
    out = torch.cat((enocder(img).unsqueeze(0), caption), dim=0)
    out, _ = lstm_2(out)
    print(output.shape)
    print(out.shape)
    break


torch.Size([1, 32, 256])
torch.Size([18, 32, 256])
torch.Size([19, 32, 256])
torch.Size([1, 32, 256])
torch.Size([19, 32, 512])


*Questoin:* Do we need start of sequence in this case? since start of sequence is the feature created from the pretrained CNN block (encoder)

## Decoder

In [10]:
print(torch.cat((torch.Tensor([[1,2,3],[7,8,9]]).unsqueeze(0), torch.Tensor([[4,5,6],[9,10,11]]).unsqueeze(0)), dim=-2))

tensor([[[ 1.,  2.,  3.],
         [ 7.,  8.,  9.],
         [ 4.,  5.,  6.],
         [ 9., 10., 11.]]])


In [38]:
class DecoderRNN(nn.Module):

    def __init__(self, embed_size, hidden_size, vocab_size, num_layers):
        super(DecoderRNN, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers)
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.dropout = nn.Dropout(0.5)
    
    def forward(self, features, captions):
        embeddings = self.dropout(self.embed(captions))
        # teacher forcing
        embeddings = torch.cat((features.unsqueeze(0), embeddings), dim=0)
        hidden, _ = self.lstm(embeddings)

        return self.linear(hidden)




In [39]:
vocabulary = Vocabulary(5)
vocabulary.build_vocabulary(pd.read_csv('data/captions.txt')['caption'].tolist())

In [40]:
enocder = EncoderCNN(256)
decoder = DecoderRNN(256, 256, len(vocabulary), num_layers=1)
for img, caption in loader:
    features = enocder(img)
    print(features.shape)
    print(caption.shape)
    out = decoder(features, caption)
    print(out.shape)
    break
    

torch.Size([32, 256])
torch.Size([20, 32])
torch.Size([21, 32, 2994])


## CNNtoRNN

In [41]:
class CNNtoRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers):
        super(CNNtoRNN, self).__init__()
        self.encoder = EncoderCNN(embed_size=embed_size)
        self.decoder = DecoderRNN(embed_size, hidden_size, vocab_size, num_layers)
    
    def forward(self, img, captions):
        features = self.encoder(img)
        outputs = self.decoder(features, captions)
        return outputs

    def caption_img(self, img, vocab, max_length=100):
        result_caption = []

        with torch.no_grad():
            feature = self.encoder(img).unsqueeze(0) # adding dim for batch size 
            states = None

            for _ in range(max_length):
                hidden, states = self.decoder.lstm(feature, states)
                output = self.decoder.linear(hidden.squeeze(0)) # removing the extra dimension needed in lstm
                predicted = output.argmax(1) # highest probablities word
                result_caption.append(predicted.item())
                feature = self.decoder.embed(predicted).unsqueeze(0)
                if vocab.itos[predicted.item()] == "<EOS>":
                    break
            
            return [vocab.itos[idx] for idx in result_caption] #return the final sentence


In [42]:
## Santity Check
model = CNNtoRNN(256, 132, 2993, 1)
vocabulary = Vocabulary(5)
vocabulary.build_vocabulary(pd.read_csv('data/captions.txt')['caption'].tolist())
for img, caption in loader:
    print(model.caption_img(img[6].unsqueeze(0), vocabulary))
    break


['cyclist', 'moment', 'picnic', 'valley', 'tutu', 'window', 'stretches', 'time', 'plate', 'still', 'following', 'boats', 'itself', 'skeleton', 'dim', 'puffy', 'gestures', 'bmx', 'skeleton', 'raincoat', 'swampy', 'which', 'lease', 'blonde', 'floppy', 'tugging', 'low', 'along', 'bridge', 'sidelines', 'moment', 'boy', 'vault', 'competition', 'pattern', 'surrounding', 'wide', 'games', 'protection', 'mud', 'sumo', 'turkeys', 'fingers', 'size', 'vest', 'chew', 'neighborhood', 'puffy', 'smoke', 'partly', 'open', 'brunette', 'placed', 'sash', 'shining', 'dunes', 'leaving', 'of', 'jeep', 'sort', 'turning', 'helicopter', 'chubby', 'midst', 'obama', 'tank', 'dribbles', 'bed', 'chew', 'whose', 'asking', 'open', 'volleyball', 'breeds', 'laugh', 'course', 'mitt', 'product', 'chess', 'watched', 'waterfalls', 'neighborhood', 'sporting', 'ballet', 'toward', 'waterway', 'arcade', 'dances', 'ornate', 'camcorder', 'contest', 'dunking', 'shephard', 'across', 'tugging', 'camcorder', 'pads', 'winds', 'pain',

## Training

In [43]:
import torch.nn as nn
import torch
from torchvision.transforms import transforms
from tqdm import tqdm
from torch.utils.tensorboard import SummaryWriter


In [44]:
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [45]:
loader, dataset = get_loader(
    "data/images/", "data/captions.txt", transform
)

In [46]:
device = "cpu"
if torch.cuda.is_available():
    devcie = "cuda"
elif torch.backends.mps.is_available():
    device = "mps"

print(device)


mps


In [47]:
train_CNN = False
embed_size = 256
hidden_size = 256
vocab_size = len(dataset.vocabulary)
num_layers = 1
lr= 3e-4
num_epochs = 10

In [48]:
writer = SummaryWriter(log_dir="runs/data")
model = CNNtoRNN(embed_size, hidden_size, vocab_size, num_layers).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=dataset.vocabulary.stoi["<PAD>"])
optimizer = torch.optim.Adam(params=model.parameters(),
                             lr=lr)

In [49]:
step = 0

In [50]:
model.train()
for epoch in range(num_epochs):
    epoch_loss = 0
    for idx, (imgs, captions) in tqdm(enumerate(loader), total=len(loader), leave=False):
        imgs = imgs.to(device)
        captions = captions.to(device)

        optimizer.zero_grad()
        outputs = model(imgs, captions[:-1])
        loss = criterion(outputs.view(-1, outputs.shape[2]), captions.view(-1))
        epoch_loss += loss.item()
        writer.add_scalar("Training loss", loss.item(), global_step=step)
        step += 1
        loss.backward()
        optimizer.step()
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss/len(loader):.4f}')



                                                   

Epoch [1/10], Loss: 3.7636


                                                   

Epoch [2/10], Loss: 3.1378


                                                   

Epoch [3/10], Loss: 2.9393


                                                      

Epoch [4/10], Loss: 2.8146


                                                   

Epoch [5/10], Loss: 2.7204


                                                   

Epoch [6/10], Loss: 2.6448


                                                   

Epoch [7/10], Loss: 2.5789


                                                   

Epoch [8/10], Loss: 2.5221


                                                  

KeyboardInterrupt: 

In [29]:
device = "cpu"
if torch.cuda.is_available():
    devcie = "cuda"
elif torch.backends.mps.is_available():
    device = "mps"
    
loader, dataset = get_loader(
    "data/images/", "data/captions.txt", transform
)

embed_size = 256
hidden_size = 256
vocab_size = len(dataset.vocabulary)
num_layers = 1

model = CNNtoRNN(embed_size, hidden_size, vocab_size, num_layers).to(device)

model.load_state_dict(torch.load('model.pth'))

<All keys matched successfully>

In [26]:
transform = transforms.Compose([
    transforms.Resize((299, 299)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

In [63]:
image_path = 'data/images/263522013_d118d46b2d.jpg'  # Replace with your image path
image = Image.open(image_path)

In [64]:
image = transform(image)

In [65]:
model = model.to(device)
image = image.to(device)

In [66]:
model.caption_img(image.unsqueeze(0), dataset.vocabulary)

['<SOS>',
 'a',
 'young',
 'boy',
 'in',
 'a',
 'red',
 'shirt',
 'is',
 'jumping',
 'into',
 'a',
 'pool',
 '.',
 '<EOS>']

In [67]:
torch.save(model.state_dict(), 'model.pth')