In [None]:
!pip install -U sentence-transformers

## Synonym Filtereing Model
**Phrase-BERT**: Improved Phrase Embeddings from BERT with an Application to Corpus Exploration (EMNLP, 2020)

[[Paper]](https://arxiv.org/pdf/2109.06304.pdf) [[Hugging Face]](https://huggingface.co/whaleloops/phrase-bert)

In [None]:
from sentence_transformers import SentenceTransformer

model = SentenceTransformer('whaleloops/phrase-bert')

In [None]:
import torch
from torch import nn
# place ground-truth sentence at phrase_list[0], distractor at phrase_list[1]
def check_cos_sim(model, phrase_list, threshold):
    phrase_embs = model.encode(phrase_list)
    [gt, dt, op1, op2, op3] = phrase_embs
    cos_sim = nn.CosineSimilarity(dim=0)
    all_distant = True
    # closeness = []
    cos_sim_list = []
    cos_sim_list.append(cos_sim(torch.tensor(gt), torch.tensor(dt)))
    cos_sim_list.append(cos_sim(torch.tensor(gt), torch.tensor(op1)))
    cos_sim_list.append(cos_sim(torch.tensor(gt), torch.tensor(op2)))
    cos_sim_list.append(cos_sim(torch.tensor(gt), torch.tensor(op3)))

    cos_sim_list.append(cos_sim(torch.tensor(dt), torch.tensor(op1)))
    cos_sim_list.append(cos_sim(torch.tensor(dt), torch.tensor(op2)))
    cos_sim_list.append(cos_sim(torch.tensor(dt), torch.tensor(op3)))

    for i in range(len(cos_sim_list)):
        if cos_sim_list[i] > threshold:
            # closeness.append((i//4, (i//4)+(i%4)+1))
            all_distant = False
            print(f"[{phrase_list[i//4]}] -  [{phrase_list[(i//4)+(i%4)+1]}]: {cos_sim_list[i]}; Current threshold is {threshold}\n")
    return all_distant

## Read Raw Sentences

In [None]:
import json

with open("./sentence_lookups.json", 'r') as f:
    j_data = json.load(f)
sent_data = j_data["data"]

In [None]:
subjs = []
for sent_dict in sent_data:
    if sent_dict['subj'] not in subjs:
        subjs.append(sent_dict['subj'])
print(subjs)

['a person', 'a man', 'a woman', 'the dog', 'the car', 'the baby', 'the cat', 'the teacher', 'the sun', 'the president', 'the coyote', 'a child', 'the jury', 'a computer', 'the space shuttle', 'the telephone', 'the officer', 'the plant', 'a wallet', 'the farmer', 'a bird', 'the boat', 'a spider', 'the knife', 'a book', 'the waitress', 'parents', 'the glass', 'an owl', 'a ghost', 'a student', 'the dentist', 'the widow', 'a cake', 'a musician', 'the ground', 'the rabbit', 'a crazy man', 'the soldier', 'the boy', 'the police', 'the minister', 'the fireman', 'the hunter', 'a butcher', 'a doctor', 'the king', 'a teacher', 'the patient', 'a programmer']


## Base Sentence

In [None]:
import random
base_sentences = []
for subject in subjs:
    compare_batch = []
    for sent_dict in sent_data:
            if sent_dict['subj'] == subject:
                compare_batch.append(sent_dict)
    assert(len(compare_batch) == 2500)
    sorted_compare_list = sorted(compare_batch, key=lambda x: x['ppl'])
    top_3_random = random.randint(0,2)
    base_sentences.append(sorted_compare_list[top_3_random]) # introduce random because we want some discrepancies

for item in base_sentences:
    print(f"[{item['id']}] {item['text']}, ppl = {item['ppl']}")

[538] a person is riding a bicycle down the stairs, ppl = 17.19
[4038] a man is riding a bicycle along the beach, ppl = 14.56
[6959] a woman is driving a car in the countryside, ppl = 15.05
[9488] the dog is riding a bicycle in the countryside, ppl = 13.88
[11988] the car is riding a bicycle in the countryside, ppl = 20.3
[13288] the baby is riding a bicycle in the refrigerator, ppl = 19.66
[15788] the cat is riding a bicycle in the refrigerator, ppl = 18.61
[18270] the teacher is eating a sandwich in the refrigerator, ppl = 20.12
[21988] the sun is riding a bicycle in the countryside, ppl = 17.89
[23270] the president is eating a sandwich in the refrigerator, ppl = 21.99
[25779] the coyote is drinking from a bottle in the refrigerator, ppl = 17.11
[28279] a child is drinking from a bottle in the refrigerator, ppl = 18.08
[31988] the jury is riding a bicycle in the countryside, ppl = 19.73
[33279] a computer is drinking from a bottle in the refrigerator, ppl = 26.17
[35538] the space s

In [None]:
with open('base_sentences.txt', 'w') as f:
    for item in base_sentences:
        f.write(f"[{item['id']}] {item['text']}, ppl = {item['ppl']}\n")

## Question Construction

In [None]:
index_to_letter = {0: 'A', 1: 'B', 2: 'C', 3: 'D', 4: 'E'}

### Pure verb phrases (*for tuning threshold*)

In [None]:
# verb phrase test case construction
verb_candidate_groups = {}
ctr = 1
for sent_dict in base_sentences:
    subject = sent_dict["subj"]
    verb_phrase = sent_dict["vp"]
    location = sent_dict["loc"]
    candidate_batch = []
    for sentence in sent_data:
        if sentence['subj'] == subject and sentence['loc'] == location:
            candidate_batch.append(sentence)
    sorted_batch = sorted(candidate_batch, key=lambda x: x['ppl'])
    assert(len(sorted_batch) == 50)

    candidate_text_group = []

    distractor = sent_dict.copy()
    gt = sorted_batch[-4].copy()
    candidate_text_group.append(gt['vp']) # need to append gt first
    candidate_text_group.append(distractor['vp'])

    for item in sorted_batch[-3:]:
        candidate_text_group.append(item['vp'])

    # if check_cos_sim(tokenizer, model, candidate_text_group, 0.8):
    #     verb_candidate_groups[ctr] = local_tmp
    #     ctr += 1

    verb_candidate_groups[ctr] = candidate_text_group
    ctr += 1

print(f"=====================\nTotal valid questions: {ctr-1}")
with open('verb_options.json', 'w') as f:
    json.dump(verb_candidate_groups, f)

Total valid questions: 50


### Verb question generation

In [None]:
# verb phrase test case construction
verb_candidate_groups = {}
ctr = 1
for sent_dict in base_sentences:
    subject = sent_dict["subj"]
    verb_phrase = sent_dict["vp"]
    location = sent_dict["loc"]
    candidate_batch = []
    for sentence in sent_data:
        if sentence['subj'] == subject and sentence['loc'] == location:
            candidate_batch.append(sentence)
    sorted_batch = sorted(candidate_batch, key=lambda x: x['ppl'])
    assert(len(sorted_batch) == 50)

    candidate_verb_phrases = []
    local_tmp = []

    distractor = sent_dict.copy()
    distractor['tag'] = 'distractor'
    distractor['gid'] = ctr

    gt = sorted_batch[-4].copy()
    gt['tag'] = 'ground-truth'
    gt['gid'] = ctr

    local_tmp.append(gt)
    candidate_verb_phrases.append(gt['vp']) # need to append gt first

    local_tmp.append(distractor)
    candidate_verb_phrases.append(distractor['vp'])

    for item in sorted_batch[-3:]:
        option = item.copy()
        option['tag'] = 'option'
        option['gid'] = ctr
        local_tmp.append(option)
        candidate_verb_phrases.append(option['vp'])

    print(f"Group {ctr}\n{candidate_verb_phrases}")
    if check_cos_sim(model, candidate_verb_phrases, 0.7): # tuned threshold
        verb_candidate_groups[ctr] = local_tmp
        ctr += 1

print(f"=====================\nTotal valid questions (Verb): {ctr-1}")

Group 1
['finding a potato', 'riding a bicycle', 'starting a fire', 'playing football', 'teaching mathematics']
Group 2
['scaring the cat', 'riding a bicycle', 'fixing a computer', 'writing a program', 'teaching mathematics']
Group 3
['opening the wallet', 'driving a car', 'scaring the cat', 'writing a program', 'fixing a computer']
Group 4
['throwing a coin', 'riding a bicycle', 'fixing a computer', 'writing a program', 'teaching mathematics']
Group 5
['fixing a computer', 'riding a bicycle', 'breaking a glass', 'watching tv', 'teaching mathematics']
Group 6
['sending a letter', 'riding a bicycle', 'throwing a coin', 'fixing a computer', 'teaching mathematics']
Group 7
['fixing a computer', 'riding a bicycle', 'sending a letter', 'throwing a coin', 'teaching mathematics']
Group 8
['writing a program', 'eating a sandwich', 'fixing a computer', 'playing football', 'running up the tree']
[writing a program] -  [fixing a computer]: 0.7432073950767517; Current threshold is 0.7

Group 8
['t

In [None]:
verb_questions = []
for group_id, sent_list in verb_candidate_groups.items():
    question = {}
    ppl_vp = {}
    tmp_loc_list = []
    id, gt_subj, gt_be, gt_verb, gt_loc, dt_verb = None, None, None, None, None, None
    for sent in sent_list:
        if sent['tag'] == 'ground-truth':
            id = sent['gid']
            gt_subj = sent['subj']
            gt_be = sent['be']
            gt_verb = sent['vp']
            gt_loc = sent['loc']
        elif sent['tag'] == 'distractor':
            dt_verb = sent['vp']
        ppl_vp[sent['vp']] = sent['ppl'] # it represents the sentence's ppl
        tmp_loc_list.append(sent['vp'])

    assert(any([id, gt_subj, gt_be, gt_verb, gt_loc, dt_verb]) != None)
    question['id'] = id
    question['subj'] = gt_subj
    question['be'] = gt_be
    question['loc'] = gt_loc
    question['gt_vp'] = gt_verb
    question['dt_vp'] = dt_verb
    question['ppl'] = ppl_vp

    assert(len(tmp_loc_list) == 5)
    random.shuffle(tmp_loc_list)  # we shuffle the option's list
    ans_idx = tmp_loc_list.index(gt_verb)

    # gt_subj_the = replace_a_or_the(gt_subj, replace='a')
    # gt_subj_a = replace_a_or_the(gt_subj, replace='the')
    max_ppl_verb = max(ppl_vp, key=ppl_vp.get)

    # multiple choice question
    choice = f"What {gt_be} {gt_subj} doing {gt_loc}?\nA. {tmp_loc_list[0]}\nB. {tmp_loc_list[1]}\nC. {tmp_loc_list[2]}\nD. {tmp_loc_list[3]}\nE. {tmp_loc_list[4]}"
    # binary question - true
    binary_gt = f"Does the image show that {gt_subj} {gt_be} {gt_verb} {gt_loc}?"
    # binary question - false
    binary_dt = f"Does the image show that {gt_subj} {gt_be} {dt_verb} {gt_loc}?"
    # binary question - comparision
    binary_co = f"Does the image show that {gt_subj} {gt_be} {max_ppl_verb} {gt_loc}?"
    # open question
    open_question = f"What {gt_be} {gt_subj} doing {gt_loc}?"

    question['choice'] = choice
    question['choice answer'] = index_to_letter[ans_idx]
    question['binary-yes'] = binary_gt
    question['binary-yes answer'] = "Yes."
    question['binary-no'] = binary_dt
    question['binary-no answer'] = "No."
    question['binary-cp'] = binary_co
    question['binary-cp answer'] = "No."
    question['open'] = open_question
    question['open answer'] = gt_verb
    question['image prompt'] = f"Generate an image of {gt_subj} {gt_verb} {gt_loc}."

    verb_questions.append(question)

In [None]:
with open('verb_questions.json', 'w') as f:
    json.dump(verb_questions, f)

with open('verb_questions-h.txt', 'w') as outfile:
    with open('verb_questions.json', 'r') as infile:
        j_list = json.load(infile)
        for question in j_list:
            outfile.write(f"Question{question['id']}:\n\n{question['image prompt']}\n{question['choice']}\n")
            outfile.write(f"Answer: {question['choice answer']}\n\n")
            outfile.write(f"{question['binary-yes']}\nAnswer: {question['binary-yes answer']}\n")
            outfile.write(f"{question['binary-no']}\nAnswer: {question['binary-no answer']}\n")
            outfile.write(f"{question['binary-cp']}\nAnswer: {question['binary-cp answer']}\n\n")
            outfile.write(f"{question['open']}\nAnswer: {question['open answer']}\n\n")
            outfile.write(f"PPL: {question['ppl']}\n\n")
            outfile.write("="*10)
            outfile.write('\n')

### Pure location phrases (*for tuning threshold*)

In [None]:
# location phrase test case construction
loc_candidate_groups = {}
ctr = 1

for sent_dict in base_sentences:
    subject = sent_dict["subj"]
    verb_phrase = sent_dict["vp"]
    location = sent_dict["loc"]
    candidate_batch = []
    for sentence in sent_data:
        if sentence['subj'] == subject and sentence['vp'] == verb_phrase:
            candidate_batch.append(sentence)
    sorted_batch = sorted(candidate_batch, key=lambda x: x['ppl'])
    assert(len(sorted_batch) == 50)

    candidate_locs = []
    distractor = sent_dict.copy()
    gt = sorted_batch[-4].copy()
    candidate_locs.append(gt['loc']) # need to append gt first
    candidate_locs.append(distractor['loc'])

    for item in sorted_batch[-3:]:
        candidate_locs.append(item['loc'])

    loc_candidate_groups[ctr] = candidate_locs
    ctr += 1

print(f"=====================\nTotal valid questions: {ctr-1}")
with open('loc_options.json', 'w') as f:
    json.dump(loc_candidate_groups, f)

Total valid questions: 50


### Location question generation

In [None]:
# location test case construction
location_candidate_groups = {}

ctr = 1
for sent_dict in base_sentences:
    subject = sent_dict["subj"]
    verb_phrase = sent_dict["vp"]
    location = sent_dict["loc"]
    candidate_batch = []
    for sentence in sent_data:
        if sentence['subj'] == subject and sentence['vp'] == verb_phrase:
            candidate_batch.append(sentence)
    sorted_batch = sorted(candidate_batch, key=lambda x: x['ppl'])
    assert(len(sorted_batch) == 50)

    local_candidates = []
    loc_phrases = []

    distractor = sent_dict.copy()
    distractor['tag'] = 'distractor'
    distractor['gid'] = ctr

    gt = sorted_batch[-4].copy()
    gt['tag'] = 'ground-truth'
    gt['gid'] = ctr

    local_candidates.append(gt)
    local_candidates.append(distractor)
    loc_phrases.append(gt['loc'])
    loc_phrases.append(distractor['loc'])

    for item in sorted_batch[-3:]:
        option = item.copy()
        option['tag'] = 'option'
        option['gid'] = ctr
        local_candidates.append(option)
        loc_phrases.append(option['loc'])

    print(f"Group {ctr}\n{loc_phrases}")
    if check_cos_sim(model, loc_phrases, 0.63): # tuned threshold
        location_candidate_groups[ctr] = local_candidates
        ctr += 1

print(f"=====================\nTotal valid questions (location): {ctr-1}")

Group 1
['in church', 'down the stairs', 'on television', 'at the brew pub', 'at the drive - thru']
Group 2
['in church', 'along the beach', 'at the brew pub', 'on television', 'at the drive - thru']
Group 3
['up the tree', 'in the countryside', 'in church', 'in school', 'on television']
Group 4
['in a cabinet', 'in the countryside', 'in school', 'in church', 'at a conference']
Group 5
['on television', 'in the countryside', 'in school', 'at a conference', 'in church']
Group 6
['on television', 'in the refrigerator', 'in school', 'in church', 'at a conference']
Group 7
['on television', 'in the refrigerator', 'at a conference', 'in church', 'in school']
Group 8
['along the beach', 'in the refrigerator', 'down the slushy road', 'in water', 'around a lake']
[along the beach] -  [down the slushy road]: 0.679032564163208; Current threshold is 0.63

[along the beach] -  [around a lake]: 0.7141851186752319; Current threshold is 0.63

Group 8
['at a conference', 'in the countryside', 'on tele

In [None]:
loc_questions = []
for group_id, sent_list in location_candidate_groups.items():
    question = {}
    ppl_loc = {}
    tmp_loc_list = []
    id, gt_subj, gt_be, gt_verb, gt_loc, dt_loc = None, None, None, None, None, None
    for sent in sent_list:
        if sent['tag'] == 'ground-truth':
            id = sent['gid']
            gt_subj = sent['subj']
            gt_be = sent['be']
            gt_verb = sent['vp']
            gt_loc = sent['loc']
        elif sent['tag'] == 'distractor':
            dt_loc = sent['loc']
        ppl_loc[sent['loc']] = sent['ppl']
        tmp_loc_list.append(sent['loc'])

    assert(any([id, gt_subj, gt_be, gt_verb, gt_loc, dt_loc]) != None)
    question['id'] = id
    question['subj'] = gt_subj
    question['be'] = gt_be
    question['vp'] = gt_verb
    question['gt_loc'] = gt_loc
    question['dt_loc'] = dt_loc
    question['ppl'] = ppl_loc

    assert(len(tmp_loc_list) == 5)
    random.shuffle(tmp_loc_list) # we shuffle the option's list
    ans_idx = tmp_loc_list.index(gt_loc)

    # gt_subj_the = replace_a_or_the(gt_subj, 'a')
    # gt_subj_a = replace_a_or_the(gt_subj, 'the')
    max_ppl_loc = max(ppl_loc, key=ppl_loc.get)

    choice = f"Where {gt_be} {gt_subj} {gt_verb}?\nA. {tmp_loc_list[0]}\nB. {tmp_loc_list[1]}\nC. {tmp_loc_list[2]}\nD. {tmp_loc_list[3]}\nE. {tmp_loc_list[4]}"
    # binary question - true
    binary_gt = f"Does the image show that {gt_subj} {gt_be} {gt_verb} {gt_loc}?"
    # binary question - false
    binary_dt = f"Does the image show that {gt_subj} {gt_be} {gt_verb} {dt_loc}?"
    # binary question - comparision
    binary_co = f"Does the image show that {gt_subj} {gt_be} {gt_verb} {max_ppl_loc}?"

    # open question
    open_question = f"Where {gt_be} {gt_subj} {gt_verb}?"

    question['choice'] = choice
    question['choice answer'] = index_to_letter[ans_idx]
    question['binary-yes'] = binary_gt
    question['binary-yes answer'] = "Yes."
    question['binary-no'] = binary_dt
    question['binary-no answer'] = "No."
    question['binary-cp'] = binary_co
    question['binary-cp answer'] = "No."
    question['open'] = open_question
    question['open answer'] = gt_loc
    question['image prompt'] = f"Generate an image of {gt_subj} {gt_verb} {gt_loc}."

    loc_questions.append(question)

In [None]:
with open('location_questions.json', 'w') as f:
    json.dump(loc_questions, f)

with open('location_questions-h.txt', 'w') as outfile:
    with open('location_questions.json', 'r') as infile:
        j_list = json.load(infile)
        for question in j_list:
            outfile.write(f"Question{question['id']}:\n\n{question['image prompt']}\n{question['choice']}\n")
            outfile.write(f"Answer: {question['choice answer']}\n\n")
            outfile.write(f"{question['binary-yes']}\nAnswer: {question['binary-yes answer']}\n")
            outfile.write(f"{question['binary-no']}\nAnswer: {question['binary-no answer']}\n")
            outfile.write(f"{question['binary-cp']}\nAnswer: {question['binary-cp answer']}\n\n")
            outfile.write(f"{question['open']}\nAnswer: {question['open answer']}\n\n")
            outfile.write(f"PPL: {question['ppl']}\n\n")
            outfile.write("="*10)
            outfile.write('\n')