In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel

  from .autonotebook import tqdm as notebook_tqdm


In [32]:
# 1) Define the gating model: LLaMA encoder + 5-way classifier
class LlamaExpertGating(nn.Module):
    def __init__(self, encoder: AutoModel, num_experts: int = 5) ->str:
        super().__init__()
        # use the provided encoder
        self.encoder = encoder
        hidden_size = self.encoder.config.hidden_size
        # simple linear head to score each expert
        # self.classifier = nn.Linear(hidden_size, num_experts)
        # self.classifier = nn.Sequential(
        #         nn.Linear(hidden_size, hidden_size),
        #         nn.Tanh(),
        #         nn.Linear(hidden_size, num_experts)
        #     )
        self.classifier= nn.Sequential(nn.Linear(hidden_size, num_experts))

    def forward(self, input_ids, attention_mask, sentence:str):
        # get last_hidden_state: (batch, seq_len, hidden)
        outputs = self.encoder(input_ids=input_ids, 
                               attention_mask=attention_mask,
                               return_dict=True)
        # take the first token’s embedding ([CLS]-like)
        masked_output = outputs.last_hidden_state * attention_mask.unsqueeze(-1)
        pooled = masked_output.sum(dim=1) / attention_mask.sum(dim=1, keepdim=True)

        # pooled = outputs.last_hidden_state[:, 0, :] # (batch, hidden)
        
        logits = self.classifier(pooled)            # (batch, 5)
        probs = F.softmax(logits, dim=-1).cpu().tolist()[0]
        probs=[str(f) for f in probs]
        probs= '@'.join(probs)
        probs += "^" + sentence
        return probs

In [3]:
# 4) Load tokenizer & model
model_name = "meta-llama/Llama-3.2-3B-Instruct"
model=AutoModel.from_pretrained(
            model_name, 
            device_map="auto",
            output_hidden_states=True, return_dict_in_generate=True,
            trust_remote_code=True
        )
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)


Loading checkpoint shards: 100%|██████████| 2/2 [00:04<00:00,  2.35s/it]


In [4]:
model.config.pad_token_id = tokenizer.eos_token_id
tokenizer.pad_token = tokenizer.eos_token
model.config.pad_token_id = tokenizer.pad_token_id

In [33]:
# 2) Your five “expert” labels
experts = ["persuasion", "keyterm", "intent", "sentiment", "tom"]

# 3) Example sentences
sentences = ["Great choice! The Tesla Model 3 is an excellent vehicle. Since you've opted for an E",
    "Absolutely. The battery is the heart of your Tesla. With Tata AI",
    "Extract the main keywords from this paragraph.",
    "What is the users intention behind this request?",
    "I am really unhappy with the service I received today.",
    "How might the character be feeling in this scene?",
    # … add your other 5 …
]
gating_model = LlamaExpertGating(model, num_experts=len(experts))

In [36]:
# Example single sentence
sentence = "I am very angry with the service I received today."
'''Assign the probabilities of each expert to the sentence. 
   The model will output a probability distribution over the experts.'''
# Tokenize and move to device
enc = tokenizer(sentence,
                return_tensors="pt",
                truncation=True,
                padding=True)

# Run model inference
with torch.no_grad():
    # logits = gating_model(enc["input_ids"], enc["attention_mask"])
    # probs = F.softmax(logits, dim=-1).cpu().tolist()[0]
    probs=gating_model(enc["input_ids"], enc["attention_mask"], sentence)
    
    # Split by '#' and take the first part
    float_part = probs.split('#')[0]
    # Split by '@' and convert to float
    probs= [str(s) for s in float_part.split('@')]

# Print results
print(f"\nSentence: {sentence!r}")
for expert, p in zip(experts, probs):
    print(f"  {expert:10s}: {p[:6]}")



Sentence: 'I am very angry with the service I received today.'
  persuasion: 0.1877
  keyterm   : 0.1242
  intent    : 0.4683
  sentiment : 0.1759
  tom       : 0.0437


### 5 Experts

In [15]:
# Define placeholder expert modules (replace with your actual experts)
def expert_sentiment(text:str) -> str:  return f"[Sentiment applied to: {text}]"
def expert_tom(text:str) -> str:        return f"[ToM applied to: {text}]"
def expert_persuasion(text:str) -> str: return f"[Persuasion applied to: {text}]"
def expert_intent(text:str) -> str:     return f"[Intent applied to: {text}]"
def expert_keyterm(text:str) -> str:    return f"[KeyTerm applied to: {text}]"


In [40]:
experts_name = [
    expert_persuasion,  # index 0
    expert_keyterm,     # index 1
    expert_intent,      # index 2
    expert_sentiment,   # index 3
    expert_tom          # index 4
]

def apply_topk(prob_tensor, text, k=3):
    probs= [float(s) for s in prob_tensor.split('@')]
    # prob=[float(s) for s in prob_tensor]
    probs = torch.tensor(probs)
    topk_values, topk_indices = torch.topk(probs, k)
    topk_weights = F.softmax(topk_values, dim=0)
    mixed = []
    for idx, w in zip(topk_indices, topk_weights):
        expert_fn = experts_name[idx.item()]
        out = expert_fn(text)
        mixed.append(f"({w.item():.2f})*" + out)
    mixed_str = " + ".join(mixed)
    return mixed_str, topk_indices, topk_weights

probs ='1.23@4.56@7.89'
# probs = torch.tensor(probs)
text = "The product really impressed me!, give me discount to the product"

combine_output, top_idx, top_weights = apply_topk(probs, text)
combine_output

'(0.96)*[Intent applied to: The product really impressed me!, give me discount to the product] + (0.03)*[KeyTerm applied to: The product really impressed me!, give me discount to the product] + (0.00)*[Persuasion applied to: The product really impressed me!, give me discount to the product]'

### Fine-Tuned LLM

In [18]:
# 4) Load tokenizer & model
from transformers import AutoModelForCausalLM, AutoTokenizer
model_name = "meta-llama/Llama-3.2-1B-Instruct"
finetune_model=AutoModelForCausalLM .from_pretrained(
            model_name, 
            device_map="auto",
            output_hidden_states=True, return_dict_in_generate=True,
            trust_remote_code=True
        )
finetune_tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
finetune_tokenizer.pad_token = finetune_tokenizer.eos_token


In [19]:
def finetune_llm(prompt: str, model, tokenizer, max_new_tokens: int = 100) -> str:
    
    prompt +="Give more weight to the following experts as assigned by probabilities "
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

    # Generate output
    with torch.no_grad():
        output = model.generate(**inputs, max_new_tokens=max_new_tokens, pad_token_id=finetune_tokenizer.eos_token_id)

    # Decode and return the response
    response = tokenizer.decode(output[0], skip_special_tokens=True)
    return response
prompt= "Who are you "
finetune_llm(prompt, finetune_model, finetune_tokenizer)

'Who are you Give more weight to the following experts as assigned by probabilities 0.1 to 0.5\n1. **A**uthoritative source on the topic\n2. **B**lack list of suspected terrorists\n3. **C**onference of experts\n4. **D**eputy head of the intelligence agency\n5. **E**xpert in the field\n6. **F**oreign expert\n7. **G**overnment official\n8. **H**istory of the event\n9. **I**ndependent'

### Reward Functions

In [None]:
def length_reward_func(completions, **kwargs):
    """
    A simple reward function that scores responses based on their length.

    Args:
        completions (list of str): A list of responses generated by the model.
        **kwargs: The trainer passes other arguments  here, which we ignore.

    Returns:
        list of float: A list of reward scores for each completion.
    """
    
    # The function returns a list of scores, one for each completion
    
    reward=[]
    
    for sentence in completions:
        probability= sentence.split('#')[0]
        text= sentence.split('#')[1]

        combine_output,_,_ = apply_topk(probability, text)
        response = finetune_llm(combine_output, finetune_model, finetune_tokenizer)

        '''Apply Reward Function Logic Here'''
        reward.append(len(response))     


    return reward

In [47]:
lis=['1.23@4.56@7.89#sentence', '1.23@4.56@7.89#sentence']
length_reward_func(lis)



[390, 563]

## TRL GRPO

#### Dataset

In [48]:
from datasets import Dataset


prompts_data = [
    {"prompt": "I have a 2021 Honda Amaze. What insurance would you recommend?"},
    {"prompt": "Of course. HDFC ERGO offers comprehensive policies with additional benefits such as Roadside Assistance and Zero Depreciation."},
    {"prompt": "What does the comprehensive policy include and what's the premium?"},
    {"prompt": "It includes own damage, third-party liability, theft, natural disasters, and more. The premium is approx $1176 per year, based on IDV."},
    {"prompt": "Is there roadside assistance in the policy?"},
    {"prompt": "Yes, HDFC ERGO includes Roadside Assistance with services like towing, jump-start, flat tire help, and fuel delivery."},
    {"prompt": "Can I get add-ons like Zero Depreciation?"},
    {"prompt": "Yes, Zero Depreciation is a valuable add-on. It’ll cost an extra $145 yearly but maximizes your claim amount."},
    {"prompt": "Is the claim process easy?"},
    {"prompt": "HDFC ERGO has a hassle-free claim process with online tracking and a wide garage network for cashless repairs."},
    {"prompt": "I haven’t claimed before. Any benefit?"},
    {"prompt": "For your Jeep Wrangler, it's around $1200 per year. It's an investment in your adventures, knowing you're covered against the unexpected challenges of off-roading."},
    {"prompt": "That's a bit steep. Are there any discounts?"},
    {"prompt": "Let me check if you qualify for any off-road enthusiast or safe driver discounts. I want to make sure you can continue exploring without financial worries. Your passion deserves protection."},
    {"prompt": "I understand. Protecting your investment is crucial. I recommend Tata AIG General Insurance — they understand the value of luxury vehicles."},
    {"prompt": "What makes them better than other insurers for a BMW?"},
    {"prompt": "They combine thorough coverage with rapid claims resolution. If something happens, they ensure your car is back to its original condition quickly, minimizing depreciation concerns."},
    {"prompt": "What if the car is totaled? I'm worried about losing a lot of money."},
    {"prompt": "They offer Insured Declared Value (IDV) coverage, so you get the original invoice value in case of total loss. You can replace your BMW without a significant financial hit."},
    {"prompt": "What about the high-tech features? I'm worried about finding mechanics who can fix them."},
    {"prompt": "They have a network of authorized service centers with technicians trained to handle BMW's sophisticated technology. You can trust your car is in capable hands, ensuring quality repairs."},
    {"prompt": "And if I'm in an accident and need a rental car?"},
    {"prompt": "They provide rental car coverage, so you're not inconvenienced while your BMW is being repaired. You maintain your lifestyle without disruption during a difficult time."},
    {"prompt": "Okay, this sounds pretty good. How much is the premium?"}
]



# Convert the list of dictionaries to a Hugging Face Dataset object
train_dataset = Dataset.from_list(prompts_data)

print(train_dataset)


Dataset({
    features: ['prompt'],
    num_rows: 24
})


In [49]:
from trl import GRPOTrainer, GRPOConfig
from peft import LoraConfig

peft_config = LoraConfig(
    r=8,
    lora_alpha=16,
    lora_dropout=0.1,
    bias="none",
    task_type="CAUSAL_LM",
)

# GRPO training configuration
grpo_config = GRPOConfig(
    output_dir="/DATA/rohan_kirti/niladri/grpo/grpo_llama3.2_finetuned",
    beta=0.1,  # The KL-divergence regularization coefficient
    max_prompt_length=256,
    max_completion_length=512,
    per_device_train_batch_size=1,
    gradient_accumulation_steps=2,
    # num_train_epochs=3,
    max_steps=5,
    learning_rate=5e-5,
    logging_steps=1,
    report_to="wandb", # Set to "wandb" or "tensorboard" for experiment tracking
    num_generations=2,
)

# Initialize the trainer


# Save the trained adapter model
# trainer.save_model("./grpo_llama3.2_finetuned")

In [None]:
# Initialize the trainer
trainer = GRPOTrainer(
    model=model,
    args=grpo_config,
    processing_class=tokenizer,
    train_dataset=train_dataset,
    reward_funcs=[length_reward_func], # Pass our reward function in a list
    peft_config=peft_config,
)

# Start the fine-tuning process
print("Starting GRPO fine-tuning...")
trainer.train()
print("Fine-tuning complete!")

