In [1]:

# =======================================================
# Task A - Persian Generative QA with QLoRA (Llama-3.2-1B-bnb-4bit)
# =======================================================

import os, re, json, random, torch
import numpy as np
from pathlib import Path
from datasets import Dataset, DatasetDict
from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, BitsAndBytesConfig, DataCollatorForLanguageModeling
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from huggingface_hub import login
from tqdm import tqdm
# -------------------------
# Config
# -------------------------
SEED = 42
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
OUTPUT_DIR = "./qadata"
os.makedirs(OUTPUT_DIR, exist_ok=True)
TRAIN_FRACTION = 0.25  # For quick testing; set to 1.0 for full
VAL_FRACTION   = 0.50
TRAIN_MAX_SAMPLES = None
VAL_MAX_SAMPLES = None
MODEL_ID = "unsloth/Llama-3.2-1B-bnb-4bit"
BATCH_SIZE = 8
GR_ACCUM   = 2
EPOCHS     = 1  # Increase to 2-3 for better results
LR         = 2e-4
MAX_LENGTH = 1024
DOC_STRIDE = 128
WARMUP_RATIO = 0.03
# Login if needed (for gated models)
# token = 'your_hf_token'
# login(token)
# -------------------------
# Data loader
# -------------------------
def read_qa(path):
    ds = []
    with open(Path(path), encoding="utf-8") as f:
        squad = json.load(f)
    for example in squad["data"]:
        title = example.get("title", "").strip()
        for paragraph in example["paragraphs"]:
            context = paragraph["context"].strip()
            for qa in paragraph["qas"]:
                answers = [a["text"].strip() for a in qa["answers"]]
                answer_starts = [a["answer_start"] for a in qa["answers"]]
                ds.append({
                    "title": title,
                    "context": context,
                    "question": qa["question"].strip(),
                    "id": qa["id"],
                    "answers": {"answer_start": answer_starts, "text": answers}
                })
    return ds
train_ds = read_qa("./qadata/pqa_train.json")
val_ds   = read_qa("./qadata/pqa_test.json")
train_dataset = Dataset.from_list(train_ds)
val_dataset   = Dataset.from_list(val_ds)
raw_ds = DatasetDict({"train": train_dataset, "validation": val_dataset})
# -------------------------
# Persian normalization
# -------------------------
def normalize_persian(text: str) -> str:
    if not text: return ""
    text = text.replace("\u200c", " ").replace("ي","ی").replace("ك","ک")
    return re.sub(r"\s+", " ", text).strip()
def map_to_squad(example):
    answers = {"text": [normalize_persian(t) for t in example["answers"]["text"]],
               "answer_start": example["answers"]["answer_start"]}
    return {
        "id": str(example.get("id", "")),
        "context": normalize_persian(example["context"]),
        "question": normalize_persian(example["question"]),
        "answers": answers,
    }
mapped = raw_ds.map(map_to_squad)
# -------------------------
# Subset for speed
# -------------------------
def take_subset(ds_split, frac=None, max_samples=None, seed=SEED):
    idxs = list(range(len(ds_split)))
    random.Random(seed).shuffle(idxs)
    if frac: idxs = idxs[:max(1, int(len(ds_split)*frac))]
    if max_samples: idxs = idxs[:max_samples]
    return ds_split.select(idxs)
train_small = take_subset(mapped["train"], frac=TRAIN_FRACTION, max_samples=TRAIN_MAX_SAMPLES)
val_small   = take_subset(mapped["validation"], frac=VAL_FRACTION, max_samples=VAL_MAX_SAMPLES)
# -------------------------
# Tokenizer
# -------------------------
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, use_fast=True)
if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token
# -------------------------
# Prepare features for generative QA
# -------------------------
def prepare_qa_features(examples):
    prompts = []
    for q, c, ans in zip(examples["question"], examples["context"], examples["answers"]):
        answer_text = ans["text"][0] if ans["text"] else ""
        prompt = f"زمینه: {c}\nسوال: {q}\nپاسخ: {answer_text}"
        prompts.append(prompt)
    
    tokenized = tokenizer(
        prompts,
        truncation=True,
        max_length=MAX_LENGTH,
        padding="max_length",
        return_tensors="pt"
    )
    
    # Create labels for LM (shifted input_ids, -100 for non-answer parts)
    labels = tokenized.input_ids.clone()
    labels[labels == tokenizer.pad_token_id] = -100
    
    # Optionally, mask loss to only answer part (after "پاسخ:")
    for i in range(len(prompts)):
        prompt_len = len(tokenizer(f"زمینه: {examples['context'][i]}\nسوال: {examples['question'][i]}\nپاسخ:")["input_ids"])
        labels[i, :prompt_len] = -100
    
    tokenized["labels"] = labels
    return tokenized
train_features = train_small.map(prepare_qa_features, batched=True, remove_columns=train_small.column_names)
val_features   = val_small.map(prepare_qa_features, batched=True, remove_columns=val_small.column_names)
# -------------------------
# QLoRA config & model
# -------------------------
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_use_double_quant=True,
    bnb_4bit_compute_dtype=torch.bfloat16 if torch.cuda.is_available() and torch.cuda.get_device_capability()[0]>=8 else torch.float16
)
lora_config = LoraConfig(
    r=16, lora_alpha=32, lora_dropout=0.05,
    bias="none", task_type="CAUSAL_LM",
    target_modules=["q_proj","k_proj","v_proj","o_proj","gate_proj","up_proj","down_proj"]
)
model = AutoModelForCausalLM.from_pretrained(
    MODEL_ID, quantization_config=bnb_config, device_map="cpu", trust_remote_code=True
)
model = prepare_model_for_kbit_training(model)
model = get_peft_model(model, lora_config)
print("Using model:", MODEL_ID)







  from .autonotebook import tqdm as notebook_tqdm
Map: 100%|██████████| 9008/9008 [00:03<00:00, 2401.16 examples/s]
Map: 100%|██████████| 930/930 [00:00<00:00, 2798.38 examples/s]
Map: 100%|██████████| 2252/2252 [00:05<00:00, 390.77 examples/s]
Map: 100%|██████████| 465/465 [00:00<00:00, 534.98 examples/s]


Using model: unsloth/Llama-3.2-1B-bnb-4bit


In [None]:
from transformers import Trainer

# -------------------------
# Trainer & Training
# -------------------------
use_bf16 = torch.cuda.is_available() and torch.cuda.get_device_capability()[0]>=8
args=TrainingArguments(
    output_dir=OUTPUT_DIR,
    per_device_train_batch_size=BATCH_SIZE,
    per_device_eval_batch_size=BATCH_SIZE,
    gradient_accumulation_steps=GR_ACCUM,
    num_train_epochs=EPOCHS,
    learning_rate=LR,
    lr_scheduler_type="cosine",
    warmup_ratio=WARMUP_RATIO,
    logging_steps=50,
    eval_strategy="epoch",
    save_strategy="epoch",
    optim="paged_adamw_32bit",
    gradient_checkpointing=True,
    remove_unused_columns=False,
    bf16=False,
    fp16=False,
    report_to=[],
    no_cuda=True,  # <--- Add this line
)
data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)
trainer = Trainer(
    model=model, args=args, train_dataset=train_features, eval_dataset=val_features,
    tokenizer=tokenizer, data_collator=data_collator
)
trainer.train()
# Save adapters
trainer.save_model(os.path.join(OUTPUT_DIR, "lora_adapters"))
# -------------------------
# Evaluation: F1 & EM
# -------------------------
def normalize_for_eval(s: str) -> str:
    s = normalize_persian(s)
    s = re.sub(r"[\p{P}،؛؟]+", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s
def f1_score(prediction: str, ground_truth: str) -> float:
    pred_tokens = normalize_for_eval(prediction).split()
    gt_tokens = normalize_for_eval(ground_truth).split()
    common = {t: min(pred_tokens.count(t), gt_tokens.count(t)) for t in set(pred_tokens)}
    num_same = sum(common.values())
    if len(pred_tokens) == 0 or len(gt_tokens) == 0:
        return float(pred_tokens == gt_tokens)
    if num_same == 0:
        return 0.0
    precision = num_same / len(pred_tokens)
    recall = num_same / len(gt_tokens)
    return 2 * precision * recall / (precision + recall)
def exact_match_score(prediction: str, ground_truth: str) -> float:
    return float(normalize_for_eval(prediction) == normalize_for_eval(ground_truth))
model.eval()
preds = []
refs = []
for example in tqdm(val_small):
    q = example["question"]
    c = example["context"]
    gold = example["answers"]["text"][0] if example["answers"]["text"] else ""
    refs.append(gold)
    
    prompt = f"زمینه: {c}\nسوال: {q}\nپاسخ:"
    inputs = tokenizer(prompt, return_tensors="pt")
    inputs = {k: v.to(DEVICE) for k, v in inputs.items()}
    with torch.no_grad():
        output = model.generate(**inputs, max_new_tokens=50, num_beams=1, do_sample=False)
    generated = tokenizer.decode(output[0], skip_special_tokens=True).split("پاسخ:")[-1].strip()
    preds.append(generated)
ems = [exact_match_score(p, r) for p, r in zip(preds, refs)]
f1s = [f1_score(p, r) for p, r in zip(preds, refs)]
EM = sum(ems) / len(ems) if ems else 0.0
F1 = sum(f1s) / len(f1s) if f1s else 0.0
print({"Exact Match": EM, "F1": F1})
# Save metrics
with open(os.path.join(OUTPUT_DIR, "metrics.json"), "w", encoding="utf-8") as f:
    json.dump({"exact_match": EM, "f1": F1}, f, ensure_ascii=False, indent=2)
print("Done. Metrics saved at:", os.path.join(OUTPUT_DIR, "metrics.json"))

  trainer = Trainer(
`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`.
