In [None]:
!pip install git+https://github.com/omarxadel/camel_tools.git
!camel_data -i all
!pip install accelerate peft bitsandbytes transformers trl unsloth unsloth_zoo

In [None]:
!pip install --no-deps vllm


import requests
import re

# vLLM requirements - vLLM breaks Colab due to reinstalling numpy
f = requests.get("https://raw.githubusercontent.com/vllm-project/vllm/refs/heads/main/requirements/common.txt").content
with open("vllm_requirements.txt", "wb") as file:
    file.write(re.sub(rb"(transformers|numpy|xformers)[^\n]{1,}\n", b"", f))

    
!pip install -r vllm_requirements.txt

In [None]:
from unsloth import FastLanguageModel
from datasets import load_dataset
from datasets import DatasetDict
from tqdm import tqdm
import torch
import time
from transformers import GenerationConfig
import json
from peft import LoraConfig

In [None]:
max_seq_length = 2048
lora_rank = 32
model_name="Omartificial-Intelligence-Space/Arabic-DeepSeek-R1-Distill-8B"


model, tokenizer = FastLanguageModel.from_pretrained(
    model_name=model_name,
    max_seq_length=max_seq_length,
    dtype=None,
    load_in_4bit=True,
    fast_inference = True,
    max_lora_rank = lora_rank,
    gpu_memory_utilization = 0.6,
)

In [None]:
dataset = load_dataset('MBZUAI/ArabicMMLU', 'All')['test']
train_testvalid = dataset.train_test_split(0.1)
test_valid = train_testvalid['test'].train_test_split(0.5)
dataset = DatasetDict({
    'train': train_testvalid['train'],
    'test': test_valid['test'],
    'valid': test_valid['train']})

In [None]:
def format_example(example):
    chat_template = """Below are some Multiple Choice Questions. Write responses in Arabic language only that appropriately complete each request in a valid, parsable JSON format with two attributes, one will be "reasoning" which is your thought process, 
        the other is the "solution" that has only a letter (a, b, c or d) in English, which represents the option you chose for the solution based on the options provided in the question.
        ### Question:
        {INPUT}
        
        ### Options:
        {OPTIONS}
        
        ### Solution JSON:
    """
    question = example['Question'] + (' ' + example['Context'] if example.get('Context') else '')
    options = [
        example["Option 1"],
        example["Option 2"],
        example["Option 3"],
        example["Option 4"],
    ]
    answer = example["Answer Key"]

    instruction = f"{question}\n"
    options_str = ""
    for i, opt in enumerate(options):
        options_str += f"{chr(97+i)}. {opt}\n"  # a, b, c, d

    prompt = chat_template.replace("{INPUT}", instruction)
    prompt = prompt.replace("{OPTIONS}", options_str)
    return {'input_text': prompt}

formatted_dataset = dataset.map(format_example)

In [None]:
from camel_tools.morphology.database import MorphologyDB
from camel_tools.morphology.analyzer import Analyzer
from camel_tools.tokenizers.morphological import MorphologicalTokenizer
from camel_tools.disambig.mle import MLEDisambiguator

mle_msa = MLEDisambiguator.pretrained('calima-msa-r13')
morph_tokenizer = MorphologicalTokenizer(disambiguator=mle_msa, scheme='atbtok')

class CustomArabicTokenizer:
    def __init__(self, base_tokenizer, morph_tokenizer):
        self.base_tokenizer = base_tokenizer
        self.morph_tokenizer = morph_tokenizer

    def __call__(self, text, **kwargs):
        morph_tokens = self.camel_morph_tokenize(text)
        morph_text = ' '.join(morph_tokens)
        return self.base_tokenizer(morph_text, **kwargs)

    def camel_morph_tokenize(self, text):
        if isinstance(text, list):
            text = ' '.join(text)
        elif not isinstance(text, str):
            raise TypeError("Input text must be a string or a list of strings.")
        words = text.split() 
        tokenized_words = self.morph_tokenizer.tokenize(words)
        return tokenized_words

    def tokenize(self, text, **kwargs):
        morph_tokens = self.camel_morph_tokenize(text)
        morph_text = ' '.join(morph_tokens)
        return self.base_tokenizer.tokenize(morph_text, **kwargs)

    def decode(self, token_ids, **kwargs):
        return self.base_tokenizer.decode(token_ids, **kwargs)


custom_tokenizer = CustomArabicTokenizer(tokenizer, morph_tokenizer)

In [None]:
def tokenize_function(examples):
    result = tokenizer(str(examples["input_text"]),truncation=True,   
                       max_length=512, return_overflowing_tokens=True)

    sample_map = result.pop("overflow_to_sample_mapping")
    for key, values in examples.items():
        result[key] = [values[i] for i in sample_map]
    return result

tokenized_datasets = formatted_dataset.map(tokenize_function, batched=True)

In [None]:
import torch
import torch.nn as nn
answer_to_index = {'A': 0, 'B': 1, 'C': 2, 'D': 3, '-1': 4}

class FinalAnswerLoss(nn.Module):
    def __init__(self):
        super(FinalAnswerLoss, self).__init__()
        self.loss_fn = nn.BCELoss()

    def forward(self, model_output, correct_answer):
        print(model_output, correct_answer)
        model_output_string = custom_tokenizer.decode(model_output, skip_special_tokens=True)
        predicted_answer = self.extract_final_answer(model_output_string)

        predicted_idx = torch.tensor([answer_to_index[predicted_answer]], dtype=torch.long)
        correct_idx = torch.tensor([answer_to_index[correct_answer]], dtype=torch.long)

        print(predicted_answer, correct_answer)
        is_correct = 1.0 if predicted_idx == correct_idx else 0.0

        prediction = torch.tensor([is_correct], dtype=torch.float32)
        target = torch.tensor([1.0], dtype=torch.float32)  

        # Compute BCE loss
        loss = self.loss_fn(prediction, target)
        return loss
    
    def extract_final_answer(self, model_output):
        # Match the block that starts after "### Solution JSON:"
        # and extract everything from the first '{' to the matching '}'
        match = re.search(r'###\s*Solution\s*JSON:\s*(\{.*)', model_output, re.DOTALL)
        
        if not match:
            # Try fallback: look for any JSON-like object
            match = re.search(r'(\{[^{}]*"solution"\s*:\s*".*?"[^{}]*\})', model_output, re.DOTALL)
        
        if not match:
            return '-1' 

        json_str = match.group(1).strip()
        json_str = match.group(1).replace('\n', ' ').replace('\r', '') + '}'

        try:
            parsed = json.loads(json_str)
            return parsed.get("solution", "").strip().upper()
        except Exception as e:
            print(model_output)
            return '-1'

In [None]:
def test_final_answer_loss():
    model_output_sample = """
        Below are some Multiple Choice Questions. Write responses in Arabic language only that appropriately complete each request in a valid, parsable JSON format with two attributes, one will be \"reasoning\" which is your thought process, \n    the other is the \"solution\" that has only a letter (a, b, c or d) in English, which represents the option you chose for the solution based on the options provided in the question.\n\n### Question:\nالمقاومة الكهربائية لمصباح مكتوب عليه 220 فولت ، 100 واط  هي\n\n\n### Options:\na. 220 أوم\nb. 202 أوم\nc. 100 أوم\nd. 484 أوم\n\n\n### Solution JSON:\n{\n    \"reasoning\": \"المقاومة الكهربائية لمصباح مكتوب عليه 220 فولت هي 100 واط. لجدول المقاومة الكهربائية، نستخدم صيغة R = V^2 / P، حيث R هي المقاومة، V هو التوتر الكهربائي، وP هو القوة الكهربائية. نستبدل القيم المعطاة: V = 220 فولت، P = 100 واط. نستبدلها في الصيغة: R = (220)^2 / 100 = 484 أوم. لذلك، المقاومة الكهربائية للمصباح هي 484 أوم.\",\n    \"solution\": \"d\"
    """
    model_output = custom_tokenizer(model_output_sample)['input_ids']

    correct_answer = 'A'
    wrong_answer = 'D'
    loss_fn = FinalAnswerLoss()
    computed_loss = loss_fn(model_output, correct_answer)
    print(computed_loss)

# Run the test
test_final_answer_loss()


In [None]:
from transformers import Trainer

class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
        # Forward pass
        outputs = model(**inputs)
        logits = outputs.get('logits')

        # Extract the correct answer from inputs
        correct_answer = inputs.get('Answer Key')

        # Compute custom loss
        loss_fct = FinalAnswerLoss()
        loss = loss_fct(logits, correct_answer)

        return (loss, outputs) if return_outputs else loss


In [None]:
import numpy as np
from sklearn.metrics import accuracy_score

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return {'accuracy': accuracy_score(labels, predictions)}

In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    learning_rate=5e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01,
    logging_dir="./logs",
)

trainer = CustomTrainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets['train'],
    eval_dataset=tokenized_datasets['valid'],
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)


In [None]:
trainer.train()