# Installing necesssary libraries

In [None]:
!pip install datasets

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
# importing necessary libraries
import os
import time
import shutil
import numpy as np
import pandas as pd
from tqdm import tqdm
from matplotlib import pyplot as plt

from datasets import load_dataset, DatasetDict
from peft import LoraConfig, get_peft_model, TaskType

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers import GenerationConfig, TrainingArguments, Trainer

In [None]:
# health-Q&A dataset
huggingface_dataset_name = "RafaelMPereira/HealthCareMagic-100k-Chat-Format-en"

ds = load_dataset(huggingface_dataset_name, split='train')


In [None]:
import re
# Function to split text into "human" and "bot" columns
def split_text(example):
    match = re.match(r"<human>:\s*(.*?)\s*<bot>:\s*(.*)", example["text"], re.DOTALL)
    if match:
        return {"human": match.group(1).strip(), "bot": match.group(2).strip()}
    return {"human": "", "bot": ""}  # Handle cases where the format is incorrect

# Apply the function to the dataset
ds = ds.map(split_text, remove_columns=["text"])
ds

In [None]:
# Selecting only 12k samples for faster fine-tuning
ds = ds.select(range(12000))
ds

In [None]:
# Split the dataset into train and test
train_test_split = ds.train_test_split(test_size=0.05, seed=42)
train_dataset = train_test_split['train']
test_dataset = train_test_split['test']

train_validation_split = train_dataset.train_test_split(test_size=0.05, seed=42)
train_dataset = train_validation_split['train']
validation_dataset = train_validation_split['test']

print(ds)

In [None]:
def print_number_of_trainable_model_parameters(model):
    trainable_model_params = 0
    all_model_params = 0
    for _, param in model.named_parameters():
        all_model_params += param.numel()
        if param.requires_grad:
            trainable_model_params += param.numel()
    return f"trainable model parameters: {trainable_model_params}\nall model parameters: {all_model_params}\npercentage of trainable model parameters: {100 * trainable_model_params / all_model_params:.2f}%"

In [None]:
my_device = "cuda" if torch.cuda.is_available() else "cpu"
print("My Device: {}".format(my_device))

In [None]:
# initialize the model
model_name = "HuggingFaceTB/SmolLM2-360M" # HuggingFaceTB/SmolLM2-135M, HuggingFaceTB/SmolLM2-360M, HuggingFaceTB/SmolLM2-1.7B, HuggingFaceTB/SmolLM2-1.7B-Instruct
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16).to(my_device)

In [None]:
tokenizer.pad_token = tokenizer.eos_token  # decoder-only models typically don't have a padding token pre-defined
model.config.pad_token_id = tokenizer.pad_token_id # padding is needed for batching

In [None]:
# Chat template
def generate_chat_template(query_text, my_tokenizer, my_device, instruct_model=False):
    # create a system message

    if instruct_model:
        messages = [{"role": "user", "content": query_text}]
        input_text = my_tokenizer.apply_chat_template(messages, tokenize=False)
        inputs = my_tokenizer.encode(input_text, return_tensors="pt").to(my_device)
    else:
        inputs = my_tokenizer.encode(query_text, return_tensors="pt").to(my_device)

    return inputs

def generate_output(my_inputs, my_tokenizer, my_model, max_tokens = 50, temp = 0.3, top_p = 0.9, top_k=50, penalty_score=1.2, do_sample = True, instruct_model=False):

    if instruct_model:
        outputs = my_model.generate(my_inputs, max_new_tokens=max_tokens, temperature=temp, top_p=top_p, top_k=top_k, repetition_penalty=penalty_score, do_sample=do_sample)
        output_text = my_tokenizer.decode(outputs[0], skip_special_tokens=True)
        cleaned_output_text = output_text.split("<|im_start|>assistant")[1].split("<|im_end|>")[0].strip()
    else:
         outputs = my_model.generate(my_inputs, max_new_tokens=max_tokens, temperature=temp, top_p=top_p, top_k=top_k, repetition_penalty=penalty_score, do_sample=do_sample,
                                     eos_token_id=my_tokenizer.eos_token_id)
         cleaned_output_text = my_tokenizer.decode(outputs[0], skip_special_tokens=True)

    return cleaned_output_text

In [None]:
def tune_generate_chat_template(query_text, my_tokenizer, my_device, instruct_model=False):
    """
    Formats the query based on the instruction tuning prompt template.
    """

    # Apply the updated instruction-tuned prompt template
    formatted_prompt = f"""
    ### Instruction:
    You are an AI medical assistant. Respond to the patient request, help them in curing the problem which they are facing by providing them proper diagnosis.

    ### Patient Request:
    {query_text}

    ### Response:
    """

    if instruct_model:
        messages = [{"role": "user", "content": formatted_prompt}]
        input_text = my_tokenizer.apply_chat_template(messages, tokenize=False)
        inputs = my_tokenizer.encode(input_text, return_tensors="pt").to(my_device)
    else:
        inputs = my_tokenizer.encode(formatted_prompt, return_tensors="pt").to(my_device)

    return inputs

def tune_generate_output(my_inputs, my_tokenizer, my_model, max_tokens=50, temp=0.3, top_p=0.9, top_k=50, penalty_score=1.2, do_sample=True, instruct_model=False):
    """
    Generates a response from the model based on the input.
    """

    outputs = my_model.generate(
        my_inputs,
        max_new_tokens=max_tokens,
        temperature=temp,
        top_p=top_p,
        top_k=top_k,
        repetition_penalty=penalty_score,
        do_sample=do_sample,
        pad_token_id=tokenizer.eos_token_id,
        eos_token_id=my_tokenizer.eos_token_id
    )

    # Decode output and clean it up
    output_text = my_tokenizer.decode(outputs[0], skip_special_tokens=True)

    # Ensure safe parsing without hardcoded token removal
    cleaned_output_text = output_text.strip()

    return cleaned_output_text

In [None]:
sample_query = "Question: What is the capital of Germany?"
sample_inputs = generate_chat_template(sample_query, tokenizer, my_device)
sample_output = generate_output(sample_inputs, tokenizer, model, max_tokens=20, temp = 0.6, top_p = 0.6, top_k=50, penalty_score=1.2, do_sample = True, instruct_model=False)

print(sample_query)
print("="*10)
print(sample_output)

In [None]:
train_dataset[0]

##  Inference on the pretrained model


In [None]:
sample_idx = 10

sample_query = train_dataset['human'][sample_idx]
sample_response = train_dataset['bot'][sample_idx]
sample_inputs = generate_chat_template(sample_query, tokenizer, my_device)
sample_output = generate_output(sample_inputs, tokenizer, model, max_tokens=50, temp = 0.7, top_p = 0.6, top_k=50, penalty_score=1.2, do_sample = True, instruct_model=False)

print("="*10)
print(sample_query)
print("="*10)
print(sample_response)
print("="*10)
print(sample_output)

In [None]:
print(print_number_of_trainable_model_parameters(model))

In [None]:
model  # this will show us the model architecture

In [None]:
## LORA ##
lora_config = LoraConfig(
    r=8, # Rank
    lora_alpha=8,
    target_modules=["q_proj", "v_proj", "k_proj", "o_proj"], #"gate_proj", "up_proj", "down_proj"
    lora_dropout=0.1,
    bias="none",
    task_type=TaskType.CAUSAL_LM
)

peft_model = get_peft_model(model, lora_config).to(my_device)

print(print_number_of_trainable_model_parameters(peft_model))

**Just training 0.45% parameters of the total model**

In [None]:
## TRY DIFFERENT PROMPTS ##
sample_idx = 0

sample_query = train_dataset['human'][sample_idx]


prompt = f"""
### Instruction:
You are an AI medical assistant. Respond to the patient request, help them in curing the problem which they are facing by providing them proper diagnosis.

### Patient Request:
{sample_query}

### Response:
"""

inputs = tokenizer.encode(prompt, return_tensors="pt").to(my_device)
outputs = model.generate(inputs, max_new_tokens=50, temperature=0.6, top_p=0.7, top_k=50, repetition_penalty=1.2, do_sample=True, eos_token_id=tokenizer.eos_token_id)
sample_output = tokenizer.decode(outputs[0], skip_special_tokens=True)

sample_response = train_dataset['bot'][sample_idx]

print("="*10)
print(sample_query)
print("="*10)
print(sample_response)
print("="*10)
print(sample_output)

In [None]:
sample_idx = 0

sample_query = train_dataset['human'][sample_idx]
sample_response = train_dataset['bot'][sample_idx]

sample_inputs = tune_generate_chat_template(sample_query, tokenizer, my_device)
sample_output = tune_generate_output(sample_inputs, tokenizer, model, max_tokens=100, temp = 0.7, top_p = 0.6, top_k=50, penalty_score=1.2, do_sample = True, instruct_model=False)


print("="*10)
print(sample_query)
print("="*10)
print(sample_response)
print("="*10)
print(sample_output)

In [None]:
def tokenize_function(examples):
    """
    Tokenizes the dataset using the fine-tuning instruction prompt format.
    Handles batched processing by iterating over each example.
    """

    # Construct the full prompt for each example in the batch
    formatted_prompts = [
        f"""
        ### Instruction:
        You are an AI medical assistant. Respond to the patient request, help them in curing the problem which they are facing by providing them proper diagnosis.

        ### Patient Request:
        {human}

        ### Response:
        {bot}
        """
        for human, bot in zip(examples["human"], examples["bot"])
    ]

    # Tokenize each formatted prompt
    tokenized_outputs = tokenizer(
        formatted_prompts,
        padding="max_length",
        truncation=True,
        max_length=512,  # Adjust as needed
    )
    # Return tokenized inputs
    return {
        "input_ids": tokenized_outputs["input_ids"],
        "attention_mask": tokenized_outputs["attention_mask"],
        "labels": tokenized_outputs["input_ids"],  # Causal LM: labels = input_ids
    }

# Tokenizing and processing all dataset splits
tokenized_train_dataset = train_dataset.map(tokenize_function, batched=True)
tokenized_train_dataset = tokenized_train_dataset.remove_columns(['human', 'bot'])

tokenized_test_dataset = test_dataset.map(tokenize_function, batched=True)
tokenized_test_dataset = tokenized_test_dataset.remove_columns(['human', 'bot'])

tokenized_validation_dataset = validation_dataset.map(tokenize_function, batched=True)
tokenized_validation_dataset = tokenized_validation_dataset.remove_columns(['human', 'bot'])

In [None]:
## now merge the splits into a single dataset format
tokenized_datasets = DatasetDict({
    'train': tokenized_train_dataset,
    'validation': tokenized_validation_dataset,
    'test': tokenized_test_dataset
})

print(f"Shapes of the datasets:")
print(f"Training: {tokenized_datasets['train'].shape}")
print(f"Validation: {tokenized_datasets['validation'].shape}")
print(f"Test: {tokenized_datasets['test'].shape}")

print(tokenized_datasets)

# Training the model

In [None]:
#### PEFT Training ####
output_dir = f'/content/SmolLM2-fine-tuned/health_qa_base_tune_peft-350M-{str(time.strftime("%Y_%m_%d_%H_%M"))}'   # define the directory to store the results

## DO IT FALSE, IF THERE ARE ANY VALUABLE MODELS!!!!
if False:
  shutil.rmtree('tuned_models_SmolLM2')

if not os.path.exists(output_dir):
    os.makedirs(output_dir)

step_size = 100 # we have 11k in total
lr = 1e-3
n_epochs = 2
batch_size = 8
gradient_acc = 4
warmup_steps = 50 # previously:0
peft_training_args = TrainingArguments(
    output_dir=output_dir,
    auto_find_batch_size=False,
    learning_rate=lr,
    num_train_epochs=n_epochs,
    logging_steps=step_size,  # Logs training loss every x.th steps
    save_steps=step_size,
    warmup_steps=warmup_steps,
    logging_strategy="steps",  # Ensures logs are printed every x.th steps
    evaluation_strategy="steps",  # Runs validation after each epoch
    save_strategy="steps",  # Saves model checkpoints after each epoch
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    gradient_accumulation_steps=gradient_acc,
    no_cuda=not torch.cuda.is_available(),
    report_to="none"  # Disable WandB
)

peft_trainer = Trainer(
    model=peft_model,
    args=peft_training_args,
    train_dataset=tokenized_datasets["train"],
   eval_dataset=tokenized_datasets['validation']
)

peft_trainer.train()

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import torch
from peft import PeftModel

# Define the model path (same as output_dir)
model_path = "/content/SmolLM2-fine-tuned/health_qa_base_tune_peft-350M-2025_04_07_12_50/checkpoint-676/"  # Replace with the actual timestamp

# Load base model (Make sure it's the same base model used for fine-tuning)
base_model_name = "HuggingFaceTB/SmolLM2-360M"  # Replace with the actual base model name
base_model = AutoModelForCausalLM.from_pretrained(base_model_name)

# Load fine-tuned LoRA model
peft_model = PeftModel.from_pretrained(base_model, model_path)

# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(base_model_name)

# Move model to GPU if available
device = "cuda" if torch.cuda.is_available() else "cpu"
peft_model.to(device)

# Set model to evaluation mode
peft_model.eval()


In [None]:
# Inference on fine-tuned model
def generate_response(prompt, max_length=256):
    inputs = tokenizer(prompt, return_tensors="pt").to(device)
    with torch.no_grad():
        output = peft_model.generate(**inputs, max_length=max_length)
    return tokenizer.decode(output[0], skip_special_tokens=True)

# Example usage:
prompt = "What are the symptoms of depression?"
response = generate_response(prompt)
print(response)

In [None]:
from peft import PeftModel
from transformers import AutoModelForCausalLM

# Merge LoRA weights into base model
merged_model = peft_model.merge_and_unload()

# Save & push merged full model
merged_model.push_to_hub("Kaith-jeet123/SmolLM2-fine-tuned-healthQA")
tokenizer.push_to_hub("Kaith-jeet123/SmolLM2-fine-tuned-healthQA")
