# Evaluate multimodal models on Memory Colors
Currently VisualBERT and LXMERT

Move to the root

In [1]:
%cd ..

/Users/lovhag/Projects/reproduce-visual-commonsense-eval


In [2]:
from transformers import BertTokenizer, BertForMaskedLM, BertConfig, VisualBertForPreTraining, LxmertForPreTraining, CLIPModel, CLIPProcessor
import torch
import json
from tqdm import tqdm
import numpy as np
import pandas as pd
import os
import copy

from models.src.clip_bert.modeling_bert import BertImageForMaskedLM
from models.src.lxmert.alterations import LxmertLanguageOnlyXLayer
from memory_colors.src.evaluate_bert import predict_masked, names_to_token_ids

In [3]:
def get_model_preds_for_questions(model, tokenizer, questions):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    dataloader = DataLoader(questions, batch_size=64, shuffle=False)
    with torch.no_grad():
        preds = []
        for questions_batch in iter(dataloader):
            inputs = tokenizer(questions_batch, return_tensors="pt", padding=True).to(device)
            mask_idx = (inputs.input_ids == tokenizer.mask_token_id).nonzero(as_tuple=False)[:, 1][:, None]
            assert len(mask_idx) == inputs.input_ids.shape[0], "ERROR: Found multiple [MASK] tokens per example"
            
            outputs = model(**inputs)["logits"] if "logits" in model(**inputs) else model(**inputs)["prediction_logits"]
            pred = outputs.gather(1, mask_idx.repeat(1, outputs.shape[-1]).unsqueeze(1)).squeeze(1)
            preds.append(pred)

    preds = torch.cat(preds)
    return preds

def update_results_with_model_preds(results, get_preds, query_files, tokenizer):
    for query_file in tqdm(query_files):
        with open(os.path.join(QUERIES_FOLDER, query_file)) as f:
            examples = [json.loads(line) for line in f.readlines()]

        questions = [ex["query"] for ex in examples]
        labels = [ex["labels"] for ex in examples]
        pred = get_preds(questions)
        score = get_map_score_for_preds(labels, pred.cpu().detach().numpy(), tokenizer)
        masked_score = get_map_score_for_masked_preds(labels, pred.cpu().detach().numpy(), tokenizer, MASK_LABELS)    
        support = len(questions)
        mean_nbr_alternatives = np.mean([len(alternatives) for alternatives in labels])

        query_type = query_file.split('.')[0]
        assert len(results[(results.model==model_name) & (results.query_type==query_type)]) == 0, "Should not append results to already existing key values"
        results = results.append({"model": model_name, "query_type": query_type, "score": score, "masked_score": masked_score, "support": support, "mean_nbr_alternatives": mean_nbr_alternatives}, ignore_index=True).reset_index(drop=True)
        
    return results

In [4]:
QUESTION_TEMPLATES = ["Q: What is the color of [DESCRIPTOR] [ITEM]? A: It is [MASK].",
                      "Q: What is the color of [DESCRIPTOR] [ITEM]? [SEP] A: It is [MASK].",
                      "Q: What is the colour of [DESCRIPTOR] [ITEM]? A: It is [MASK].",
                      "What is the color of [DESCRIPTOR] [ITEM]? [MASK].",
                      "What is the color of [DESCRIPTOR] [ITEM]? [SEP] [MASK].",
                      "What is the colour of [DESCRIPTOR] [ITEM]? [MASK].",
                      "The color of [DESCRIPTOR] [ITEM] is [MASK].",
                      "The usual color of [DESCRIPTOR] [ITEM] is [MASK].",
                      "[DESCRIPTOR] [ITEM] usually has the color of [MASK].",
                      "What is the usual color of [DESCRIPTOR] [ITEM]? [MASK].",
                      "What is the usual color of [DESCRIPTOR] [ITEM]? [SEP] [MASK].",
                      "What is the typical color of [DESCRIPTOR] [ITEM]? [MASK].",
                      "What is the typical color of [DESCRIPTOR] [ITEM]? [SEP] [MASK]."]

OBJECT_COLORS_DATAFILE = "memory_colors/data/memory_colors.jsonl"

In [5]:
with open(OBJECT_COLORS_DATAFILE, "r") as f:
    examples = [json.loads(line) for line in f.readlines()]
    
CLASS_NAMES = list(set(ex["label"] for ex in examples))

In [6]:
# Model evaluation results will be added to this data frame
results = pd.DataFrame(columns=["model", "question_template", "accuracy"])

In [10]:
model_name = "bert-base-uncased"
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
model = BertForMaskedLM.from_pretrained("bert-base-uncased").eval()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class_names_ids = names_to_token_ids(CLASS_NAMES, tokenizer)
for question_template in QUESTION_TEMPLATES:
    questions = []
    for ex in examples:
        if ex["descriptor"] == '':
            question = question_template.replace("[DESCRIPTOR] ", "").replace("[ITEM]", ex["item"])
        else:
            question = question_template.replace("[DESCRIPTOR]", ex["descriptor"]).replace("[ITEM]", ex["item"])
        questions.append(question)

    inputs = tokenizer(questions, return_tensors="pt", padding=True).to(device)
    pred = predict_masked(inputs, tokenizer, model.to(device), class_names_ids).cpu().detach()
    topk = pred.topk(1).indices # Rank color tokens and select top k (batch, k)
    gt = torch.tensor([CLASS_NAMES.index(ex["label"]) for ex in examples]).unsqueeze(1) # Index of ground truth color - (batch, 1)

    precision_at_k = np.true_divide((topk == gt).sum(), gt.shape[0])
    assert len(results[(results.model==model_name) & (results.question_template==question_template)]) == 0, "Should not append results to already existing key values"
    results = results.append({"model": model_name, "question_template": question_template, "accuracy": float(precision_at_k.numpy())}, ignore_index=True).reset_index(drop=True)

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForMaskedLM: ['cls.seq_relationship.weight', 'cls.seq_relationship.bias']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [11]:
model_name = "bert-base-trained"
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
config = BertConfig.from_pretrained("bert-base-uncased")
model = BertImageForMaskedLM(config)
model.load_state_dict(torch.load("models/data/model-weights/bert-clip-bert-trained/mp_rank_00_model_states.pt", map_location="cpu")["module"], strict=False)
model.eval()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class_names_ids = names_to_token_ids(CLASS_NAMES, tokenizer)
for question_template in QUESTION_TEMPLATES:
    questions = []
    for ex in examples:
        if ex["descriptor"] == '':
            question = question_template.replace("[DESCRIPTOR] ", "").replace("[ITEM]", ex["item"])
        else:
            question = question_template.replace("[DESCRIPTOR]", ex["descriptor"]).replace("[ITEM]", ex["item"])
        questions.append(question)

    inputs = tokenizer(questions, return_tensors="pt", padding=True).to(device)
    pred = predict_masked(inputs, tokenizer, model.to(device), class_names_ids).cpu().detach()
    topk = pred.topk(1).indices # Rank color tokens and select top k (batch, k)
    gt = torch.tensor([CLASS_NAMES.index(ex["label"]) for ex in examples]).unsqueeze(1) # Index of ground truth color - (batch, 1)

    precision_at_k = np.true_divide((topk == gt).sum(), gt.shape[0])
    assert len(results[(results.model==model_name) & (results.question_template==question_template)]) == 0, "Should not append results to already existing key values"
    results = results.append({"model": model_name, "question_template": question_template, "accuracy": float(precision_at_k.numpy())}, ignore_index=True).reset_index(drop=True)

In [12]:
model_name = "clip-bert-implicit"
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
config = BertConfig.from_pretrained("bert-base-uncased")
model = BertImageForMaskedLM(config)
model.load_state_dict(torch.load("models/data/model-weights/clip-bert/mp_rank_00_model_states.pt", map_location="cpu")["module"], strict=False)
model.eval()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class_names_ids = names_to_token_ids(CLASS_NAMES, tokenizer)
for question_template in QUESTION_TEMPLATES:
    questions = []
    for ex in examples:
        if ex["descriptor"] == '':
            question = question_template.replace("[DESCRIPTOR] ", "").replace("[ITEM]", ex["item"])
        else:
            question = question_template.replace("[DESCRIPTOR]", ex["descriptor"]).replace("[ITEM]", ex["item"])
        questions.append(question)

    inputs = tokenizer(questions, return_tensors="pt", padding=True).to(device)
    pred = predict_masked(inputs, tokenizer, model.to(device), class_names_ids).cpu().detach()
    topk = pred.topk(1).indices # Rank color tokens and select top k (batch, k)
    gt = torch.tensor([CLASS_NAMES.index(ex["label"]) for ex in examples]).unsqueeze(1) # Index of ground truth color - (batch, 1)

    precision_at_k = np.true_divide((topk == gt).sum(), gt.shape[0])
    assert len(results[(results.model==model_name) & (results.question_template==question_template)]) == 0, "Should not append results to already existing key values"
    results = results.append({"model": model_name, "question_template": question_template, "accuracy": float(precision_at_k.numpy())}, ignore_index=True).reset_index(drop=True)

In [7]:
model_name = "clip-bert-explicit"
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
config = BertConfig.from_pretrained("bert-base-uncased")
model = BertImageForMaskedLM(config)
model.load_state_dict(torch.load("models/data/model-weights/clip-bert/mp_rank_00_model_states.pt", map_location="cpu")["module"], strict=False)
model.eval()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class_names_ids = names_to_token_ids(CLASS_NAMES, tokenizer)

clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").eval()
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
        
for question_template in QUESTION_TEMPLATES:
    questions = []
    for ex in examples:
        if ex["descriptor"] == '':
            question = question_template.replace("[DESCRIPTOR] ", "").replace("[ITEM]", ex["item"])
        else:
            question = question_template.replace("[DESCRIPTOR]", ex["descriptor"]).replace("[ITEM]", ex["item"])
        questions.append(question)

    inputs = tokenizer(questions, return_tensors="pt", padding=True)
    img_feats = clip_model.get_text_features(**clip_processor(text=questions, return_tensors="pt", padding=True)).unsqueeze(1)
    inputs["img_feats"] = img_feats
    
    pred = predict_masked(inputs.to(device), tokenizer, model.to(device), class_names_ids).cpu().detach()
    topk = pred.topk(1).indices # Rank color tokens and select top k (batch, k)
    gt = torch.tensor([CLASS_NAMES.index(ex["label"]) for ex in examples]).unsqueeze(1) # Index of ground truth color - (batch, 1)

    precision_at_k = np.true_divide((topk == gt).sum(), gt.shape[0])
    assert len(results[(results.model==model_name) & (results.question_template==question_template)]) == 0, "Should not append results to already existing key values"
    results = results.append({"model": model_name, "question_template": question_template, "accuracy": float(precision_at_k.numpy())}, ignore_index=True).reset_index(drop=True)

In [13]:
model_name = "bert-base-trained-lxmert"
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
config = BertConfig.from_pretrained("bert-base-uncased")
model = BertImageForMaskedLM(config)
model.load_state_dict(torch.load("models/data/model-weights/bert-lxmert-trained/mp_rank_00_model_states.pt", map_location="cpu")["module"], strict=False)
model.eval()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class_names_ids = names_to_token_ids(CLASS_NAMES, tokenizer)
for question_template in QUESTION_TEMPLATES:
    questions = []
    for ex in examples:
        if ex["descriptor"] == '':
            question = question_template.replace("[DESCRIPTOR] ", "").replace("[ITEM]", ex["item"])
        else:
            question = question_template.replace("[DESCRIPTOR]", ex["descriptor"]).replace("[ITEM]", ex["item"])
        questions.append(question)

    inputs = tokenizer(questions, return_tensors="pt", padding=True).to(device)
    pred = predict_masked(inputs, tokenizer, model.to(device), class_names_ids).cpu().detach()
    topk = pred.topk(1).indices # Rank color tokens and select top k (batch, k)
    gt = torch.tensor([CLASS_NAMES.index(ex["label"]) for ex in examples]).unsqueeze(1) # Index of ground truth color - (batch, 1)

    precision_at_k = np.true_divide((topk == gt).sum(), gt.shape[0])
    assert len(results[(results.model==model_name) & (results.question_template==question_template)]) == 0, "Should not append results to already existing key values"
    results = results.append({"model": model_name, "question_template": question_template, "accuracy": float(precision_at_k.numpy())}, ignore_index=True).reset_index(drop=True)

In [14]:
model_name = "bert-base-trained-lxmert-scratch"
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
config = BertConfig.from_pretrained("bert-base-uncased")
model = BertImageForMaskedLM(config)
model.load_state_dict(torch.load("models/data/model-weights/bert-lxmert-trained-scratch/mp_rank_00_model_states.pt", map_location="cpu")["module"], strict=False)
model.eval()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class_names_ids = names_to_token_ids(CLASS_NAMES, tokenizer)
for question_template in QUESTION_TEMPLATES:
    questions = []
    for ex in examples:
        if ex["descriptor"] == '':
            question = question_template.replace("[DESCRIPTOR] ", "").replace("[ITEM]", ex["item"])
        else:
            question = question_template.replace("[DESCRIPTOR]", ex["descriptor"]).replace("[ITEM]", ex["item"])
        questions.append(question)

    inputs = tokenizer(questions, return_tensors="pt", padding=True).to(device)
    pred = predict_masked(inputs, tokenizer, model.to(device), class_names_ids).cpu().detach()
    topk = pred.topk(1).indices # Rank color tokens and select top k (batch, k)
    gt = torch.tensor([CLASS_NAMES.index(ex["label"]) for ex in examples]).unsqueeze(1) # Index of ground truth color - (batch, 1)

    precision_at_k = np.true_divide((topk == gt).sum(), gt.shape[0])
    assert len(results[(results.model==model_name) & (results.question_template==question_template)]) == 0, "Should not append results to already existing key values"
    results = results.append({"model": model_name, "question_template": question_template, "accuracy": float(precision_at_k.numpy())}, ignore_index=True).reset_index(drop=True)

In [15]:
model_name = "visualbert-vqa-coco"
model = VisualBertForPreTraining.from_pretrained("uclanlp/visualbert-vqa-coco-pre").eval()
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class_names_ids = names_to_token_ids(CLASS_NAMES, tokenizer)
for question_template in QUESTION_TEMPLATES:
    questions = []
    for ex in examples:
        if ex["descriptor"] == '':
            question = question_template.replace("[DESCRIPTOR] ", "").replace("[ITEM]", ex["item"])
        else:
            question = question_template.replace("[DESCRIPTOR]", ex["descriptor"]).replace("[ITEM]", ex["item"])
        questions.append(question)

    inputs = tokenizer(questions, return_tensors="pt", padding=True).to(device)
    pred = predict_masked(inputs, tokenizer, model.to(device), class_names_ids).cpu().detach()
    topk = pred.topk(1).indices # Rank color tokens and select top k (batch, k)
    gt = torch.tensor([CLASS_NAMES.index(ex["label"]) for ex in examples]).unsqueeze(1) # Index of ground truth color - (batch, 1)

    precision_at_k = np.true_divide((topk == gt).sum(), gt.shape[0])
    assert len(results[(results.model==model_name) & (results.question_template==question_template)]) == 0, "Should not append results to already existing key values"
    results = results.append({"model": model_name, "question_template": question_template, "accuracy": float(precision_at_k.numpy())}, ignore_index=True).reset_index(drop=True)

Downloading: 100%|██████████| 631/631 [00:00<00:00, 125kB/s]
Downloading: 100%|██████████| 428M/428M [00:35<00:00, 12.8MB/s] 


In [17]:
model_name = "lxmert-base-uncased"
model = LxmertForPreTraining.from_pretrained("unc-nlp/lxmert-base-uncased")
prev_encoder = copy.deepcopy(model.lxmert.encoder)
model.lxmert.encoder.x_layers = torch.nn.ModuleList([LxmertLanguageOnlyXLayer(model.lxmert.encoder.config) for _ in range(model.lxmert.encoder.config.x_layers)])
model.lxmert.encoder.load_state_dict(prev_encoder.state_dict())
model.eval()

tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

FEATURES_SHAPE = (1, 2048)
NORMALIZED_BOXES_SHAPE = (1, 4)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class_names_ids = names_to_token_ids(CLASS_NAMES, tokenizer)
for question_template in QUESTION_TEMPLATES:
    questions = []
    for ex in examples:
        if ex["descriptor"] == '':
            question = question_template.replace("[DESCRIPTOR] ", "").replace("[ITEM]", ex["item"])
        else:
            question = question_template.replace("[DESCRIPTOR]", ex["descriptor"]).replace("[ITEM]", ex["item"])
        questions.append(question)

    inputs = tokenizer(questions, return_tensors="pt", padding=True).to(device)
    nbr_samples = len(questions)
    normalized_boxes = torch.empty((nbr_samples,)+NORMALIZED_BOXES_SHAPE).uniform_(0, 1).to(device)
    features = torch.empty((nbr_samples,)+FEATURES_SHAPE).uniform_(0, 10).to(device)
    inputs.update({
        "visual_feats": features,
        "visual_pos": normalized_boxes
    })
            
    pred = predict_masked(inputs, tokenizer, model.to(device), class_names_ids).cpu().detach()
    topk = pred.topk(1).indices # Rank color tokens and select top k (batch, k)
    gt = torch.tensor([CLASS_NAMES.index(ex["label"]) for ex in examples]).unsqueeze(1) # Index of ground truth color - (batch, 1)

    precision_at_k = np.true_divide((topk == gt).sum(), gt.shape[0])
    assert len(results[(results.model==model_name) & (results.question_template==question_template)]) == 0, "Should not append results to already existing key values"
    results = results.append({"model": model_name, "question_template": question_template, "accuracy": float(precision_at_k.numpy())}, ignore_index=True).reset_index(drop=True)

## Save the results

In [3]:
RESULTS_FILE = "memory_colors/data/results.csv"

In [21]:
save_results = True
if save_results:
    results.to_csv(RESULTS_FILE, index=False)