In [None]:
'''extractive QA
BERT - squad2.0'''

In [None]:
!pip install transformers -q

In [None]:
from tqdm import tqdm

from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os

if not os.path.exists('/content/drive/MyDrive/Projects/BERT QA'):
    os.mkdir('/content/drive/MyDrive/Projects/BERT QA')

In [None]:
import torch
import json
import requests
from torch.utils.data import DataLoader
from transformers import BertTokenizerFast, BertForQuestionAnswering

In [None]:
with open('train-v2.0.json', 'rb') as f:
    squad = json.load(f)

In [None]:
def read_data(path):
    """
    Read SQuAD data from a JSON file.

    Parameters:
    - path: Path to the JSON file containing SQuAD data

    Returns:
    - contexts: List of contexts (passages)
    - questions: List of questions
    - answers: List of answers
    """
    # Open the JSON file and load the data
    with open(path, 'r', encoding='utf-8') as f:
        squad = json.load(f)

    # Initialize lists to store contexts, questions, and answers
    contexts = []
    questions = []
    answers = []

    # Iterate over groups in the SQuAD data
    for group in squad.get('data', []):
        # Iterate over paragraphs in the group
        for passage in group.get('paragraphs', []):
            # Get the context (passage)
            context = passage.get('context', '')
            # Iterate over questions and answers in the paragraph
            for qa in passage.get('qas', []):
                # Get the question
                question = qa.get('question', '')
                # Iterate over answers for the question
                for answer in qa.get('answers', []):
                    # Append context, question, and answer to their respective lists
                    contexts.append(context)
                    questions.append(question)
                    answers.append(answer)

    # Return the lists of contexts, questions, and answers
    return contexts, questions, answers

# Read training data
train_contexts, train_questions, train_answers = read_data('train-v2.0.json')
# Read validation data
valid_contexts, valid_questions, valid_answers = read_data('dev-v2.0.json')

In [None]:
def add_end_index(answers, contexts):
    for answer, context in zip(answers, contexts):
        gold_text = answer['text']
        start_idx = answer['answer_start']
        end_idx = start_idx + len(gold_text)

        # Check if the answer is correctly positioned
        for offset in [0, -1, -2]:
            if context[start_idx + offset:end_idx + offset] == gold_text:
                # Update answer start and end indices
                answer['answer_start'] = start_idx + offset
                answer['answer_end'] = end_idx + offset
                break  # Break loop once correct offset is found

add_end_index(train_answers, train_contexts)
add_end_index(valid_answers, valid_contexts)

In [None]:
# Initialize tokenizer
tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased')
train_encodings = tokenizer(train_contexts, train_questions, truncation=True, padding=True)
valid_encodings = tokenizer(valid_contexts, valid_questions, truncation=True, padding=True)

In [None]:
def add_token_positions(encodings, answers):
    """
    Adds token positions for answers to encodings.

    Parameters:
    - encodings: Encodings object containing tokenized inputs
    - answers: List of dictionaries containing answer positions

    Returns:
    None (modifies encodings in place)
    """
    start_positions = []
    end_positions = []

    # Loop through each answer
    for i, answer in enumerate(answers):
        # Convert character positions to token positions
        start_positions.append(encodings.char_to_token(i, answer['answer_start']))
        end_positions.append(encodings.char_to_token(i, answer['answer_end'] - 1))

        # Handle cases where answer passage has been truncated
        if start_positions[-1] is None:
            start_positions[-1] = tokenizer.model_max_length
        if end_positions[-1] is None:
            end_positions[-1] = tokenizer.model_max_length

    # Update encodings with start and end positions
    encodings.update({'start_positions': start_positions, 'end_positions': end_positions})

# Add token positions for training data
add_token_positions(train_encodings, train_answers)
# Add token positions for validation data
add_token_positions(valid_encodings, valid_answers)

In [None]:
class SQuAD_Dataset(torch.utils.data.Dataset):
    """
    Custom dataset class for SQuAD.

    Parameters:
    - encodings: Encodings object containing tokenized inputs
    """
    def __init__(self, encodings):
        self.encodings = encodings

    def __getitem__(self, idx):
        """
        Retrieves an item from the dataset.

        Parameters:
        - idx: Index of the item to retrieve

        Returns:
        Dictionary containing tensors for each key in the encodings
        """
        return {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}

    def __len__(self):
        """
        Returns the length of the dataset.

        Returns:
        Integer representing the length of the dataset
        """
        return len(self.encodings.input_ids)

# Create training dataset
train_dataset = SQuAD_Dataset(train_encodings)
# Create validation dataset
valid_dataset = SQuAD_Dataset(valid_encodings)

In [None]:
# Define the dataloaders
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=16)

In [None]:
model = BertForQuestionAnswering.from_pretrained("bert-base-uncased")

In [None]:
# Check the available device and use GPU if available, otherwise use CPU
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
# Print the device being used
print(f'Working on {device}')

In [None]:
# Number of epochs for training: 3-9
N_EPOCHS = 3

# Optimizer definition
optim = torch.optim.AdamW(model.parameters(), lr=5e-5)

# Move model to the appropriate device (GPU if available, otherwise CPU)
model.to(device)
# Set model in training mode
model.train()

# Iterate over epochs
for epoch in range(N_EPOCHS):
    # Create a progress bar for the training data
    loop = tqdm(train_loader, leave=True)
    # Iterate over batches in the training data
    for batch in loop:
        # Zero gradients from previous iteration
        optim.zero_grad()
        # Move input tensors to the appropriate device
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        start_positions = batch['start_positions'].to(device)
        end_positions = batch['end_positions'].to(device)
        # Forward pass through the model
        outputs = model(input_ids, attention_mask=attention_mask, start_positions=start_positions, end_positions=end_positions)
        # Compute the loss
        loss = outputs[0]
        # Backpropagation: compute gradients
        loss.backward()
        # Update model parameters
        optim.step()

        # Update progress bar description with current epoch
        loop.set_description(f'Epoch {epoch+1}')
        # Update progress bar with current loss
        loop.set_postfix(loss=loss.item())

# Define the path where the model and tokenizer will be saved
model_path = '/content/drive/MyDrive/Projects/BERT QA'

# Save the model's weights, configuration, and vocabulary to the specified path
model.save_pretrained(model_path)

# Save the tokenizer's vocabulary and tokenizer configuration to the specified path
tokenizer.save_pretrained(model_path)

In [None]:
# # Define the path where the pre-trained model and tokenizer are saved
# model_path = '/content/drive/MyDrive/Projects/BERT QA'

# # Load the pre-trained BERT model from the specified path
# model = BertForQuestionAnswering.from_pretrained(model_path)

# # Load the tokenizer from the specified path
# tokenizer = BertTokenizerFast.from_pretrained(model_path)

# # Check the available device and use GPU if available, otherwise use CPU
# device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
# # Move the model to the appropriate device
# model = model.to(device)

# # Print the device being used
# print(f'Working on {device}')

In [None]:
# Set the model to evaluation mode
model.eval()

# Initialize a list to store accuracy values
acc = []

# Iterate over batches in the validation data
for batch in tqdm(valid_loader):
    with torch.no_grad():
        # Move input tensors to the appropriate device
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        start_true = batch['start_positions'].to(device)
        end_true = batch['end_positions'].to(device)

        # Forward pass through the model
        outputs = model(input_ids, attention_mask=attention_mask)

        # Get predicted start and end positions
        start_pred = torch.argmax(outputs['start_logits'], dim=1)
        end_pred = torch.argmax(outputs['end_logits'], dim=1)

        # Compute accuracy for start positions and end positions
        acc.append(((start_pred == start_true).sum() / len(start_pred)).item())
        acc.append(((end_pred == end_true).sum() / len(end_pred)).item())

# Compute the average accuracy
acc = sum(acc) / len(acc)

# Print the header for true and predicted answer positions
print("\n\nT/P\tanswer_start\tanswer_end\n")

# Print true and predicted start and end positions for each example
for i in range(len(start_true)):
    print(f"true\t{start_true[i]}\t{end_true[i]}\n"
          f"pred\t{start_pred[i]}\t{end_pred[i]}\n")

In [None]:
def get_prediction(context, question):
    """
    Get the predicted answer for a given context and question.

    Parameters:
    - context: The context in which the question is asked
    - question: The question to be answered

    Returns:
    - answer: The predicted answer to the question
    """
    # Tokenize the question and context
    inputs = tokenizer.encode_plus(question, context, return_tensors='pt').to(device)
    # Perform inference using the model
    outputs = model(**inputs)

    # Get the predicted start and end positions
    answer_start = torch.argmax(outputs[0])
    answer_end = torch.argmax(outputs[1]) + 1

    # Convert the predicted token IDs to string
    answer = tokenizer.convert_tokens_to_string(tokenizer.convert_ids_to_tokens(inputs['input_ids'][0][answer_start:answer_end]))

    return answer

def normalize_text(s):
    """
    Normalize text by removing articles, punctuation, and standardizing whitespace.

    Parameters:
    - s: Input text to be normalized

    Returns:
    - Normalized text
    """
    import string, re

    # Function to remove articles from text
    def remove_articles(text):
        regex = re.compile(r"\b(a|an|the)\b", re.UNICODE)
        return re.sub(regex, " ", text)

    # Function to fix white space in text
    def white_space_fix(text):
        return " ".join(text.split())

    # Function to remove punctuation from text
    def remove_punc(text):
        exclude = set(string.punctuation)
        return "".join(ch for ch in text if ch not in exclude)

    # Function to convert text to lowercase
    def lower(text):
        return text.lower()

    # Apply text normalization steps
    return white_space_fix(remove_articles(remove_punc(lower(s))))

def exact_match(prediction, truth):
    """
    Compute exact match between predicted answer and true answer.

    Parameters:
    - prediction: Predicted answer
    - truth: True answer

    Returns:
    - Boolean indicating whether the prediction exactly matches the truth
    """
    return bool(normalize_text(prediction) == normalize_text(truth))

def compute_f1(prediction, truth):
    """
    Compute F1 score between predicted answer and true answer.

    Parameters:
    - prediction: Predicted answer
    - truth: True answer

    Returns:
    - F1 score
    """
    pred_tokens = normalize_text(prediction).split()
    truth_tokens = normalize_text(truth).split()

    # If either the prediction or the truth is no-answer then F1 score is 1 if they agree, 0 otherwise
    if len(pred_tokens) == 0 or len(truth_tokens) == 0:
        return int(pred_tokens == truth_tokens)

    common_tokens = set(pred_tokens) & set(truth_tokens)

    # If there are no common tokens then F1 score is 0
    if len(common_tokens) == 0:
        return 0

    prec = len(common_tokens) / len(pred_tokens)
    rec = len(common_tokens) / len(truth_tokens)

    return round(2 * (prec * rec) / (prec + rec), 2)

def question_answer(context, question, answer):
    """
    Ask a question given a context and compare the predicted answer to the true answer.

    Parameters:
    - context: The context in which the question is asked
    - question: The question to be answered
    - answer: The true answer to the question

    Returns:
    None (prints the results)
    """
    # Get the predicted answer for the question
    prediction = get_prediction(context, question)
    # Compute exact match score
    em_score = exact_match(prediction, answer)
    # Compute F1 score
    f1_score = compute_f1(prediction, answer)

    # Print the results
    print(f'Question: {question}')
    print(f'Prediction: {prediction}')
    print(f'True Answer: {answer}')
    print(f'Exact match: {em_score}')
    print(f'F1 score: {f1_score}\n')

In [None]:
context = """Albert Einstein was a German-born theoretical physicist who developed the theory of relativity, one of the two pillars of modern physics (alongside quantum mechanics)."""


questions = ["What did Albert Einstein develop?",
             "Where was Albert Einstein born?"]

answers = ["theory of relativity", "german"]

for question, answer in zip(questions, answers):
  question_answer(context, question, answer)