In [4]:
import json
from nltk.tokenize import word_tokenize  # or any specific tokenizer you prefer from nltk
import nltk
nltk.download('punkt')  # If using NLTK's default tokenizer

# Optional based on your needs
import numpy as np
import pandas as pd
import re


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [2]:
import json
from transformers import BertTokenizer
from sklearn.model_selection import train_test_split
from transformers import AutoTokenizer, AutoModelForQuestionAnswering

# File path to your JSON dataset
file_path = '/content/arcd-test.json'

# Load the JSON file
with open(file_path, 'r', encoding='utf-8') as file:
    full_data = json.load(file)

# Function to tokenize text by splitting on spaces
def basic_tokenize(text):
    return text.split()

# Tokenizing the dataset (context and questions only, answers remain untokenized)
tokenized_full_data = []
for entry in full_data['data']:
    for paragraph in entry['paragraphs']:
        context = paragraph['context']
        tokenized_context = basic_tokenize(context)
        tokenized_qas = []
        for qa in paragraph['qas']:
            question = qa['question']
            tokenized_question = basic_tokenize(question)
            answers = qa['answers']  # Keeping answers untokenized
            tokenized_qas.append({'question': tokenized_question, 'id': qa['id'], 'answers': answers})
        tokenized_full_data.append({'context': tokenized_context, 'qas': tokenized_qas})

# Initialize the BERT tokenizer
tokenizer = AutoTokenizer.from_pretrained("salti/AraElectra-base-finetuned-ARCD")
model = AutoModelForQuestionAnswering.from_pretrained("salti/AraElectra-base-finetuned-ARCD")

# Function to encode the data for BERT
def encode_data(data, max_length=512):
    encoded_data = []
    for item in data:
        context = ' '.join(item['context'])
        for qa in item['qas']:
            question = ' '.join(qa['question'])
            answer_text = qa['answers'][0]['text']
            start_position = qa['answers'][0]['answer_start']

            # Encode and pad for Electra with truncation
            inputs = tokenizer.encode_plus(question, context, add_special_tokens=True, max_length=max_length, truncation=True, padding='max_length', return_tensors='pt')
            answer_start_position = context.find(answer_text)
            answer_end_position = answer_start_position + len(answer_text)

            if answer_start_position < max_length:  # Making sure the answer is not truncated
                encoded_data.append({'input_ids': inputs['input_ids'].squeeze(0),
                                    'attention_mask': inputs['attention_mask'].squeeze(0),
                                    'start_positions': torch.tensor(answer_start_position),
                                    'end_positions': torch.tensor(answer_end_position)})
    return encoded_data


import random

def custom_train_test_split(data, test_size=0.2):
    # Calculate the number of samples for the test set
    n_total = len(data)
    n_test = int(n_total * test_size)

    # Randomly shuffle the data indices
    indices = list(range(n_total))
    random.shuffle(indices)

    # Split indices into training and test sets
    test_indices = indices[:n_test]
    train_indices = indices[n_test:]

    # Create training and test datasets
    train_data = [data[i] for i in train_indices]
    test_data = [data[i] for i in test_indices]

    return train_data, test_data

# Use the custom function to split the data
#train_data, val_data = custom_train_test_split(encoded_data, test_size=0.2)

# Split the data into training and validation sets
#train_data, val_data = train_test_split(encoded_data, test_size=0.2)

# The `train_data` and `val_data` are now ready for training and validation


In [3]:
import json
import torch
from torch.utils.data import DataLoader, Dataset
from transformers import AutoTokenizer, AutoModelForQuestionAnswering
from torch.nn.utils.rnn import pad_sequence
import random

# Load the JSON file
file_path = '/content/arcd-test.json'
with open(file_path, 'r', encoding='utf-8') as file:
    full_data = json.load(file)

# Initialize the Electra tokenizer
tokenizer = AutoTokenizer.from_pretrained("salti/AraElectra-base-finetuned-ARCD")

# Function to encode the data for Electra
def encode_data(data, max_length=512):
    encoded_data = []
    for entry in data['data']:
        for paragraph in entry['paragraphs']:
            context = paragraph['context']
            for qa in paragraph['qas']:
                question = qa['question']
                answer_text = qa['answers'][0]['text']
                answer_start = qa['answers'][0]['answer_start']
                answer_end = answer_start + len(answer_text)

                # Encode and pad for Electra with truncation
                inputs = tokenizer.encode_plus(question, context, add_special_tokens=True, max_length=max_length, truncation=True, padding='max_length', return_tensors='pt')
                answer_start_position = context.find(answer_text)
                answer_end_position = answer_start_position + len(answer_text)

                # Ensure the answer isn't truncated
                if answer_start_position < max_length and answer_end_position < max_length:
                    encoded_data.append({'input_ids': inputs['input_ids'].squeeze(0),
                                         'attention_mask': inputs['attention_mask'].squeeze(0),
                                         'start_positions': torch.tensor(answer_start_position),
                                         'end_positions': torch.tensor(answer_end_position)})
    return encoded_data

# Encode the data
encoded_data = encode_data(full_data)

# Custom train/test split function
def custom_train_test_split(data, test_size=0.2):
    n_total = len(data)
    n_test = int(n_total * test_size)
    indices = list(range(n_total))
    random.shuffle(indices)
    test_indices = indices[:n_test]
    train_indices = indices[n_test:]
    train_data = [data[i] for i in train_indices]
    test_data = [data[i] for i in test_indices]
    return train_data, test_data

# Splitting the data
train_data, val_data = custom_train_test_split(encoded_data, test_size=0.2)

# Dataset class
class QADataset(Dataset):
    def __init__(self, encodings):
        self.encodings = encodings

    def __getitem__(self, idx):
        return self.encodings[idx]

    def __len__(self):
        return len(self.encodings)

# collate_fn function
def collate_fn(batch):
    input_ids = pad_sequence([item['input_ids'] for item in batch], batch_first=True, padding_value=tokenizer.pad_token_id)
    attention_mask = pad_sequence([item['attention_mask'] for item in batch], batch_first=True, padding_value=0)
    start_positions = torch.stack([item['start_positions'] for item in batch])
    end_positions = torch.stack([item['end_positions'] for item in batch])
    return {'input_ids': input_ids, 'attention_mask': attention_mask, 'start_positions': start_positions, 'end_positions': end_positions}

# Datasets and DataLoaders
train_dataset = QADataset(train_data)
val_dataset = QADataset(val_data)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, collate_fn=collate_fn)

train_loader, val_loader



(<torch.utils.data.dataloader.DataLoader at 0x79c73b0a4be0>,
 <torch.utils.data.dataloader.DataLoader at 0x79c73b0a7cd0>)

In [6]:
from transformers import AdamW

# Assuming model is already loaded
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Optimizer
optimizer = AdamW(model.parameters(), lr=5e-5)

for epoch in range(40):  # Adjust the number of epochs
    model.train()
    for batch in train_loader:
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = model(**batch)
        loss = outputs.loss

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # Validation step
    model.eval()
    total_eval_accuracy = 0
    for batch in val_loader:
        batch = {k: v.to(device) for k, v in batch.items()}
        with torch.no_grad():
            outputs = model(**batch)

        # You can add your evaluation metric here

# Save the model
model.save_pretrained('your_model_directory')




In [7]:
from collections import Counter

def compute_f1(a_gold, a_pred):
    gold_toks = set(a_gold.split())
    pred_toks = set(a_pred.split())
    common = gold_toks & pred_toks
    if len(common) == 0: return 0
    precision = len(common) / len(pred_toks)
    recall = len(common) / len(gold_toks)
    return 2 * (precision * recall) / (precision + recall)

def compute_em(a_gold, a_pred):
    return int(a_gold == a_pred)
def compute_sm(a_gold, a_pred, threshold=0.7):
    gold_toks = set(a_gold.split())
    pred_toks = set(a_pred.split())
    if not gold_toks or not pred_toks:
        return 0
    common = gold_toks & pred_toks
    similarity = len(common) / max(len(gold_toks), len(pred_toks))
    return int(similarity >= threshold)


# Trackers for metrics
f1_scores = []
em_scores = []
sm_scores = []

def get_true_answer(encoded_data, idx):
    # Assuming the true answer is stored in the encoded data
    answer_start = encoded_data[idx]['start_positions'].item()
    answer_end = encoded_data[idx]['end_positions'].item() + 1
    true_answer = tokenizer.decode(encoded_data[idx]['input_ids'][answer_start:answer_end])
    return true_answer

# In your validation loop
for batch_idx, batch in enumerate(val_loader):
    batch = {k: v.to(device) for k, v in batch.items()}
    with torch.no_grad():
        outputs = model(**batch)

    start_logits = outputs.start_logits
    end_logits = outputs.end_logits
    for i in range(start_logits.shape[0]):
        start_pred = torch.argmax(start_logits[i])
        end_pred = torch.argmax(end_logits[i]) + 1
        predicted_answer = tokenizer.decode(batch['input_ids'][i][start_pred:end_pred])

        # Retrieve the true answer
        global_idx = batch_idx * val_loader.batch_size + i
        true_answer = get_true_answer(val_data, global_idx)

        f1_scores.append(compute_f1(true_answer, predicted_answer))
        em_scores.append(compute_em(true_answer, predicted_answer))
        sm_scores.append(compute_sm(true_answer, predicted_answer))

# Calculate average scores
avg_f1 = sum(f1_scores) / len(f1_scores)
avg_em = sum(em_scores) / len(em_scores)
avg_sm = sum(sm_scores) / len(sm_scores)
print(f"Average F1 Score: {avg_f1}")
print(f"Exact Match Score: {avg_em}")
print(f"Sentence Match Score: {avg_sm}")


Average F1 Score: 0.2557948678248253
Exact Match Score: 0.0
Sentence Match Score: 0.1951219512195122
