In [None]:
#INSTALL DEPENDENCIES
!pip install -q transformers datasets peft accelerate bitsandbytes evaluate rouge_score bert-score

In [None]:
#CHECKING GPU
!nvidia-smi

In [None]:
# IMPORTING LIBRARIES
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pprint import pprint
import json
import torch
import evaluate
from datasets import load_dataset
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    TrainingArguments,
    Trainer,
    BitsAndBytesConfig,
    DataCollatorForLanguageModeling
)
from peft import LoraConfig, get_peft_model
from transformers import BitsAndBytesConfig, AutoModelForCausalLM

In [None]:
#LOADING DATASET
MEDQUAD = "lavita/MedQuAD"
MEDQUAD = load_dataset(MEDQUAD)

In [None]:
#SPLITTING THE DATASET
train = MEDQUAD["train"].train_test_split(test_size=0.3, seed=42)
test = train["test"].train_test_split(test_size=0.5, seed=42)

In [None]:
# Assign splits to descriptive variables
train_data = train["train"]
validation_data = test["train"]
test_data = test["test"]

In [None]:
# Print dataset sizes
print(f"Train size: {len(train_data)}")
print(f"Validation size: {len(validation_data)}")
print(f"Test size: {len(test_data)}")

In [None]:
# LOADING BASE LINE MODEL
BASE_MODEL= "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
med_tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL)
if med_tokenizer.pad_token is None:
    med_tokenizer.pad_token = med_tokenizer.eos_token
med_base_model = AutoModelForCausalLM.from_pretrained(
    BASE_MODEL,
    torch_dtype=torch.float16,
    device_map="auto"
)
med_base_model.config.pad_token_id = med_tokenizer.pad_token_id

In [None]:
# Evaluate Model Performance
def evaluate_med_model(model, eval_dataset, max_samples=50):
    all_predictions = []
    all_references = []
    for item in eval_dataset.select(range(min(max_samples, len(eval_dataset)))):
        question_text = str(item.get("question", "") or "").strip()
        reference_text = str(item.get("answer", "") or "").strip()
        if not question_text or not reference_text:
            continue
        predicted_answer = generate_answer(model, question_text)
        predicted_answer = str(predicted_answer or "").strip()
        all_predictions.append(predicted_answer)
        all_references.append(reference_text)
    if not all_predictions:
        print("No valid samples found!")
        return {}, {}
    rouge_scores = rouge.compute(predictions=all_predictions, references=all_references)
    bleu_scores = bleu.compute(
        predictions=all_predictions,
        references=[[ref] for ref in all_references]
    )
    return rouge_scores, bleu_scores

In [None]:
# Loading QLoRA (4-bit) TinyLlama Model
qlora_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_use_double_quant=True
)
qlora_model = AutoModelForCausalLM.from_pretrained(
    BASE_MODEL,
    quantization_config=qlora_config,
    device_map="auto",
    torch_dtype=torch.float16
)
if med_tokenizer.pad_token is None:
    med_tokenizer.pad_token = med_tokenizer.eos_token
qlora_model.config.pad_token_id = med_tokenizer.pad_token_id

In [None]:
# LoRA configuration
tiny_lora_config = LoraConfig(
    r=8,
    lora_alpha=16,
    target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM"
)
med_lora_model = get_peft_model(qlora_model, tiny_lora_config)
med_lora_model.print_trainable_parameters()

In [None]:
# Tokenization & Preprocessing
MAX_SEQ_LENGTH = 256
def preprocess_med_sample(sample):
    question_text = str(sample.get("question", "") or "").strip()
    answer_text = str(sample.get("answer", "") or "").strip()
    if not question_text or not answer_text:
        return {
            "input_ids": [0] * MAX_SEQ_LENGTH,
            "attention_mask": [0] * MAX_SEQ_LENGTH,
            "labels": [-100] * MAX_SEQ_LENGTH
        }
    prompt_text = f"<|user|>\n{question_text}\n<|assistant|>\n"
    full_text = prompt_text + answer_text
    tokenized_output = med_tokenizer(
        full_text,
        truncation=True,
        padding="max_length",
        max_length=MAX_SEQ_LENGTH
    )
    labels = tokenized_output["input_ids"].copy()
    prompt_token_ids = med_tokenizer(
        prompt_text,
        truncation=True,
        max_length=MAX_SEQ_LENGTH
    )["input_ids"]

    labels[:len(prompt_token_ids)] = [-100] * len(prompt_token_ids)
    tokenized_output["labels"] = labels
    return tokenized_output

In [None]:
# Tokenize Train & Validation Datasets
train_tokenized_data = train_data.map(
    preprocess_med_sample,
    remove_columns=train_data.column_names,
    batched=False
)

validation_tokenized_data = validation_data.map(
    preprocess_med_sample,
    remove_columns=validation_data.column_names,
    batched=False
)

In [None]:
print(train_tokenized_data)

In [None]:
med_data_collator = DataCollatorForLanguageModeling(
    tokenizer=med_tokenizer,
    mlm=False
)
med_training_args = TrainingArguments(
    output_dir="./qlora_medical_model",
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    gradient_accumulation_steps=2,
    num_train_epochs=2,
    learning_rate=2e-4,
    fp16=True,
    logging_steps=50,
    eval_strategy="epoch",
    save_strategy="epoch",
    save_total_limit=1,
    load_best_model_at_end=True,
    report_to="none"
)
med_trainer = Trainer(
    model=med_lora_model,
    args=med_training_args,
    train_dataset=train_tokenized_data,
    eval_dataset=validation_tokenized_data,
    data_collator=med_data_collator
)
med_trainer.train()

In [None]:
med_lora_model.save_pretrained("./qlora_medical_model")
med_tokenizer.save_pretrained("./qlora_medical_model")

In [None]:
# Loading Evaluation Metrics
rouge_metric = evaluate.load("rouge")
bleu_metric = evaluate.load("bleu")
device = "cuda" if torch.cuda.is_available() else "cpu"

def generate_med_answer(model, question_text, max_input_length=256, max_output_tokens=100):
    if not question_text:
        return ""
    model.eval()
    prompt_text = f"<|user|>\n{question_text}\n<|assistant|>\n"
    inputs = med_tokenizer(
        prompt_text,
        return_tensors="pt",
        truncation=True,
        max_length=max_input_length
    ).to(device)
    with torch.no_grad():
        generated_outputs = model.generate(
            **inputs,
            max_new_tokens=max_output_tokens,
            do_sample=False,
            pad_token_id=med_tokenizer.pad_token_id
        )
    decoded_text = med_tokenizer.decode(generated_outputs[0], skip_special_tokens=True)
    answer_text = decoded_text.replace(prompt_text, "").strip()
    return answer_text

# Evaluating Model Performance
def evaluate_med_model(model, eval_dataset, max_samples=50):
    all_predictions = []
    all_references = []
    for item in eval_dataset.select(range(min(max_samples, len(eval_dataset)))):
        question_text = str(item.get("question", "") or "").strip()
        reference_text = str(item.get("answer", "") or "").strip()
        if not question_text or not reference_text:
            continue
        predicted_answer = generate_med_answer(model, question_text) or ""
        all_predictions.append(predicted_answer)
        all_references.append(reference_text)
    if not all_predictions:
        print("No valid samples found!")
        return {}, {}
    rouge_scores = rouge_metric.compute(predictions=all_predictions, references=all_references)
    bleu_scores = bleu_metric.compute(
        predictions=all_predictions,
        references=[[ref] for ref in all_references]
    )
    return rouge_scores, bleu_scores

In [None]:
#QLoRA Evaluation
qlora_rouge_scores, qlora_bleu_scores = evaluate_med_model(med_lora_model, test_data)
metrics_output = {
    "rouge": qlora_rouge_scores,
    "bleu": qlora_bleu_scores
}
def convert_numpy(obj):
    if isinstance(obj, dict):
        return {k: convert_numpy(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_numpy(v) for v in obj]
    elif isinstance(obj, np.generic):
        return obj.item()
    else:
        return obj
clean_metrics = convert_numpy(metrics_output)
print("\nQLoRA ROUGE Scores:")
pprint(clean_metrics["rouge"], width=60)

print("\nQLoRA BLEU Scores:")
pprint(clean_metrics["bleu"], width=60)

with open("qlora_medical_metrics.json", "w") as metrics_file:
    json.dump(clean_metrics, metrics_file, indent=4)

In [None]:
baseline_rouge_scores, baseline_bleu_scores = evaluate_med_model(med_base_model, test_data)
# Baseline Evaluation
def convert_numpy(obj):
    if isinstance(obj, dict):
        return {k: convert_numpy(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_numpy(v) for v in obj]
    elif isinstance(obj, np.generic):
        return float(obj)
    else:
        return obj
clean_baseline = {
    "rouge": convert_numpy(baseline_rouge_scores),
    "bleu": convert_numpy(baseline_bleu_scores)
}
print("\nBaseline ROUGE Scores:")
pprint(clean_baseline["rouge"], width=60)

print("\nBaseline BLEU Scores:")
pprint(clean_baseline["bleu"], width=60)

In [None]:
# Compare Baseline vs QLoRA Model
comparison_results = pd.DataFrame({
    "Model": ["Baseline (Phi-2)", "QLoRA Fine-Tuned"],
    "ROUGE-L": [
        baseline_rouge_scores.get("rougeL", 0.0),
        qlora_rouge_scores.get("rougeL", 0.0)
    ],
    "BLEU": [
        baseline_bleu_scores.get("bleu", 0.0),
        qlora_bleu_scores.get("bleu", 0.0)
    ]
})
print(comparison_results)
comparison_results.to_csv("med_model_comparison.csv", index=False)

In [None]:
# Model Comparison
sns.set(style="whitegrid")
plot_data = comparison_results.melt(id_vars="Model", var_name="Metric", value_name="Score")
plt.figure(figsize=(8, 5))
sns.barplot(x="Metric", y="Score", hue="Model", data=plot_data, palette="Set2")
plt.title("Baseline vs QLoRA Fine-Tuned Model Performance")
plt.ylim(0, 1)
plt.ylabel("Score")
plt.xlabel("Metric")
plt.legend(title="Model")
plt.tight_layout()
plt.show()