In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
! pip install transformers accelerate peft datasets

In [None]:
# Load the dataset

from datasets import load_dataset

dataset = load_dataset("OpenAssistant/oasst1")

In [None]:
dataset = dataset["train"]

# Preprocessing the dataset to get the proper and valid responses

sft_data = []
counter = 0

for row in dataset:
    if row["role"] == "assistant" and row.get("parent_id") is not None:
        parent = next((item for item in dataset if item["message_id"] == row["parent_id"]), None)
        if parent and parent["role"] == "prompter":
            counter += 1
            sft_data.append({"prompt": parent["text"],"response": row["text"]})
            if counter == 500:
                break


In [None]:
len(sft_data)

In [None]:
from transformers import AutoTokenizer

# Model and Tokenizer initialisation

model_name = "distilgpt2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token

# Tokenize definition to generate tokens for prompt + response text

def tokenize(example):
    input_text = example["prompt"] + "\n\n" + example["response"]
    return tokenizer(input_text, truncation = True, max_length = 256, padding = "max_length")

# Map the function with original dataset

tokenized_data = list(map(tokenize, sft_data))

In [None]:
tokenized_data[0]

In [None]:
import torch
from transformers import AutoModelForCausalLM, Trainer, TrainingArguments, DataCollatorForLanguageModeling
from datasets import Dataset
from peft import get_peft_model, LoraConfig, TaskType

dataset = Dataset.from_list(tokenized_data)
model = AutoModelForCausalLM.from_pretrained(model_name)

# LoRA fine tuning to get the SFT Model

lora_config = LoraConfig(
    r = 8,
    lora_alpha = 16,
    lora_dropout = 0.1,
    target_modules = ["c_attn"],
    bias = "none",
    task_type = TaskType.CAUSAL_LM
)

# SFT Model
model = get_peft_model(model,lora_config)

# Freezinf the Pre Trained Weights other than LoRA Params
for name, param in model.named_parameters():
    if "lora" not in name:
        param.requires_grad = False

# Batching data
data_collator = DataCollatorForLanguageModeling(tokenizer = tokenizer, mlm = False)

# Training Args required for Trainer Class
training_args = TrainingArguments(
    output_dir="./lora-distilgpt2",
    per_device_train_batch_size = 4,
    num_train_epochs = 3,
    eval_strategy = "no",
    fp16 = torch.cuda.is_available(),
    save_total_limit = 2,
    remove_unused_columns = False,
    report_to = "none"
)

# Trainer class invoke
trainer = Trainer(
    model = model,
    args = training_args,
    train_dataset = dataset,
    data_collator = data_collator
)

In [None]:
# Train the model
trainer.train()

In [None]:
!pip install -q transformers accelerate datasets peft trl

In [None]:
from transformers import AutoModelForSequenceClassification, AutoTokenizer

# Load a reward model 

model_name = "OpenAssistant/reward-model-deberta-v3-large-v2"
model = AutoModelForSequenceClassification.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Inference of that model
question = "Explain nuclear fusion like I am five."
answer = "Nuclear fusion is the process by which two or more protons and neutrons combine to form a single nucleus."

inputs = tokenizer(question, answer, return_tensors='pt')

outputs = model(**inputs)
score = outputs.logits[0].cpu().detach().numpy()

# Reward Score for the prompt
print(f"Score: {score}")


In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModelForSequenceClassification
from peft import PeftModel
from trl import PPOTrainer, PPOConfig
from datasets import Dataset
import torch

base_model_name = "distilgpt2"             # Base Model
sft_model_path = "/kaggle/working/lora-distilgpt2/checkpoint-189"      # Supervised Fine Tuned (SFT) Model path
tokenizer = AutoTokenizer.from_pretrained(base_model_name)             # Tokenizer
tokenizer.pad_token = tokenizer.eos_token                              # PAD Token
model = AutoModelForCausalLM.from_pretrained(base_model_name)         
model = PeftModel.from_pretrained(model, sft_model_path)               # Supervised Fine Tuned (SFT) Model
reward_tokenizer = AutoTokenizer.from_pretrained("OpenAssistant/reward-model-deberta-v3-large-v2")       # Reward Tokenizer
reward_model = AutoModelForSequenceClassification.from_pretrained("OpenAssistant/reward-model-deberta-v3-large-v2")   # Reward Model
reward_model.eval()

In [None]:
# PPO Configuration parameters
 
ppo_config = PPOConfig(
    output_dir="./ppo_output",
    learning_rate=1e-5,
    batch_size=1,
    mini_batch_size=1,
    gradient_accumulation_steps=1,
)

# Testing Prompts

prompts = [
    "What is reinforcement learning?",
    "How does a black hole form?",
    "Explain quantum computing in simple terms.",
    "Why is the sky blue?",
    "Tell me a fun fact about animals."
]

# Keep all the prompts in a HuggingFace Format
dataset = Dataset.from_dict({"prompt": prompts})

# Reward Score generator definiton given a prompt
def get_reward(prompt, response):
    with torch.no_grad():
        inputs = reward_tokenizer(prompt, response, return_tensors="pt", padding=True, truncation=True).to(reward_model.device)
        reward = reward_model(**inputs).logits[0].item()
    return reward

In [None]:
import inspect
from trl import PPOTrainer

# TRL - Transformer Reinforcement Learning

print(inspect.signature(PPOTrainer.__init__))


In [None]:
ppo_trainer = PPOTrainer(
    config=ppo_config,             # PPOConfig instance
    processing_class=tokenizer,    # tokenizer or processor
    policy=model,                  # your LoRA fine-tuned model (policy network)
    ref_policy=AutoModelForCausalLM.from_pretrained(base_model_name),  # base model without LoRA
    reward_model=reward_model,    # your reward model
    train_dataset=dataset,        # Dataset of prompts
    value_model=model       # value function model (usually a causal LM too)
)

In [None]:
from transformers import TrainerCallback

class LoggingCallback(TrainerCallback):
    def on_log(self, args, state, control, logs=None, **kwargs):
        print(f"Step {state.global_step}: {logs}")

# PPO Trainer class and their parameters

ppo_trainer = PPOTrainer(
    config=ppo_config,
    processing_class=tokenizer,
    policy=model,
    ref_policy=AutoModelForCausalLM.from_pretrained(base_model_name),
    reward_model=reward_model,
    train_dataset=dataset,
    value_model=model,
    callbacks=[LoggingCallback()]
)

# Train the model
ppo_trainer.train()


In [None]:
# Save the model
ppo_trainer.save_pretrained("./ppo_output")

from peft import PeftModel
from transformers import AutoModelForCausalLM

# Load the PPO Optimized Model

base_model = AutoModelForCausalLM.from_pretrained(base_model_name)
ppo_model = PeftModel.from_pretrained(base_model, "./ppo_output")
ppo_model.eval()


In [None]:
# Test Prompts

test_prompts = [
    "Explain reinforcement learning simply.",
    "What is a black hole?",
    "Tell me about quantum physics."
]

# Perform Inference

for prompt in test_prompts:
    inputs = tokenizer(prompt, return_tensors="pt").to(ppo_model.device)
    generated_ids = ppo_model.generate(
        input_ids=inputs["input_ids"],
        max_length=100,
        pad_token_id=tokenizer.eos_token_id,
        do_sample=True,
        top_k=50,
        top_p=0.95,
        temperature=0.8,
    )
    response = tokenizer.decode(generated_ids[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True)
    print(f"\nPrompt: {prompt}\nResponse: {response}")


In [None]:
# Print reward for each prompt

for prompt in test_prompts:
    response = ...
    reward = get_reward(prompt, response)
    print(f"Reward: {reward}")


In [None]:
from trl import DPOTrainer
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="./dpo-distilgpt2",
    per_device_train_batch_size=4,
    num_train_epochs=3,
    logging_steps=10,
    save_strategy="epoch",
    eval_strategy="no",
    learning_rate=5e-5,
    fp16=True,
    remove_unused_columns=False,
)

trainer = DPOTrainer(
    model=model,
    ref_model=ref_model,
    args=training_args,
    beta=0.1,
    train_dataset=dpo_dataset,  # must have 'prompt', 'chosen', 'rejected'
    tokenizer=tokenizer,
)


In [None]:
trainer.train()

In [None]:
trainer.save_model("./dpo-aligned-distilgpt2")

In [None]:
from transformers import pipeline

# Load the fine-tuned model
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained("./dpo-aligned-distilgpt2")
tokenizer = AutoTokenizer.from_pretrained(base_model_name)

# Create a text generation pipeline
generator = pipeline("text-generation", model=model, tokenizer=tokenizer)

# Generate a response
prompt = "Explain reinforcement learning in simple terms."
response = generator(prompt, max_length=100, do_sample=True, top_k=50, top_p=0.95, temperature=0.8)
print(response[0]["generated_text"])
