In [1]:
import pandas as pd
import numpy as np
import pickle
import torch

from collections import defaultdict
from pathlib import Path
from tqdm.auto import tqdm

In [2]:
model_name = '32' # '32' for ViT-B/32, '14' for ViT-L/14
triples_threshold = 375

cleaned_svo_path = Path('cleaned_svo')
existing_verbs = [] # list of verbs that we have enough SVO triples for
for f in cleaned_svo_path.glob('*.csv'):
    df = pd.read_csv(f)
    if len(df) >= triples_threshold:
        existing_verbs.append(f.stem)
        
data = pd.read_csv('vl_checklist.csv', index_col=0)

print(f'Number of verbs found in `{cleaned_svo_path}`:\n{len(existing_verbs)}')
print(f'Number of rows in the data: {len(data)}')

# filter out the entries whose positive verb or negative verb is not in the existing verbs
data = data[data['pos_verb'].isin(existing_verbs) & data['neg_verb'].isin(existing_verbs)]
print(f'Number of rows in the dataset after filtering out verbs with less than `{triples_threshold}` SVO triples:\n{len(data)}')

# make sure that all verbs in the dataset can be handled by VerbCLIP
assert data['pos_verb'].isin(existing_verbs).all()
assert data['neg_verb'].isin(existing_verbs).all()

Number of verbs found in `cleaned_svo`:
274
Number of rows in the data: 12733
Number of rows in the dataset after filtering out verbs with less than `375` SVO triples:
9407


In [3]:
data['pos_verb_lemma'] = data['pos_verb']
data['neg_verb_lemma'] = data['neg_verb']

In [4]:
# load use pre-computed embeddings
txt_emb_path = Path(f'embeddings/txt_emb_{model_name}.pkl')
img_emb_path = Path(f'embeddings/img_emb_{model_name}.pkl')
txt_emb_dict = pickle.load(open(txt_emb_path, 'rb'))
img_emb_dict = pickle.load(open(img_emb_path, 'rb'))

def encode_text(text):
    return np.array(txt_emb_dict[text])

def encode_image(image_name):
    return np.array(img_emb_dict[image_name])

In [5]:
# Compute embeddings in real time
# Note: the actual images are needed to compute their embeddings
# However, the images not available in this repository due to copyright issues

# import clip
# from PIL import Image
# device = 'cpu'
# model, preprocess = clip.load("ViT-B/32", device=device)

# def encode_text(text):
#     inputs = clip.tokenize([text]).to(device)
#     with torch.no_grad():
#         return model.encode_text(inputs).squeeze().cpu()

# img_folder = Path('images')
# def encode_image(image_name):
#     image = Image.open(img_folder / image_name).convert('RGB')
#     inputs = preprocess(image_name).unsqueeze(0).to(device)
#     with torch.no_grad():
#         return model.encode_image(image).squeeze().cpu()

In [6]:
# load the verb matrices computed using the `build_matrices.ipynb` notebook
# in the root directory of this repository
rel_mat_path = Path(f'embeddings/rel_matrices_{model_name}.pkl')
rel_mat_norm_path = Path(f'embeddings/rel_matrices_norm_{model_name}.pkl')
reg_mat_subj_path = Path(f'embeddings/reg_subj_matrices_{model_name}.pkl')
reg_mat_obj_path = Path(f'embeddings/reg_obj_matrices_{model_name}.pkl')

rel_mat_dict = pickle.load(open(rel_mat_path, 'rb'))
rel_mat_norm_dict = pickle.load(open(rel_mat_norm_path, 'rb'))
reg_mat_subj_dict = pickle.load(open(reg_mat_subj_path, 'rb'))
reg_mat_obj_dict = pickle.load(open(reg_mat_obj_path, 'rb'))

def get_rel_mat(verb, norm=False):
    if norm:
        return rel_mat_norm_dict[verb]
    else:
        return rel_mat_dict[verb]
    
def get_reg_mat(verb, part, norm=False):
    if part == 'subj':
        return reg_mat_subj_dict[verb]
    elif part == 'obj':
        return reg_mat_obj_dict[verb]
    else:
        raise ValueError(f'part should be either subj or obj, got {part}')

In [7]:
def report_scores(pos_scores, neg_scores):
    pos_scores = np.array(pos_scores)
    neg_scores = np.array(neg_scores)
    return (pos_scores > neg_scores).mean()

def normalise(vec):
    return vec / np.linalg.norm(vec)

In [8]:
def matrix_methods(data, matrix_method, **options):
    """
    The available options are:

    - normalize_caption: whether to normalise the caption embeddings
    - normalize_parts: whether to normalise the embeddings of the parts of the sentence
    - normalize_compo: whether to normalise the entire compositional part.

    - normalize_rel: whether to normalise the relational verb matrix by the number of SVO
                     triples used to build it. Only used for the Relational `rel` method.
    - normalize_kron: whether to use the normalise verb vector in the Kronecker product,
                      only used for the Kronecker `kron` method.
    - normalize_reg: whether to use normalised embeddings for the calculation of the
                     Regression `reg` method.

    - cos_sim: whether to use cosine similarity or dot product for the calculation of the scores.
    """
    data = data.copy()

    scores = defaultdict(list)

    for _, row in tqdm(data.iterrows(), total=len(data), leave=True, position=1):
        text_data = {'pos_verb': row['pos_verb'], 'neg_verb': row['neg_verb'], 
                     'subject': row['subject'], 'object': row['object'], 
                     'pos_caption': row['pos_caption'], 'neg_caption': row['neg_caption']}
        text_embs = {k: encode_text(v) for k, v in text_data.items()}
        image_embs = encode_image(row['image_id'])
       
        # prepare verb matrices 
        if matrix_method == 'kron':
            pos_verb_mat = np.outer(text_embs['pos_verb'], text_embs['pos_verb'])
            neg_verb_mat = np.outer(text_embs['neg_verb'], text_embs['neg_verb'])
            if options.get('normalize_kron', False):
                pos_verb_mat = pos_verb_mat / np.linalg.norm(text_embs['pos_verb'])**2
                neg_verb_mat = neg_verb_mat / np.linalg.norm(text_embs['neg_verb'])**2
        elif matrix_method == 'rel':
            pos_verb_mat = get_rel_mat(row['pos_verb'], norm=options.get('normalize_rel', False))
            neg_verb_mat = get_rel_mat(row['neg_verb'], norm=options.get('normalize_rel', False))
        elif matrix_method == 'reg':
            pos_verb_mat_obj = get_reg_mat(row['pos_verb'], norm=options.get('normalize_reg', False), part='obj')
            neg_verb_mat_obj = get_reg_mat(row['neg_verb'], norm=options.get('normalize_reg', False), part='obj')
            pos_verb_mat_subj = get_reg_mat(row['pos_verb'], norm=options.get('normalize_reg', False), part='subj')
            neg_verb_mat_subj = get_reg_mat(row['neg_verb'], norm=options.get('normalize_reg', False), part='subj')
            
        if options.get('normalize_caption', False):
            text_embs['pos_caption'] = normalise(text_embs['pos_caption'])
            text_embs['neg_caption'] = normalise(text_embs['neg_caption'])
            
        if options.get('normalize_parts', False):
            text_embs['pos_verb'] = normalise(text_embs['pos_verb'])
            text_embs['neg_verb'] = normalise(text_embs['neg_verb'])
            text_embs['subject'] = normalise(text_embs['subject'])
            text_embs['object'] = normalise(text_embs['object'])
            
        if matrix_method == 'reg':
            pos_s_Vo_emb = text_embs['subject'] * (pos_verb_mat_obj @ text_embs['object'])
            pos_sV_o_emb = (text_embs['subject'] @ pos_verb_mat_subj) * text_embs['object']
            neg_s_Vo_emb = text_embs['subject'] * (neg_verb_mat_obj @ text_embs['object'])
            neg_sV_o_emb = (text_embs['subject'] @ neg_verb_mat_subj) * text_embs['object']
        else:
            pos_s_Vo_emb = text_embs['subject'] * (pos_verb_mat @ text_embs['object'])
            pos_sV_o_emb = (text_embs['subject'] @ pos_verb_mat) * text_embs['object']
            neg_s_Vo_emb = text_embs['subject'] * (neg_verb_mat @ text_embs['object'])
            neg_sV_o_emb = (text_embs['subject'] @ neg_verb_mat) * text_embs['object']
        
        pos_add = pos_s_Vo_emb + pos_sV_o_emb
        neg_add = neg_s_Vo_emb + neg_sV_o_emb
        
        if options.get('normalize_compo', False):
            pos_s_Vo_emb = normalise(pos_s_Vo_emb)
            pos_sV_o_emb = normalise(pos_sV_o_emb)
            neg_s_Vo_emb = normalise(neg_s_Vo_emb)
            neg_sV_o_emb = normalise(neg_sV_o_emb)
            pos_add = normalise(pos_add)
            neg_add = normalise(neg_add)
        
        pos_vec_copy_subj = text_embs['pos_caption'] + pos_s_Vo_emb
        pos_vec_copy_obj = text_embs['pos_caption'] + pos_sV_o_emb
        pos_vec_copy_add = text_embs['pos_caption'] + pos_add
        neg_vec_copy_subj = text_embs['neg_caption'] + neg_s_Vo_emb
        neg_vec_copy_obj = text_embs['neg_caption'] + neg_sV_o_emb
        neg_vec_copy_add = text_embs['neg_caption'] + neg_add
        
        if options.get('cos_sim', False):
            f = lambda a, b: np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
        else:
            f = lambda a, b: np.dot(a, b)

        scores['pos_clip'].append(f(text_embs['pos_caption'], image_embs))
        scores['pos_copy_subj'].append(f(pos_vec_copy_subj, image_embs))
        scores['pos_copy_obj'].append(f(pos_vec_copy_obj, image_embs))
        scores['pos_copy_add'].append(f(pos_vec_copy_add, image_embs))
        scores['pos_copy_subj_alone'].append(f(pos_s_Vo_emb, image_embs))
        scores['pos_copy_obj_alone'].append(f(pos_sV_o_emb, image_embs))

        scores['neg_clip'].append(f(text_embs['neg_caption'], image_embs))
        scores['neg_copy_subj'].append(f(neg_vec_copy_subj, image_embs))
        scores['neg_copy_obj'].append(f(neg_vec_copy_obj, image_embs))
        scores['neg_copy_add'].append(f(neg_vec_copy_add, image_embs))
        scores['neg_copy_subj_alone'].append(f(neg_s_Vo_emb, image_embs))
        scores['neg_copy_obj_alone'].append(f(neg_sV_o_emb, image_embs))

    # add the scores to the dataframe
    for k, v in scores.items():
        data[k] = v
        
        # add the difference scores to the dataframe
        # a positive value indicates that a higher similarity score was given to the positive caption
        if k.startswith('pos'):
            data[k.replace('pos', 'diff')] = np.array(scores[k]) - np.array(scores[k.replace('pos', 'neg')])

    return data

In [9]:
options = {
    'normalize_caption': True,
    'normalize_parts': True,
    'normalize_compo': False,
    'normalize_rel': True,
    'normalize_kron': True,
    'normalize_reg': True,
    'cos_sim': False
}

print(f"Options:")
for k, v in options.items():
    print(f"    {k}: {v}")
print("Accuracy:")
result = matrix_methods(data, 'kron', **options)
print(f" method: kron")
print(f"    Copy-Subj: {report_scores(result['pos_copy_subj'], result['neg_copy_subj'])*100:.2f}")
print(f"    Copy-Obj: {report_scores(result['pos_copy_obj'], result['neg_copy_obj'])*100:.2f}")
print(f"    Copy-Add: {report_scores(result['pos_copy_add'], result['neg_copy_add'])*100:.2f}")

result = matrix_methods(data, 'rel', **options)
print(f" method: rel")
print(f"    Copy-Subj: {report_scores(result['pos_copy_subj'], result['neg_copy_subj'])*100:.2f}")
print(f"    Copy-Obj: {report_scores(result['pos_copy_obj'], result['neg_copy_obj'])*100:.2f}")
print(f"    Copy-Add: {report_scores(result['pos_copy_add'], result['neg_copy_add'])*100:.2f}")

result = matrix_methods(data, 'reg', **options)
print(f" method: reg")
print(f"    Copy-Subj: {report_scores(result['pos_copy_subj'], result['neg_copy_subj'])*100:.2f}")
print(f"    Copy-Obj: {report_scores(result['pos_copy_obj'], result['neg_copy_obj'])*100:.2f}")
print(f"    Copy-Add: {report_scores(result['pos_copy_add'], result['neg_copy_add'])*100:.2f}")

Options:
    normalize_caption: True
    normalize_parts: True
    normalize_compo: False
    normalize_rel: True
    normalize_kron: True
    normalize_reg: True
    cos_sim: False
Accuracy:


  0%|          | 0/9407 [00:00<?, ?it/s]

 method: kron
    Copy-Subj: 59.53
    Copy-Obj: 58.53
    Copy-Add: 60.41


  0%|          | 0/9407 [00:00<?, ?it/s]

 method: rel
    Copy-Subj: 58.80
    Copy-Obj: 56.62
    Copy-Add: 57.85


  0%|          | 0/9407 [00:00<?, ?it/s]

 method: reg
    Copy-Subj: 58.49
    Copy-Obj: 52.56
    Copy-Add: 59.53


In [10]:
def vector_methods(data, **options):
    """
    The available options are:
    
    - normalize_caption: whether to normalise the caption embeddings
    - normalize_parts: whether to normalise the embeddings of the parts of the sentence
    - normalize_compo: whether to normalise the entire compositional part.
    
    - cos_sim: whether to use cosine similarity or dot product for the calculation of the scores.
    """
    scores = defaultdict(list)
    for i, row in tqdm(data.iterrows(), total=len(data)):
        text_data = {'pos_verb': row['pos_verb'], 'neg_verb': row['neg_verb'], 
                     'subject': row['subject'], 'object': row['object'], 
                     'pos_caption': row['pos_caption'], 'neg_caption': row['neg_caption']}
        text_embs = {k: encode_text(v) for k, v in text_data.items()}
        image_embs = encode_image(row['image_id'])
        
        if options.get('normalize_caption', False):
            text_embs['pos_caption'] = normalise(text_embs['pos_caption'])
            text_embs['neg_caption'] = normalise(text_embs['neg_caption'])
            
        if options.get('normalize_parts', False):
            text_embs['pos_verb'] = normalise(text_embs['pos_verb'])
            text_embs['neg_verb'] = normalise(text_embs['neg_verb'])
            text_embs['subject'] = normalise(text_embs['subject'])
            text_embs['object'] = normalise(text_embs['object'])

        pos_sum = text_embs['subject'] + text_embs['pos_verb'] + text_embs['object']
        neg_sum = text_embs['subject'] + text_embs['neg_verb'] + text_embs['object']
        pos_mult = text_embs['subject'] * text_embs['pos_verb'] * text_embs['object']
        neg_mult = text_embs['subject'] * text_embs['neg_verb'] * text_embs['object']

        if options.get('normalize_compo', False):
            pos_sum = normalise(pos_sum)
            neg_sum = normalise(neg_sum)
            pos_mult = normalise(pos_mult)
            neg_mult = normalise(neg_mult) 

        pos_vec_add = text_embs['pos_caption'] + pos_sum
        pos_vec_mult = text_embs['pos_caption'] + pos_mult
        neg_vec_add = text_embs['neg_caption'] + neg_sum
        neg_vec_mult = text_embs['neg_caption'] + neg_mult
        
        if options.get('cos_sim', False):
            f = lambda a, b: np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
        else:
            f = lambda a, b: np.dot(a, b)
        
        scores['pos_add'].append(f(pos_vec_add, image_embs))
        scores['pos_mult'].append(f(pos_vec_mult, image_embs))
        scores['neg_add'].append(f(neg_vec_add, image_embs))
        scores['neg_mult'].append(f(neg_vec_mult, image_embs))
        scores['pos_clip'].append(f(text_embs['pos_caption'], image_embs))
        scores['neg_clip'].append(f(text_embs['neg_caption'], image_embs))
        
    # add the scores to the dataframe
    for k, v in scores.items():
        data[k] = v
        
        # add the difference scores to the dataframe
        # a positive value indicates that a higher similarity score was given to the positive caption
        if k.startswith('pos'):
            data[k.replace('pos', 'diff')] = np.array(scores[k]) - np.array(scores[k.replace('pos', 'neg')])

    return data

In [11]:
options = {
    'normalize_caption': True,
    'normalize_parts': True,
    'normalize_compo': False,
    'cos_sim': False
}

print(f"Options:")
for k, v in options.items():
    print(f"    {k}: {v}")
print("Accuracy:")
result = vector_methods(data, **options)
print(f"    Add: {report_scores(result['pos_add'], result['neg_add'])*100:.2f}")
print(f"    Mult: {report_scores(result['pos_mult'], result['neg_mult'])*100:.2f}")
print(f"    Clip: {report_scores(result['pos_clip'], result['neg_clip'])*100:.2f}")

Options:
    normalize_caption: True
    normalize_parts: True
    normalize_compo: False
    cos_sim: False
Accuracy:


  0%|          | 0/9407 [00:00<?, ?it/s]

    Add: 60.00
    Mult: 57.83
    Clip: 57.27


In [12]:
def report_best_alpha_beta(result):
    accs = dict()
    for alpha in np.arange(1, 10.1, 0.1):
        for beta in np.arange(1, 10.1, 0.1):
            pos = result['pos_clip'] + alpha * result['pos_copy_subj_alone'] + beta * result['pos_copy_obj_alone']
            neg = result['neg_clip'] + alpha * result['neg_copy_subj_alone'] + beta * result['neg_copy_obj_alone']
            acc = report_scores(pos, neg)
            accs[(alpha, beta)] = acc
    acc_unweighted = report_scores(result['pos_copy_add'], result['neg_copy_add'])

    # fine the best acc
    best_acc = max(accs.values())
    print(f"Unweighted accuracy: {acc_unweighted*100:.2f}")
    print(f"Best accuracy: {best_acc*100:.2f}")
    print(f"Best alpha, beta:")
    for k, v in accs.items():
        if v == best_acc:
            print(f"    {k}")
            
options = {
    'normalize_caption': True,
    'normalize_parts': True,
    'normalize_compo': False,
    'normalize_rel': True,
    'normalize_kron': True,
    'normalize_reg': True,
    'cos_sim': False
}
print(f"Options:")
for k, v in options.items():
    print(f"    {k}: {v}")

print(f" method: kron")
result = matrix_methods(data, 'kron', **options)
report_best_alpha_beta(result)

print(f" method: rel")
result = matrix_methods(data, 'rel', **options)
report_best_alpha_beta(result)

print(f" method: reg")
result = matrix_methods(data, 'reg', **options)
report_best_alpha_beta(result)

Options:
    normalize_caption: True
    normalize_parts: True
    normalize_compo: False
    normalize_rel: True
    normalize_kron: True
    normalize_reg: True
    cos_sim: False
 method: kron


  0%|          | 0/9407 [00:00<?, ?it/s]

Unweighted accuracy: 60.41
Best accuracy: 66.47
Best alpha, beta:
    (9.800000000000008, 1.0)
 method: rel


  0%|          | 0/9407 [00:00<?, ?it/s]

Unweighted accuracy: 57.85
Best accuracy: 65.47
Best alpha, beta:
    (10.000000000000007, 1.0)
 method: reg


  0%|          | 0/9407 [00:00<?, ?it/s]

Unweighted accuracy: 59.53
Best accuracy: 62.90
Best alpha, beta:
    (4.700000000000003, 1.8000000000000007)
