In [2]:
import transformers
from transformers import pipeline
import torch
from transformers import BertTokenizer, BertModel, BertForMaskedLM
import random
import pandas as pd
import logging
logging.basicConfig(level=logging.INFO)

In [3]:
# Load BERT tokeniser
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [4]:
# Assign torch method to calculate cosine similarity through tensors
cos = torch.nn.CosineSimilarity(dim=0, eps=1e-6)

In [5]:
# Load validation data
test_answer_data = pd.read_csv('C:/Users/t_p_c/AI/Term 2/ANLP/Labs/Week2/lab2resources/sentence-completion/test_answer.csv', index_col='id')
# Load testing data
testing_data = pd.read_csv('C:/Users/t_p_c/AI/Term 2/ANLP/Labs/Week2/lab2resources/sentence-completion/testing_data.csv', index_col='id')

# Seperate data into questions and answers
questions = testing_data['question']
answers = testing_data.drop(['question'],axis=1)

In [6]:
def tokeniseSentence(sentence):
    """
        Function to tokenise questions, replace '_____' with '[MASK]' and append [CLS], [SEP]
            and get mask id
        :Param sentence: question sentence to be formatted
        :Return tokenised, mask_ind: return formatted questions, and mask id
    """
    # Replace '_____' with mask token
    sentence = sentence.replace('_____', '[MASK]')
    # Concatenate CLS and SEP tokens, and tokenise questions
    tokenised = ['[CLS]'] + tokenizer.tokenize(sentence) + ['[SEP]']
    # Assign mask token index
    mask_ind = tokenised.index('[MASK]')
    return tokenised, mask_ind

In [7]:
def make_segment_ids(list_of_tokens):
    """
        Function to assign segment ids
        :Param list_of_tokens: list of questions
        :Return segment_ids: int
    """
    #this function assumes that up to and including the first '[SEP]' is the first segment, anything afterwards is the second segment
    current_id=0
    segment_ids=[]
    for token in list_of_tokens:
        segment_ids.append(current_id)
        if token == '[SEP]':
            current_id +=1
    return segment_ids

In [8]:
def predict(sentence, mask_ind, model):
    """
        Function to make a predction of the masked word through BERT model using tokens converted into tensors
        :Param sentence: question sentence
        :Param mask_id: mask token location in question sentence
        :Param model: BERT model to be used
        :Return predicted_token: predicted word for mask token
    """
    # Convert tokens into ids
    indexed_tokens = tokenizer.convert_tokens_to_ids(sentence)
    # Assign segment id, redundant for this approach as data passed through in seperate segments
    segment_ids=make_segment_ids(sentence)
    # Convert token ids into tensor
    tokens_tensor = torch.tensor([indexed_tokens])
    # Convert segment id into tensor
    segments_tensors = torch.tensor([segment_ids])

    # Predict all tokens
    with torch.no_grad():
        outputs = model(tokens_tensor, token_type_ids=segments_tensors)
        predictions = outputs[0]

         # find the token id which maximises the prediction for the masked token and then convert this back to a word
    predicted_index = torch.argmax(predictions[0, mask_ind]).item()
    predicted_token = tokenizer.convert_ids_to_tokens([predicted_index])[0]

    return predicted_token

In [9]:
def compareAnswers(predicted, choices):
    """" 
        Function to check if predicted answer in a valid choice.
        :Param predicted: model's predicted word
        :Param choices: possible answers for question
        :Return ans, counter: char string of predicted answer if correct, int
        :Return letters, counter: random character if prediction wrong, int
    """
    # Letters to be used for answers
    letters = ['a', 'b', 'c', 'd', 'e']
    # int to track correct predictions
    counter = 0
    # Loop through choices
    for i, choice in enumerate(choices):
        # If prediction is correct
        if predicted == choice:
            # Index corresponding answer letter
            ans = letters[i]
            # Increment counter
            counter+=1
            return ans, counter
    # If prediction wrong and no matches, return 0, and counter
    return 0, counter

In [10]:
def chooseAnswer(tokenised, mask_ind, generated, choices):
    """
        Function to calclate and compare sentence similarity
        :Param tokenised: formatted question sentences
        :Param mask_ind: mask id location in question sentence
        :Param generated: model's predicted word
        :Param choices: answer word choices
        :Return letters[]: character for selected answer
    """
    # List to store encoded tokens
    encoded=[]
    letters = ['a', 'b', 'c', 'd', 'e']
    # Create new sentence for the generated word
    sentences = []
    choices = [generated] + list(choices)
    
    
    # Create list with choice answers
    for choice in choices:
        
        new_sent2 = tokenised.copy()
        new_sent2[mask_ind] = choice
        sentences.append(new_sent2)
    
    # Convert tokens and segments into tensors
    for sent in sentences:

        indexed_tokens = tokenizer.convert_tokens_to_ids(sent)
        segment_ids=make_segment_ids(sent)
        tokens_tensor = torch.tensor([indexed_tokens])
        segments_tensors = torch.tensor([segment_ids])

        # Predict all tokens
        with torch.no_grad():
            outputs = model(tokens_tensor, token_type_ids=segments_tensors)
            predictions = outputs[0]
            
        cls=predictions[0,0]
        encoded.append(cls)

    # Assign sentence with predicted word to posA for comparison
    posA = encoded[0]
    # Assign answer choices to encoded_choices
    encoded_choices = encoded[1:]
    
    sims=[]
    # Compute cosine similarity for predicted word and each answer choice
    for posB in encoded_choices:
        output=cos(posA,posB)
        sims.append(output.item())
    # Select answer with max sentence similarity to predicted answer
    choice_ind = sims.index(max(sims))
    # Return answer letter
    return letters[choice_ind]

In [None]:
def testAllQuestions(questions, answers, model):
    """
        Function to pass all questions and answers through tokenising, prediction and comparison 
            functions.
            :Return tuple: selected answers and count of correct predictions
    """
    # List to store answers
    scores=[]
    # Counter to count correct predictions
    total_counter = 0
    # Loop through questions
    for i, q in enumerate(questions, start=1):
        # Tokenise questions and get mask id
        tokenised, mask_ind = tokeniseSentence(q)
        # Generate a predicted word
        generated = predict(tokenised, mask_ind, model)
        # Check predicted word and get answer letter
        ans, counter = compareAnswers(generated, answers.loc[i])
        # Increment counter to track correct predictions
        total_counter+=counter
        # If checkAnswer  returns 0
        if ans == 0:
            # Conduct sentence similarity
            ans = chooseAnswer(tokenised, mask_ind, generated, answers.loc[i])
        # Append selected answer to answer list
        scores.append(ans)
    return(scores, total_counter)

In [12]:
# Load pretrained base BERT model
model = BertForMaskedLM.from_pretrained('bert-base-uncased')
model.eval()

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForMaskedLM: ['cls.seq_relationship.weight', 'cls.seq_relationship.bias']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


BertForMaskedLM(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0): BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=Tr

In [13]:
final_answers, counter = testAllQuestions(questions, answers, model)

# print(final_answers)
# print(counter)

144


In [None]:
def evaluate(scores, goldStandard):
    """Evaluation function to compare answers model obtains with validation data.
        :Return float: percentage correct
    """
    return ((scores == goldStandard).sum()/len(scores)) * 100

In [15]:
final_answers_df = pd.DataFrame(final_answers, index=testing_data.index, columns=['answer'] )

In [16]:
score = evaluate(final_answers_df, test_answer_data)
print(score)

INFO:numexpr.utils:NumExpr defaulting to 8 threads.
answer    54.903846
dtype: float64
