# **Dependecies**

In [53]:
import torch
from torch import nn
from transformers import BertModel, BertTokenizer
import numpy as np
import random
import time

# **Common Setup Functions**


In [54]:
def set_seed():
    seed = 42
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def setup_model_and_tokenizer(model_name='bert-base-uncased'):
    model = BertModel.from_pretrained(model_name)
    tokenizer = BertTokenizer.from_pretrained(model_name)
    return model, tokenizer

def prepare_inputs(text, tokenizer):
    return tokenizer(text, return_tensors='pt')

def create_additional_input_vector(hidden_size):
    set_seed()
    return torch.randn(1, 1, hidden_size)

# **Common Integration Method Apllier**

In [55]:
class IntegrationMethodApplier(nn.Module):
    def __init__(self, integration_method):
        super(IntegrationMethodApplier, self).__init__()
        self.integration_method = integration_method

    def forward(self, input_tensor, additional_input_vector):
        if self.integration_method == "addition":
            return input_tensor + additional_input_vector
        elif self.integration_method == "multiplication":
            return input_tensor * additional_input_vector
        else:
            raise ValueError("Unsupported integration method")

# **Hook-Based Approach**

## **Define the HookBasedBERTModifier**

In [73]:
class HookBasedBERTModifier:
    def __init__(self, model, layer_number, integration_method_applier):
        self.model = model
        self.layer_number = layer_number
        self.integration_method_applier = integration_method_applier
        self.hook = None

    def modify_output(self, module, input, output):
        input_tensor = input[0]
        modified_output = self.integration_method_applier(input_tensor, self.additional_input_vector)
        return (modified_output,)

    def register_hook(self, additional_input_vector):
        self.additional_input_vector = additional_input_vector
        layer = self.model.encoder.layer[self.layer_number - 1]
        self.hook = layer.register_forward_hook(self.modify_output)

    def remove_hook(self):
        if self.hook is not None:
            self.hook.remove()
            self.hook = None


## **Test Hook-Based Approach**

In [6]:
def test_hook_based_modifier(input_text, layer_number, integration_method):
    model, tokenizer = setup_model_and_tokenizer()
    additional_input_vector = create_additional_input_vector(model.config.hidden_size)
    integration_method_applier = IntegrationMethodApplier(integration_method)

    modifier = HookBasedBERTModifier(model, layer_number, integration_method_applier)

    inputs = prepare_inputs(input_text, tokenizer)

    with torch.no_grad():
        outputs_without_hook = model(**inputs)

    modifier.register_hook(additional_input_vector)

    with torch.no_grad():
        outputs_with_hook = model(**inputs)

    modifier.remove_hook()

    output_difference = torch.abs(outputs_with_hook.last_hidden_state - outputs_without_hook.last_hidden_state)
    print("Output difference: ", torch.sum(output_difference).item())
    # return outputs_without_hook, outputs_with_hook

## **Run The Test**

In [7]:
test_hook_based_modifier("Hello, how are you?", 10, "multiplication")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Output difference:  3651.546875


# **Custom Layer-Based Approach**


## **Define the CustomLayerBERTModifier**

In [74]:
class CustomLayerBERTModifier(nn.Module):
    def __init__(self, model, layer_number, integration_method_applier):

        super(CustomLayerBERTModifier, self).__init__()
        self.bert = model
        self.layer_number = layer_number
        self.integration_method_applier = integration_method_applier

    def forward(self, input_ids, attention_mask, additional_input_vector):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, output_hidden_states=True)
        hidden_states = outputs.hidden_states
        modified_layer_input = self.integration_method_applier(hidden_states[self.layer_number - 1], additional_input_vector)
        for i in range(self.layer_number, len(self.bert.encoder.layer)):
            modified_layer_input = self.bert.encoder.layer[i](modified_layer_input)[0]

        return modified_layer_input


## **Test Custom Layer-Based Approach**

In [9]:
def test_custom_layer_modifier(input_text, layer_number, integration_method):
    model, tokenizer = setup_model_and_tokenizer()

    additional_input_vector = create_additional_input_vector(model.config.hidden_size)

    integration_method_applier = IntegrationMethodApplier(integration_method)
    custom_model = CustomLayerBERTModifier(model, layer_number, integration_method_applier)

    inputs = prepare_inputs(input_text, tokenizer)

    with torch.no_grad():
        outputs = custom_model(input_ids=inputs['input_ids'], attention_mask=inputs['attention_mask'], additional_input_vector=additional_input_vector)

    print("Output shape: ", outputs.shape)
    # return outputs

## **Run The Test**

In [10]:
test_custom_layer_modifier("Hello, how are you?", 10, "addition")

Output shape:  torch.Size([1, 8, 768])


#**Compare Methods**

In [79]:
def compare_methods(input_text, layer_number, integration_method):
    set_seed()  # Set the seed for reproducibility

    # Common setup
    model, tokenizer = setup_model_and_tokenizer()
    additional_input_vector = create_additional_input_vector(model.config.hidden_size)
    integration_method_applier = IntegrationMethodApplier(integration_method)
    inputs = prepare_inputs(input_text, tokenizer)

    model.eval()  # Ensure the model is in evaluation mode

    # Hook-based modifier
    hook_modifier = HookBasedBERTModifier(model, layer_number, integration_method_applier)
    hook_modifier.register_hook(additional_input_vector)

    start_time = time.time()
    with torch.no_grad():
        outputs_with_hook = model(**inputs)
    hook_modifier.remove_hook()
    hook_runtime = time.time() - start_time

    last_hidden_state_with_hook = outputs_with_hook.last_hidden_state

    # Custom layer-based modifier
    custom_model = CustomLayerBERTModifier(model, layer_number, integration_method_applier)
    custom_model.eval()  # Ensure the custom model is in evaluation mode

    start_time = time.time()
    with torch.no_grad():
        outputs_custom = custom_model(input_ids=inputs['input_ids'], attention_mask=inputs['attention_mask'], additional_input_vector=additional_input_vector)
    custom_runtime = time.time() - start_time

    last_hidden_state_custom = outputs_custom

    # Compare the outputs
    are_same = torch.allclose(last_hidden_state_with_hook, last_hidden_state_custom, atol=1e-6)

    print(f"Hook-based method runtime: {hook_runtime:.6f} seconds")
    print(f"Custom layer-based method runtime: {custom_runtime:.6f} seconds")
    return are_same


# Run the comparison
compare_methods("Hello, how are you?", 9, "addition")


Hook-based method runtime: 0.130415 seconds
Custom layer-based method runtime: 0.138234 seconds


True