In [None]:
import os

In [None]:
import cv2
import random
import numpy as np
import random

In [None]:
SEED=1234
np.random.seed(SEED)

In [None]:
import torch
from torch.utils import data
import torch.nn as nn

In [None]:
from dataloader import Dataset
from vocabulary import Vocabulary

In [None]:
dataset_folder="/floyd/input/bangla_image_caption/"

In [None]:
!pip install bnltk

In [None]:
import json

In [None]:
dataset_folder="/floyd/input/bangla_image_caption/"

caption_json_path=dataset_folder+"captions.json"

filenames_with_captions=json.load(open(caption_json_path))

from bnltk.tokenize import Tokenizers
t = Tokenizers()
tokenizer = t.bn_word_tokenizer

from sklearn.model_selection import train_test_split

train,test=train_test_split(filenames_with_captions,test_size=0.1)
train,valid=train_test_split(filenames_with_captions,test_size=0.1)

image_names=[]
all_captions=[]
train_with_captions=[]
val_with_captions=[]
test_with_captions=[]
image_folder= dataset_folder+"images/"
for filename_caption in train:
    image_name = filename_caption["filename"]
    captions=filename_caption["caption"]
    for caption in captions:
        train_with_captions.append((image_name,caption))
        all_captions.append(caption)
for filename_caption in valid:
    image_name = filename_caption["filename"]
    captions=filename_caption["caption"]
    for caption in captions:
        val_with_captions.append((image_name,caption))
        all_captions.append(caption)
for filename_caption in test:
    image_name = filename_caption["filename"]
    captions=filename_caption["caption"]
    for caption in captions:
        test_with_captions.append((image_name,caption))
        all_captions.append(caption)

vocab=Vocabulary(vocab_threshold=6,captions=all_captions,tokenizer=t.bn_word_tokenizer)

In [None]:
from torchvision import transforms

# Define a transform to pre-process the training images.
transform_train = transforms.Compose([ 
    transforms.Resize(224),                           # get 224x224 crop from random location
    transforms.RandomHorizontalFlip(),               # horizontally flip image with probability=0.5
    transforms.ToTensor(),                           # convert the PIL Image to a tensor
])
transform_test = transforms.Compose([ 
    transforms.Resize(224),                          # smaller edge of image resized to 256
    transforms.ToTensor(),                           # convert the PIL Image to a tensor
])

In [None]:
dataset= Dataset(image_folder,train_with_captions,transform_train,vocab,tokenizer=t.bn_word_tokenizer)
valid_dataset= Dataset(image_folder,val_with_captions,transform_test,vocab,tokenizer=t.bn_word_tokenizer)
test_dataset= Dataset(image_folder,test_with_captions,transform_test,vocab,tokenizer=t.bn_word_tokenizer)

In [None]:
from model import EncoderCNN,EncoderVGG,EncoderVGGAtt
from model import DecoderRNN,DecoderRNNAttention

In [None]:
batch_size = 128          # batch size
vocab_threshold = 6        # minimum word count threshold
vocab_from_file = True    # if True, load existing vocab file
embed_size = 4096         # dimensionality of image and word embeddings
hidden_size = 512          # number of features in hidden state of the RNN decoder
num_epochs = 20             # number of training epochs (1 for testing)
save_every = 1             # determines frequency of saving model weights
print_every = 200          # determines window for printing average loss
log_file = 'training_log_attention.txt'       # name of file with saved training loss and perplexity
val_log_file = 'validation_log_attention.txt'
vocab_size=len(vocab)

In [None]:
encoder = EncoderVGG()
decoder = DecoderRNN(embed_size, hidden_size, vocab_size,batch_size)

# Move models to GPU if CUDA is available. 
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
encoder.to(device)
decoder.to(device)

# Define the loss function. 
criterion = nn.CrossEntropyLoss().cuda() if torch.cuda.is_available() else nn.CrossEntropyLoss()

In [None]:
params = list(decoder.parameters())# + list(encoder.embed.parameters()) 
optimizer = torch.optim.Adam(params, lr=0.001,weight_decay=0.001)

In [None]:
validation_loss_min = np.inf

In [None]:
train_losses=[]
validation_losses=[]

In [None]:
f = open(log_file, 'w')
validation_f=open(val_log_file,'w')
i_step=0
print_every=100

for epoch in range(1, num_epochs+1):
    try:
        for images,captions in dataset.load_data(batch_size):
            images=torch.cat(images)


            # Move batch of images and captions to GPU if CUDA is available.
            images_gpu = images.to(device)
            captions_gpu = [caption.to(device) for caption in captions]
            captions_padded=nn.utils.rnn.pad_sequence(captions_gpu,batch_first=True)
            # Zero the gradients.
            decoder.zero_grad()
            encoder.zero_grad()

            # Pass the inputs through the CNN-RNN model.
            features = encoder(images_gpu)
            outputs = decoder(features, captions_gpu)

            # Calculate the batch loss.
    #         print("outputs.shape: ", outputs.shape)
            loss = criterion(outputs.contiguous().view(-1, vocab_size), captions_padded.view(-1))

            # Backward pass.
            loss.backward()

            # Update the parameters in the optimizer.
            optimizer.step()

            # Get training statistics.
            stats = 'Epoch [%d/%d], Step [%d], Loss: %.4f, Perplexity: %5.4f' % (epoch, num_epochs, i_step, loss.item(), np.exp(loss.item()))

            # Print training statistics (on same line).
            print('\r' + stats, end="")


            # Print training statistics to file.
            f.write(stats + '\n')
            f.flush()

            # Print training statistics (on different line).
            if i_step % print_every == 0:
                print('\r' + stats)
            i_step=i_step+1
        train_losses.append(loss.item())
    except RuntimeError:
        print(captions,i_step)
        pass
    validation_loss=0
    validation_iter=1
    for images,captions in valid_dataset.load_data(batch_size):
        images=torch.cat(images)
        

        # Move batch of images and captions to GPU if CUDA is available.
        images = images.to(device)
        captions = [caption.to(device) for caption in captions]
        captions_padded=nn.utils.rnn.pad_sequence(captions,batch_first=True)
        # Zero the gradients.
        decoder.zero_grad()
        encoder.zero_grad()
        
        # Pass the inputs through the CNN-RNN model.
        features = encoder(images)
        outputs = decoder(features, captions)

        
        # Calculate the batch loss.
#         print("outputs.shape: ", outputs.shape)
        loss = criterion(outputs.contiguous().view(-1, vocab_size), captions_padded.view(-1))
        validation_loss=validation_loss+loss.item()
        validation_iter=validation_iter+1
        # Get training statistics.
    validation_loss = validation_loss/validation_iter
    validation_losses.append(validation_loss)
    stats = 'Epoch [%d/%d], Step [%d], Validation Loss: %.4f, Perplexity: %5.4f\n' % (epoch, num_epochs, validation_iter, validation_loss, np.exp(validation_loss))
    if validation_loss<validation_loss_min:
        print("\nmodel improved!")
        torch.save(decoder.state_dict(), os.path.join('bengali_models', 'decoder.pkl'))
        torch.save(encoder.state_dict(), os.path.join('bengali_models', 'encoder.pkl'))
        validation_loss_min=validation_loss
    else:
        print("\nnot improved yet!")
    # Print training statistics (on same line).
    print('\n' + stats, end="")


    # Print training statistics to file.
    validation_f.write(stats + '\n')
    validation_f.flush()

    # Print training statistics (on different line).



# Close the training log file.
f.close()
validation_f.close()

In [None]:
encoder.load_state_dict(torch.load("bengali_models/encoder.pkl",map_location="cpu"))
decoder.load_state_dict(torch.load("bengali_models/decoder.pkl",map_location="cpu"))

In [None]:
import matplotlib.pyplot as plt

In [None]:
plt.plot(train_losses,label="Train Loss")
plt.plot(validation_losses,label="Validation Loss")
plt.ylabel("Loss")
plt.xlabel("Epochs")
plt.title("Loss curve")
plt.legend()

In [None]:
import cv2

work_image=cv2.imread("work.jpg")

work_image= cv2.resize(work_image,(224,224))

work_image= cv2.cvtColor(work_image,cv2.COLOR_BGR2RGB)

work_image=work_image/255

work_image_tensor= torch.from_numpy(work_image)

work_image_tensor.size()

work_image_tensor=work_image_tensor.permute((2,0,1))

In [None]:
def sample(inputs):
    " accepts pre-processed image tensor (inputs) and returns predicted sentence (list of tensor ids of length max_len) "


    output = []
    batch_size = inputs.shape[0] # batch_size is 1 at inference, inputs shape : (1, 1, embed_size)
    hidden = decoder.init_hidden(batch_size) # Get initial hidden state of the LSTM

    while True:
        lstm_out, hidden = decoder.lstm(inputs, hidden) # lstm_out shape : (1, 1, hidden_size)
        outputs = decoder.linear(lstm_out)  # outputs shape : (1, 1, vocab_size)
        outputs= nn.functional.softmax(outputs,dim=2)
        outputs = outputs.squeeze(1) # outputs shape : (1, vocab_size)
        _, max_indice = torch.max(outputs, dim=1) # predict the most likely next word, max_indice shape : (1)

        output.append(max_indice.cpu().numpy()[0].item()) # storing the word predicted

        if (max_indice == 0):
            # We predicted the <end> word, so there is no further prediction to do
            break

        ## Prepare to embed the last predicted word to be the new input of the lstm
        inputs = decoder.word_embeddings(max_indice) # inputs shape : (1, embed_size)
        inputs = inputs.unsqueeze(1) # inputs shape : (1, 1, embed_size)

    return output

In [None]:
import matplotlib.font_manager as fm
prop = fm.FontProperties(fname='Kalpurush.ttf')

In [None]:
target_folder="flickr8k_images_outputs/test/"

In [None]:
count=0
for validation_image,validation_caption in test_dataset:
    encoder.eval()
    decoder.eval()
    validation_image=validation_image.to(device)
    validation_image=validation_image.view(1,3,224,224)
    actual_outputs=validation_caption.cpu().numpy().tolist()
    features=encoder(validation_image)
    outputs=sample(features.view(1,1,4096))
    output_words=[vocab.idx2word[output] for output in outputs]
    plt.figure()
    plt.imshow(validation_image[0].cpu().numpy().transpose((1,2,0)))
    plt.title(" ".join(output_words),fontproperties=prop)
    plt.savefig("{}/{}.jpg".format(target_folder,count))
    count=count+1
#     if count>100:
#         break

In [None]:
work_image_tensor.float().type()

encoder.eval()
decoder.eval()
validation_image=work_image_tensor.to(device)
validation_image=validation_image.view(1,3,224,224)
features=encoder(validation_image.float())
outputs=sample(features.view(1,1,4096))
output_words=[vocab.idx2word[output] for output in outputs]
plt.figure()
plt.imshow(validation_image[0].cpu().numpy().transpose((1,2,0)))
plt.title(" ".join(output_words[:-1]),fontproperties=prop)
plt.savefig("work_result.jpg")#.format(target_folder,count))

In [None]:
for output in actual_outputs:
    print(vocab.idx2word[output])