In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import json
from pathlib import Path

def read_squad(path):
    path = Path(path)
    with open(path, 'rb') as f:
        squad_dict = json.load(f)

    contexts = []
    questions = []
    answers = []
    for group in squad_dict['data']:
        for passage in group['paragraphs']:
            context = passage['context']
            for qa in passage['qas']:
                question = qa['question']
                for answer in qa['answers']:
                    contexts.append(context)
                    questions.append(question)
                    answers.append(answer)

    return contexts, questions, answers


In [None]:
train_path = "/content/drive/MyDrive/train.json"  ## add custom path for adding your data
test_path = "/content/drive/MyDrive/test.json"  ## add custom path for adding your data

In [None]:
# Reading training and testing dataset
train_contexts, train_questions, train_answers = read_squad(train_path)
val_contexts, val_questions, val_answers = read_squad(test_path)


In [None]:
# sometimes squad answers are off by a character or two – fix this
def add_end_idx(answers, contexts):
    for answer, context in zip(answers, contexts):
        gold_text = answer['text']
        start_idx = answer['answer_start']
        end_idx = start_idx + len(gold_text)
        gold_text = gold_text.lstrip()


        if context[start_idx:end_idx] == gold_text:
            answer['answer_end'] = end_idx
        elif context[start_idx-1:end_idx-1] == gold_text:
            answer['answer_start'] = start_idx - 1
            answer['answer_end'] = end_idx - 1     # When the gold label is off by one character
        elif context[start_idx-2:end_idx-2] == gold_text:
            answer['answer_start'] = start_idx - 2
            answer['answer_end'] = end_idx - 2     # When the gold label is off by two characters

add_end_idx(train_answers, train_contexts)
add_end_idx(val_answers, val_contexts)


In [None]:
# from transformers import DistilBertTokenizerFast
# tokenizer = DistilBertTokenizerFast.from_pretrained('distilbert-base-uncased')
# from transformers import RobertaTokenizerFast

# tokenizer = RobertaTokenizerFast.from_pretrained('roberta-base')
# trying different models but the best f1 score is from albert base v2 model
from transformers import AlbertTokenizerFast
tokenizer = AlbertTokenizerFast.from_pretrained('albert-base-v2')
train_encodings = tokenizer(train_contexts, train_questions, truncation=True, padding=True)
val_encodings = tokenizer(val_contexts, val_questions, truncation=True, padding=True)

spiece.model:   0%|          | 0.00/760k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.31M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/684 [00:00<?, ?B/s]

In [None]:
# tokenizing tokens
def add_token_positions(encodings, answers):
    start_positions = []
    end_positions = []
    for i in range(len(answers)):
        start_positions.append(encodings.char_to_token(i, answers[i]['answer_start']))
        end_positions.append(encodings.char_to_token(i, answers[i]['answer_end'] - 1))
        # if None, the answer passage has been truncated
        if start_positions[-1] is None:
            start_positions[-1] = tokenizer.model_max_length
        if end_positions[-1] is None:
            end_positions[-1] = tokenizer.model_max_length
    encodings.update({'start_positions': start_positions, 'end_positions': end_positions})

add_token_positions(train_encodings, train_answers)
add_token_positions(val_encodings, val_answers)


In [None]:
import torch

class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, encodings):
        self.encodings = encodings

    def __getitem__(self, idx):
        return {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}

    def __len__(self):
        return len(self.encodings.input_ids)

train_dataset = CustomDataset(train_encodings)
val_dataset = CustomDataset(val_encodings)


In [None]:
# from transformers import DistilBertForQuestionAnswering
# model = DistilBertForQuestionAnswering.from_pretrained("distilbert-base-uncased")
# from transformers import RobertaForQuestionAnswering

# # Example with RoBERTa
# model = RobertaForQuestionAnswering.from_pretrained("roberta-base")
from transformers import AlbertForQuestionAnswering

# Example with ALBERT
model = AlbertForQuestionAnswering.from_pretrained("albert-base-v2")



model.safetensors:   0%|          | 0.00/47.4M [00:00<?, ?B/s]

Some weights of AlbertForQuestionAnswering were not initialized from the model checkpoint at albert-base-v2 and are newly initialized: ['qa_outputs.bias', 'qa_outputs.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
# cell for fine tuning albert model

from torch.utils.data import DataLoader
from transformers import AdamW

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

model.to(device)
model.train()

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)

optim = AdamW(model.parameters(), lr=5e-5)

for epoch in range(5):
    for batch in train_loader:
        optim.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        start_positions = batch['start_positions'].to(device)
        end_positions = batch['end_positions'].to(device)
        outputs = model(input_ids, attention_mask=attention_mask, start_positions=start_positions, end_positions=end_positions)
        loss = outputs[0]
        loss.backward()
        optim.step()




In [None]:
# evaluation code
from torch.utils.data import DataLoader
from transformers import AdamW
from sklearn.metrics import f1_score

# Assuming `eval_dataset` is your evaluation dataset
eval_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)  # No need to shuffle during evaluation

model.eval()  # Set the model to evaluation mode

all_preds = []
all_labels = []

with torch.no_grad():
    for batch in eval_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        start_positions = batch['start_positions'].to(device)
        end_positions = batch['end_positions'].to(device)

        outputs = model(input_ids, attention_mask=attention_mask, start_positions=start_positions, end_positions=end_positions)
        start_preds = torch.argmax(outputs.start_logits, dim=1)
        end_preds = torch.argmax(outputs.end_logits, dim=1)

        # Convert predictions to answer texts
        batch_size = input_ids.size(0)
        for i in range(batch_size):
            start_pred = start_preds[i].item()
            end_pred = end_preds[i].item()
            pred_answer = tokenizer.decode(input_ids[i, start_pred:end_pred+1], skip_special_tokens=True)
            all_preds.append(pred_answer)

            true_answer = tokenizer.decode(input_ids[i, start_positions[i]:end_positions[i]+1], skip_special_tokens=True)
            all_labels.append(true_answer)

# Calculate F1 score
f1 = f1_score(all_labels, all_preds, average='micro')  # You can use other averaging strategies

print("Micro F1 Score:", f1)

# Albert model gives best f1 score from all the used models

Micro F1 Score: 0.3333333333333333
