# Auto Caption

In [1]:
import io
import os
import pandas as pd
import matplotlib.pyplot as plt

import PIL.Image

#os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

import torch
from torch import nn
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
import torchvision
import torchvision.models as models
import torchvision.datasets as dset
import torchvision.transforms as transforms
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

from pycocotools.coco import COCO
from PIL import Image
from tqdm import tqdm_notebook as tqdm

This project is going to use the COCO annotations dataset. There are multiple captions available for each image, but to keep the training data manageable I will only use 2 captions per image.

In [2]:
data_path=r"C:\Users\water\Documents\datasets\coco"
json_path=r"C:\Users\water\Documents\datasets\coco\annotations_trainval2017\annotations"

CAPS_PER_IMAGE = 2

class CocoDataset(Dataset):
    def __init__(self, root_dir, ann_file, caps_per_image=1, transform=None, target_transform=None):
        self.root_dir = root_dir
        self.ann_file = ann_file
        self.transform = transform
        self.target_transform = target_transform
        
        self.coco = COCO(self.ann_file)
        coco = self.coco
        self.ids = list(sorted(self.coco.imgs.keys()))
        self.imgs = []
        self.caps = []
        
        for i, _id in enumerate(self.ids):
            ann_ids = coco.getAnnIds(imgIds=_id)
            anns = coco.loadAnns(ann_ids)
            
            for i, ann in enumerate(anns):
                self.caps.append(ann['caption'])
                self.imgs.append(coco.loadImgs(_id)[0]['file_name'])
                if i + 1 == caps_per_image:
                    break
        
        self.df = pd.DataFrame({'img_path': self.imgs, 'caption': self.caps})
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx:int):
        path = self.df['img_path'].iloc[idx]
        img = Image.open(os.path.join(self.root_dir, path)).convert('RGB')
        target = self.df['caption'].iloc[idx]
        
        if self.transform is not None:
            img = self.transform(img)
            
        if self.target_transform is not None:
            target = self.target_transform(target)
        
        return img, target
    
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

train_ds = CocoDataset(
    root_dir=data_path + '\\train2017', 
    ann_file=json_path + '\\captions_train2017.json',
    caps_per_image=CAPS_PER_IMAGE,
    transform=transform
    )

val_ds = CocoDataset(
    root_dir=data_path + '\\val2017', 
    ann_file=json_path + '\\captions_val2017.json',
    caps_per_image=CAPS_PER_IMAGE,
    transform=transform
    )

print('Length of training set:', len(train_ds))
print('Length of validation set:', len(val_ds))

img, caption = train_ds[1]
print(img.size())
print(caption)

loading annotations into memory...
Done (t=0.77s)
creating index...
index created!
loading annotations into memory...
Done (t=0.06s)
creating index...
index created!
Length of training set: 236574
Length of validation set: 10000
torch.Size([3, 224, 224])
A meal is presented in brightly colored plastic trays.


In [3]:
CWD = os.getcwd()
def save_model(model):
    model_path = os.path.join(CWD, 'caption_model.pt')
    torch.save(model, model_path)

def save_vocab(vocab):
    vocab_path = os.path.join(CWD + 'vocab.pt')
    torch.save(vocab, vocab_path)

To convert the captions into tensors, a vocabulary object will be built. The tokenizer used is the 'basic english' tokenizer from torchtext. Padding will be used to make all captions the same length and special tokens will be used to indicate the beginning and ending of a caption, as well as an unknown token symbol (a word that's not in the vocabulary.) The minimum frequency is somewhat arbitrary, I will use 15.

In [4]:
tokenizer = get_tokenizer('basic_english')
def tokenize(data_slice):
    _, caption = data_slice
    return tokenizer(caption)

MIN_FREQ = 15
UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3
special_symbols = ['<unk>', '<pad>', '<bos>', '<eos>']

def build_vocab(vocab=None):
    if not vocab:
        vocab = build_vocab_from_iterator(
            iterator=map(tokenize, train_ds),
            min_freq=MIN_FREQ,
            specials=special_symbols,
        )
    vocab.set_default_index(vocab['<unk>'])
    return vocab

vocab = build_vocab()
save_vocab(vocab)
print('Number of words in vocabulary:', len(vocab))


Number of words in vocabulary: 3884


In [5]:
text_pipeline = lambda x: vocab(tokenizer(x))
itos = vocab.get_itos()

def collate_fn(batch):
    image_batch, target_batch = [], []
    for image, caption in batch:
        text_token_ids = text_pipeline(caption)
        text_tensor = torch.cat((torch.tensor([BOS_IDX]),
                                 torch.tensor(text_token_ids),
                                 torch.tensor([EOS_IDX])))
        target_batch.append(text_tensor)
        image_batch.append(image.unsqueeze(0))
        #print([itos[token] for token in text_tensor])
        
    image_batch = torch.cat(image_batch)
    target_batch = pad_sequence(target_batch, batch_first=True, padding_value=PAD_IDX)
    #for target in target_batch:
        #print([itos[token] for token in target])
    
    return image_batch, target_batch

In [6]:
batch_size = 32

train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, pin_memory=True, collate_fn=collate_fn)
val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=True, pin_memory=True, collate_fn=collate_fn)

'''
for images, captions in train_loader:
    for caption in captions:
        for token in caption:
            print(itos[token])
    print(image.size())
    #print(caption)
    break
'''

'\nfor images, captions in train_loader:\n    for caption in captions:\n        for token in caption:\n            print(itos[token])\n    print(image.size())\n    #print(caption)\n    break\n'

Rather than building the model all in one class, I'm going to separate the encoder and decoder part of the model into different classes. The encoder will be a pretrained ResNet, the decoder will be an LSTM based network.

In [22]:
class Encoder(nn.Module):
    def __init__(self, embed_size:int=1024):
        super(Encoder, self).__init__()
    
        # Pretrained image classifier ResNet-152
        self.CNN = models.resnet152(pretrained=True, progress=False)
        for param in self.CNN.parameters():
            param.requires_grad_(False)
        self.CNN.classifier = nn.Linear(self.CNN.fc.in_features, self.CNN.fc.in_features)
        
        self.dropout = nn.Dropout(0.8)
        self.fc2 = nn.Linear(self.CNN.fc.out_features, embed_size)
        
    def forward(self, images):
        x = F.relu(self.CNN(images))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        return x

In [23]:
class Decoder(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1):
        super(Decoder, self).__init__()
        
        self.embed_size = embed_size
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size
        self.num_layers = num_layers
        
        self.lstm = nn.LSTM(input_size=self.embed_size, hidden_size=self.hidden_size,
                           num_layers=self.num_layers, batch_first=True)
        self.embed = nn.Embedding(num_embeddings=self.vocab_size, embedding_dim=self.embed_size)
        self.linear = nn.Linear(hidden_size, self.vocab_size)
        
    def forward(self, features, captions):
            embeddings = self.embed(captions)
            #print('caption size:', captions.size())
            #embeddings = torch.permute(embeddings, (1, 0, 2))
            features = features.unsqueeze(1)
            #print('features size:', features.size())
            #print('embedding size:', embeddings.size())
            embeddings = torch.cat((features, embeddings), dim=1)
            hidden_states, cell_states = self.lstm(embeddings)
            outputs = self.linear(hidden_states)
            return outputs

In [24]:
class CaptionModel(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers):
        super(CaptionModel, self).__init__()
        self.encoder = Encoder(embed_size)
        self.decoder = Decoder(embed_size, hidden_size, vocab_size, num_layers)
        
    def forward(self, image, caption):
        x = self.encoder(image)
        x = self.decoder(x, caption)
        return x
    
    def generate_caption(self, image, vocabulary, max_length=50):
        generated_caption = []
        
        with torch.no_grad():
            x = self.encoder(image).unsqueeze(0)
            cell_states = None
            
            for i in range (max_length):
                hidden_states, cell_states = self.decoder.lstm(x, cell_states)
                output = self.decoder.linear(hidden_states.squeeze(0))
                predicted = output.argmax(1)
                print(predicted.shape)
                print(predicted)
                generated_caption.append(predicted[i])
                x = self.decoder.embed(output.long()).unsqueeze(0)
                
                if itos[predicted[i]] == '<eos>':
                    break
        return [itos[i] for i in generated_caption]

In [25]:
EMBED_SIZE = 512
HIDDEN_SIZE = 512


model = CaptionModel(
    embed_size=EMBED_SIZE,
    hidden_size=HIDDEN_SIZE,
    vocab_size=len(vocab),
    num_layers=5)

#model.load_state_dict(torch.load(os.path.join(CWD, 'caption_model.pt'))) 
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#device = torch.device('cpu')
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [27]:
def train_one_epoch():
    model.train()
    running_loss = 0
    last_loss = 0
    
    for i, data in tqdm(enumerate(train_loader), total=len(train_loader)):
        inputs, labels = data[0].to(device), data[1].to(device)
        #print('inputs shape:', inputs.size())
        #print('labels shape:', labels.size())
        optimizer.zero_grad()
        outputs = model(inputs, labels[:,:-1])
        #print('outputs size:', outputs.size())
        loss = criterion(outputs.reshape(-1, outputs.shape[2]), labels.reshape(-1))
        #loss = criterion(score.reshape(-1, score.shape[2]), labels.reshape(-1))
            
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        
        if i % 1000 == 999:
            last_loss = running_loss / 1000 # loss per batch
            print('  batch {} loss: {}'.format(i + 1, last_loss))
            running_loss = 0.
            
    return last_loss

def val_one_epoch(train_loss):
    model.eval()
    running_loss = 0
    
    with torch.no_grad():
        for i, data in enumerate(val_loader):
            inputs, labels = data[0].to(device), data[1].to(device)
            outputs = model(inputs, labels[:,:-1])
            loss = criterion(outputs.reshape(-1, outputs.shape[2]), labels.reshape(-1))
            
            running_loss += loss.item() 
            
    val_loss = running_loss / (i + 1)
    print('LOSS train {} valid {}'.format(train_loss, val_loss))
    return val_loss

In [28]:

EPOCHS = 1

min_loss = 1000
for epoch in range(EPOCHS):
    print('Epoch number:', epoch)
    train_loss = train_one_epoch()
    val_loss = val_one_epoch(train_loss)
    
    if val_loss < min_loss:
        save_model(model)
print('Training Complete')


Epoch number: 0


Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  for i, data in tqdm(enumerate(train_loader), total=len(train_loader)):


  0%|          | 0/7393 [00:00<?, ?it/s]

  batch 1000 loss: 2.917600263595581
  batch 2000 loss: 2.1631980882883073
  batch 3000 loss: 1.9276047449707985
  batch 4000 loss: 1.8256837568283082
  batch 5000 loss: 1.7724254564642907
  batch 6000 loss: 1.718541918039322
  batch 7000 loss: 1.6709784666895866
LOSS train 1.6709784666895866 valid 1.658398258038603
Training Complete


Training and validation loss confirm no overfitting.

TODO: Train for longer on cloud platform.

In [21]:
num_test = 1
model.eval()
with torch.no_grad():
    for i, data in enumerate(val_loader):
        inputs, labels = data[0].to(device), data[1].to(device)
        outputs = model(inputs, labels[:,:-1])
        #print(outputs)
        print(inputs[1, :, :, :].size())
        model.generate_caption(inputs, vocab)
        if i == num_test:
            break

torch.Size([3, 224, 224])
torch.Size([32])
tensor([ 2,  4, 24,  4,  4,  7,  7,  1,  4,  7,  1,  1,  1,  1,  1,  1,  1,  1,
         1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1],
       device='cuda:0')


ValueError: only one element tensors can be converted to Python scalars