In [4]:
from transformers import BertTokenizer, BertModel, pipeline
import torch
import numpy as np
import pandas as pd
import networkx as nx
import stanza
import matplotlib.pyplot as plt
import pydot
from networkx.drawing.nx_pydot import graphviz_layout

In [6]:
def get_attention(sent, model, tokenizer):
    with torch.no_grad():
        inputs = tokenizer.encode_plus(sent, return_tensors='pt')
        input_ids = inputs['input_ids']
        token_type_ids = inputs['token_type_ids']
        attention = model(input_ids, token_type_ids=token_type_ids)
    return attention[2]

#convert the attention scores into a ranking

def convert_to_ranking(att):
    return torch.sort(att, descending=True)[1]

#lets compare 2 sents

def compare_att_dist(a1, a2): #takes the attention distribution to be the same if the attention distributions are at least 3 similar, this is a choice that can be experimented with
    #val = bool(a1[0] == a2[0] and a2[1] == a2[1] and a1[2] == a2[2])
    val = torch.equal(a1, a2)
    return val

def compare_three_att_dist(a1, a2, a3): #takes the attention distribution to be the same if the attention distributions are at least 3 similar, this is a choice that can be experimented with
    #val = bool(a1[0] == a2[0] and a2[1] == a2[1] and a1[2] == a2[2])
    val = bool(torch.equal(a1[:len(a1) // 2], a2[:len(a1) // 2])) and bool(torch.equal(a1[:len(a1) // 2], a3[:len(a1) // 2]))
    return val

def compare_atts(att1, att2, s1, s2):
    sim_dict = {}
    t = 0
    similar_heads = []
    for layer in range(12):
        for head in range(12):\
            sim_dict[(layer, head)] = []
            head_att1 = att1[layer][0][head]
            head_att2 = att2[layer][0][head]
            head_rank1 = convert_to_ranking(head_att1)
            head_rank2 = convert_to_ranking(head_att2)
            head_att3 = att3[layer][0][head]
            head_rank3 = convert_to_ranking(head_att3)
            for i in range(len(head_rank1)):
            #sim_dict[str(layer+1) + "-" + str(head+1)].append(torch.equal(head_rank1[i], head_rank2[i]))
                sim_dict[(layer, head)].append(compare_att_dist(head_rank1[i], head_rank2[i]))
                t += compare_three_att_dist(head_rank1[i], head_rank2[i], head_rank3[i])
                if compare_three_att_dist(head_rank1[i], head_rank2[i], head_rank3[i]) and (layer, head) not in similar_heads:
                    similar_heads.append((layer, head))
    return s1, s2, s3, t, similar_heads, sim_dict

def get_best_heads(sim): #gets the heads, whose attention dist for every word are the same
    best_heads = []
    for k in sim.keys():
        if sum(sim[k]) == 5:
            best_heads.append(k)
    return best_heads

def triplet_compare(sent_list, model, tokenizer):
  a1 = get_attention(sent_list['original'], model, tokenizer)
  a2 = get_attention(sent_list['good'], model, tokenizer)
  a3 = get_attention(sent_list['bad'], model, tokenizer)
  #print(compare_atts(a1, a2, sent_list['original'], sent_list['good']))
  sim_list = [compare_three_atts(a1, a2, a3, sent_list['original'], sent_list['good'], sent_list['bad'])]
  return sim_list

def compare_data_set(sent_list, model, tokenizer):
  results = pd.DataFrame(columns=["triplet", "sent_1", "sent_2", "sent_3", "score", "similar_heads", "sim_dict"])
  for i, d in enumerate(sent_list):
    result = triplet_compare(d, model, tokenizer)
    #print(result)
    result = [[i] + list(r) for r in result]
    result = pd.DataFrame(result, columns=["triplet", "sent_1", "sent_2", "sent_3", "score", "similar_heads", "sim_dict"])
    results = pd.concat([results, result])
  return results

In [None]:
if __name__ == '__main__':
    model_version = 'bert-base-uncased'
    model = BertModel.from_pretrained(model_version, output_attentions=True)
    tokenizer = BertTokenizer.from_pretrained(model_version)
    
    perturb_file = open("/content/drive/My Drive/COMP599/perturbed_sentences.txt")
    file_contents = perturb_file.read()
    perturb_sents = file_contents.splitlines()
    perturb_sents = [{'original' : perturb_sents[i], 'good' : perturb_sents[i+1], 'bad' : perturb_sents[i+2]} for i in range(0, len(perturb_sents), 3)]
    
    r_three = compare_data_set(perturb_sents, model, tokenizer)
    r_three = r_three.reset_index(drop=True)
    sim_heads_list = list(r_three['similar_heads'])
    
    stanza.download('en')
    nlp = stanza.Pipeline('en', processors='tokenize,mwt,pos,lemma,depparse', use_gpu=False, pos_batch_size=3000)
    
    perturb_file = open("/content/drive/My Drive/COMP599/perturbed_sentences.txt")
    file_contents = perturb_file.read()
    perturb_sents_list = file_contents.splitlines()
    all_sents = "\n\n".join(perturb_sents_list)
    doc_all = nlp(all_sents) # Run the pipeline on the input text
    
    