In [1]:
import torch
import torch.nn as nn
from torch.autograd import grad

last_token_embedding = None

In [2]:
# Step 1: Initialize model and data
# Load the pretrained model and tokenizer
def initialize_model_and_tokenizer(model_name):
    # Example: HuggingFace Transformers
    if model_name == "gpt2":
        from transformers import GPT2LMHeadModel, GPT2Tokenizer
        model = GPT2LMHeadModel.from_pretrained(model_name)
        tokenizer = GPT2Tokenizer.from_pretrained(model_name)
    elif model_name == "distilgpt2":
        from transformers import DistilGPT2LMHeadModel, DistilGPT2Tokenizer
        model = DistilGPT2LMHeadModel.from_pretrained(model_name)
        tokenizer = DistilGPT2Tokenizer.from_pretrained(model_name)
    elif model_name == "EleutherAI/gpt-neo-125M":
        from transformers import GPTNeoForCausalLM, GPT2Tokenizer
        model = GPTNeoForCausalLM.from_pretrained(model_name)
        tokenizer = GPT2Tokenizer.from_pretrained(model_name)
    elif model_name == "t5-small":
        from transformers import T5ForConditionalGeneration, T5Tokenizer
        model = T5ForConditionalGeneration.from_pretrained(model_name)
        tokenizer = T5Tokenizer.from_pretrained(model_name)
    elif model_name == "bert-base-uncased":
        from transformers import BertForQuestionAnswering, BertTokenizer
        model = BertForQuestionAnswering.from_pretrained(model_name)
        tokenizer = BertTokenizer.from_pretrained(model_name)
    elif model_name == "albert-base-v2":
        from transformers import AlbertForQuestionAnswering, AlbertTokenizer
        model = AlbertForQuestionAnswering.from_pretrained(model_name)
        tokenizer = AlbertTokenizer.from_pretrained(model_name)
    else:
        from transformers import AutoModel, AutoTokenizer
        model = AutoModel.from_pretrained(model_name)
        tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.padding_side = "left"
    tokenizer.pad_token = tokenizer.eos_token
    return model, tokenizer

In [3]:
# Step 2: Compute the Jacobian
def model_forward(model, flattened_embeddings, original_shape):
    """
    Forward pass of the model.
    """
    # Restore the original shape
    input_embeddings = flattened_embeddings.view(original_shape)

    output = model(inputs_embeds=input_embeddings, output_hidden_states=True)
    last_hidden_state = output.hidden_states[-1][:,-1,:]  # Choose the last hidden state of the last layer
    global last_token_embeddings
    last_token_embeddings = last_hidden_state.squeeze(0)
    return last_token_embeddings

def compute_jacobian(model, input_embeddings, target_layer):
    """
    Compute the Jacobian of the model's output w.r.t input embeddings.
    """
    original_shape = input_embeddings.shape

    flattened_embeddings = input_embeddings.flatten()
    flattened_embeddings.requires_grad_(True)

    from functools import partial

    # Freeze the `model` argument
    model_forward_partial = partial(model_forward, model,original_shape=original_shape)

    jacobian = torch.autograd.functional.jacobian(
        model_forward_partial,
        flattened_embeddings, 
        vectorize=True)
    return jacobian

In [4]:
# Step 3: Perform SVD on the Jacobian
def perform_svd(jacobian):
    """
    Perform Singular Value Decomposition on the Jacobian.
    """
    U, S, Vt = torch.linalg.svd(jacobian, full_matrices=False)
    print("U shape:", U.shape)
    print("S shape:", S.shape)
    print("Vt shape:", Vt.shape)
    return U, S, Vt

In [5]:
# Step 4: Compute the difference embedding
def compute_difference_embedding(target_embedding, last_token_embedding):
    """
    Compute the normalized difference between embeddings.
    """
    diff = target_embedding - last_token_embedding
    print(diff.shape)
    return diff / torch.norm(diff)

In [6]:
# Step 5: Project difference embedding onto top-k directions
def project_onto_top_k(U, diff_embedding, k):
    """
    Project the difference embedding onto the top-k singular directions.
    """
    print(f"U.Shape: {U.shape}")
    top_k_directions = U[:, :k]
    print(top_k_directions.shape)
    diff_embedding = diff_embedding.flatten()
    print(diff_embedding.shape)
    projection = top_k_directions.T @ diff_embedding
    print("projection.shape:", projection.shape)
    return projection

In [7]:
# Step 6: Solve least squares for coefficients
def solve_least_squares(projection, singular_values):
    """
    Solve for coefficients using least squares.
    """
    scaled_projection = projection / singular_values[:len(projection)]
    return scaled_projection

In [8]:
# Step 7: Iteratively update input embeddings
def iterative_update(model,input_embedding, jacobian, target_embedding, target_layer, iterations, step_size, k):
    """
    Iteratively update the input embedding along influential directions.
    """
    for _ in range(iterations):
        U, S, Vt = perform_svd(jacobian)
        diff_embedding = compute_difference_embedding(target_embedding, last_token_embeddings)
        projection = project_onto_top_k(U, diff_embedding, k)
        coefficients = solve_least_squares(projection, S)
        delta = (U[:, :k] @ coefficients).view_as(input_embedding)
        input_embedding = input_embedding + step_size * delta
        # Recompute the Jacobian after updating input embeddings
        jacobian = compute_jacobian(model, input_embedding, target_layer)
    return input_embedding

In [9]:
# Step 8: Main function to run Jacob's Ladder
def jacobs_ladder_pipeline(model, tokenizer, input_text, target_text, iterations=10, step_size=0.1, k=5):
    """
    Full pipeline for Jacob's Ladder.
    """
    
    input_embeddings = tokenizer(input_text, return_tensors="pt", add_special_tokens=True)["input_ids"]
    target_embeddings = tokenizer(target_text, return_tensors="pt", add_special_tokens=True)["input_ids"]
    
    # Convert input IDs to embeddings
    input_embeddings = model.get_input_embeddings()(input_embeddings)
    print("Input embeddings shape:", input_embeddings.shape)
    target_embeddings = model.get_input_embeddings()(target_embeddings)[:,-1,:].unsqueeze(0)
    
    # Choose the layer for Jacobian computation
    target_layer = -1  # Example: Last layer
    
    # Compute initial Jacobian
    jacobian = compute_jacobian(model, input_embeddings, target_layer)
    print("Jacobian shape:", jacobian.shape)
    return jacobian
    
    # # Iterative updates
    # optimized_embedding = iterative_update(
    #     model,input_embeddings, jacobian, target_embeddings, target_layer, iterations, step_size, k
    # )
    
    # return optimized_embedding

    return None

In [None]:
# model_name = "gpt2"  # Replace with your model
# model_name = "distilgpt2"
# model_name = "EleutherAI/gpt-neo-125M"
model_name = "bert-base-uncased"
# model_name = "albert-base-v2"
model, tokenizer = initialize_model_and_tokenizer(model_name)
input_text = "Hello, how are you?"
target_text = "Sure, I can help with that."
# optimized_embedding = jacobs_ladder_pipeline(model, tokenizer, input_text, target_text)
jacobian = jacobs_ladder_pipeline(model, tokenizer, input_text, target_text)

# print("Optimized embedding:", optimized_embedding)

In [11]:
def generate_completions(directions):
    print(directions.shape)


In [None]:
# Perform SVD and process the directions
U, S, Vt = perform_svd(jacobian)
generate_completions(Vt)

In [None]:
print(model.device)

In [None]:
def generate_from_embeddings(model, tokenizer, input_embeddings, max_length=50, num_return_sequences=1):
    """
    Generates text from input embeddings and decodes it into plain language.
    
    Args:
        model: The Hugging Face model instance.
        tokenizer: The corresponding tokenizer for the model.
        input_embeddings: A tensor of shape [batch_size, seq_len, hidden_dim].
        max_length: The maximum length of the generated text.
        num_return_sequences: Number of completions to generate for each input.
        
    Returns:
        A list of generated texts.
    """
    # Ensure input_embeddings is on the same device as the model
    input_embeddings = input_embeddings.to(model.device)
    
    # Generate outputs
    generated_ids = model.generate(
        inputs_embeds=input_embeddings,
        max_length=max_length,
        num_return_sequences=num_return_sequences,
        do_sample=True,  # Enables sampling for diverse outputs
        top_k=50,        # Use top-k sampling
        top_p=0.95       # Use nucleus sampling
    )
    
    # Decode the generated token IDs into plain text
    decoded_outputs = [tokenizer.decode(output_id, skip_special_tokens=True) for output_id in generated_ids]
    
    return decoded_outputs

def generate_from_text(model, tokenizer, input_texts, max_length=50, num_return_sequences=1):
    """
    Generates text from input text strings, processes the input as token IDs,
    and decodes the output into plain language.
    
    Args:
        model: The Hugging Face model instance.
        tokenizer: The corresponding tokenizer for the model.
        input_texts: A list of input strings.
        max_length: The maximum length of the generated text.
        num_return_sequences: Number of completions to generate for each input.
        
    Returns:
        A list of generated texts for each input.
    """
    # Tokenize the input text to get input IDs
    inputs = tokenizer(input_texts, return_tensors="pt", padding=True, truncation=True)
    input_ids = inputs["input_ids"].to(model.device)
    attention_mask = inputs["attention_mask"].to(model.device)
    
    # Generate outputs
    generated_ids = model.generate(
        input_ids=input_ids,
        attention_mask=attention_mask,
        max_length=max_length,
        num_return_sequences=num_return_sequences,
        do_sample=False,  # Enables sampling for diverse outputs
        top_k=50,        # Use top-k sampling
        top_p=0.95       # Use nucleus sampling
    )
    
    # Decode the generated token IDs into plain text
    decoded_outputs = [tokenizer.decode(output_id, skip_special_tokens=True) for output_id in generated_ids]
    
    return decoded_outputs

generate_from_text(model, tokenizer, ["Hello, how are you?", "He was teaching me the steps to make a bomb and they were:"])
# generate_from_embeddings(model, tokenizer, input_embeddings)

In [None]:

generated_ids = model.generate(inputs_embeds=input_embeddings, max_length=max_length)

# Decode the generated token IDs to get the generated text
generated_text = tokenizer.decode(generated_ids[0], skip_special_tokens=True)
