## **Data Preprocess**

### **Mount Drive**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# !cp -r "/content/drive/My Drive/Skripsi 2.0/coco_dataset" /content/

In [None]:
# !unzip /content/coco_dataset/images/train2017.zip -d /content/coco_dataset/images

## **Library**

In [None]:
import numpy as np
import json
import os
import os.path
import pickle
from collections import Counter, defaultdict

from PIL import Image
from tqdm import tqdm
from pycocotools.coco import COCO

import torch
import torch.nn as nn
import torchvision.models as models
import torch.utils.data as data

from torchvision import transforms
from nltk.translate.bleu_score import corpus_bleu

import nltk
import math
import matplotlib.pyplot as plt

In [None]:
nltk.download("punkt")
nltk.download("punkt_tab")

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


True

## **Class & Function**

In [None]:
# Vocabulary

class Vocabulary(object):
    def __init__(
        self,
        vocab_threshold,
        vocab_file="./vocab.pkl",
        start_word="<start>",
        end_word="<end>",
        unk_word="<unk>",
        annotations_file="/content/coco_dataset/annotations/captions_train2017.json",
        vocab_from_file=False,
    ):

        self.vocab_threshold = vocab_threshold
        self.vocab_file = vocab_file
        self.start_word = start_word
        self.end_word = end_word
        self.unk_word = unk_word
        self.annotations_file = annotations_file
        self.vocab_from_file = vocab_from_file
        self.get_vocab()

    def get_vocab(self):
        if os.path.exists(self.vocab_file) and self.vocab_from_file:
            with open(self.vocab_file, "rb") as f:
                vocab = pickle.load(f)
            self.word2idx = vocab.word2idx
            self.idx2word = vocab.idx2word

        # create a new vocab file
        else:
            self.build_vocab()
            with open(self.vocab_file, "wb") as f:
                pickle.dump(self, f)

    def build_vocab(self):
        self.init_vocab()
        self.add_word(self.start_word)
        self.add_word(self.end_word)
        self.add_word(self.unk_word)
        self.add_captions()

    def init_vocab(self):
        self.word2idx = {}
        self.idx2word = {}
        self.idx = 0

    def add_word(self, word):
        if word not in self.word2idx:
            self.word2idx[word] = self.idx
            self.idx2word[self.idx] = word
            self.idx += 1

    def add_captions(self):
        coco = COCO(self.annotations_file)
        counter = Counter()
        ids = coco.anns.keys()
        for i, idx in enumerate(ids):
            caption = str(coco.anns[idx]["caption"])
            tokens = nltk.tokenize.word_tokenize(caption.lower())
            counter.update(tokens)

        words = [word for word, cnt in counter.items() if cnt >= self.vocab_threshold]

        for i, word in enumerate(words):
            self.add_word(word)

    def __call__(self, word):
        if word not in self.word2idx:
            return self.word2idx[self.unk_word]
        return self.word2idx[word]

    def __len__(self):
        return len(self.word2idx)

In [None]:
# Coco Dataset

class CoCoDataset(data.Dataset):
    def __init__(
        self,
        transform,
        mode,
        batch_size,
        vocab_threshold,
        vocab_file,
        start_word,
        end_word,
        unk_word,
        annotations_file,
        vocab_from_file,
        img_folder,
    ):
        self.transform = transform
        self.mode = mode
        self.batch_size = batch_size
        self.img_folder = img_folder

        self.vocab = Vocabulary(
            vocab_threshold,
            vocab_file,
            start_word,
            end_word,
            unk_word,
            annotations_file,
            vocab_from_file,
        )
        if self.mode == "train":
            self.coco = COCO(annotations_file)
            self.ids = list(self.coco.anns.keys())

            tokenized_captions = [
                nltk.tokenize.word_tokenize(
                    str(self.coco.anns[self.ids[index]]["caption"]).lower()
                )
                for index in tqdm(np.arange(len(self.ids)))
            ]

            self.caption_lengths = [len(token) for token in tokenized_captions]
        else:
            test_info = json.loads(open(annotations_file).read())
            self.paths = [item["file_name"] for item in test_info["images"]]

    def __getitem__(self, index):
        if self.mode == "train":
            ann_id = self.ids[index]
            caption = self.coco.anns[ann_id]["caption"]
            img_id = self.coco.anns[ann_id]["image_id"]
            path = self.coco.loadImgs(img_id)[0]["file_name"]

            image = Image.open(os.path.join(self.img_folder, path)).convert("RGB")
            image = self.transform(image)

            tokens = nltk.tokenize.word_tokenize(str(caption).lower())
            caption = [self.vocab(self.vocab.start_word)]
            caption.extend([self.vocab(token) for token in tokens])
            caption.append(self.vocab(self.vocab.end_word))
            caption = torch.Tensor(caption).long()

            return image, caption

        else:
            path = self.paths[index]

            pil_image = Image.open(os.path.join(self.img_folder, path)).convert("RGB")
            orig_image = np.array(pil_image)
            image = self.transform(pil_image)

            return orig_image, image

    def get_train_indices(self):
        sel_length = np.random.choice(self.caption_lengths)

        all_indices = np.where(
            [
                self.caption_lengths[i] == sel_length
                for i in np.arange(len(self.caption_lengths))
            ]
        )[0]

        indices = list(np.random.choice(all_indices, size=self.batch_size))
        return indices

    def __len__(self):
        if self.mode == "train":
            return len(self.ids)
        else:
            return len(self.paths)

In [None]:
# Get Loader

def get_loader(
    transform,
    mode="train",
    batch_size=1,
    vocab_threshold=None,
    vocab_file="./vocab.pkl",
    start_word="<start>",
    end_word="<end>",
    unk_word="<unk>",
    vocab_from_file=True,
    num_workers=4,
    cocoapi_loc="/opt",
):

    if mode == "train":
        img_folder = os.path.join(cocoapi_loc, "images/train2017/")
        annotations_file = os.path.join(
            cocoapi_loc, "annotations/captions_train2017.json"
        )

    elif mode == "test":
        img_folder = os.path.join(cocoapi_loc, "images/val2017/")
        annotations_file = os.path.join(
            cocoapi_loc, "annotations/captions_train2017.json"
        )
    else:
        raise ValueError(f"Invalid mode: {mode}")

    # COCO caption dataset.
    dataset = CoCoDataset(
        transform=transform,
        mode=mode,
        batch_size=batch_size,
        vocab_threshold=vocab_threshold,
        vocab_file=vocab_file,
        start_word=start_word,
        end_word=end_word,
        unk_word=unk_word,
        annotations_file=annotations_file,
        vocab_from_file=vocab_from_file,
        img_folder=img_folder,
    )

    if mode == "train":
        indices = dataset.get_train_indices()
        initial_sampler = data.sampler.SubsetRandomSampler(indices=indices)

        data_loader = data.DataLoader(
            dataset=dataset,
            num_workers=num_workers,
            batch_sampler=data.sampler.BatchSampler(
                sampler=initial_sampler, batch_size=dataset.batch_size, drop_last=False
            ),
        )
    else:
        data_loader = data.DataLoader(
            dataset=dataset,
            batch_size=dataset.batch_size,
            shuffle=True,
            num_workers=num_workers,
        )

    return data_loader

In [None]:
# Val Loader

def val_get_loader(
    transform,
    mode="valid",
    batch_size=1,
    vocab_threshold=None,
    vocab_file="./vocab.pkl",
    start_word="<start>",
    end_word="<end>",
    unk_word="<unk>",
    vocab_from_file=True,
    num_workers=0,
    cocoapi_loc="/opt",
):

    if mode == "train":
        img_folder = os.path.join(cocoapi_loc, "images/train2017/")
        annotations_file = os.path.join(
            cocoapi_loc, "annotations/captions_train2017.json"
        )
    elif mode == "test":
        img_folder = os.path.join(cocoapi_loc, "images/val2017/")
        annotations_file = os.path.join(
            cocoapi_loc, "annotations/captions_val2017.json"
        )
    elif mode == "valid":
        img_folder = os.path.join(cocoapi_loc, "images/val2017/")
        annotations_file = os.path.join(
            cocoapi_loc, "annotations/captions_val2017.json"
        )
    else:
        raise ValueError(f"Invalid mode: {mode}")

    dataset = CoCoDataset(
        transform=transform,
        mode=mode,
        batch_size=batch_size,
        vocab_threshold=vocab_threshold,
        vocab_file=vocab_file,
        start_word=start_word,
        end_word=end_word,
        unk_word=unk_word,
        annotations_file=annotations_file,
        vocab_from_file=vocab_from_file,
        img_folder=img_folder,
    )

    if mode == "train":
        indices = dataset.get_train_indices()

        initial_sampler = data.sampler.SubsetRandomSampler(indices=indices)

        data_loader = data.DataLoader(
            dataset=dataset,
            num_workers=num_workers,
            batch_sampler=data.sampler.BatchSampler(
                sampler=initial_sampler, batch_size=dataset.batch_size, drop_last=False
            ),
        )
    else:
        data_loader = data.DataLoader(
            dataset=dataset,
            batch_size=dataset.batch_size,
            shuffle=True,
            num_workers=num_workers,
        )

    return data_loader


class CoCoDataset(data.Dataset):
    def __init__(
        self,
        transform,
        mode,
        batch_size,
        vocab_threshold,
        vocab_file,
        start_word,
        end_word,
        unk_word,
        annotations_file,
        vocab_from_file,
        img_folder,
    ):
        self.transform = transform
        self.mode = mode
        self.batch_size = batch_size
        self.vocab = Vocabulary(
            vocab_threshold,
            vocab_file,
            start_word,
            end_word,
            unk_word,
            annotations_file,
            vocab_from_file,
        )
        self.img_folder = img_folder
        if self.mode == "train":
            self.coco = COCO(annotations_file)
            self.ids = list(self.coco.anns.keys())

            all_tokens = [
                nltk.tokenize.word_tokenize(
                    str(self.coco.anns[self.ids[index]]["caption"]).lower()
                )
                for index in tqdm(np.arange(len(self.ids)))
            ]
            self.caption_lengths = [len(token) for token in all_tokens]
        else:
            test_info = json.loads(open(annotations_file).read())
            self.paths = [item["file_name"] for item in test_info["images"]]

    def __getitem__(self, index):
        if self.mode == "train":
            ann_id = self.ids[index]
            caption = self.coco.anns[ann_id]["caption"]
            img_id = self.coco.anns[ann_id]["image_id"]
            path = self.coco.loadImgs(img_id)[0]["file_name"]

            image = Image.open(os.path.join(self.img_folder, path)).convert("RGB")
            image = self.transform(image)

            tokens = nltk.tokenize.word_tokenize(str(caption).lower())
            caption = []
            caption.append(self.vocab(self.vocab.start_word))
            caption.extend([self.vocab(token) for token in tokens])
            caption.append(self.vocab(self.vocab.end_word))
            caption = torch.Tensor(caption).long()

            return image, caption

        elif self.mode == "valid":
            path = self.paths[index]
            image_id = int(path.split("/")[0].split(".")[0].split("_")[-1])
            pil_image = Image.open(os.path.join(self.img_folder, path)).convert("RGB")
            image = self.transform(pil_image)

            return image_id, image

        else:
            path = self.paths[index]

            pil_image = Image.open(os.path.join(self.img_folder, path)).convert("RGB")
            orig_image = np.array(pil_image)
            image = self.transform(pil_image)

            return orig_image, image

    def get_train_indices(self):
        sel_length = np.random.choice(self.caption_lengths)
        all_indices = np.where(
            [
                self.caption_lengths[i] == sel_length
                for i in np.arange(len(self.caption_lengths))
            ]
        )[0]
        indices = list(np.random.choice(all_indices, size=self.batch_size))
        return indices

    def __len__(self):
        if self.mode == "train":
            return len(self.ids)
        else:
            return len(self.paths)

In [None]:
# Clean sentence

def clean_sentence(output, idx2word):
    sentence = ""
    for i in output:
        word = idx2word[i]
        if i == 0:
            continue
        if i == 1:
            break
        if i == 18:
            sentence = sentence + word
        else:
            sentence = sentence + " " + word
    return sentence

In [None]:
# BLEU Score

def bleu_score(true_sentences, predicted_sentences):
    hypotheses = []
    references = []
    for img_id in set(true_sentences.keys()).intersection(
        set(predicted_sentences.keys())
    ):
        img_refs = [cap.split() for cap in true_sentences[img_id]]
        references.append(img_refs)
        hypotheses.append(predicted_sentences[img_id][0].strip().split())

    return corpus_bleu(references, hypotheses)

In [None]:
# Encoder & Decoder

class EncoderCNN(nn.Module):
    def __init__(self, embed_size):
        super(EncoderCNN, self).__init__()
        resnet = models.resnet50(pretrained=True)

        for param in resnet.parameters():
            param.requires_grad_(False)

        modules = list(resnet.children())[:-1]
        self.resnet = nn.Sequential(*modules)
        self.embed = nn.Linear(resnet.fc.in_features, embed_size)

    def forward(self, images):
        features = self.resnet(images)
        features = features.view(features.size(0), -1)
        features = self.embed(features)
        return features

class DecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1):
        super(DecoderRNN, self).__init__()

        self.hidden_dim = hidden_size

        self.embed = nn.Embedding(vocab_size, embed_size)

        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)

        self.linear = nn.Linear(hidden_size, vocab_size)

        self.hidden = (torch.zeros(1, 1, hidden_size), torch.zeros(1, 1, hidden_size))

    def forward(self, features, captions):
        cap_embedding = self.embed(
            captions[:, :-1]
        )

        embeddings = torch.cat((features.unsqueeze(dim=1), cap_embedding), dim=1)

        lstm_out, self.hidden = self.lstm(
            embeddings
        )
        outputs = self.linear(lstm_out)

        return outputs

    def sample(self, inputs, states=None, max_len=20):
        res = []

        for i in range(max_len):
            lstm_out, states = self.lstm(
                inputs, states
            )
            outputs = self.linear(lstm_out.squeeze(dim=1))
            _, predicted_idx = outputs.max(dim=1)
            res.append(predicted_idx.item())

            if predicted_idx == 1:
                break
            inputs = self.embed(predicted_idx)

            inputs = inputs.unsqueeze(1)

        return res

In [None]:
# Pre-process image

transform_train = transforms.Compose(
    [
        transforms.Resize(256),
        transforms.RandomCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(
            (0.485, 0.456, 0.406),
            (0.229, 0.224, 0.225),
        ),
    ]
)

cocoapi_dir = '/content/coco_dataset/'

## **Model**

In [None]:
# Compute unit: GPU

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [None]:
# Setting hyperparameters

batch_size = 128
vocab_threshold = 5
vocab_from_file = True
embed_size = 256
hidden_size = 512
num_epochs = 5
save_every = 1
print_every = 20

In [None]:
# Create vocab

# data_loader = get_loader(
#     transform=transform_train,
#     mode="train",
#     batch_size=batch_size,
#     vocab_threshold=vocab_threshold,
#     vocab_from_file=False,
#     cocoapi_loc=cocoapi_dir,
# )

In [None]:
# Set train data loader

data_loader = get_loader(
    transform=transform_train,
    mode="train",
    batch_size=batch_size,
    vocab_threshold=vocab_threshold,
    vocab_from_file=vocab_from_file,
    cocoapi_loc=cocoapi_dir,
)

Vocabulary successfully loaded from vocab.pkl file!
loading annotations into memory...
Done (t=1.02s)
creating index...
index created!
Obtaining caption lengths...


100%|██████████| 591753/591753 [00:53<00:00, 11096.37it/s]


In [None]:
# Initialize architecture

vocab_size = len(data_loader.dataset.vocab)

encoder = EncoderCNN(embed_size).to(device)
decoder = DecoderRNN(embed_size, hidden_size, vocab_size).to(device)

criterion = (
    nn.CrossEntropyLoss().cuda() if torch.cuda.is_available() else nn.CrossEntropyLoss()
)

params = list(decoder.parameters()) + list(encoder.embed.parameters())

optimizer = torch.optim.Adam(params, lr=0.001)

total_step = math.ceil(len(data_loader.dataset) / data_loader.batch_sampler.batch_size)



In [None]:
# Define checkpoint path

model_save_path = '/content/drive/MyDrive/Skripsi 2.0/save_path'
os.makedirs(model_save_path, exist_ok=True)

In [None]:
# Training Loop

for epoch in range(1, num_epochs + 1):
    for i_step in range(1, total_step + 1):
        indices = data_loader.dataset.get_train_indices()

        new_sampler = data.sampler.SubsetRandomSampler(indices=indices)
        data_loader.batch_sampler.sampler = new_sampler

        images, captions = next(iter(data_loader))

        images = images.to(device)
        captions = captions.to(device)

        decoder.zero_grad()
        encoder.zero_grad()

        features = encoder(images)
        outputs = decoder(features, captions)

        loss = criterion(outputs.view(-1, vocab_size), captions.view(-1))

        loss.backward()

        optimizer.step()

        stats = (
            f"Epoch [{epoch}/{num_epochs}], Step [{i_step}/{total_step}], "
            f"Loss: {loss.item():.4f}, Perplexity: {np.exp(loss.item()):.4f}"
        )

        if i_step % print_every == 0:
            print("\r" + stats)

    # Save checkpoint
    if epoch % save_every == 0:
        torch.save(
            decoder.state_dict(), os.path.join(model_save_path, f"decoder-{epoch}.pkl")
        )
        torch.save(
            encoder.state_dict(), os.path.join(model_save_path, f"encoder-{epoch}.pkl")
        )

Epoch [1/5], Step [100/2312], Loss: 3.8825, Perplexity: 48.5457
Epoch [1/5], Step [200/2312], Loss: 3.1019, Perplexity: 22.2408
Epoch [1/5], Step [300/2312], Loss: 2.8584, Perplexity: 17.4331
Epoch [1/5], Step [400/2312], Loss: 3.0613, Perplexity: 21.3549
Epoch [1/5], Step [500/2312], Loss: 2.5684, Perplexity: 13.0455
Epoch [1/5], Step [600/2312], Loss: 2.8741, Perplexity: 17.7097
Epoch [1/5], Step [700/2312], Loss: 2.7965, Perplexity: 16.3874
Epoch [1/5], Step [800/2312], Loss: 2.4042, Perplexity: 11.0701
Epoch [1/5], Step [900/2312], Loss: 2.3060, Perplexity: 10.0342
Epoch [1/5], Step [1000/2312], Loss: 2.5846, Perplexity: 13.2584
Epoch [1/5], Step [1100/2312], Loss: 2.2560, Perplexity: 9.5449
Epoch [1/5], Step [1200/2312], Loss: 2.2348, Perplexity: 9.3446
Epoch [1/5], Step [1300/2312], Loss: 2.3044, Perplexity: 10.0182
Epoch [1/5], Step [1400/2312], Loss: 2.3209, Perplexity: 10.1847
Epoch [1/5], Step [1500/2312], Loss: 2.3148, Perplexity: 10.1230
Epoch [1/5], Step [1600/2312], Loss:

In [None]:
# Save final epoch

torch.save(decoder.state_dict(), os.path.join(model_save_path, 'decoder-final.pkl'))
torch.save(encoder.state_dict(), os.path.join(model_save_path, 'encoder-final.pkl'))

In [None]:
# Validation step

transform_test = transforms.Compose(
    [
        transforms.Resize(224),
        transforms.ToTensor(),
        transforms.Normalize(
            (0.485, 0.456, 0.406),
            (0.229, 0.224, 0.225),
        ),
    ]
)


# Create the data loader
val_data_loader = val_get_loader(
    transform=transform_test, mode="valid", cocoapi_loc=cocoapi_dir
)

encoder_file = "encoder-3.pt"
decoder_file = "decoder-3.pt"

encoder = EncoderCNN(embed_size).to(device)
decoder = DecoderRNN(embed_size, hidden_size, vocab_size).to(device)

# Load the trained weights
encoder.load_state_dict(torch.load(os.path.join(model_save_path, encoder_file)))
decoder.load_state_dict(torch.load(os.path.join(model_save_path, decoder_file)))

# Set to eval
encoder.eval()
decoder.eval()

Vocabulary successfully loaded from vocab.pkl file!


  encoder.load_state_dict(torch.load(os.path.join(model_save_path, encoder_file)))
  decoder.load_state_dict(torch.load(os.path.join(model_save_path, decoder_file)))


DecoderRNN(
  (embed): Embedding(2334, 256)
  (lstm): LSTM(256, 512, batch_first=True)
  (linear): Linear(in_features=512, out_features=2334, bias=True)
)

In [None]:
# Infer captions for all images

pred_result = defaultdict(list)
for img_id, img in tqdm(val_data_loader):
    img = img.to(device)
    with torch.no_grad():
        features = encoder(img)
        features = features.unsqueeze(0).permute(1, 0, 2)
        output = decoder.sample(features)
    sentence = clean_sentence(output, val_data_loader.dataset.vocab.idx2word)
    pred_result[img_id.item()].append(sentence)

100%|██████████| 5000/5000 [01:41<00:00, 49.17it/s]


In [None]:
print(output)

[0, 3, 74, 60, 52, 53, 3, 240, 11, 1]


In [None]:
with open(
    os.path.join(cocoapi_dir, "annotations/captions_val2017.json"), "r"
) as f:
    caption = json.load(f)

valid_annot = caption["annotations"]
valid_result = defaultdict(list)
for i in valid_annot:
    valid_result[i["image_id"]].append(i["caption"].lower())

In [None]:
list(valid_result.values())[:3]

[['a black honda motorcycle parked in front of a garage.',
  'a honda motorcycle parked in a grass driveway',
  'a black honda motorcycle with a dark burgundy seat.',
  'ma motorcycle parked on the gravel in front of a garage',
  'a motorcycle with its brake extended standing outside'],
 ['an office cubicle with four different types of computers.',
  'the home office space seems to be very cluttered.',
  'an office with desk computer and chair and laptop.',
  'office setting with a lot of computer screens.',
  'a desk and chair in an office cubicle.'],
 ['a small closed toilet in a cramped space.',
  'a tan toilet and sink combination in a small room.',
  'this is an advanced toilet with a sink and control panel.',
  'a close-up picture of a toilet with a fountain.',
  'off white toilet with a faucet and controls. ']]

In [None]:
list(pred_result.values())[:3]

[[' a man is sitting on a table .'],
 [' a man is sitting on a table .'],
 [' a man is sitting on a table .']]

In [None]:
bleu_score(true_sentences=valid_result, predicted_sentences=pred_result)

0.056271270164749254

## **Model Test**

In [None]:
# Creating the test data loader

data_loader = get_loader(transform=transform_test, mode="test", cocoapi_loc=cocoapi_dir)

In [None]:
# Predict caption

def get_prediction(idx2word, i=0, save=False):
    orig_image, image = next(iter(data_loader))
    image = image.to(device)
    features = encoder(image).unsqueeze(1)
    output = decoder.sample(features)
    sentence = clean_sentence(output, idx2word)

    ax = plt.axes()

    ax.spines["right"].set_visible(False)
    ax.spines["top"].set_visible(False)
    ax.spines["bottom"].set_visible(False)
    ax.spines["left"].set_visible(False)

    ax.xaxis.set_major_locator(plt.NullLocator())
    ax.yaxis.set_major_locator(plt.NullLocator())

    plt.imshow(np.squeeze(orig_image))
    plt.xlabel(sentence, fontsize=12)
    if save:
        plt.savefig(f"samples/sample_{i:03}.png", bbox_inches="tight")
    plt.show()

In [None]:
for i in range(10):
    get_prediction(data_loader.dataset.vocab.idx2word, i=i)