In [None]:
!mkdir squad
!wget https://raw.githubusercontent.com/chiahsuan156/Spoken-SQuAD/master/spoken_train-v1.1.json  -O squad/train-v2.0.json
!wget https://raw.githubusercontent.com/chiahsuan156/Spoken-SQuAD/master/spoken_test-v1.1_WER54.json -O squad/test-v2.0.json

In [None]:
!pip install transformers

In [None]:
import os
import requests
import json
from pathlib import Path

In [None]:
import json

def read_squad(path):
    # open JSON file and load intro dictionary
    with open(path, 'rb') as f:
        squad_dict = json.load(f)

    # initialize lists for contexts, questions, and answers
    contexts = []
    questions = []
    answers = []
    # iterate through all data in squad data
    for group in squad_dict['data']:
        for passage in group['paragraphs']:
            context = passage['context']
            for qa in passage['qas']:
                question = qa['question']
                # check if we need to be extracting from 'answers' or 'plausible_answers'
                if 'plausible_answers' in qa.keys():
                    access = 'plausible_answers'
                else:
                    access = 'answers'
                for answer in qa[access]:
                    # append data to lists
                    contexts.append(context)
                    questions.append(question)
                    answers.append(answer)
    # return formatted data lists
    return contexts, questions, answers

In [None]:
train_contexts, train_questions, train_answers = read_squad('/content/squad/train-v2.0.json')
test_contexts, test_questions, test_answers = read_squad('/content/squad/test-v2.0.json')

In [None]:
def add_end_idx(answers, contexts):
    # loop through each answer-context pair
    for answer, context in zip(answers, contexts):
        # gold_text refers to the answer we are expecting to find in context
        gold_text = answer['text']
        # we already know the start index
        start_idx = answer['answer_start']
        # and ideally this would be the end index...
        end_idx = start_idx + len(gold_text)

        # ...however, sometimes squad answers are off by a character or two
        if context[start_idx:end_idx] == gold_text:
            # if the answer is not off :)
            answer['answer_end'] = end_idx
        else:
            # this means the answer is off by 1-2 tokens
            for n in [1, 2]:
                if context[start_idx-n:end_idx-n] == gold_text:
                    answer['answer_start'] = start_idx - n
                    answer['answer_end'] = end_idx - n
            
# and apply the function to our two answer lists
add_end_idx(train_answers, train_contexts)
add_end_idx(test_answers, test_contexts)

In [None]:
from transformers import DistilBertTokenizerFast
# initialize the tokenizer
tokenizer = DistilBertTokenizerFast.from_pretrained('distilbert-base-uncased')
# tokenize
train_encodings = tokenizer(train_contexts, train_questions, truncation=True, padding=True)
test_encodings = tokenizer(test_contexts, test_questions, truncation=True, padding=True)

In [None]:
def add_token_positions(encodings, answers):
    start_positions = []
    end_positions = []

    for i in range(len(answers)):
        # append start token position using char_to_token method
        start_position = encodings.char_to_token(i, answers[i]['answer_start'])
        start_positions.append(start_position)
        
        # find end position based on answer text length
        answer_length = len(answers[i]['text'])
        end_position = encodings.char_to_token(i, max(0, answers[i]['answer_start'] + answer_length - 1))
        end_positions.append(end_position)

        # if start position is None, the answer passage has been truncated
        if start_positions[-1] is None:
            start_positions[-1] = tokenizer.model_max_length
        
        # end position cannot be found, char_to_token found space, so shift position until found
        shift = 1
        while end_positions[-1] is None:
            end_positions[-1] = encodings.char_to_token(i, max(0, answers[i]['answer_start'] + answer_length - 1 - shift))
            shift += 1

    # update our encodings object with the new token-based start/end positions
    encodings.update({'start_positions': start_positions, 'end_positions': end_positions})

# apply function to our data
add_token_positions(train_encodings, train_answers)
add_token_positions(test_encodings, test_answers)



In [None]:
import torch

class SquadDataset(torch.utils.data.Dataset):
    def __init__(self, encodings):
        self.encodings = encodings

    def __getitem__(self, idx):
        return {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}

    def __len__(self):
        return len(self.encodings.input_ids)

# build datasets for both our training and validation sets
train_dataset = SquadDataset(train_encodings)
test_dataset = SquadDataset(test_encodings)

In [None]:
from transformers import DistilBertForQuestionAnswering
model = DistilBertForQuestionAnswering.from_pretrained("distilbert-base-uncased")

In [None]:
from torch.utils.data import DataLoader
from transformers import AdamW
from tqdm import tqdm

# setup GPU/CPU
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
# move model over to detected device
model.to(device)
# activate training mode of model
model.train()
# initialize adam optimizer with weight decay (reduces chance of overfitting)
optim = AdamW(model.parameters(), lr=2e-6)

# initialize data loader for training data
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

for epoch in range(4):
    # set model to train mode
    model.train()
    # setup loop (we use tqdm for the progress bar)
    loop = tqdm(train_loader, leave=True)
    for batch in loop:
        # initialize calculated gradients (from prev step)
        optim.zero_grad()
        # pull all the tensor batches required for training
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        start_positions = batch['start_positions'].to(device)
        end_positions = batch['end_positions'].to(device)
        # train model on batch and return outputs (incl. loss)
        outputs = model(input_ids, attention_mask=attention_mask,
                        start_positions=start_positions,
                        end_positions=end_positions)
        # extract loss
        loss = outputs[0]
        # calculate loss for every parameter that needs grad update
        loss.backward()
        # update parameters
        optim.step()
        # print relevant info to progress bar
        loop.set_description(f'Epoch {epoch}')
        loop.set_postfix(loss=loss.item())

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
model_path = '/content/MyDrive/gmodels/bert_base_uncased-custom'
model.save_pretrained(model_path)
tokenizer.save_pretrained(model_path)

In [None]:
# switch model out of training mode
model.eval()

#val_sampler = SequentialSampler(val_dataset)
val_loader = DataLoader(test_dataset, batch_size=16)

acc = []

# initialize loop for progress bar
loop = tqdm(val_loader)
# loop through batches
for batch in loop:
    # we don't need to calculate gradients as we're not training
    with torch.no_grad():
        # pull batched items from loader
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        start_true = batch['start_positions'].to(device)
        end_true = batch['end_positions'].to(device)
        # make predictions
        outputs = model(input_ids, attention_mask=attention_mask)
        # pull preds out
        start_pred = torch.argmax(outputs['start_logits'], dim=1)
        end_pred = torch.argmax(outputs['end_logits'], dim=1)
        # calculate accuracy for both and append to accuracy list
        acc.append(((start_pred == start_true).sum()/len(start_pred)).item())
        acc.append(((end_pred == end_true).sum()/len(end_pred)).item())
# calculate average accuracy in total
acc = sum(acc)/len(acc)

In [None]:
print (acc)

In [None]:
print("T/F\tstart\tend\n")
for i in range(len(start_true)):
    print(f"true\t{start_true[i]}\t{end_true[i]}\n"
          f"pred\t{start_pred[i]}\t{end_pred[i]}\n")

In [None]:
# switch model out of training mode
model.eval()

#val_sampler = SequentialSampler(val_dataset)
val_loader = DataLoader(test_dataset, batch_size=16)

true_starts = []
true_ends = []
pred_starts = []
pred_ends = []

# initialize loop for progress bar
loop = tqdm(val_loader)
# loop through batches
for batch in loop:
    # we don't need to calculate gradients as we're not training
    with torch.no_grad():
        # pull batched items from loader
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        start_true = batch['start_positions'].to(device)
        end_true = batch['end_positions'].to(device)
        # make predictions
        outputs = model(input_ids, attention_mask=attention_mask)
        # pull preds out
        start_pred = torch.argmax(outputs['start_logits'], dim=1)
        end_pred = torch.argmax(outputs['end_logits'], dim=1)
        # append predictions and true values to lists
        true_starts.extend(start_true.cpu().numpy())
        true_ends.extend(end_true.cpu().numpy())
        pred_starts.extend(start_pred.cpu().numpy())
        pred_ends.extend(end_pred.cpu().numpy())
import numpy as np
# calculate precision and recall
true_starts = np.array(true_starts)
true_ends = np.array(true_ends)
pred_starts = np.array(pred_starts)
pred_ends = np.array(pred_ends)

true_pos_starts = np.sum(np.logical_and(true_starts == pred_starts, true_starts != -1))
true_pos_ends = np.sum(np.logical_and(true_ends == pred_ends, true_ends != -1))
false_pos_starts = np.sum(np.logical_and(true_starts != pred_starts, pred_starts != -1))
false_pos_ends = np.sum(np.logical_and(true_ends != pred_ends, pred_ends != -1))
false_neg_starts = np.sum(np.logical_and(true_starts != pred_starts, true_starts != -1))
false_neg_ends = np.sum(np.logical_and(true_ends != pred_ends, true_ends != -1))

precision_starts = true_pos_starts / (true_pos_starts + false_pos_starts + 1e-9)
recall_starts = true_pos_starts / (true_pos_starts + false_neg_starts + 1e-9)
precision_ends = true_pos_ends / (true_pos_ends + false_pos_ends + 1e-9)
recall_ends = true_pos_ends / (true_pos_ends + false_neg_ends + 1e-9)

# calculate F1 score
f1_starts = 2 * (precision_starts * recall_starts) / (precision_starts + recall_starts + 1e-9)
f1_ends = 2 * (precision_ends * recall_ends) / (precision_ends + recall_ends + 1e-9)
f1 = (f1_starts + f1_ends) / 2

print("F1 score: {:.4f}".format(f1))