In [8]:
# QUESTION 06
# Loading Models

import torch
import torchvision
import torch.nn as nn
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


class ImageEncoder(nn.Module):
    def __init__(self, embed_size):
        super(ImageEncoder, self).__init__()
        self.resnet = nn.Sequential(*list(torchvision.models.resnet50(pretrained=True).children())[:-1])
        self.linear = nn.Linear(2048, embed_size)
        self.bn = nn.BatchNorm1d(embed_size, momentum=0.01)

    def forward(self, images):
        features = self.resnet(images)
        features = features.view(features.size(0), -1)
        out_linear = self.linear(features)
        embeddings = self.bn(out_linear)
        return embeddings


class SemanticEncoder(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers):
        super(SemanticEncoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers)

    def forward(self, texts):
        embeddings = self.embedding(texts)
        hiddens, _ = self.lstm(embeddings)

        return hiddens[:, -1]


class Decoder(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size
        # lstm cell
        self.lstm_cell = nn.LSTMCell(input_size=embed_size + hidden_size, hidden_size=hidden_size)
        # output fully connected layer
        self.fc_out = nn.Linear(in_features=hidden_size, out_features=vocab_size)
        # embedding layer
        self.embed = nn.Embedding(num_embeddings=vocab_size + hidden_size, embedding_dim=embed_size + hidden_size)
        # activations
        self.softmax = nn.Softmax(dim=1)

    def forward(self, features, captions):
        # batch size
        batch_size = features.size(0)
        # init the hidden and cell states to zeros
        hidden_state = torch.zeros((batch_size, self.hidden_size)).to(device)
        cell_state = torch.zeros((batch_size, self.hidden_size)).to(device)
        # define the output tensor placeholder
        outputs = torch.empty((batch_size, captions.size(1), self.vocab_size)).to(device)
        # embed the captions
        captions_embed = self.embed(captions)
        # pass the caption word by word
        for t in range(captions.size(1)):
            # for the first time step the input is the feature vector
            if t == 0:
                hidden_state, cell_state = self.lstm_cell(features, (hidden_state, cell_state))
            # for the 2nd+ time step, using teacher forcer
            else:
                hidden_state, cell_state = self.lstm_cell(captions_embed[:, t, :], (hidden_state, cell_state))
            # output of the attention mechanism
            out = self.fc_out(hidden_state)
            # build the output tensor
            outputs[:, t, :] = out
        return outputs

    def sample(self, features, max_seg_length):
        """Generate captions for given image features using greedy search."""
        sampled_ids = []
        # inputs = features.unsqueeze(1)
        batch_size = features.size(0)
        hidden_state = torch.zeros((batch_size, self.hidden_size)).to(device)
        cell_state = torch.zeros((batch_size, self.hidden_size)).to(device)
        for i in range(max_seg_length):
            # hiddens, states = self.lstm(inputs, states)  # hiddens: (batch_size, 1, hidden_size)
            hidden_state, cell_state = self.lstm_cell(features, (hidden_state, cell_state))
            # outputs = self.linear(hiddens.squeeze(1))  # outputs:  (batch_size, vocab_size)
            outputs = self.fc_out(hidden_state)
            _, predicted = outputs.max(1)  # predicted: (batch_size)
            sampled_ids.append(predicted)
            features = self.embed(predicted)  # inputs: (batch_size, embed_size)
        sampled_ids = torch.stack(sampled_ids, 1)  # sampled_ids: (batch_size, max_seq_length)
        return sampled_ids

In [10]:
# iNFERENCE PIPELINE FOR GENERATE CAPTIONS

import torch
from torchvision import transforms
from PIL import Image
import json
from nltk.tokenize import word_tokenize
from torch.utils.data import DataLoader
import os
import pickle
from ipynb.fs.full.vocabulary_builder import Vocabulary
from ipynb.fs.full.data_loader import ROCODataset


class CaptionGenerator:
    def __init__(self, vocab_file, embed_size, hidden_size, num_layers, image_encoder_path, semantic_encoder_path,
                 decoder_path):
        
        with open(vocab_file, 'rb') as f:
            self.vocab = pickle.load(f)
        # convert vocab into index2word format
        vocab_size = 2200

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        if torch.cuda.is_available():
            map_location = torch.device('cuda')
        else:
            map_location = torch.device('cpu')

        self.image_encoder = ImageEncoder(embed_size).to(self.device)
        self.semantic_encoder = SemanticEncoder(vocab_size, embed_size, hidden_size, num_layers).to(self.device)
        self.decoder = Decoder(vocab_size, embed_size, hidden_size).to(self.device)

        # Load trained weights (assuming you saved them earlier as 'encoder.pth' and 'decoder.pth')
        self.image_encoder.load_state_dict(torch.load(image_encoder_path, map_location=map_location))
        self.semantic_encoder.load_state_dict(torch.load(semantic_encoder_path, map_location=map_location))
        self.decoder.load_state_dict(torch.load(decoder_path, map_location=map_location))

        # Set to evaluation mode
        self.image_encoder.eval()
        self.semantic_encoder.eval()
        self.decoder.eval()

        # Image preprocessing
        self.transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])

    def generate_caption(self, image_path, max_length=20):
        # Load and preprocess the image
        image = Image.open(image_path).convert('RGB')
        image = self.transform(image).unsqueeze(0).to(self.device)

        # Get the feature vector from the encoder
        features_image = self.image_encoder(image)
        # tokens = word_tokenize('CTLA venous phase of donor'.lower())
        caption = [self.vocab('<START>')]
        # caption.extend([self.vocab.get(token, self.vocab['<UNK>']) for token in tokens])
        # caption.append(self.vocab['<END>'])
        features_text = self.semantic_encoder(torch.Tensor([caption]).long()[:, :torch.Tensor([caption]).shape[1]])

        combined_features = torch.cat((features_image, features_text), dim=1)
        gen_caption = []
        predicted_token_ids = self.decoder.sample(combined_features, max_seg_length=20)
        for id in predicted_token_ids[0].tolist():
            word = self.vocab_i2w[id]
            if word == '<END>':
                break
            gen_caption.append(word)
        return ' '.join(gen_caption)

    def process_batch(self, batch_size, max_seg_length, json_data_path='selected_dataset/selected_dataset_info.json'):
        transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])
        
        test_dataset = ROCODataset(data_json='selected_dataset/selected_dataset_info.json',
                                transform=transform,
                                vocab=self.vocab,
                                dataset_type='test')

        test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=True,
                                collate_fn=ROCODataset.collate_fn,
                                num_workers=2)

        processed_captions = []
        GT_captions = []
        for i, (images_val, GT, _) in enumerate(test_loader):
            caps = []
            for i in range(images_val.size()[0]):
                caps.append([self.vocab('<START>')])
            captions_val = torch.Tensor(caps).long()
            
            images = images_val.to(self.device)
            image_features = self.image_encoder(images)
            text_features = self.semantic_encoder(captions_val[:, :captions_val.shape[1]].to(self.device))
            combined_features = torch.cat((image_features, text_features), dim=1)
            
            # captions_target = captions_val.to(self.device)
            # predictions = torch.argmax(self.decoder(combined_features, captions_target), dim=2)
            
            predictions = self.decoder.sample(combined_features, max_seg_length=max_seg_length)
            
            for prediction in predictions:
                # Convert word_ids to words
                sampled_caption = []
                for word_id in prediction.detach().numpy():
                    word = self.vocab.idx2word[word_id]
                    if word != '<start>':
                        if word != '<unk>':
                            if word != '<end>':
                                sampled_caption.append(word)
                    if word == '<end>':
                        break
                image_caption = ' '.join(sampled_caption)
                processed_captions.append(image_caption)
            
            # getting GTs
            for gt in GT:
                ground_truth_caption = []
                for word_id in gt.detach().numpy():
                    word = self.vocab.idx2word[word_id]
                    if word != '<start>':
                        if word != '<unk>':
                            if word != '<end>':
                                if word != '<pad>':
                                    ground_truth_caption.append(word)
                image_gt_caption = ' '.join(ground_truth_caption)
                GT_captions.append(image_gt_caption)
        return processed_captions, GT_captions

In [1]:
import torch
from transformers import BertTokenizer, BertModel
from gensim.models import Word2Vec, FastText, KeyedVectors
from sklearn.metrics.pairwise import cosine_similarity
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from itertools import combinations
from nltk.tokenize import word_tokenize
import numpy as np
from scipy.spatial import distance
import json
from sklearn.feature_extraction.text import CountVectorizer

import warnings
warnings.filterwarnings("ignore", category=UserWarning, module='nltk')

# Load pre-trained Word2Vec and FastText models
word2vec_model = KeyedVectors.load_word2vec_format('GoogleNews-vectors-negative300.bin', binary=True)
fasttext_model = KeyedVectors.load_word2vec_format('crawl-300d-2M.vec')


def get_embedding(sentence, embed_type):
    if embed_type == "biobert":
        # Load pre-trained model and tokenizer for BioBERT
        model_name = 'dmis-lab/biobert-base-cased-v1.1'
        biobert = BertModel.from_pretrained(model_name)
        tokenizer = BertTokenizer.from_pretrained(model_name)
        with torch.no_grad():
            inputs = tokenizer(sentence, return_tensors="pt", padding=True, truncation=True, max_length=512)
            outputs = biobert(**inputs)
            return outputs.last_hidden_state[:, 0, :].numpy()
    else:
        words = word_tokenize(sentence)
        if embed_type == "word2vec":
            vectors = [word2vec_model[word] for word in words if word in word2vec_model.key_to_index]
        elif embed_type == "fasttext":
            vectors = [fasttext_model[word] for word in words if word in fasttext_model.key_to_index]
        else:
            raise ValueError(f"unknown {embed_type}")
        return np.mean(vectors, axis=0).reshape(1, -1)


def calculate_similarities(original, prediction, embedding_type):
    orig_embedding = get_embedding(original, embedding_type)
    pred_embedding = get_embedding(prediction, embedding_type)

    # Compute similarities
    cosine_sim = cosine_similarity(orig_embedding, pred_embedding)[0][0]
    euclidean_dist = distance.euclidean(orig_embedding.flatten(), pred_embedding.flatten())
    manhattan_dist = distance.cityblock(orig_embedding.flatten(), pred_embedding.flatten())
    bleu_score = sentence_bleu([word_tokenize(original)], word_tokenize(prediction), weights=(0, 1, 0, 0))  # bi-gram
    jaccard_sim = jaccard_similarity(word_tokenize(original), word_tokenize(prediction))

    return {
        "Cosine Similarity": cosine_sim,
        "Euclidean Distance": euclidean_dist,
        "Manhattan Distance": manhattan_dist,
        "BLEU Score": bleu_score,
        "Jaccard Similarity": jaccard_sim
    }


def calculate_countvector_similarity(original, prediction):
    # Compute similarities based on count vectors
    vectorizer = CountVectorizer().fit([original, prediction])

    # Convert sentences to count vectors
    orig_vector = vectorizer.transform([original]).toarray()[0]  # Convert sparse matrix to numpy array
    pred_vector = vectorizer.transform([prediction]).toarray()[0]  # Convert sparse matrix to numpy array

    # Compute similarities based on count vectors
    cosine_sim = cosine_similarity([orig_vector], [pred_vector])[0][0]
    euclidean_dist = distance.euclidean(orig_vector, pred_vector)
    manhattan_dist = distance.cityblock(orig_vector, pred_vector)
    bleu_score = sentence_bleu([word_tokenize(original)], word_tokenize(prediction), weights=(0, 1, 0, 0))  # bi-gram
    jaccard_sim = jaccard_similarity(word_tokenize(original), word_tokenize(prediction))

    return {
        "Cosine Similarity": cosine_sim,
        "Euclidean Distance": euclidean_dist,
        "Manhattan Distance": manhattan_dist,
        "BLEU Score": bleu_score,
        "Jaccard Similarity": jaccard_sim
    }


def sent2sent_similarity(captionA, captionB):
    """
    Question6: get the similarity between GT and the generated caption
    :param captionA: input caption/ Generated caption (Y)
    :param captionB: GT (X)
    :return: json object with different embeddings types and their similarity scores
    EX: {
            "embed_type": {
                "Cosine Similarity": float,
                "Euclidean Distance": float,
                "Manhattan Distance": float,
                "BLEU Score": float,
                "Jaccard Similarity": float
            }
        }
    """
    results = {}
    for method in ["biobert", "word2vec", "fasttext"]:
        results[method] = calculate_similarities(captionA, captionB, method)
    results['countvectorizer'] = calculate_countvector_similarity(captionA, captionB)
    return results

def jaccard_similarity(list1, list2):
    s1 = set(list1)
    s2 = set(list2)
    return len(s1.intersection(s2)) / len(s1.union(s2))


  from .autonotebook import tqdm as notebook_tqdm


In [12]:
path_image_encoder = 'train/image_encoder.pth'
path_semantic_encoder_path = 'train/semantic_encoder.pth'
path_decoder_path = 'train/decoder.pth'
# Initialize the caption generator and generate caption
caption_generator = CaptionGenerator(vocab_file='vocab.pkl', embed_size=256, hidden_size=256, num_layers=1,
                                        image_encoder_path=path_image_encoder,
                                        semantic_encoder_path=path_semantic_encoder_path,
                                        decoder_path=path_decoder_path)
# image_path = "selected_dataset/test/radiology/images/PMC4803869_GJHS-7-124-g006.jpg"
# caption = caption_generator.generate_caption(image_path)
generated_captions, GTs = caption_generator.process_batch(batch_size=4, max_seg_length=10)

# use first image 
Generated_caption = generated_captions[0]
GT_caption = GTs[0]
print('Generated_caption', Generated_caption)
print('GT_caption', GT_caption)

results = sent2sent_similarity(Generated_caption, GT_caption)
print(results)
# print(json.dumps(results, default=lambda o: float(o) if isinstance(o, np.float32) else o, indent=4))



Generated_caption within within within within within within within within within
GT_caption coronal ct scan taken two years shows enlarged kidneys filled with numerous cysts and occupying the abdominal and pelvic . note that there are only a few cysts in the liver .
{'biobert': {'Cosine Similarity': 0.737587, 'Euclidean Distance': 9.570383071899414, 'Manhattan Distance': 209.31915, 'BLEU Score': 0, 'Jaccard Similarity': 0.0}, 'word2vec': {'Cosine Similarity': 0.2919802, 'Euclidean Distance': 2.1420795917510986, 'Manhattan Distance': 30.315865, 'BLEU Score': 0, 'Jaccard Similarity': 0.0}, 'fasttext': {'Cosine Similarity': 0.40645656, 'Euclidean Distance': 3.2084531784057617, 'Manhattan Distance': 41.06874, 'BLEU Score': 0, 'Jaccard Similarity': 0.0}, 'countvectorizer': {'Cosine Similarity': 0.0, 'Euclidean Distance': 10.770329614269007, 'Manhattan Distance': 38, 'BLEU Score': 0, 'Jaccard Similarity': 0.0}}


In [17]:
# Question 07: get the most similar caption for generated caption from training set
def get_best_match_from_target_list(caption, json_data):
    """
    :param caption: input caption/ generated caption (Y)
    :param json_data: data file to get the training list
    :return: best matching caption for Y <-- (Z)
    """
    data = json.load(open(json_data))
    target_captions = []
    for k, v in data.items():
        if k == 'train':
            for item in v:
                target_captions.append(list(item.values())[0]['caption'])

    best_sim = 0
    best_caption = None
    for train_cap in target_captions:
        cosine_sim = calculate_countvector_similarity(train_cap, caption)
        if best_sim < cosine_sim['Cosine Similarity']:
            best_sim = cosine_sim['Cosine Similarity']
            best_caption = train_cap
    return best_sim, best_caption

#Q7
# get the most similar caption for generated caption from training set, for all the test images 
json_data = 'selected_dataset/selected_dataset_info.json'
best_similarities_Z = []
best_matching_captions_Z = []
all_data = []
for i, Generated_caption in enumerate(generated_captions):
    best_similarity, best_matching_caption = get_best_match_from_target_list(Generated_caption, json_data)
    best_similarities_Z.append(best_similarity)
    best_matching_captions_Z.append(best_matching_caption)
    all_data.append({'similarity_score': best_similarity, 'best_matching_caption': best_matching_caption})
print(all_data)


[{'similarity_score': 0.3779644730092272, 'best_matching_caption': ' Gallstone remains impacted within the sigmoid colon.'}, {'similarity_score': 0.19611613513818404, 'best_matching_caption': ' TDM abdomino pelvienne en coupe axiale montrant une masse tissulaire retro péritonéale gauche faisant 8x5 cm à contours bosselé en rapport avec un amas ganglionnaire'}, {'similarity_score': 0.19611613513818404, 'best_matching_caption': ' TDM abdomino pelvienne en coupe axiale montrant une masse tissulaire retro péritonéale gauche faisant 8x5 cm à contours bosselé en rapport avec un amas ganglionnaire'}, {'similarity_score': 0.19611613513818404, 'best_matching_caption': ' TDM abdomino pelvienne en coupe axiale montrant une masse tissulaire retro péritonéale gauche faisant 8x5 cm à contours bosselé en rapport avec un amas ganglionnaire'}, {'similarity_score': 0.3779644730092272, 'best_matching_caption': ' Gallstone remains impacted within the sigmoid colon.'}, {'similarity_score': 0.19611613513818

In [18]:
# Question 8: compare GT (X) with (Y) and (Z) and get the best caption as (Y) or (Z) and assigned it to test image
def get_best_caption(X_Y_sim_score, X_Z_sim_score, Y, Z):
    """
    :param Z: retrival caption from training set for generated caption
    :param Y: generated caption
    :param X_Y_sim_score: similarity score between GT (X) and generated caption (Y)
    :param X_Z_sim_score: similarity score between GT (X) and retrival caption from training set (Z)
    :return: (Y) or (Z)
    """
    if X_Y_sim_score > X_Z_sim_score:
        ret = Y
    else:
        ret = Z
    return ret


# Q8
final_captions = []
for i, (GT_caption, Generated_caption) in enumerate(zip(GTs, generated_captions)):
    cosine_sim = calculate_countvector_similarity(GT_caption, Generated_caption)
    final_caption = get_best_caption(cosine_sim['Cosine Similarity'], best_similarities_Z[i], Generated_caption, best_matching_captions_Z[i])
    final_captions.append(final_caption)
print(final_captions)

[' Gallstone remains impacted within the sigmoid colon.', ' TDM abdomino pelvienne en coupe axiale montrant une masse tissulaire retro péritonéale gauche faisant 8x5 cm à contours bosselé en rapport avec un amas ganglionnaire', ' TDM abdomino pelvienne en coupe axiale montrant une masse tissulaire retro péritonéale gauche faisant 8x5 cm à contours bosselé en rapport avec un amas ganglionnaire', ' TDM abdomino pelvienne en coupe axiale montrant une masse tissulaire retro péritonéale gauche faisant 8x5 cm à contours bosselé en rapport avec un amas ganglionnaire', ' Gallstone remains impacted within the sigmoid colon.', ' TDM abdomino pelvienne en coupe axiale montrant une masse tissulaire retro péritonéale gauche faisant 8x5 cm à contours bosselé en rapport avec un amas ganglionnaire', ' TDM abdomino pelvienne en coupe axiale montrant une masse tissulaire retro péritonéale gauche faisant 8x5 cm à contours bosselé en rapport avec un amas ganglionnaire', ' TDM abdomino pelvienne en coupe a

In [20]:
# Question9: average similarity value for the captions from all test images
def compute_bleu(reference, candidate):
    """
    Compute the BLEU score between a reference and a candidate sentence.
    """
    # BLEU expects tokenized sentences
    reference = reference.split()
    candidate = candidate.split()

    # Use a smoothing function to avoid issues with zero BLEU scores
    smoothing = SmoothingFunction().method1

    return sentence_bleu([reference], candidate, smoothing_function=smoothing)


def get_average_similarity(captions):
    """
    Q9:
    :param captions: list of final captions for the test images
    :return: average similarity value for the captions from all test images
    Using BLEU score similarity
    """
    scores = []

    # Compute pairwise BLEU scores for a list of sentences.
    for s1, s2 in combinations(captions, 2):
        bleu_score = compute_bleu(s1, s2)
        scores.append(bleu_score)
        # print(f"BLEU({s1}, {s2}) = {bleu_score:.4f}")

    # get average
    avg_score = sum(scores) / len(scores)
    return avg_score

# Q9
avg_sim = get_average_similarity(generated_captions)
print(f"Average BLEU score: {avg_sim:.4f}")

Average BLEU score: 0.6073
