In [None]:
import os

BASE_DIR = os.getcwd()
data_folder = os.path.join(BASE_DIR, 'data')
data_name = 'coco_5_cap_per_img_5_min_word_freq'

In [None]:
import h5py
import json

# temp dataset

captions = []
caplens = []

captions_per_image = 5

h5_file = h5py.File(os.path.join(data_folder, ('VAL_IMAGES_' + data_name + '.hdf5')), 'r')
images = h5_file['images']

with open(os.path.join(data_folder, ('VAL_CAPTIONS_' + data_name + '.json')), 'r') as json_file:
    captions = json.load(json_file)

# Load caption lengths (completely into memory)
with open(os.path.join(data_folder, ('VAL_CAPLENS_' + data_name + '.json')), 'r') as json_file:
    caplens = json.load(json_file)

print(len(images))
print(len(captions))
print(len(caplens))

In [None]:
data_size = 4000

images = images[:data_size // 5]
captions = captions[:data_size]
caplens = caplens[:data_size]

In [None]:
captions_lens = set([len(i) for i in captions])
captions_lens

In [None]:
from sklearn.model_selection import train_test_split
from tqdm import tqdm

assert len(images) == 800
assert len(captions) == 4000
assert len(caplens) == 4000

data = []

for i in tqdm(range(data_size)):
    img = images[i // 5]
    cp = captions[i]
    clen = caplens[i]

    data.append([img, cp, clen])

train_data, val_data = train_test_split(data, test_size=0.2, shuffle=False)

print()
print(len(train_data))
print(len(val_data))

In [None]:
import joblib

joblib.dump(train_data, 'data/train_data.pickle')
joblib.dump(val_data, 'data/val_data.pickle')

In [None]:
import joblib
import torch
from torch.utils.data import Dataset

class CaptionDataset(Dataset):
    """
    A PyTorch Dataset class to be used in a PyTorch DataLoader to create batches.
    """

    def __init__(self, raw_data, captions_per_image=5, mode='TRAIN'):
        super(CaptionDataset, self).__init__()
        self.mode = mode
        self.data_size = len(raw_data)
        self.captions_per_image = captions_per_image
        self.raw_images, self.raw_captions, self.raw_caplens = list(map(list, zip(*raw_data)))

        self.images = torch.FloatTensor(self.raw_images)
        self.captions = torch.LongTensor(self.raw_captions)
        self.caplens = torch.LongTensor(self.raw_caplens)

    def __getitem__(self, i):
        img = self.images[i]
        caption = self.captions[i]
        caplen = self.caplens[i]

        if self.mode == 'TRAIN':
            return img, caption, caplen
        else:
            # For validation of testing, also return all 'captions_per_image' captions to find BLEU-4 score
            img_index = i // self.captions_per_image
            all_captions = self.captions[img_index * 5:(img_index + 1) * 5]
            return img, caption, caplen, all_captions

    def __len__(self):
        return self.data_size

In [None]:
import joblib

raw_train_data = joblib.load(os.path.join(data_folder, 'train_data.pickle'))
raw_val_data = joblib.load(os.path.join(data_folder, 'val_data.pickle'))

In [None]:
from torch.utils.data import DataLoader
from torchvision import transforms

# normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

train_dataset = CaptionDataset(raw_data=raw_train_data, mode='TRAIN')
val_dataset = CaptionDataset(raw_data=raw_val_data, mode='VAL')

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

In [None]:
sample = next(iter(train_loader))
sample_img, sample_caption, sample_caplen = sample

print(sample_img.shape)
print(sample_caption.shape)
print(sample_caplen.shape)

In [None]:
from gluonnlp.data import SentencepieceTokenizer
from kobert.utils import get_tokenizer
from kobert.pytorch_kobert import get_pytorch_kobert_model


class Tokenizer:
    def __init__(self):
        self.model, self.vocab = get_pytorch_kobert_model()
        self.sp = SentencepieceTokenizer(get_tokenizer())

    def tokenize(self, sentence: str):
        return self.sp(sentence)

    def convert_ids_to_tokens(self, ids: list):
        return [self.vocab.idx_to_token[idx] for idx in ids]

    def convert_tokens_to_ids(self, tokens: list):
        return [self.vocab.token_to_idx[token] for token in tokens]

    def get_word_map(self):
        return self.vocab.token_to_idx

    def get_vocab_size(self):
        return len(self.vocab.token_to_idx)  # 7002

    def get_embedding_dim(self):
        return list(self.model.embeddings.children())[0].embedding_dim  # 768

    def get_pretrained_embedding(self):
        return self.model.embeddings.word_embeddings

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models import resnet101


class Encoder(nn.Module):
    def __init__(self, encoded_image_size=14):
        super(Encoder, self).__init__()
        self.enc_image_size = encoded_image_size

        resnet = resnet101(pretrained=True)

        modules = list(resnet.children())[:-2]
        self.resnet = nn.Sequential(*modules)

        self.adaptive_pool = nn.AdaptiveAvgPool2d((encoded_image_size, encoded_image_size))
        self.fine_tune()

    def forward(self, images):
        """
        :param images: images, a tensor of dimensions (batch_size, 3, image_size, image_size)
        :return: encoded images
        """
        # (batch_size, 2048, image_size/32, image_size/32)
        out = self.resnet(images)
        # (batch_size, 2048, encoded_image_size, encoded_image_size)
        out = self.adaptive_pool(out)
        # (batch_size, encoded_image_size, encoded_image_size, 2048)
        out = out.permute(0, 2, 3, 1)

        # print('출력해보기_image_feature', out)
        return out

    def fine_tune(self, fine_tune=True):
        """
         Allow or prevent the computation of gradients for convolutional blocks 2 through 4 of the encoder.

         :param fine_tune: Allow?
        """
        # parameter update되지 않도록 고정
        for p in self.resnet.parameters():
            p.requires_grad = False
        # If fine-tuning, only fine-tune convolutional blocks 2 through 4
        for c in list(self.resnet.children())[5:]:
            for p in c.parameters():
                p.requires_grad = fine_tune

In [None]:
class Attention(nn.Module):
    def __init__(self, encoder_feature_size, decoder_hidden_size, attention_size):
        super(Attention, self).__init__()
        self.encoder_feature_size = encoder_feature_size
        self.decoder_hidden_size = decoder_hidden_size
        self.attention_size = attention_size

        self.encoder_att = nn.Linear(encoder_feature_size, attention_size)
        self.decoder_att = nn.Linear(decoder_hidden_size, attention_size)
        self.f_beta = nn.Linear(decoder_hidden_size, encoder_feature_size)
        self.full_att = nn.Linear(attention_size, 1)

    def forward(self, encoder_output, decoder_hidden):
        # encoder_output : (batch_size, num_pixels, encoder_feature_size)
        # decoder_hideen: (batch_size, decoder_hidden_size)
        # (batch_size, num_pixels, attention_size)
        att1 = self.encoder_att(encoder_output)
        # (batch_size, attention_size)
        att2 = self.decoder_att(decoder_hidden)

        # att2.unsqueeze(1) -> (batch_size, 1, attention_size)
        # att1 + att2.unsqueeze(1) -> (batch_size, num_pixels, attention_size)
        # self.full_att(att1+att2.unsqueeze(1)) -> (batch_size, num_pixels, 1)
        # (batch_size, num_pixels) <- 여기가 이해되지 X
        att = self.full_att(F.relu(att1 + att2.unsqueeze(1))).squeeze(2)
        alpha = F.softmax(att, dim=1)  # (batch_size, num_pixels)

        # encoder_output : (batch_size, num_pixels, encoder_feature_size)
        # alpha.unsqueeze(2) : (batch_size, num_pixels, 1)
        # encoder_output*alpha.unsqueeze(2) -> (batch_size, num_pixels, encoder_feature_size)
        attention_weighted_encoding = (encoder_output * alpha.unsqueeze(2)).sum(dim=1)  # (batch_size, encoder_feature_size)

        gate = F.sigmoid(self.f_beta(decoder_hidden)) # [batch_size, enc_feature_size]

        # hadamard product (gate and attention_weighted_encoding)
        attention_weighted_encoding = gate * attention_weighted_encoding # [batch_size, enc_feature_size]

        return attention_weighted_encoding

In [None]:
class Decoder(nn.Module):
    def __init__(self, attention, tokenizer, hidden_size, dropout=0.5):
        super(Decoder, self).__init__()
        self.embedding_size = tokenizer.get_embedding_dim()
        self.output_size = tokenizer.get_vocab_size()

        self.embedding = tokenizer.get_pretrained_embedding()
        self.attention = attention
        self.lstm = nn.LSTM(self.embedding_size + self.attention.encoder_feature_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, self.output_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, encoder_outputs, captions, hidden, cell):
        batch_size = encoder_outputs.size(0)
        
        attention_weights = self.attention(encoder_outputs, hidden)
        
        # embedding: [batch_size, 1, embedding_size]
        # attention: [batch_size, encoder_feature_size]

        hidden = hidden.unsqueeze(0)
        cell = cell.unsqueeze(0)

        # [batch_size, 1]
        token = captions.unsqueeze(1)
        embedded = self.dropout(self.embedding(token))
        attn = attention_weights.unsqueeze(1)

        lstm_input = torch.cat([embedded, attn], dim=2)
        lstm_output, (hidden, cell) = self.lstm(lstm_input, (hidden, cell))

        output = self.fc(lstm_output)

        return output.squeeze(1), hidden, cell

In [None]:
class ImageCaptioner(nn.Module):
    def __init__(self, tokenizer, encoder_feature_size, decoder_hidden_size, attention_size, dropout=0.5):
        super(ImageCaptioner, self).__init__()
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'

        self.encoder_feature_size = encoder_feature_size
        self.decoder_hidden_size = decoder_hidden_size
        self.attention_size = attention_size

        self.encoder = Encoder()
        self.attention = Attention(encoder_feature_size, decoder_hidden_size, attention_size)
        self.decoder = Decoder(self.attention, tokenizer, decoder_hidden_size)

        self.init_h = nn.Linear(self.encoder_feature_size, self.decoder_hidden_size)
        self.init_c = nn.Linear(self.encoder_feature_size, self.decoder_hidden_size)
        
    def forward(self, images, captions, caption_lengths):
        batch_size = captions.size(0)
        caption_length = captions.size(1)

        # img -> encoder [batch_size, img_size, img_size, enc_feature_size]
        encoder_outputs = self.encoder(images).view(batch_size, -1, encoder_feature_size)
        num_pixels = encoder_outputs.size(1)

        # Sort input data by decreasing lengths; why? apparent below
        # caption_lengths.squeeze(1) -> (batch_size)
        # caption length 별 내림차순 정렬
        # 별 의미없는거같음
        caption_lengths, sort_ind = caption_lengths.sort(dim=0, descending=True)
        encoder_output = encoder_outputs[sort_ind]
        encoded_caption = captions[sort_ind]

        hidden, cell = self.init_hidden_state(encoder_output)  # (batch_size, decoder_hidden_size)
        decode_lengths = (caption_lengths - 1).tolist()

        # Create tensors to hold word predicion scores and alphas
        vocab_size = tokenizer.get_vocab_size()
        predictions = torch.zeros(batch_size, caption_length, vocab_size).to(device)

        # At each time-step, decode by
        # attention-weighing the encoder's output based on the decoder's previous hidden state output
        # then generate a new word in the decoder with the previous word and the attention weighted encoding
        for t in range(caption_length):
            # batch_size_t = sum([l > t for l in decode_lengths])  # 학습할 데이터의 수
            
            # encoder_output_t = encoder_output[:batch_size_t]
            decoder_output, _, _ = self.decoder(encoder_output, captions[:, t], hidden, cell)

            # update predictions and alphas(=attention score)
            predictions[:, t] = decoder_output

        return predictions

    def init_hidden_state(self, encoder_output):
        mean = encoder_output.mean(dim=1)
        hidden = self.init_h(mean)
        cell = self.init_c(mean)

        return hidden, cell

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

device = 'cuda' if torch.cuda.is_available() else 'cpu'

# load pretrained word embeddings
tokenizer = Tokenizer()
embedding_weights = tokenizer.get_pretrained_embedding()

# model parameters
encoder_feature_size = 2048
decoder_hidden_size = 1024
attention_size = 512
dropout = 0.5

# hyper parameters
num_epochs = 1
batch_size = 64
learning_rate = 0.001

model = ImageCaptioner(tokenizer, encoder_feature_size, decoder_hidden_size, attention_size, dropout=dropout).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
def train(model, train_loader, criterion, optimizer, device='cpu'):
    epoch_loss = 0

    model.train()

    for _, (images, captions, caplens) in enumerate(train_loader):
        images = images.to(device)
        captions = captions.to(device)
        caplens = caplens.to(device)

        optimizer.zero_grad()

        # output: [batch_size, cap_len, vocab_size]
        # caption: [batch_size, cap_len]
        output = model(images, captions, caplens)
        output_dim = output.size(-1) # vocab_size

        output = output.view(-1, output_dim) # [batch_size * cap_len, vocab_size]
        captions = captions.view(-1) # [batch_size * cap_len]

        loss = criterion(output, captions)

        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(train_loader)

In [None]:
def validate(model, val_loader, criterion, device='cpu'):
    epoch_loss = 0

    model.eval()

    with torch.no_grad():
        for _, (img, caption, caplen) in enumerate(val_loader):
            img = img.to(device)
            caption = caption.to(device)
            caplen = caplen.to(device)

            output = model(img, caption, caplen)
            output_dim = output.size(-1) # vocab_size

            output = output.view(-1, output_dim) # [batch_size * cap_len, vocab_size]
            captions = captions.view(-1) # [batch_size * cap_len]

            loss = criterion(output, captions)

            epoch_loss += loss.item()

    return epoch_loss / len(val_loader)

In [None]:
for epoch in range(num_epochs):
    train_loss = train(model, train_loader, criterion, optimizer)

    val_loss = validate(model, val_loader, criterion)

    print('Epoch [{02d}/{02d}] | Train Loss: {} | Val. Loss: {}'.format(epoch + 1, num_epochs, train_loss, val_loss))