In [None]:
import sys
sys.path.append('../..')  # Add parent directory to path
import os
import pandas as pd
import torch as t
import torch.nn as nn
t.cuda.empty_cache()
from transformers import Qwen2ForCausalLM, PreTrainedTokenizer, PreTrainedModel
from hf import HF
from evaluate import load

class SafetyAdapter(nn.Module):
    def __init__(self, hidden_size, dtype=None):
        super().__init__()
        # A small adapter: down-project, non-linearity, then up-project.
        self.adapter = nn.Sequential(
            nn.Linear(hidden_size, hidden_size // 4, dtype=dtype),
            nn.ReLU(),
            nn.Linear(hidden_size // 4, hidden_size, dtype=dtype),
        )
        # Initialize the final layer with zeros so that initially the adapter acts like an identity.
        nn.init.zeros_(self.adapter[2].weight)
        nn.init.zeros_(self.adapter[2].bias)

    def forward(self, hidden_states):
        # Ensure adapter input has the right dtype
        if hidden_states.dtype != self.adapter[0].weight.dtype:
            hidden_states = hidden_states.to(self.adapter[0].weight.dtype)
            
        adapter_out = self.adapter(hidden_states)
        
        # Return output in same dtype as input
        return hidden_states + adapter_out

class AdapterWrapper(nn.Module):
    """A wrapper that applies an adapter after the output of an existing module."""
    def __init__(self, original_module, adapter):
        super().__init__()
        self.original_module = original_module
        self.adapter = adapter
    
    def forward(self, *args, **kwargs):
        """Apply the original module and then the adapter."""
        outputs = self.original_module(*args, **kwargs)
        
        if isinstance(outputs, tuple):
            hidden_states = outputs[0]
            adapted_hidden_states = self.adapter(hidden_states)
            return (adapted_hidden_states,) + outputs[1:]
        else:
            return self.adapter(outputs)

def inject_adapter(model: PreTrainedModel, layer_idx: int) -> PreTrainedModel:

    # Determine the hidden size if not provided
    hidden_size = model.config.hidden_size

    param = next(model.parameters())
    dtype = param.dtype
    device = param.device
    
    print(f"Model is using dtype: {dtype} on device: {device}")
    
    # Create the adapter with matching dtype
    adapter = SafetyAdapter(hidden_size, dtype=dtype)
    adapter = adapter.to(device)
    
    # Find the layers
    layers = model.model.layers
    
    # Make sure the layer index is valid
    if layer_idx < 0 or layer_idx >= len(layers):
        raise ValueError(f"Layer index {layer_idx} is out of bounds. Model has {len(layers)} layers.")
    
    # Wrap the target layer with the adapter
    original_layer = layers[layer_idx]
    wrapped_layer = AdapterWrapper(original_layer, adapter)
    layers[layer_idx] = wrapped_layer
    
    # Store the adapter on the model for reference
    # This allows accessing adapter parameters later if needed
    if not hasattr(model, "adapters"):
        model.adapters = {}
    model.adapters[f"safety_adapter_{layer_idx}"] = adapter
    
    return model


# Load the base model
model_name = "Qwen/Qwen2.5-14B-Instruct"
base_model, base_tokenizer = HF.load_model(model_name)
base_model: Qwen2ForCausalLM

# Inject adapter at layer 5
model_with_adapter = inject_adapter(base_model, layer_idx=5)

NameError: name 'PreTrainedModel' is not defined

In [None]:
HF.query(model_with_adapter, base_tokenizer, "Hello")