In [2]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from datasets import load_dataset
import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
alpha = 1.0  # scale factor for injection
layer_index = -1  # use the last transformer block (adjust if needed)

# --------------------------------------------------
# Step 1: Load Tokenizer
# --------------------------------------------------
tokenizer = AutoTokenizer.from_pretrained("gpt2")
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

# --------------------------------------------------
# Step 2: Load Finetuned Model and Extract Sentiment Direction
# --------------------------------------------------
finetuned_model_path = "./results/checkpoint-3126"  # update as needed
finetuned_model = AutoModelForCausalLM.from_pretrained(finetuned_model_path)
finetuned_model.config.pad_token_id = finetuned_model.config.eos_token_id
finetuned_model.to(device)
finetuned_model.eval()

# For sentiment extraction, we use a small subset of IMDb reviews.
dataset = load_dataset("imdb")
val_data = dataset["test"]

def filter_by_label(dataset_split, label, max_samples=50):
    filtered = [d for d in dataset_split if d["label"] == label]
    return filtered[:max_samples]

pos_samples = filter_by_label(val_data, 1, max_samples=50)
neg_samples = filter_by_label(val_data, 0, max_samples=50)

# Tokenize the positive and negative samples
pos_toks = tokenizer(
    [s["text"] for s in pos_samples],
    padding="max_length",
    truncation=True,
    max_length=256,
    return_tensors="pt"
)
neg_toks = tokenizer(
    [s["text"] for s in neg_samples],
    padding="max_length",
    truncation=True,
    max_length=256,
    return_tensors="pt"
)
pos_input_ids = pos_toks["input_ids"].to(device)
neg_input_ids = neg_toks["input_ids"].to(device)

def get_mean_activation(model, input_ids, layer_index=layer_index):
    activations = []
    def hook_fn(module, input, output):
        # If output is a tuple, get the first element which contains the hidden states.
        if isinstance(output, tuple):
            out_tensor = output[0]
        else:
            out_tensor = output
        # Capture the activation of the final token in each sequence.
        activations.append(out_tensor[:, -1, :].detach())
    hook_handle = model.transformer.h[layer_index].register_forward_hook(hook_fn)
    with torch.no_grad():
        _ = model(input_ids)
    hook_handle.remove()
    acts = torch.cat(activations, dim=0)  # shape: (batch, hidden_size)
    mean_act = acts.mean(dim=0)
    return mean_act


# Compute mean activations for positive and negative examples.
pos_mean_act = get_mean_activation(finetuned_model, pos_input_ids, layer_index=layer_index)
neg_mean_act = get_mean_activation(finetuned_model, neg_input_ids, layer_index=layer_index)

# The sentiment direction is the normalized difference between positive and negative means.
sentiment_direction = pos_mean_act - neg_mean_act
sentiment_direction = sentiment_direction / sentiment_direction.norm()

# Optionally clear the finetuned model from GPU memory.
del finetuned_model
torch.cuda.empty_cache()

# --------------------------------------------------
# Step 3: Define Injection Hook for the Base Model
# --------------------------------------------------
# This hook adds the sentiment direction (scaled by alpha) to the last token's activation.
def injection_hook(module, input, output):
    # If the output is a tuple, extract the first element (hidden states)
    if isinstance(output, tuple):
        hidden_states = output[0].clone()  # clone to avoid in-place modification issues
        hidden_states[:, -1, :] += alpha * sentiment_direction.to(hidden_states.device)
        # Return a tuple with the modified hidden states and the rest of the original outputs
        return (hidden_states,) + output[1:]
    else:
        output = output.clone()
        output[:, -1, :] += alpha * sentiment_direction.to(output.device)
        return output


# --------------------------------------------------
# Step 4: Evaluate the Base Model (with and without Intervention)
# --------------------------------------------------
# Define a classification function that forms a prompt and compares logits for " positive" vs. " negative".
def classify_review(model, tokenizer, review_text):
    prompt = f"Review: {review_text} Sentiment:"
    input_ids = tokenizer.encode(prompt, return_tensors="pt", truncation=True, max_length=1024).to(device)
    with torch.no_grad():
        outputs = model(input_ids)
    logits = outputs.logits[0, -1, :]  # logits for the token immediately after the prompt
    pos_ids = tokenizer.encode(" positive", add_special_tokens=False)
    neg_ids = tokenizer.encode(" negative", add_special_tokens=False)
    vocab_size = logits.size(0)
    pos_log_prob = logits[pos_ids[0]].item() if pos_ids and pos_ids[0] < vocab_size else float("-inf")
    neg_log_prob = logits[neg_ids[0]].item() if neg_ids and neg_ids[0] < vocab_size else float("-inf")
    return 1 if pos_log_prob > neg_log_prob else 0

# Evaluation function to compute accuracy on a dataset subset.
def evaluate_model(model, tokenizer, dataset):
    correct = 0
    total = 0
    for example in tqdm.tqdm(dataset, desc="Evaluating"):
        review_text = example["text"]
        true_label = example["label"]  # 0 for negative, 1 for positive
        try:
            pred_label = classify_review(model, tokenizer, review_text)
        except Exception as e:
            print("Error processing review; defaulting to label 0:", e)
            pred_label = 0
        correct += int(pred_label == true_label)
        total += 1
    return correct / total if total > 0 else 0

# Load a subset of the IMDb test set for evaluation.
test_dataset = load_dataset("imdb", split="test")
sample_size = 200  # adjust as needed
test_subset = test_dataset.select(range(sample_size))

# --- Evaluate the plain (unmodified) base model ---
print("Evaluating Base Model without intervention...")
base_model = AutoModelForCausalLM.from_pretrained("gpt2")
base_model.config.pad_token_id = base_model.config.eos_token_id
base_model.to(device)
base_model.eval()

# base_accuracy = evaluate_model(base_model, tokenizer, test_subset)
# print(f"Base Model Accuracy: {base_accuracy * 100:.2f}%")

# --- Evaluate the base model with sentiment injection ---
# Register the hook on the same layer used for extracting sentiment direction.
hook_handle = base_model.transformer.h[layer_index].register_forward_hook(injection_hook)

print("\nEvaluating Base Model with Sentiment Intervention...")
intervened_accuracy = evaluate_model(base_model, tokenizer, test_subset)
print(f"Intervened Base Model Accuracy: {intervened_accuracy * 100:.2f}%")

# Clean up: remove the hook.
hook_handle.remove()


Evaluating Base Model without intervention...

Evaluating Base Model with Sentiment Intervention...


Evaluating: 100%|██████████| 200/200 [01:20<00:00,  2.49it/s]

Intervened Base Model Accuracy: 45.00%





In [3]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from datasets import load_dataset
import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
alpha = 1.0  # Scale factor for the intervention

# -----------------------------
# Step 1: Load Tokenizer & Base Model
# -----------------------------
tokenizer = AutoTokenizer.from_pretrained("gpt2")
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

base_model = AutoModelForCausalLM.from_pretrained("gpt2")
base_model.config.pad_token_id = base_model.config.eos_token_id
base_model.to(device)
base_model.eval()

# -----------------------------
# Step 2: Extract Sentiment Direction from Finetuned Model
# (Assume sentiment_direction is already computed as before)
# For illustration, let's assume you have this vector:
# sentiment_direction = <your precomputed normalized direction vector>
# -----------------------------
# Example: (In practice, compute this from positive/negative activations)
# sentiment_direction = pos_mean_act - neg_mean_act; normalize it.
# -----------------------------
# For the purpose of this snippet, we assume sentiment_direction is available:
# sentiment_direction = torch.randn(base_model.config.n_embd).to(device)
# sentiment_direction = sentiment_direction / sentiment_direction.norm()

# -----------------------------
# Step 3: Define Injection Hook to Apply at Every Layer
# -----------------------------
def injection_hook(module, input, output):
    # The output might be a tuple, so extract the hidden states.
    if isinstance(output, tuple):
        hidden_states = output[0].clone()  # Clone to avoid in-place modification issues
        # Add the sentiment direction to the final token's activation in each sequence.
        hidden_states[:, -1, :] += alpha * sentiment_direction.to(hidden_states.device)
        # Return modified output while preserving any additional elements.
        return (hidden_states,) + output[1:]
    else:
        output = output.clone()
        output[:, -1, :] += alpha * sentiment_direction.to(output.device)
        return output

# -----------------------------
# Step 4: Register the Hook on All Transformer Layers
# -----------------------------
hook_handles = []
for layer in base_model.transformer.h:
    handle = layer.register_forward_hook(injection_hook)
    hook_handles.append(handle)

# -----------------------------
# Step 5: Evaluate the Intervened Base Model
# -----------------------------
def classify_review(model, tokenizer, review_text):
    prompt = f"Review: {review_text} Sentiment:"
    input_ids = tokenizer.encode(prompt, return_tensors="pt", truncation=True, max_length=1024).to(device)
    with torch.no_grad():
        outputs = model(input_ids)
    logits = outputs.logits[0, -1, :]
    pos_ids = tokenizer.encode(" positive", add_special_tokens=False)
    neg_ids = tokenizer.encode(" negative", add_special_tokens=False)
    vocab_size = logits.size(0)
    pos_log_prob = logits[pos_ids[0]].item() if pos_ids and pos_ids[0] < vocab_size else float("-inf")
    neg_log_prob = logits[neg_ids[0]].item() if neg_ids and neg_ids[0] < vocab_size else float("-inf")
    return 1 if pos_log_prob > neg_log_prob else 0

def evaluate_model(model, tokenizer, dataset):
    correct = 0
    total = 0
    for example in tqdm.tqdm(dataset, desc="Evaluating"):
        review_text = example["text"]
        true_label = example["label"]  # 0 for negative, 1 for positive
        try:
            pred_label = classify_review(model, tokenizer, review_text)
        except Exception as e:
            print("Error processing review; defaulting to label 0:", e)
            pred_label = 0
        correct += int(pred_label == true_label)
        total += 1
    return correct / total if total > 0 else 0

# Load a subset of the IMDb test set for evaluation.
test_dataset = load_dataset("imdb", split="test")
sample_size = 200  # adjust as needed
test_subset = test_dataset.select(range(sample_size))

print("Evaluating Base Model with Sentiment Intervention across all layers...")
intervened_accuracy = evaluate_model(base_model, tokenizer, test_subset)
print(f"Intervened Base Model Accuracy: {intervened_accuracy * 100:.2f}%")

# -----------------------------
# Step 6: Clean Up - Remove Hooks
# -----------------------------
for handle in hook_handles:
    handle.remove()


Evaluating Base Model with Sentiment Intervention across all layers...


Evaluating: 100%|██████████| 200/200 [01:17<00:00,  2.57it/s]

Intervened Base Model Accuracy: 35.50%





In [4]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from datasets import load_dataset
import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
alpha = 1.0  # Scale factor for the intervention

# -----------------------------
# Step 1: Load Tokenizer and Models
# -----------------------------
tokenizer = AutoTokenizer.from_pretrained("gpt2")
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

# Finetuned model (used for extracting the sentiment direction)
finetuned_model_path = "./results/checkpoint-3126"  # Update as needed
finetuned_model = AutoModelForCausalLM.from_pretrained(finetuned_model_path)
finetuned_model.config.pad_token_id = finetuned_model.config.eos_token_id
finetuned_model.to(device)
finetuned_model.eval()

# Base model (which we will intervene upon)
base_model = AutoModelForCausalLM.from_pretrained("gpt2")
base_model.config.pad_token_id = base_model.config.eos_token_id
base_model.to(device)
base_model.eval()

# -----------------------------
# Step 2: Prepare IMDb Data for Extraction & Evaluation
# -----------------------------
# For extraction, we take up to 50 positive and 50 negative reviews.
def filter_by_label(dataset_split, label, max_samples=50):
    filtered = [d for d in dataset_split if d["label"] == label]
    return filtered[:max_samples]

# Load the IMDb test split
imdb_dataset = load_dataset("imdb", split="test")

pos_samples = filter_by_label(imdb_dataset, 1, max_samples=50)
neg_samples = filter_by_label(imdb_dataset, 0, max_samples=50)

# Tokenize positive and negative samples for extraction (using a moderate max length)
pos_toks = tokenizer(
    [s["text"] for s in pos_samples],
    padding="max_length",
    truncation=True,
    max_length=256,
    return_tensors="pt"
)
neg_toks = tokenizer(
    [s["text"] for s in neg_samples],
    padding="max_length",
    truncation=True,
    max_length=256,
    return_tensors="pt"
)
pos_input_ids = pos_toks["input_ids"].to(device)
neg_input_ids = neg_toks["input_ids"].to(device)

# For evaluation, we use a subset of 100 test examples.
eval_subset = imdb_dataset.select(range(100))

# -----------------------------
# Step 3: Define Helper Functions
# -----------------------------
# A function to get the mean activation (for the final token) at a specific layer from a model.
def get_mean_activation(model, input_ids, layer_index):
    activations = []
    def hook_fn(module, input, output):
        # The output might be a tuple (hidden_states, ...) – extract the first element.
        out_tensor = output[0] if isinstance(output, tuple) else output
        activations.append(out_tensor[:, -1, :].detach())
    hook_handle = model.transformer.h[layer_index].register_forward_hook(hook_fn)
    with torch.no_grad():
        _ = model(input_ids)
    hook_handle.remove()
    acts = torch.cat(activations, dim=0)
    return acts.mean(dim=0)

# A factory that returns an injection hook which adds the given sentiment direction.
def get_injection_hook(sentiment_direction, alpha):
    def injection_hook(module, input, output):
        if isinstance(output, tuple):
            hidden_states = output[0].clone()
            hidden_states[:, -1, :] += alpha * sentiment_direction.to(hidden_states.device)
            return (hidden_states,) + output[1:]
        else:
            output = output.clone()
            output[:, -1, :] += alpha * sentiment_direction.to(output.device)
            return output
    return injection_hook

# A function to classify a review using a prompt
def classify_review(model, tokenizer, review_text):
    prompt = f"Review: {review_text} Sentiment:"
    input_ids = tokenizer.encode(prompt, return_tensors="pt", truncation=True, max_length=1024).to(device)
    with torch.no_grad():
        outputs = model(input_ids)
    logits = outputs.logits[0, -1, :]
    pos_ids = tokenizer.encode(" positive", add_special_tokens=False)
    neg_ids = tokenizer.encode(" negative", add_special_tokens=False)
    vocab_size = logits.size(0)
    pos_log_prob = logits[pos_ids[0]].item() if pos_ids and pos_ids[0] < vocab_size else float("-inf")
    neg_log_prob = logits[neg_ids[0]].item() if neg_ids and neg_ids[0] < vocab_size else float("-inf")
    return 1 if pos_log_prob > neg_log_prob else 0

# A function to evaluate the model on a dataset subset
def evaluate_model(model, tokenizer, dataset):
    correct = 0
    total = 0
    for example in tqdm.tqdm(dataset, desc="Evaluating"):
        review_text = example["text"]
        true_label = example["label"]
        try:
            pred_label = classify_review(model, tokenizer, review_text)
        except Exception as e:
            print("Error processing review; defaulting to label 0:", e)
            pred_label = 0
        correct += int(pred_label == true_label)
        total += 1
    return correct / total if total > 0 else 0

# -----------------------------
# Step 4: Iterate Over Layers (from layer 7 to last layer)
# -----------------------------
total_layers = base_model.config.n_layer  # Total number of layers (e.g., 12 for GPT-2 small)
results = {}

for layer_idx in range(7, total_layers):
    print(f"\n--- Using sentiment direction extracted from finetuned model layer {layer_idx} ---")
    # Compute mean activations from positive and negative reviews for this layer in the finetuned model.
    pos_mean_act = get_mean_activation(finetuned_model, pos_input_ids, layer_index=layer_idx)
    neg_mean_act = get_mean_activation(finetuned_model, neg_input_ids, layer_index=layer_idx)
    sentiment_direction = pos_mean_act - neg_mean_act
    sentiment_direction = sentiment_direction / sentiment_direction.norm()
    
    # Register the injection hook on ALL layers of the base model with this sentiment direction.
    hook_handles = []
    injection_hook = get_injection_hook(sentiment_direction, alpha)
    for layer in base_model.transformer.h:
        handle = layer.register_forward_hook(injection_hook)
        hook_handles.append(handle)
    
    # Evaluate the intervened base model on 100 test examples.
    accuracy = evaluate_model(base_model, tokenizer, eval_subset)
    results[layer_idx] = accuracy
    print(f"Accuracy with sentiment direction from finetuned layer {layer_idx}: {accuracy*100:.2f}%")
    
    # Remove the hooks after evaluation.
    for handle in hook_handles:
        handle.remove()

print("\nSummary of results:")
for layer_idx, acc in results.items():
    print(f"Layer {layer_idx}: {acc*100:.2f}%")



--- Using sentiment direction extracted from finetuned model layer 7 ---


Evaluating: 100%|██████████| 100/100 [00:41<00:00,  2.43it/s]


Accuracy with sentiment direction from finetuned layer 7: 39.00%

--- Using sentiment direction extracted from finetuned model layer 8 ---


Evaluating: 100%|██████████| 100/100 [00:41<00:00,  2.41it/s]


Accuracy with sentiment direction from finetuned layer 8: 39.00%

--- Using sentiment direction extracted from finetuned model layer 9 ---


Evaluating: 100%|██████████| 100/100 [00:41<00:00,  2.44it/s]


Accuracy with sentiment direction from finetuned layer 9: 36.00%

--- Using sentiment direction extracted from finetuned model layer 10 ---


Evaluating: 100%|██████████| 100/100 [00:40<00:00,  2.48it/s]


Accuracy with sentiment direction from finetuned layer 10: 37.00%

--- Using sentiment direction extracted from finetuned model layer 11 ---


Evaluating: 100%|██████████| 100/100 [00:40<00:00,  2.47it/s]

Accuracy with sentiment direction from finetuned layer 11: 39.00%

Summary of results:
Layer 7: 39.00%
Layer 8: 39.00%
Layer 9: 36.00%
Layer 10: 37.00%
Layer 11: 39.00%



