In [None]:
import csv
import matplotlib.pyplot as plt

class F1Tracker:
    def __init__(self, experiment_name, save_csv=True, csv_path="f1_results.csv"):
        self.experiment_name = experiment_name
        self.percent_silenced = []
        self.f1_scores = []
        self.save_csv = save_csv
        self.csv_path = csv_path
        if self.save_csv:
            with open(self.csv_path, mode='w', newline='') as file:
                writer = csv.writer(file)
                writer.writerow(["experiment", "percent_silenced", "f1_score"])

    def add(self, percent, f1_score):
        self.percent_silenced.append(percent)
        self.f1_scores.append(f1_score)
        if self.save_csv:
            with open(self.csv_path, mode='a', newline='') as file:
                writer = csv.writer(file)
                writer.writerow([self.experiment_name, percent, f1_score])

    def plot(self, show=True, save_path=None):
        plt.plot(self.percent_silenced, self.f1_scores, marker='o')
        plt.title(f"F1-score - {self.experiment_name}")
        plt.xlabel("% Neuronas Silenciadas")
        plt.ylabel("F1-score")
        plt.ylim(0, 1)
        plt.grid(True)
        if save_path:
            plt.savefig(save_path, bbox_inches='tight')
        if show:
            plt.show()
        plt.clf()

# Configs

In [1]:
import os
import torch
import json
import logging
import time
import numpy as np
import pandas as pd
import pickle
from transformers import AutoTokenizer
import matplotlib.pyplot as plt
import seaborn as sns
from transformers import AutoModelForSequenceClassification
from torch.utils.data import DataLoader, Dataset
import neurox.data.extraction.transformers_extractor as transformers_extractor
from neurox.data.writer import ActivationsWriter
import neurox.data.loader as data_loader
from transformers import AutoConfig
from tqdm import tqdm
import neurox.interpretation.linear_probe as linear_probe
import neurox.interpretation.utils as utils
import neurox.analysis.visualization as TransformersVisualizer
from sklearn.model_selection import train_test_split
from IPython.display import display
import neurox.interpretation.probeless as probeless
from neurox.interpretation.probeless import (
    get_neuron_ordering,
    get_neuron_ordering_for_all_tags
)
import ast
from torch.cuda.amp import autocast
from sklearn.metrics import accuracy_score, f1_score, classification_report
from sklearn.preprocessing import LabelEncoder
from matplotlib_venn import venn2
from neurox.interpretation.linear_probe import get_top_neurons
from sklearn.utils import shuffle

In [2]:
import logging

# ==========================
# 📜 Configure Logging 
# ==========================

logger = logging.getLogger("synapse_logger")
logger.setLevel(logging.INFO)

# Avoid duplicates
if not logger.hasHandlers():

    # 📁 Handler 
    file_handler = logging.FileHandler("logs/synapse_extraction_csv_pth.log", mode="w")
    file_handler.setLevel(logging.INFO)

    # 🖥️ Handler 
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)

    # Format
    formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)

    # Add handlers to main logger
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)

logger.info("🚀 Logging configured")


2025-08-07 11:21:30,298 - INFO - 🚀 Logging configured


In [3]:
# ==========================
# SYNAPSE Model Configuration
# ==========================

# 🧠 Select the model (options: "BERT", "BigBird", "DistilBERT", "Longformer")
MODEL = "BigBird"

# 📁 Paths based on model name
BASE_PATH = f"data/{MODEL}"
input_csv = f"{BASE_PATH}/{MODEL}_tokens_PT.csv"
output_csv = f"{BASE_PATH}/reduced/{MODEL}_tokens_reduced.csv"
labels_output_path = f"{BASE_PATH}/labels_numeric.txt"
label_mapping_path = f"{BASE_PATH}/labels_mapping.json"
activations_file = f"{BASE_PATH}/activations.json"
weights_path = f"{BASE_PATH}/best_model_{MODEL}.pth"

# 🔢 Number of labels
NUM_LABELS = 5


# 🔧 HuggingFace model mapping
MODEL_HF = {
    "BERT": "bert-base-uncased",
    "BigBird": "google/bigbird-roberta-base",
    "DistilBERT": "distilbert-base-uncased",
    "Longformer": "allenai/longformer-base-4096"
}[MODEL]

# ⚙️ Device selection
device = torch.device("cpu")

In [4]:
# ==========================
# Load Model and Weights
# ==========================
from transformers import AutoConfig

model = AutoModelForSequenceClassification.from_pretrained(MODEL_HF, num_labels=NUM_LABELS)

# Load trained weights from disk
state_dict = torch.load(weights_path, map_location=device)
model.load_state_dict(state_dict)
model.to(device)
model.eval()

print(f"✅ Loaded {MODEL} with pretrained weights on {device}")

Some weights of BigBirdForSequenceClassification were not initialized from the model checkpoint at google/bigbird-roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


✅ Loaded BigBird with pretrained weights on cpu


### Load dataset

In [5]:
reduction_ratio = 0.001

# ==========================
# ✅ Skip dataset reduction if already available
# ==========================
if os.path.exists(output_csv) and os.path.exists(labels_output_path):
    logger.info(f"⚡ Reduced dataset found: {output_csv}. Skipping reduction.")
    df_reduced = pd.read_csv(output_csv)
    with open(labels_output_path, "r") as f:
        labels = [int(line.strip()) for line in f]  # Labels as integers
    with open(label_mapping_path, "r") as f:
        label_mapping = json.load(f)  # Load label mapping
else:
    logger.info(f"🔄 Loading dataset from {input_csv}")

    chunk_size = 5000 
    total_rows = sum(1 for _ in open(input_csv)) - 1  # Total rows excluding header
    df_chunks = []

    logger.info(f"🔄 Processing {total_rows} rows in chunks of {chunk_size}...")

    with tqdm(total=total_rows, desc="Processing rows", unit=" rows") as pbar:
        for chunk in pd.read_csv(input_csv, chunksize=chunk_size):
            # Convert `input_ids` from string to list of integers
            chunk['input_ids'] = chunk['input_ids'].apply(lambda x: list(map(int, x.strip("[]").split(","))))
            df_chunks.append(chunk)
            pbar.update(len(chunk))

    df = pd.concat(df_chunks, ignore_index=True)

    # ==========================
    # 🔢 Encode labels as integers
    # ==========================
    df['label'], unique_labels = pd.factorize(df["label"])
    label_mapping = {label: int(idx) for idx, label in enumerate(unique_labels)}

    # ==========================
    # 🧪 Reduce dataset maintaining class proportions
    # ==========================
    df_reduced, _ = train_test_split(df, train_size=reduction_ratio, stratify=df["label"], random_state=42)
    labels = df_reduced["label"].tolist()

    # ==========================
    # 💾 Save reduced dataset and labels
    # ==========================
    df_reduced.to_csv(output_csv, index=False)
    with open(labels_output_path, "w") as f:
        for label in labels:
            f.write(str(label) + "\n")

    with open(label_mapping_path, "w") as f:
        json.dump(label_mapping, f, indent=4)

    logger.info(f"✅ Reduced dataset saved to {output_csv}")
    logger.info(f"✅ Numeric labels saved to {labels_output_path}")
    logger.info(f"✅ Label mapping saved to {label_mapping_path}")


2025-08-07 11:21:32,045 - INFO - ⚡ Reduced dataset found: data/BigBird/reduced/BigBird_tokens_reduced.csv. Skipping reduction.


### Create Dataloader

In [6]:
# ==========================
# 📦 Create DataLoader
# ==========================
class SyscallDataset(Dataset):
    def __init__(self, dataframe):
        self.data = dataframe

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        input_ids = torch.tensor(self.data.iloc[idx]['input_ids'])
        label = torch.tensor(self.data.iloc[idx]['label'])
        return input_ids, label

# Initialize DataLoader with reduced dataset
dataset = SyscallDataset(df_reduced)
dataloader = DataLoader(dataset, batch_size=4, shuffle=False)

logger.info("✅ Dataloader created")

# Ensure `input_ids` are lists of integers
if isinstance(df_reduced["input_ids"].iloc[0], str):
    df_reduced["input_ids"] = df_reduced["input_ids"].apply(lambda x: list(map(int, x.strip("[]").split(","))))


2025-08-07 11:21:32,060 - INFO - ✅ Dataloader created


# NeuroX

## Activation Extraction

In [7]:
if os.path.exists(activations_file):
    logger.info(f"⚡ Activations file found: {activations_file}. Skipping extraction.")
else:
    transformers_extractor.extract_representations(
        model, 
        df_reduced["input_ids"].tolist(),  # Pass preprocessed tokens directly
        activations_file,
        device=device,
    )

    logger.info(f"✅ Activations saved to {activations_file}")


2025-08-07 11:21:32,077 - INFO - ⚡ Activations file found: data/BigBird/activations.json. Skipping extraction.


## Load Activations

In [8]:
activations, num_layers = data_loader.load_activations(activations_file)
logger.info(f"✅ Loaded activations from {activations_file} with {num_layers} layers")

# Load sentence-level classification data using activations
tokens = data_loader.load_sentence_data(
    output_csv, labels_output_path, activations
)

# Create sentence-level tensors for classification
X, y, mapping = utils.create_tensors(
    tokens,
    activations,
    task_specific_tag="NN",
    task_type="classification"
)

label2idx, idx2label, src2idx, idx2src = mapping
logger.info("✅ Created input/output tensors and label mappings for classification")

2025-08-07 11:21:32,116 - INFO - ✅ Loaded activations from data/BigBird/activations.json with 12 layers
2025-08-07 11:21:32,125 - INFO - ✅ Created input/output tensors and label mappings for classification


Loading json activations from data/BigBird/activations.json...
50 12.0
Number of tokens:  50
length of source dictionary:  17
length of target dictionary:  5
50
Total instances: 50
['s']
Number of samples:  50
Stats: Labels with their frequencies in the final set
4 9
3 15
2 9
1 10
0 7


## Train linear probe

In [9]:
probe = linear_probe.train_logistic_regression_probe(X, y, lambda_l1=0.001, lambda_l2=0.001)
scores = linear_probe.evaluate_probe(probe, X, y, idx_to_class=idx2label)
logger.info(f"🎯 Probe evaluation results: {scores}")

top_neurons_probe, per_class_top_neurons = linear_probe.get_top_neurons(probe, percentage=0.1, class_to_idx=label2idx)
logger.info(f"🔍 Top global neurons: {top_neurons_probe}")
logger.info(f"🔍 Top neurons per class: {per_class_top_neurons}")

Clases en y_train: [0 1 2 3 4]
Training classification probe
Creating model...
Number of training instances: 50
Number of classes: 5


epoch [1/10]: 0it [00:00, ?it/s]

Epoch: [1/10], Loss: 0.0752


epoch [2/10]: 0it [00:00, ?it/s]

Epoch: [2/10], Loss: 0.0397


epoch [3/10]: 0it [00:00, ?it/s]

Epoch: [3/10], Loss: 0.0254


epoch [4/10]: 0it [00:00, ?it/s]

Epoch: [4/10], Loss: 0.0223


epoch [5/10]: 0it [00:00, ?it/s]

Epoch: [5/10], Loss: 0.0199


epoch [6/10]: 0it [00:00, ?it/s]

Epoch: [6/10], Loss: 0.0174


epoch [7/10]: 0it [00:00, ?it/s]

Epoch: [7/10], Loss: 0.0160


epoch [8/10]: 0it [00:00, ?it/s]

Epoch: [8/10], Loss: 0.0152


epoch [9/10]: 0it [00:00, ?it/s]

Epoch: [9/10], Loss: 0.0145


epoch [10/10]: 0it [00:00, ?it/s]

Epoch: [10/10], Loss: 0.0140


Evaluating: 0it [00:00, ?it/s]

2025-08-07 11:21:32,277 - INFO - 🎯 Probe evaluation results: {'__OVERALL__': np.float64(0.96), '4': np.float64(1.0), '3': np.float64(1.0), '2': np.float64(0.7777777777777778), '1': np.float64(1.0), '0': np.float64(1.0)}
2025-08-07 11:21:32,281 - INFO - 🔍 Top global neurons: [8193 8194 8196 ... 8189 8190 8191]


Score (accuracy) of the probe: 0.96


2025-08-07 11:21:32,283 - INFO - 🔍 Top neurons per class: {'4': array([8457, 9014, 8769, 8833, 8987, 8723, 9181, 8488, 8980, 8398, 7879,
       8869, 8952, 8485, 7898, 9147, 8925, 9171, 8722, 8711, 8853, 8634,
       8712, 8544, 9063, 8572, 8956, 8293, 8662, 8507, 8738, 9207, 8583,
       9052, 9008, 8127, 7779, 8984, 7764, 8534, 9127, 8316, 8931, 8495,
       8873, 7688, 8482, 5548, 8797, 9027, 7849, 8721, 7715, 8319, 8923,
       8700, 8865, 8638, 8074, 9060, 7753, 8157, 7857, 7836, 8866, 8415,
       6026, 8884, 8989, 9043, 8056, 8872, 8034, 7797, 7933, 8020, 8681,
       8689, 8724, 6614, 7866, 7686, 8163, 8480, 7815, 9164, 5824, 8851,
       8971, 9007, 7865, 7950, 8941, 7791, 8655, 8234, 8576, 8196, 9062,
       8255, 8095, 8454, 8736, 5592, 5507, 8798, 7250, 8395, 9152, 8926,
       8285, 8617, 1866, 8958, 8637, 8967, 8666, 7901, 8369, 5551, 7734,
       8011, 4724, 9073, 8938, 9083, 5902, 4109, 7825, 7223, 8696, 8096,
       9005, 8750, 8459, 8543, 5601, 6534, 4675, 8694, 8164,

# Experiments

## Original performance

In [10]:
df = pd.read_csv(input_csv)

# 🎯 Select 50 random examples and reset index
sample_df = df.sample(n=50, random_state=42).reset_index(drop=True)

# 🧹 Convert "input_ids" and "attention_mask" from string to list format
def parse_list(x):
    return ast.literal_eval(x)

sample_df['input_ids'] = sample_df['input_ids'].apply(parse_list)
sample_df['attention_mask'] = sample_df['attention_mask'].apply(parse_list)

# 🔢 Encode labels to integers
label_encoder = LabelEncoder()
sample_df['label'] = label_encoder.fit_transform(sample_df['label'])
labels_list = sample_df['label'].tolist()

predictions_list = []
model.eval()
torch.cuda.empty_cache()

for i in range(len(sample_df)):
    input_ids_tensor = torch.tensor(sample_df.loc[i, 'input_ids']).unsqueeze(0).to(device)
    attention_mask_tensor = torch.tensor(sample_df.loc[i, 'attention_mask']).unsqueeze(0).to(device)

    with torch.no_grad():
        with autocast():
            device = torch.device("cpu")
            outputs = model(input_ids=input_ids_tensor, attention_mask=attention_mask_tensor)
            logits = outputs['logits']
            pred = torch.argmax(logits, dim=1).item()
            predictions_list.append(pred)

    del input_ids_tensor, attention_mask_tensor, outputs, logits
    torch.cuda.empty_cache()

# 📊 Compute evaluation metrics
accuracy = accuracy_score(labels_list, predictions_list)
f1 = f1_score(labels_list, predictions_list, average='weighted')
report_dict = classification_report(labels_list, predictions_list, output_dict=True)
report_df = pd.DataFrame(report_dict).transpose()

# Round for readability
report_df = report_df.round(4)

# Add accuracy as a separate row
accuracy_row = pd.DataFrame({'precision': accuracy, 'recall': accuracy, 'f1-score': accuracy, 'support': sum(report_df['support'])}, index=['accuracy'])
report_df = pd.concat([report_df, accuracy_row])

# 📝 Save or append to CSV

experiment_title = "🧪 Sample of 30 - Full Model Evaluation"
csv_report_path = f"{BASE_PATH}/classification_report_sample_eval.csv"

# Remove incorrect 'accuracy' row if it exists
report_df = report_df.drop("accuracy", errors="ignore")

# Append correct accuracy row
accuracy_row = pd.DataFrame({
    'precision': [""],
    'recall': [""],
    'f1-score': [accuracy],
    'support': [sum(report_df["support"])]
}, index=["overall_accuracy"])

# Combine
final_df = pd.concat([report_df, accuracy_row])

# Write to CSV with experiment title as a header
with open(csv_report_path, "a") as f:
    f.write(f"\n\n# {experiment_title}\n")
final_df.to_csv(csv_report_path, mode="a")

logger.info(f"📁 Appended classification report with title '{experiment_title}' to {csv_report_path}")


  with autocast():
2025-08-07 11:22:54,341 - INFO - 📁 Appended classification report with title '🧪 Sample of 30 - Full Model Evaluation' to data/BigBird/classification_report_sample_eval.csv


In [17]:
logger = logging.getLogger(__name__)

def quick_baseline_f1(model, sample_df, labels_list):
    """
    Corre inferencia rápida y devuelve solo el F1-weighted para el modelo actual.
    Úsalo tras cada experimento para comprobar que el baseline no se contamina.
    """
    model.eval()
    preds = []
    for row in sample_df.itertuples():
        input_ids = torch.tensor(row.input_ids).unsqueeze(0).to(model.device)
        att_mask  = torch.tensor(row.attention_mask).unsqueeze(0).to(model.device)
        with torch.no_grad():
            logits = model(input_ids=input_ids, attention_mask=att_mask).logits
        preds.append(int(logits.argmax(dim=-1)))
    f1w = f1_score(labels_list, preds, average="weighted", zero_division=0)
    logger.info(f"[Baseline Check] Weighted F1-score: {f1w:.4f}")
    return f1w

## Shortcut: reload model

In [46]:
def load_model(model_hf: str,
                     weights_path: str = None,
                     num_labels: int = None,
                     device: str = None):
    # dispositivo
    if device is None:
        device = "cuda" if torch.cuda.is_available() else "cpu"
    device = torch.device(device)

    # carga base
    kwargs = {}
    if num_labels is not None:
        kwargs["num_labels"] = num_labels
    model = AutoModelForSequenceClassification.from_pretrained(model_hf, **kwargs)

    # aplica checkpoint propio si se pasa
    if weights_path:
        state_dict = torch.load(weights_path, map_location=device)
        # strict=False por si tu state_dict no coincide exactamente (labels, etc.)
        model.load_state_dict(state_dict, strict=False)

    model.to(device)
    model.eval()
    return model


model = load_model(MODEL_HF, weights_path=weights_path, num_labels=NUM_LABELS, device="cpu")

Some weights of BigBirdForSequenceClassification were not initialized from the model checkpoint at google/bigbird-roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


## Silence and evaluate neurons. Function definition

### Full silencing

In [11]:
def get_top_k_neurons_exact(probe, percentage: float) -> list[int]:
    """
    Return exactly N = round(total_neurons * percentage) neuron indices, sorted by importance.
    Importance is measured as the sum of absolute values of weights across all output classes.
    """
    weight_matrix = probe.linear.weight.detach().abs()  # [num_classes, num_neurons]
    importance = weight_matrix.sum(dim=0).cpu().numpy()  # [num_neurons]
    total_neurons = len(importance)
    top_n = round(total_neurons * percentage)

    sorted_indices = importance.argsort()[-top_n:]  # Top-N by importance
    return sorted_indices.tolist()

In [12]:
def make_cls_silence_hook(indices):
    indices = [int(i) for i in indices]
    indices_tensor = torch.tensor(indices, dtype=torch.long) if indices else None

    def hook(module, input, output):
        if output.dim() == 3:
            new_output = output.clone()
            cls_token = new_output[:, 0, :]
            mask = torch.ones_like(cls_token)
            if indices_tensor is not None:
                local_indices = indices_tensor.to(new_output.device)
                mask[:, local_indices] = 0.0
            new_output[:, 0, :] = cls_token * mask
            return new_output
        return output
    return hook

In [13]:
# ==========================
# Get encoder layers dynamically
# ==========================
def get_encoder_layers(model):
    if hasattr(model, "bert"):
        return model.bert.encoder.layer
    elif hasattr(model, "longformer"):
        return model.longformer.encoder.layer
    elif hasattr(model, "distilbert"):
        return model.distilbert.transformer.layer
    else:
        raise NotImplementedError("❌ Unsupported model architecture.")

In [14]:
def silence_top_global_percentage_and_evaluate(
    model,
    sample_df,
    labels_list,
    probe,
    label2idx,
    percentage=0.10,
    report_path=None,
    experiment_title=None
):
    hidden_dim = model.config.hidden_size
    num_layers = model.config.num_hidden_layers
    total_neurons = num_layers * hidden_dim

    top_neurons_global = get_top_k_neurons_exact(probe, percentage=percentage)
    logger.info(f"🔧 Silencing exactly {len(top_neurons_global)} neurons ({percentage:.2%} of total {total_neurons})")

    # Save neuron indices
    neurons_dir = f"{BASE_PATH}/neurons"
    os.makedirs(neurons_dir, exist_ok=True)
    json_path = f"{neurons_dir}/top_{int(percentage * 100)}p_neurons_global.json"
    with open(json_path, "w") as f:
        json.dump(top_neurons_global, f, indent=4)
    logger.info(f"📁 Saved neuron indices to {json_path}")

    # Register hooks per layer (compatible with BERT, DistilBERT, etc.)
    encoder_layers = get_encoder_layers(model)
    hook_handles = []
    for i in range(num_layers):
        indices_layer = [idx - i * hidden_dim for idx in top_neurons_global if i * hidden_dim <= idx < (i + 1) * hidden_dim]
        if indices_layer:
            logger.info(f"📌 Layer {i}: silencing {len(indices_layer)} neurons")
            # Use 'output' submodule if exists (BERT, RoBERTa), else register on main layer (DistilBERT)
            if hasattr(encoder_layers[i], "output"):
                handle = encoder_layers[i].output.register_forward_hook(make_cls_silence_hook(indices_layer))
            else:
                handle = encoder_layers[i].register_forward_hook(make_cls_silence_hook(indices_layer))
            hook_handles.append(handle)

    # Inference
    model.eval()
    predictions = []
    for i in range(len(sample_df)):
        input_ids_tensor = torch.tensor(sample_df.loc[i, 'input_ids']).unsqueeze(0).to(model.device)
        attention_mask_tensor = torch.tensor(sample_df.loc[i, 'attention_mask']).unsqueeze(0).to(model.device)

        with torch.no_grad():
            outputs = model(input_ids=input_ids_tensor, attention_mask=attention_mask_tensor)
            logits = outputs['logits']
            pred = torch.argmax(logits, dim=1).item()
            predictions.append(pred)

        del input_ids_tensor, attention_mask_tensor, outputs, logits
        torch.cuda.empty_cache()

    # Metrics
    accuracy = accuracy_score(labels_list, predictions)
    f1 = f1_score(labels_list, predictions, average='weighted')
    report_dict = classification_report(labels_list, predictions, output_dict=True)
    report_df = pd.DataFrame(report_dict).transpose().round(4)
    report_df = report_df.drop("accuracy", errors="ignore")

    accuracy_row = pd.DataFrame({
        'precision': [""],
        'recall': [""],
        'f1-score': [accuracy],
        'support': [sum(report_df["support"])]
    }, index=["overall_accuracy"])
    final_df = pd.concat([report_df, accuracy_row])

    # Save classification report
    if report_path is None:
        report_path = f"{BASE_PATH}/results/full_silencing.csv"
    os.makedirs(os.path.dirname(report_path), exist_ok=True)

    if experiment_title is None:
        experiment_title = f"Silencing top {percentage:.2%} global neurons"

    if not os.path.exists(report_path):
        with open(report_path, "w") as f:
            f.write(f"# {experiment_title}\n")
            final_df.to_csv(f)
    else:
        with open(report_path, "a") as f:
            f.write(f"\n\n# {experiment_title}\n")
        final_df.to_csv(report_path, mode="a")

    logger.info(f"🎯 Accuracy after silencing: {accuracy:.4f}")
    logger.info(f"📏 Weighted F1 Score: {f1:.4f}")
    logger.info(f"📋 Classification report saved to {report_path}")

    # Remove all hooks
    for handle in hook_handles:
        handle.remove()
    logger.info("✅ All hooks removed after evaluation")

## Global impact

In [15]:
percentages = [0.025, 0.05, 0.075, 0.10, 0.125, 0.15, 0.175, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50, 0.65, 0.75, 0.8, 0.95]

for pct in percentages:
    silence_top_global_percentage_and_evaluate(
        model=model,
        sample_df=sample_df,
        labels_list=labels_list,
        probe=probe,
        label2idx=label2idx,
        percentage=pct,
        experiment_title=f"Silencing {pct*100:.1f}% Global Neurons"
    )


2025-08-07 11:22:54,370 - INFO - 🔧 Silencing exactly 230 neurons (2.50% of total 9216)
2025-08-07 11:22:54,370 - INFO - 📁 Saved neuron indices to data/BigBird/neurons/top_2p_neurons_global.json
2025-08-07 11:22:54,371 - INFO - 📌 Layer 5: silencing 1 neurons
2025-08-07 11:22:54,371 - INFO - 📌 Layer 6: silencing 2 neurons
2025-08-07 11:22:54,372 - INFO - 📌 Layer 7: silencing 8 neurons
2025-08-07 11:22:54,372 - INFO - 📌 Layer 8: silencing 6 neurons
2025-08-07 11:22:54,372 - INFO - 📌 Layer 9: silencing 11 neurons
2025-08-07 11:22:54,373 - INFO - 📌 Layer 10: silencing 45 neurons
2025-08-07 11:22:54,373 - INFO - 📌 Layer 11: silencing 157 neurons
2025-08-07 11:24:08,916 - INFO - 🎯 Accuracy after silencing: 0.8200
2025-08-07 11:24:08,916 - INFO - 📏 Weighted F1 Score: 0.8111
2025-08-07 11:24:08,917 - INFO - 📋 Classification report saved to data/BigBird/results/full_silencing.csv
2025-08-07 11:24:08,917 - INFO - ✅ All hooks removed after evaluation
2025-08-07 11:24:08,918 - INFO - 🔧 Silencing ex

KeyboardInterrupt: 

In [None]:
import matplotlib.pyplot as plt
from collections import defaultdict
import re

def plot_f1_scores_from_file(filepath, output_path= f"{BASE_PATH}/figs/f1_curve_global.png"):
    with open(filepath, 'r') as f:
        lines = f.readlines()

    # Estructuras para almacenar los datos
    results = defaultdict(list)  # clase -> lista de F1-scores
    global_f1_scores = {}
    percentages = []
    current_percentage = None

    # Parsear el contenido
    for line in lines:
        line = line.strip()

        # Detectar inicio de bloque
        if line.startswith("# Silencing"):
            match = re.search(r"# Silencing ([\d.]+)% Global Neurons", line)
            if match:
                current_percentage = float(match.group(1))
                percentages.append(current_percentage)
            continue

        if line.startswith(",precision") or not line:
            continue

        parts = line.split(",")
        label = parts[0]

        if label.isdigit():
            class_index = int(label)
            try:
                f1 = float(parts[3])
                results[class_index].append(f1)
            except:
                results[class_index].append(None)
        elif label == "weighted avg":
            try:
                global_f1_scores[current_percentage] = float(parts[3])
            except:
                global_f1_scores[current_percentage] = None

    # Ordenar porcentajes
    sorted_percentages = sorted(global_f1_scores.keys())
    global_f1 = [global_f1_scores[p] for p in sorted_percentages]

    # Crear la figura
    plt.figure(figsize=(10, 6))

    # F1 global
    plt.plot(sorted_percentages, global_f1, label="Global F1", linewidth=2, marker='o')

    # F1 por clase
    for class_index in sorted(results.keys()):
        f1s = results[class_index]
        plt.plot(sorted_percentages, f1s, label=f"Class {class_index}", linestyle='--', marker='x')

    # Formato
    plt.title("F1 Scores vs. % of Silenced Global Neurons")
    plt.xlabel("% of Silenced Neurons")
    plt.ylabel("F1 Score")
    plt.grid(True)
    plt.legend()
    plt.tight_layout()

    # Guardar como PNG
    plt.savefig(output_path, dpi=300)
    print(f"Gráfica guardada como: {output_path}")
    plt.close()


In [None]:
plot_f1_scores_from_file(f"{BASE_PATH}/results/full_silencing.csv")

## Impact per class

In [None]:
def get_top_k_neurons_for_class_exact(probe, percentage: float, class_to_idx: dict, class_id: int) -> list[int]:
    """
    Return top-k neurons most important for a specific class, measured by absolute weight.
    """
    weight_matrix = probe.linear.weight.detach().abs()  # [num_classes, num_neurons]
    class_weights = weight_matrix[class_id]  # [num_neurons]
    total_neurons = class_weights.size(0)
    top_n = round(percentage * total_neurons)
    top_indices = class_weights.cpu().numpy().argsort()[-top_n:]
    return top_indices.tolist()

In [None]:
# ==========================
# Get encoder layers dynamically
# ==========================
def get_encoder_layers(model):
    if hasattr(model, "bert"):
        return model.bert.encoder.layer
    elif hasattr(model, "longformer"):
        return model.longformer.encoder.layer
    elif hasattr(model, "distilbert"):
        return model.distilbert.transformer.layer
    else:
        raise NotImplementedError("❌ Unsupported model architecture.")

# ==========================
# Silence and evaluate top per-class neurons
# ==========================

def silence_top_class_percentage_and_evaluate(
    model,
    sample_df,
    labels_list,
    probe,
    label2idx,
    class_id: int,
    percentage: float = 0.1,
    report_path: str = None,
    experiment_title: str = None
):
    class_name = f"class_{class_id}"
    hidden_dim = model.config.hidden_size
    num_layers = model.config.num_hidden_layers
    total_neurons = num_layers * hidden_dim

    # Get top neurons for specific class
    top_class_neurons = get_top_k_neurons_for_class_exact(
        probe, percentage=percentage, class_to_idx=label2idx, class_id=class_id
    )

    logger.info(f"🔧 Silencing {len(top_class_neurons)} neurons for class {class_id} ({percentage:.2%} of total)")

    # Save neurons to JSON
    neurons_dir = f"{BASE_PATH}/neurons"
    os.makedirs(neurons_dir, exist_ok=True)
    json_path = f"{neurons_dir}/top_{int(percentage * 100)}p_neurons_{class_name}.json"
    with open(json_path, "w") as f:
        json.dump(top_class_neurons, f, indent=4)
    logger.info(f"📁 Saved neuron indices to {json_path}")

    # Register hooks per layer
    encoder_layers = get_encoder_layers(model)
    hook_handles = []
    for i in range(num_layers):
        indices_layer = [idx - i * hidden_dim for idx in top_class_neurons if i * hidden_dim <= idx < (i + 1) * hidden_dim]
        if indices_layer:
            logger.info(f"📌 Layer {i}: silencing {len(indices_layer)} neurons for class {class_id}")
            handle = encoder_layers[i].output.register_forward_hook(make_cls_silence_hook(indices_layer))
            hook_handles.append(handle)

    # Inference
    model.eval()
    predictions = []
    for i in range(len(sample_df)):
        input_ids_tensor = torch.tensor(sample_df.loc[i, 'input_ids']).unsqueeze(0).to(model.device)
        attention_mask_tensor = torch.tensor(sample_df.loc[i, 'attention_mask']).unsqueeze(0).to(model.device)

        with torch.no_grad():
            outputs = model(input_ids=input_ids_tensor, attention_mask=attention_mask_tensor)
            logits = outputs['logits']
            pred = torch.argmax(logits, dim=1).item()
            predictions.append(pred)

        del input_ids_tensor, attention_mask_tensor, outputs, logits
        torch.cuda.empty_cache()

    # Metrics
    accuracy = accuracy_score(labels_list, predictions)
    f1 = f1_score(labels_list, predictions, average='weighted')
    report_dict = classification_report(labels_list, predictions, output_dict=True)
    report_df = pd.DataFrame(report_dict).transpose().round(4)
    report_df = report_df.drop("accuracy", errors="ignore")

    accuracy_row = pd.DataFrame({
        'precision': [""],
        'recall': [""],
        'f1-score': [accuracy],
        'support': [sum(report_df["support"])]
    }, index=["overall_accuracy"])
    final_df = pd.concat([report_df, accuracy_row])

    # Save classification report
    if report_path is None:
        report_path = f"{BASE_PATH}/results/class_silencing{class_id}.csv"
    os.makedirs(os.path.dirname(report_path), exist_ok=True)

    if experiment_title is None:
        experiment_title = f"Silencing top {percentage:.2%} neurons for class {class_id}"

    if not os.path.exists(report_path):
        with open(report_path, "w") as f:
            f.write(f"# {experiment_title}\n")
            final_df.to_csv(f)
    else:
        with open(report_path, "a") as f:
            f.write(f"\n\n# {experiment_title}\n")
        final_df.to_csv(report_path, mode="a")

    logger.info(f"🎯 Accuracy after class-specific silencing: {accuracy:.4f}")
    logger.info(f"📏 Weighted F1 Score: {f1:.4f}")
    logger.info(f"📋 Classification report saved to {report_path}")

    # Remove all hooks
    for handle in hook_handles:
        handle.remove()
    logger.info("✅ All hooks removed after evaluation")

In [None]:
# percentages = [0.025, 0.05, 0.075, 0.10, 0.125, 0.15, 0.175, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50, 0.65, 0.75, 0.8, 0.95]
percentages = [0.50]
target_class_id = 4

for pct in percentages:
    silence_top_class_percentage_and_evaluate(
        model=model,
        sample_df=sample_df,
        labels_list=labels_list,
        probe=probe,
        label2idx=label2idx,
        class_id=target_class_id,
        percentage=pct,
        experiment_title=f"Silencing {pct*100:.1f}% of Neurons for Class {target_class_id}"
    )


In [None]:
percentages = [0.025, 0.05, 0.075, 0.10, 0.125, 0.15, 0.175, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50, 0.65, 0.75, 0.8, 0.95]

target_class_id = 3

for pct in percentages:
    silence_top_class_percentage_and_evaluate(
        model=model,
        sample_df=sample_df,
        labels_list=labels_list,
        probe=probe,
        label2idx=label2idx,
        class_id=target_class_id,
        percentage=pct,
        experiment_title=f"Silencing {pct*100:.1f}% of Neurons for Class {target_class_id}"
    )


In [None]:
percentages = [0.025, 0.05, 0.075, 0.10, 0.125, 0.15, 0.175, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50, 0.65, 0.75, 0.8, 0.95]

target_class_id = 2
for pct in percentages:
    silence_top_class_percentage_and_evaluate(
        model=model,
        sample_df=sample_df,
        labels_list=labels_list,
        probe=probe,
        label2idx=label2idx,
        class_id=target_class_id,
        percentage=pct,
        experiment_title=f"Silencing {pct*100:.1f}% of Neurons for Class {target_class_id}"
    )


In [None]:
percentages = [0.025, 0.05, 0.075, 0.10, 0.125, 0.15, 0.175, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50, 0.65, 0.75, 0.8, 0.95]

target_class_id = 1

for pct in percentages:
    silence_top_class_percentage_and_evaluate(
        model=model,
        sample_df=sample_df,
        labels_list=labels_list,
        probe=probe,
        label2idx=label2idx,
        class_id=target_class_id,
        percentage=pct,
        experiment_title=f"Silencing {pct*100:.1f}% of Neurons for Class {target_class_id}"
    )


In [None]:
percentages = [0.025, 0.05, 0.075, 0.10, 0.125, 0.15, 0.175, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50, 0.65, 0.75, 0.8, 0.95]

target_class_id = 0

for pct in percentages:
    silence_top_class_percentage_and_evaluate(
        model=model,
        sample_df=sample_df,
        labels_list=labels_list,
        probe=probe,
        label2idx=label2idx,
        class_id=target_class_id,
        percentage=pct,
        experiment_title=f"Silencing {pct*100:.1f}% of Neurons for Class {target_class_id}"
    )


# Attacks

## FGSM

In [None]:
from torch.nn import CrossEntropyLoss

def run_fgsm_attack_and_evaluate(
    model,
    sample_df,
    labels_list,
    epsilon: float = 0.1,
    report_path: str = f"{BASE_PATH}/fgsm.csv",
    experiment_title: str = None
):
    logger.info(f"⚔️ Running FGSM attack with ε = {epsilon}")
    model.eval()
    predictions_fgsm = []
    loss_fn = CrossEntropyLoss()

    is_longformer = hasattr(model, "longformer")

    for i in range(len(sample_df)):
        # ===============================
        # 🔢 Prepare input tensors
        # ===============================
        input_ids_tensor = torch.tensor(sample_df.loc[i, 'input_ids'], dtype=torch.long).unsqueeze(0).to(model.device)
        attention_mask_tensor = torch.tensor(sample_df.loc[i, 'attention_mask'], dtype=torch.long).unsqueeze(0).to(model.device)
        true_label = torch.tensor([labels_list[i]], dtype=torch.long).to(model.device)

        # ===============================
        # 🔍 Extract embeddings as leaf tensor
        # ===============================
        with torch.no_grad():
            embedding_output = model.base_model.embeddings(input_ids_tensor)
        embeds = embedding_output.clone().detach().requires_grad_(True)

        # ===============================
        # 🔁 Forward pass
        # ===============================
        if is_longformer:
            global_attention_mask = torch.zeros_like(attention_mask_tensor)
            global_attention_mask[:, 0] = 1
            outputs = model(
                inputs_embeds=embeds,
                attention_mask=attention_mask_tensor,
                global_attention_mask=global_attention_mask
            )
        else:
            outputs = model(
                inputs_embeds=embeds,
                attention_mask=attention_mask_tensor
            )

        logits = outputs.logits
        loss = loss_fn(logits, true_label)

        # ===============================
        # 🔁 Backward pass
        # ===============================
        model.zero_grad()
        loss.backward()

        # ===============================
        # ⚔️ FGSM perturbation
        # ===============================
        perturbation = epsilon * embeds.grad.data.sign()
        adv_embeds = embeds + perturbation

        # ===============================
        # 🔮 Inference with adversarial input
        # ===============================
        with torch.no_grad():
            if is_longformer:
                adv_outputs = model(
                    inputs_embeds=adv_embeds,
                    attention_mask=attention_mask_tensor,
                    global_attention_mask=global_attention_mask
                )
            else:
                adv_outputs = model(
                    inputs_embeds=adv_embeds,
                    attention_mask=attention_mask_tensor
                )
            adv_logits = adv_outputs.logits
            pred = torch.argmax(adv_logits, dim=1).item()
            predictions_fgsm.append(pred)

        del input_ids_tensor, attention_mask_tensor, embeds, adv_embeds, outputs, adv_outputs, logits, adv_logits
        torch.cuda.empty_cache()

    # ===============================
    # 📊 Evaluation
    # ===============================
    accuracy = accuracy_score(labels_list, predictions_fgsm)
    f1 = f1_score(labels_list, predictions_fgsm, average='weighted')
    report_dict = classification_report(labels_list, predictions_fgsm, output_dict=True)
    report_df = pd.DataFrame(report_dict).transpose().round(4).drop("accuracy", errors="ignore")

    accuracy_row = pd.DataFrame({
        'precision': [""],
        'recall': [""],
        'f1-score': [accuracy],
        'support': [sum(report_df["support"])]
    }, index=["overall_accuracy"])
    final_df = pd.concat([report_df, accuracy_row])

    # ===============================
    # 💾 Save report
    # ===============================
    if experiment_title is None:
        experiment_title = f"FGSM Attack (ε = {epsilon})"
    if not os.path.exists(report_path):
        with open(report_path, "w") as f:
            f.write(f"# {experiment_title}\n")
            final_df.to_csv(f)
    else:
        with open(report_path, "a") as f:
            f.write(f"\n\n# {experiment_title}\n")
        final_df.to_csv(report_path, mode="a")

    logger.info(f"🎯 Accuracy under FGSM (ε={epsilon}): {accuracy:.4f}")
    logger.info(f"📏 Weighted F1 Score: {f1:.4f}")
    logger.info(f"📋 Classification report saved to {report_path}")

In [None]:
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
model.to("cpu")
device = torch.device("cpu")


In [None]:
run_fgsm_attack_and_evaluate(
    model=model,
    sample_df=sample_df,
    labels_list=labels_list,
    epsilon=0.1,
    experiment_title="FGSM Adversarial Attack with ε = 0.1"
)

## Random Noise

In [None]:
def run_random_noise_attack(
    model,
    sample_df,
    labels_list,
    epsilon=0.3,
    device="cpu"
):
    import torch
    from sklearn.metrics import accuracy_score, f1_score, classification_report

    logger.info(f"🎲 Running random noise attack (epsilon={epsilon})")
    model.to(device)
    model.eval()
    predictions_noise = []

    is_longformer = hasattr(model, "longformer")

    for i in range(len(sample_df)):
        input_ids_tensor = torch.tensor(sample_df.loc[i, 'input_ids'], dtype=torch.long).unsqueeze(0).to(device)
        attention_mask_tensor = torch.tensor(sample_df.loc[i, 'attention_mask'], dtype=torch.long).unsqueeze(0).to(device)

        with torch.no_grad():
            embeddings = model.base_model.embeddings(input_ids_tensor)
            noisy_embeddings = embeddings + epsilon * torch.randn_like(embeddings)

            if is_longformer:
                global_attention_mask = torch.zeros_like(attention_mask_tensor)
                global_attention_mask[:, 0] = 1
                outputs = model(
                    inputs_embeds=noisy_embeddings,
                    attention_mask=attention_mask_tensor,
                    global_attention_mask=global_attention_mask
                )
            else:
                outputs = model(
                    inputs_embeds=noisy_embeddings,
                    attention_mask=attention_mask_tensor
                )

            logits = outputs.logits
            pred = torch.argmax(logits, dim=1).item()
            predictions_noise.append(pred)

    accuracy = accuracy_score(labels_list, predictions_noise)
    f1 = f1_score(labels_list, predictions_noise, average='weighted')
    report = classification_report(labels_list, predictions_noise)

    logger.info(f"📉 Random noise attack results (epsilon={epsilon}):")
    logger.info(f"Accuracy: {accuracy:.4f} | F1 Score: {f1:.4f}")
    logger.info(f"\nClassification Report:\n{report}")

    return accuracy, f1, predictions_noise

In [None]:
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score, f1_score, classification_report

epsilons = np.linspace(0.1, 1.0, 10)  # 10 valores de 0.1 a 1.0
results = []

for epsilon in epsilons:
    logger.info(f"\n🎲 Running random noise attack with ε={epsilon:.2f}")

    acc_noise, f1_noise, preds_noise = run_random_noise_attack(
        model=model,
        sample_df=sample_df,
        labels_list=labels_list,
        epsilon=epsilon,
        device="cpu"
    )

    logger.info(f"📉 Results for ε={epsilon:.2f} → Accuracy: {acc_noise:.4f} | F1 Score: {f1_noise:.4f}")

    results.append({
        "epsilon": round(epsilon, 2),
        "accuracy": acc_noise,
        "f1_score": f1_noise
    })

# Convertimos resultados a dataframe
noise_sweep_df = pd.DataFrame(results)
noise_sweep_df.to_csv("results/random_noise_sweep_up_to_1.csv", index=False)

logger.info("📁 Random noise sweep results saved to 'results/random_noise_sweep_up_to_1.csv'")


## Logit Bias

In [38]:
import logging
import torch
from collections import Counter
from sklearn.metrics import accuracy_score, f1_score, classification_report

# ─────────────────────────────────────────
# Configure logging to show INFO and above
# ─────────────────────────────────────────
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    datefmt="%H:%M:%S"
)
logger = logging.getLogger(__name__)

# — Helper de inferencia común —
def run_inference(model, sample_df):
    model.eval()
    preds = []
    for row in sample_df.itertuples():
        input_ids = torch.tensor(row.input_ids).unsqueeze(0).to(model.device)
        att_mask  = torch.tensor(row.attention_mask).unsqueeze(0).to(model.device)
        with torch.no_grad():
            logits = model(input_ids=input_ids, attention_mask=att_mask).logits
        preds.append(int(logits.argmax(dim=-1)))
    return preds

# ==============================
# TEST Logit‐Bias Majority Attack con reporte completo
# ==============================
def test_logit_bias_majority(
    model,
    sample_df,
    labels_list,
    target_class: int = 3,
    bias: float = 5.0,
    min_frac: float = 0.8
):
    """
    Añade un sesgo al logit de la clase `target_class` y:
      • Ejecuta inferencia
      • Mide la fracción de predicciones forzadas hacia target_class
      • Calcula accuracy, F1, distribución de predicciones
      • Mapea original→atacado (completo) y SOLO las transiciones hacia la clase objetivo
      • Devuelve un dict con todos los resultados (no detiene ejecución)
    """
    # 1) Define e instala el hook
    def make_logit_bias_hook(target, b):
        def hook(module, inp, out):
            out.logits[:, target] += b
            return out
        return hook

    handle = model.register_forward_hook(make_logit_bias_hook(target_class, bias))

    # 2) Inferencia bajo ataque
    attacked = run_inference(model, sample_df)

    # 3) Quita el hook
    handle.remove()

    # 4) Fracción de muestras clasificadas como target_class
    count_to_target = sum(1 for p in attacked if p == target_class)
    frac_to_target = count_to_target / len(attacked) if attacked else 0.0
    logger.info(f"[LogitBiasMajority] Fraction→class_{target_class}: {frac_to_target:.2%} (bias={bias})")
    if frac_to_target < min_frac:
        logger.warning(
            f"[LogitBiasMajority] Fraction below threshold: got {frac_to_target:.2%}, "
            f"expected at least {min_frac:.2%}"
        )
    else:
        logger.info(
            f"[LogitBiasMajority] Fraction meets threshold: {frac_to_target:.2%} ≥ {min_frac:.2%}"
        )

    # 5) Métricas de rendimiento
    accuracy = accuracy_score(labels_list, attacked)
    f1w      = f1_score(labels_list, attacked, average='weighted', zero_division=0)
    logger.info(f"[LogitBiasMajority] Accuracy under attack: {accuracy:.4f}")
    logger.info(f"[LogitBiasMajority] Weighted F1 Score: {f1w:.4f}")

    # 6) Distribución de predicciones
    dist = dict(Counter(attacked))
    logger.info(f"[LogitBiasMajority] Prediction distribution: {dist}")

    # 7) Mapeo completo: clase original → clase atacada
    mapping_full = Counter(zip(labels_list, attacked))
    mapping_full_str = {f"{orig}→{pred}": cnt for (orig, pred), cnt in mapping_full.items()}
    logger.info(f"[LogitBiasMajority] Mapping original→attacked (FULL): {mapping_full_str}")

    # 7a) SOLO transiciones que acaban en la clase objetivo (target_class)
    to_target_only = {f"{orig}→{pred}": cnt
                      for (orig, pred), cnt in mapping_full.items()
                      if pred == target_class}
    flips_to_target = sum(cnt for (orig, pred), cnt in mapping_full.items()
                          if pred == target_class and orig != target_class)
    kept_as_target  = mapping_full.get((target_class, target_class), 0)

    total_non_target = sum(1 for y in labels_list if y != target_class)
    frac_flips_from_non_target = (flips_to_target / total_non_target) if total_non_target else 0.0
    frac_all_to_target = (sum(to_target_only.values()) / len(labels_list)) if labels_list else 0.0

    logger.info(f"[LogitBiasMajority] ONLY to target {target_class}: {to_target_only}")
    logger.info(f"[LogitBiasMajority] Flips to target (from other classes): {flips_to_target}")
    logger.info(f"[LogitBiasMajority] Kept as target (target→target): {kept_as_target}")
    logger.info(f"[LogitBiasMajority] Frac of non-target that flipped→target: {frac_flips_from_non_target:.2%}")
    logger.info(f"[LogitBiasMajority] Overall frac predicted as target: {frac_all_to_target:.2%}")

    # 8) Classification report completo
    report = classification_report(labels_list, attacked, zero_division=0)
    logger.info(f"[LogitBiasMajority] Classification Report:\n{report}")

    # 9) Devolver detalles para inspección adicional
    return {
        "target_class": target_class,
        "bias": bias,
        "min_frac": min_frac,
        "fraction_to_target": frac_to_target,
        "accuracy": accuracy,
        "f1_weighted": f1w,
        "prediction_distribution": dist,
        "mapping_full": mapping_full_str,
        "only_to_target": to_target_only,
        "flips_to_target": flips_to_target,
        "kept_as_target": kept_as_target,
        "frac_flips_from_non_target": frac_flips_from_non_target,
        "frac_all_to_target": frac_all_to_target,
        "classification_report": report,
        "attacked_preds": attacked
    }
# ==============================
# Ejecutar el test y almacenar resultados
# ==============================
results = test_logit_bias_majority(
    model=model,
    sample_df=sample_df,
    labels_list=labels_list,
    target_class=3,
    bias=8.0,
    min_frac=0.8
)
logger.info(f"Test completed, results: {results}")

11:22:46 INFO __main__: [LogitBiasMajority] Fraction→class_3: 40.00% (bias=8.0)
11:22:46 INFO __main__: [LogitBiasMajority] Accuracy under attack: 0.7000
11:22:46 INFO __main__: [LogitBiasMajority] Weighted F1 Score: 0.6867
11:22:46 INFO __main__: [LogitBiasMajority] Prediction distribution: {3: 20, 2: 13, 0: 9, 1: 5, 4: 3}
11:22:46 INFO __main__: [LogitBiasMajority] Mapping original→attacked (FULL): {'3→3': 8, '2→2': 11, '4→3': 7, '2→3': 3, '0→0': 9, '1→1': 5, '4→4': 2, '4→2': 2, '1→3': 2, '2→4': 1}
11:22:46 INFO __main__: [LogitBiasMajority] ONLY to target 3: {'3→3': 8, '4→3': 7, '2→3': 3, '1→3': 2}
11:22:46 INFO __main__: [LogitBiasMajority] Flips to target (from other classes): 12
11:22:46 INFO __main__: [LogitBiasMajority] Kept as target (target→target): 8
11:22:46 INFO __main__: [LogitBiasMajority] Frac of non-target that flipped→target: 28.57%
11:22:46 INFO __main__: [LogitBiasMajority] Overall frac predicted as target: 40.00%
11:22:46 INFO __main__: [LogitBiasMajority] Classifi

In [25]:


model = AutoModelForSequenceClassification.from_pretrained(MODEL_HF, num_labels=NUM_LABELS)

# Load trained weights from disk
state_dict = torch.load(weights_path, map_location=device)
model.load_state_dict(state_dict)
model.to(device)
model.eval()
device = "cpu"

quick_baseline_f1(model, sample_df, labels_list)

Some weights of BigBirdForSequenceClassification were not initialized from the model checkpoint at google/bigbird-roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
11:58:21 INFO __main__: [Baseline Check] Weighted F1-score: 0.8306


0.8306384351683807

Some weights of BigBirdForSequenceClassification were not initialized from the model checkpoint at google/bigbird-roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [31]:
quick_baseline_f1(model, sample_df, labels_list)

10:52:07 INFO __main__: [Baseline Check] Weighted F1-score: 0.8306


0.8306384351683807

## Gaussian Noise

In [47]:
import logging
import torch
from collections import Counter
from sklearn.metrics import accuracy_score, f1_score, classification_report

# ─────────────────────────────────────────
# (Re)configura logging si es necesario
# ─────────────────────────────────────────
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    datefmt="%H:%M:%S"
)
logger = logging.getLogger(__name__)

# — Helper de inferencia común —
def run_inference(model, sample_df):
    model.eval()
    preds = []
    for row in sample_df.itertuples():
        input_ids = torch.tensor(row.input_ids).unsqueeze(0).to(model.device)
        att_mask  = torch.tensor(row.attention_mask).unsqueeze(0).to(model.device)
        with torch.no_grad():
            logits = model(input_ids=input_ids, attention_mask=att_mask).logits
        preds.append(int(logits.argmax(dim=-1)))
    return preds

# ==============================
# TEST Gaussian‐Noise Attack con reporte completo
# ==============================
def test_gaussian_noise_detailed(
    model,
    sample_df,
    labels_list,
    probe,
    percentage: float = 0.10,
    sigma: float = 0.2
):
    """
    Inyecta ruido Gaussiano en las top‐k neuronas del CLS token y:
      • Ejecuta inferencia
      • Mide cuántas predicciones cambiaron (y su fracción)
      • Calcula accuracy, F1, distribución de predicciones,
        mapeo original→atacado y classification report completo
      • Informa todo vía logger.info / logger.warning
    """
    hidden = model.config.hidden_size
    num_layers = model.config.num_hidden_layers

    # 1) Selección de top‐k neuronas
    topk = get_top_k_neurons_exact(probe, percentage=percentage)
    logger.info(f"[GaussNoise] Injecting σ={sigma} into {len(topk)} neurons ({percentage:.0%})")

    # 2) Hook maker
    def make_noise_hook(indices, σ):
        idxs = torch.tensor(indices, dtype=torch.long)
        def hook(module, inp, out):
            if out.dim()==3 and idxs.numel()>0:
                o = out.clone()
                cls = o[:, 0, :]
                noise = torch.randn_like(cls[:, idxs]) * σ
                cls[:, idxs] += noise.to(o.device)
                o[:, 0, :] = cls
                return o
            return out
        return hook

    # 3) Registrar hooks por capa
    handles = []
    enc_layers = get_encoder_layers(model)
    for l in range(num_layers):
        local = [i - l*hidden for i in topk if l*hidden <= i < (l+1)*hidden]
        if not local:
            continue
        handles.append(
            enc_layers[l].output.register_forward_hook(make_noise_hook(local, sigma))
        )

    # 4) Inferencia baseline y bajo ruido
    baseline = run_inference(model, sample_df)
    attacked = run_inference(model, sample_df)

    # 5) Limpieza de hooks
    for h in handles:
        h.remove()

    # 6) Cuántas predicciones cambiaron
    diff = sum(1 for b, a in zip(baseline, attacked) if b != a)
    frac_changed = diff / len(baseline)
    if diff == 0:
        logger.warning(f"[GaussNoise] NO changes detected at σ={sigma}, {percentage:.0%}")
    else:
        logger.info(f"[GaussNoise] {diff} changes ({frac_changed:.2%} of samples)")

    # 7) Métricas de rendimiento
    accuracy = accuracy_score(labels_list, attacked)
    f1w      = f1_score(labels_list, attacked, average='weighted')
    logger.info(f"[GaussNoise] Accuracy under attack: {accuracy:.4f}")
    logger.info(f"[GaussNoise] Weighted F1 Score: {f1w:.4f}")

    # 8) Distribución de predicciones
    dist = dict(Counter(attacked))
    logger.info(f"[GaussNoise] Prediction distribution: {dist}")

    # 9) Mapeo clase original → clase atacada
    mapping = Counter(zip(labels_list, attacked))
    mapping_str = {f"{orig}→{pred}": cnt for (orig, pred), cnt in mapping.items()}
    logger.info(f"[GaussNoise] Mapping original→attacked: {mapping_str}")

    # 10) Classification report completo
    report = classification_report(labels_list, attacked, zero_division=0)
    logger.info(f"[GaussNoise] Classification Report:\n{report}")

    # 11) Devolver resultados
    return {
        "num_changes": diff,
        "frac_changed": frac_changed,
        "accuracy": accuracy,
        "f1_weighted": f1w,
        "prediction_distribution": dist,
        "original_to_attacked_mapping": mapping_str,
        "classification_report": report,
        "attacked_preds": attacked
    }

# ==============================
# Ejecutar el test
# ==============================
results_noise = test_gaussian_noise_detailed(
    model=model,
    sample_df=sample_df,
    labels_list=labels_list,
    probe=probe,
    percentage=0.40,
    sigma=0.5
)
logger.info(f"Gaussian Noise test results: {results_noise}")

11:41:57 INFO __main__: [GaussNoise] Injecting σ=0.5 into 3686 neurons (40%)
11:44:36 INFO __main__: [GaussNoise] 2 changes (4.00% of samples)
11:44:36 INFO __main__: [GaussNoise] Accuracy under attack: 0.8800
11:44:36 INFO __main__: [GaussNoise] Weighted F1 Score: 0.8750
11:44:36 INFO __main__: [GaussNoise] Prediction distribution: {3: 10, 2: 15, 4: 8, 1: 8, 0: 9}
11:44:36 INFO __main__: [GaussNoise] Mapping original→attacked: {'3→3': 8, '2→2': 13, '4→4': 7, '2→1': 1, '0→0': 9, '1→1': 7, '4→2': 2, '2→4': 1, '4→3': 2}
11:44:36 INFO __main__: [GaussNoise] Classification Report:
              precision    recall  f1-score   support

           0       1.00      1.00      1.00         9
           1       0.88      1.00      0.93         7
           2       0.87      0.87      0.87        15
           3       0.80      1.00      0.89         8
           4       0.88      0.64      0.74        11

    accuracy                           0.88        50
   macro avg       0.88      0.90   

In [48]:
import torch
from collections import Counter
from sklearn.metrics import accuracy_score, f1_score, classification_report
import logging

logger = logging.getLogger(__name__)

# — Utilidad: localizar la capa lineal de clasificación en distintas arquitecturas —
def get_classifier_linear(model):
    """
    Devuelve el módulo lineal final (con .weight y .bias):
    - BERT/RoBERTa/DistilBERT: model.classifier (nn.Linear)
    - BigBird: model.classifier.out_proj
    - Otros: model.score (si existe)
    """
    if hasattr(model, "classifier"):
        clf = model.classifier
        if hasattr(clf, "out_proj"):  # BigBirdClassificationHead
            return clf.out_proj
        if hasattr(clf, "weight") and hasattr(clf, "bias"):
            return clf
    if hasattr(model, "score"):  # algunos heads usan "score"
        return model.score
    raise NotImplementedError("❌ No se encontró capa lineal de clasificación compatible.")

# — Inference helper (ya lo usas) —
def run_inference(model, sample_df):
    model.eval()
    preds = []
    for row in sample_df.itertuples():
        input_ids = torch.tensor(row.input_ids).unsqueeze(0).to(model.device)
        att_mask  = torch.tensor(row.attention_mask).unsqueeze(0).to(model.device)
        with torch.no_grad():
            logits = model(input_ids=input_ids, attention_mask=att_mask).logits
        preds.append(int(logits.argmax(dim=-1)))
    return preds

# ==============================
# ⚙️  WEIGHT ATTACK (Fault-Sneak dirigido, con restore)
# ==============================
def test_weight_attack_targeted(
    model,
    sample_df,
    labels_list,
    probe,
    target_class: int,
    percentage: float = 0.10,
    delta_scale: float = 0.05,
    balanced_push: bool = True  # empuja + a la clase objetivo y - (pequeño) al resto
):
    """
    Modifica temporalmente los PESOS de la capa de clasificación (W: [num_labels, hidden])
    en las columnas asociadas a las neuronas top-k (según el probe), para favorecer
    la 'target_class'. Restaura al final (no deja el modelo contaminado).
    """
    # --- Selección de neuronas top-k y mapeo a columnas del clasificador ---
    hidden = model.config.hidden_size
    num_layers = model.config.num_hidden_layers
    topk_global = get_top_k_neurons_exact(probe, percentage=percentage)

    # Mapeamos índices globales (capa*largo + neurona) a columnas locales de la última capa
    # Truco práctico: columna = idx % hidden  (columna del vector [CLS] que alimenta a la head)
    cols = sorted({ g % hidden for g in topk_global })
    if len(cols) == 0:
        logger.warning("[WeightAttack] No hay columnas seleccionadas; abortando.")
        return None

    clf = get_classifier_linear(model)
    W = clf.weight            # [C, H]
    b = clf.bias              # [C] (puede ser None)
    C, H = W.shape

    logger.info(
        f"[WeightAttack] Target class={target_class}, top-k={len(cols)} cols, "
        f"delta_scale={delta_scale}, balanced_push={balanced_push}"
    )

    # --- Backup de pesos (y bias) ---
    W_orig = W.data.clone()
    b_orig = b.data.clone() if b is not None else None

    # --- Construcción de ΔW (solo columnas seleccionadas) ---
    delta = torch.zeros_like(W.data)  # [C, H]
    # Empuja a favor de la clase objetivo en esas columnas
    delta[target_class, cols] += delta_scale

    if balanced_push and C > 1:
        # Para mantener efecto "sigiloso", empujón negativo suave al resto de clases
        neg = (-delta_scale) / (C - 1)
        mask_other = torch.ones(C, dtype=torch.bool, device=W.device)
        mask_other[target_class] = False
        delta[mask_other][:, cols] += neg

    # --- Aplicar Δ (ataque de PESOS) ---
    W.data.add_(delta.to(W.device))

    # --- Inferencia atacada ---
    attacked = run_inference(model, sample_df)

    # --- Restaurar pesos (y bias si procede) ---
    W.data.copy_(W_orig)
    if b is not None and b_orig is not None:
        b.data.copy_(b_orig)

    # ===============================
    # 📊 Evaluación y reportes
    # ===============================
    accuracy = accuracy_score(labels_list, attacked)
    f1w      = f1_score(labels_list, attacked, average='weighted', zero_division=0)
    logger.info(f"[WeightAttack] Accuracy under attack: {accuracy:.4f}")
    logger.info(f"[WeightAttack] Weighted F1 Score: {f1w:.4f}")

    # Distribución de predicciones
    dist = dict(Counter(attacked))
    logger.info(f"[WeightAttack] Prediction distribution: {dist}")

    # Mapeo original → atacado (completo)
    mapping = Counter(zip(labels_list, attacked))
    mapping_full_str = {f"{orig}→{pred}": cnt for (orig, pred), cnt in mapping.items()}
    logger.info(f"[WeightAttack] Mapping original→attacked (FULL): {mapping_full_str}")

    # Solo transiciones hacia la clase objetivo
    to_target_only = {f"{orig}→{pred}": cnt
                      for (orig, pred), cnt in mapping.items()
                      if pred == target_class}
    flips_to_target = sum(cnt for (orig, pred), cnt in mapping.items()
                          if pred == target_class and orig != target_class)
    kept_as_target  = mapping.get((target_class, target_class), 0)

    total_non_target = sum(1 for y in labels_list if y != target_class)
    frac_flips_from_non_target = (flips_to_target / total_non_target) if total_non_target else 0.0
    frac_all_to_target = (sum(to_target_only.values()) / len(labels_list)) if labels_list else 0.0

    logger.info(f"[WeightAttack] ONLY to target {target_class}: {to_target_only}")
    logger.info(f"[WeightAttack] Flips→target (from other classes): {flips_to_target}")
    logger.info(f"[WeightAttack] Kept as target (target→target): {kept_as_target}")
    logger.info(f"[WeightAttack] Frac non-target that flipped→target: {frac_flips_from_non_target:.2%}")
    logger.info(f"[WeightAttack] Overall frac predicted as target: {frac_all_to_target:.2%}")

    # Classification report
    report = classification_report(labels_list, attacked, zero_division=0)
    logger.info(f"[WeightAttack] Classification Report:\n{report}")

    return {
        "target_class": target_class,
        "percentage_neurons": percentage,
        "delta_scale": delta_scale,
        "balanced_push": balanced_push,
        "accuracy": accuracy,
        "f1_weighted": f1w,
        "prediction_distribution": dist,
        "mapping_full": mapping_full_str,
        "only_to_target": to_target_only,
        "flips_to_target": flips_to_target,
        "kept_as_target": kept_as_target,
        "frac_flips_from_non_target": frac_flips_from_non_target,
        "frac_all_to_target": frac_all_to_target,
        "classification_report": report,
        "attacked_preds": attacked,
        "used_columns": cols
    }

In [None]:
results_weight = test_weight_attack_targeted(
    model=model,
    sample_df=sample_df,
    labels_list=labels_list,
    probe=probe,
    target_class=3,      # objetivo 
    percentage=0.10,     # top-k neuronas (global → columnas únicas)
    delta_scale=0.05,    # intensidad del empujón en pesos
    balanced_push=True   # empuja + a target y - (suave) al resto
)
logger.info(f"Weight attack results: {results_weight}")

11:51:30 INFO __main__: [WeightAttack] Target class=3, top-k=593 cols, delta_scale=0.05, balanced_push=True
11:52:48 INFO __main__: [WeightAttack] Accuracy under attack: 0.7800
11:52:48 INFO __main__: [WeightAttack] Weighted F1 Score: 0.7578
11:52:48 INFO __main__: [WeightAttack] Prediction distribution: {3: 14, 2: 14, 4: 5, 1: 8, 0: 9}
11:52:48 INFO __main__: [WeightAttack] Mapping original→attacked (FULL): {'3→3': 8, '2→2': 12, '4→4': 3, '2→1': 1, '2→4': 2, '0→0': 9, '1→1': 7, '4→3': 6, '4→2': 2}
11:52:48 INFO __main__: [WeightAttack] ONLY to target 3: {'3→3': 8, '4→3': 6}
11:52:48 INFO __main__: [WeightAttack] Flips→target (from other classes): 6
11:52:48 INFO __main__: [WeightAttack] Kept as target (target→target): 8
11:52:48 INFO __main__: [WeightAttack] Frac non-target that flipped→target: 14.29%
11:52:48 INFO __main__: [WeightAttack] Overall frac predicted as target: 28.00%
11:52:48 INFO __main__: [WeightAttack] Classification Report:
              precision    recall  f1-score 

## Global Noise Injection

In [None]:
import os
import torch
import pandas as pd
from sklearn.metrics import accuracy_score, f1_score, classification_report

# Parámetros
PERCENTAGE  = 0.10
SIGMA       = 0.1
REPORT_PATH = os.path.join(BASE_PATH, "results", f"global_noise_{int(PERCENTAGE*100)}p.csv")

# Seleccionar top‐k
top_neurons = get_top_k_neurons_exact(probe, percentage=PERCENTAGE)
os.makedirs(f"{BASE_PATH}/neurons", exist_ok=True)
with open(f"{BASE_PATH}/neurons/top_{int(PERCENTAGE*100)}p_global_noise.json", "w") as f:
    json.dump(top_neurons, f, indent=2)

# Hook maker
def make_partial_noise_hook(indices, sigma):
    idxs = torch.tensor(indices, dtype=torch.long)
    def hook(module, inp, out):
        if out.dim()==3 and idxs.numel()>0:
            o = out.clone()
            vals = o[:,:,idxs]             # (b, seq_len, |idxs|)
            o[:,:,idxs] = vals + torch.randn_like(vals)*sigma
            return o
        return out
    return hook

# Registrar
handles = []
for layer_idx, layer in enumerate(get_encoder_layers(model)):
    local = [i - layer_idx*model.config.hidden_size for i in top_neurons
             if layer_idx*model.config.hidden_size <= i < (layer_idx+1)*model.config.hidden_size]
    if not local: continue
    handles.append(
        layer.output.register_forward_hook(make_partial_noise_hook(local, SIGMA))
    )

# Inferencia
model.eval()
predictions_gnoise = []
for row in sample_df.itertuples():
    input_ids = torch.tensor(row.input_ids).unsqueeze(0).to(model.device)
    att_mask  = torch.tensor(row.attention_mask).unsqueeze(0).to(model.device)
    with torch.no_grad():
        logits = model(input_ids=input_ids, attention_mask=att_mask).logits
    predictions_gnoise.append(int(logits.argmax(dim=-1)))

# ===============================
# 📊 Evaluation
# ===============================
accuracy = accuracy_score(labels_list, predictions_gnoise)
f1       = f1_score(labels_list, predictions_gnoise, average='weighted')
report_dict = classification_report(labels_list, predictions_gnoise, output_dict=True, zero_division=0)
report_df   = pd.DataFrame(report_dict).transpose().round(4).drop("accuracy", errors="ignore")

accuracy_row = pd.DataFrame({
    'precision': [""],
    'recall':    [""],
    'f1-score':  [accuracy],
    'support':   [report_df["support"].sum()]
}, index=["overall_accuracy"])
final_df = pd.concat([report_df, accuracy_row])

# ===============================
# 💾 Save report
# ===============================
experiment_title = f"Partial Global Noise ({PERCENTAGE:.0%} top neurons, σ={SIGMA})"
os.makedirs(os.path.dirname(REPORT_PATH), exist_ok=True)
mode = "w" if not os.path.exists(REPORT_PATH) else "a"
with open(REPORT_PATH, mode) as f:
    if mode == "w":
        f.write(f"# {experiment_title}\n")
    else:
        f.write(f"\n\n# {experiment_title}\n")
    final_df.to_csv(f)

logger.info(f"🎯 Accuracy under Partial Global Noise: {accuracy:.4f}")
logger.info(f"📏 Weighted F1 Score: {f1:.4f}")
logger.info(f"📋 Classification report saved to {REPORT_PATH}")

# Cleanup
for h in handles:
    h.remove()

## Fault Sneaking (sim)

In [None]:
import os, json, torch, pandas as pd
from sklearn.metrics import accuracy_score, f1_score, classification_report

# Parámetros
PERCENTAGE  = 0.10
DELTA_SCALE = 0.05
REPORT_PATH = os.path.join(BASE_PATH, "results", f"fault_sneak_{int(PERCENTAGE*100)}p.csv")

# 1) Selección top‐k
top_neurons = get_top_k_neurons_exact(probe, percentage=PERCENTAGE)
os.makedirs(f"{BASE_PATH}/neurons", exist_ok=True)
with open(f"{BASE_PATH}/neurons/top_{int(PERCENTAGE*100)}p_fault.json", "w") as f:
    json.dump(top_neurons, f, indent=2)

# 2) Construir delta sobre out_proj
hidden = model.config.hidden_size
# out_proj: Linear(hidden, num_labels)
out_proj = model.classifier.out_proj  
delta = torch.zeros_like(out_proj.weight.data)
for g in top_neurons:
    idx = g % hidden
    # para cada clase (fila), añadimos ruido pequeño en la columna idx
    delta[:, idx] = torch.randn(delta.shape[0]) * DELTA_SCALE

# 3) Hook pre‐forward sobre out_proj
def make_fault_hook(delta_tensor):
    def hook(module, inp):
        module.weight.data += delta_tensor.to(module.weight.device)
    return hook

handle = out_proj.register_forward_pre_hook(make_fault_hook(delta))

# 4) Inferencia
model.eval()
predictions_fault = []
for row in sample_df.itertuples():
    input_ids = torch.tensor(row.input_ids).unsqueeze(0).to(model.device)
    att_mask  = torch.tensor(row.attention_mask).unsqueeze(0).to(model.device)
    with torch.no_grad():
        logits = model(input_ids=input_ids, attention_mask=att_mask).logits
    predictions_fault.append(int(logits.argmax(dim=-1)))

# ===============================
# 📊 Evaluation
# ===============================
accuracy = accuracy_score(labels_list, predictions_fault)
f1       = f1_score(labels_list, predictions_fault, average='weighted')
report_dict = classification_report(labels_list, predictions_fault, output_dict=True, zero_division=0)
report_df   = pd.DataFrame(report_dict).transpose().round(4).drop("accuracy", errors="ignore")

accuracy_row = pd.DataFrame({
    'precision': [""],
    'recall':    [""],
    'f1-score':  [accuracy],
    'support':   [report_df["support"].sum()]
}, index=["overall_accuracy"])
final_df = pd.concat([report_df, accuracy_row])

# ===============================
# 💾 Save report
# ===============================
experiment_title = f"Fault Sneaking ({PERCENTAGE:.0%} top neurons, scale={DELTA_SCALE})"
os.makedirs(os.path.dirname(REPORT_PATH), exist_ok=True)
mode = "w" if not os.path.exists(REPORT_PATH) else "a"
with open(REPORT_PATH, mode) as f:
    if mode == "w":
        f.write(f"# {experiment_title}\n")
    else:
        f.write(f"\n\n# {experiment_title}\n")
    final_df.to_csv(f)

logger.info(f"🎯 Accuracy under Fault Sneaking: {accuracy:.4f}")
logger.info(f"📏 Weighted F1 Score: {f1:.4f}")
logger.info(f"📋 Classification report saved to {REPORT_PATH}")

# 5) Cleanup
handle.remove()

## Tests

In [None]:
import torch
from sklearn.metrics import accuracy_score
import logging

logger = logging.getLogger(__name__)

# — Helper de inferencia común —
def run_inference(model, sample_df):
    model.eval()
    preds = []
    for row in sample_df.itertuples():
        input_ids = torch.tensor(row.input_ids).unsqueeze(0).to(model.device)
        att_mask  = torch.tensor(row.attention_mask).unsqueeze(0).to(model.device)
        with torch.no_grad():
            logits = model(input_ids=input_ids, attention_mask=att_mask).logits
        preds.append(int(logits.argmax(dim=-1)))
    return preds

# ==============================
# 1) TEST Logit‐Bias Majority Attack (sin cortar ejecución)
# ==============================
def test_logit_bias_majority(
    model,
    sample_df,
    labels_list,
    target_class: int = 3,
    bias: float = 5.0,
    min_frac: float = 0.8
):
    # instalar hook
    def hook(m, inp, out):
        out.logits[:, target_class] += bias
        return out
    handle = model.register_forward_hook(hook)

    # inferencia
    attacked = run_inference(model, sample_df)
    handle.remove()

    # comprobación
    count_target = sum(1 for p in attacked if p == target_class)
    frac = count_target / len(attacked)
    expected = f">= {min_frac:.0%} to class {target_class}"
    actual = f"{frac:.0%} to class {target_class}"
    if frac < min_frac:
        logger.error(f"[LogitBiasMajority] FAILURE: expected {expected}, got {actual}")
    else:
        logger.info(f"[LogitBiasMajority] SUCCESS: {actual} (bias={bias})")

    # métricas
    acc = accuracy_score(labels_list, attacked)
    logger.info(f"[LogitBiasMajority] Accuracy under attack: {acc:.4f}")
    # opcional: devolver attacked para inspección
    return attacked

# ==============================
# 2) TEST Gaussian Noise en top‐k (sin cortar ejecución)
# ==============================
def test_gaussian_noise(
    model,
    sample_df,
    labels_list,
    probe,
    percentage: float = 0.1,
    sigma: float = 0.2
):
    # baseline y atacada
    baseline = run_inference(model, sample_df)
    topk = get_top_k_neurons_exact(probe, percentage=percentage)

    # instalar hooks
    def make_noise(idx_list, σ):
        idxs = torch.tensor(idx_list, dtype=torch.long)
        def hook(m, inp, out):
            if out.dim()==3 and idxs.numel()>0:
                o = out.clone()
                cls = o[:,0,:]
                cls[:, idxs] += torch.randn_like(cls[:, idxs]) * σ
                o[:,0,:] = cls
                return o
            return out
        return hook

    handles = []
    for i, layer in enumerate(get_encoder_layers(model)):
        local = [g - i*model.config.hidden_size for g in topk
                 if i*model.config.hidden_size <= g < (i+1)*model.config.hidden_size]
        if not local: continue
        handles.append(layer.output.register_forward_hook(make_noise(local, sigma)))

    attacked = run_inference(model, sample_df)
    for h in handles: h.remove()

    # ver cuántas cambiaron
    diff = sum(1 for b,a in zip(baseline, attacked) if b != a)
    expected = "> 0 changes"
    actual = f"{diff} changes"
    if diff == 0:
        logger.error(f"[GaussNoise] FAILURE: expected {expected}, got {actual}")
    else:
        logger.info(f"[GaussNoise] SUCCESS: {actual} (σ={sigma}, {percentage:.0%} neurons)")

    acc = accuracy_score(labels_list, attacked)
    logger.info(f"[GaussNoise] Accuracy under attack: {acc:.4f}")
    return attacked

# ==============================
# 3) TEST Partial Global Noise en top‐k (sin cortar ejecución)
# ==============================
def test_partial_noise(
    model,
    sample_df,
    labels_list,
    probe,
    percentage: float = 0.1,
    sigma: float = 0.1
):
    baseline = run_inference(model, sample_df)
    topk = get_top_k_neurons_exact(probe, percentage=percentage)

    def make_partial(idx_list, σ):
        idxs = torch.tensor(idx_list, dtype=torch.long)
        def hook(m, inp, out):
            if out.dim()==3 and idxs.numel()>0:
                o = out.clone()
                o[:,:,idxs] += torch.randn_like(o[:,:,idxs]) * σ
                return o
            return out
        return hook

    handles = []
    for i, layer in enumerate(get_encoder_layers(model)):
        local = [g - i*model.config.hidden_size for g in topk
                 if i*model.config.hidden_size <= g < (i+1)*model.config.hidden_size]
        if not local: continue
        handles.append(layer.output.register_forward_hook(make_partial(local, sigma)))

    attacked = run_inference(model, sample_df)
    for h in handles: h.remove()

    diff = sum(1 for b,a in zip(baseline, attacked) if b != a)
    expected = "> 0 changes"
    actual = f"{diff} changes"
    if diff == 0:
        logger.error(f"[PartialNoise] FAILURE: expected {expected}, got {actual}")
    else:
        logger.info(f"[PartialNoise] SUCCESS: {actual} (σ={sigma}, {percentage:.0%} neurons)")

    acc = accuracy_score(labels_list, attacked)
    logger.info(f"[PartialNoise] Accuracy under attack: {acc:.4f}")
    return attacked

# ==============================
# 4) TEST Fault‐Sneaking simulado en top‐k (sin cortar ejecución)
# ==============================
def test_fault_sneaking(
    model,
    sample_df,
    labels_list,
    probe,
    percentage: float = 0.1,
    delta_scale: float = 0.05
):
    baseline = run_inference(model, sample_df)
    topk = get_top_k_neurons_exact(probe, percentage=percentage)

    # delta sobre out_proj
    hidden = model.config.hidden_size
    out_proj = model.classifier.out_proj
    delta = torch.zeros_like(out_proj.weight.data)
    for g in topk:
        idx = g % hidden
        delta[:, idx] = torch.randn(delta.shape[0]) * delta_scale

    # hook
    def make_fault_hook(delta_tensor):
        def hook(m, inp):
            m.weight.data += delta_tensor.to(m.weight.device)
        return hook

    handle = out_proj.register_forward_pre_hook(make_fault_hook(delta))
    attacked = run_inference(model, sample_df)
    handle.remove()

    diff = sum(1 for b,a in zip(baseline, attacked) if b != a)
    expected = "> 0 changes"
    actual = f"{diff} changes"
    if diff == 0:
        logger.error(f"[FaultSneak] FAILURE: expected {expected}, got {actual}")
    else:
        logger.info(f"[FaultSneak] SUCCESS: {actual} (scale={delta_scale}, {percentage:.0%} neurons)")

    acc = accuracy_score(labels_list, attacked)
    logger.info(f"[FaultSneak] Accuracy under attack: {acc:.4f}")
    return attacked

# ==============================
# Ejecutar tests sin cortar ejecución
# ==============================
att_logit   = test_logit_bias_majority(model, sample_df, labels_list)
att_gauss   = test_gaussian_noise(model, sample_df, labels_list, probe)
att_partial = test_partial_noise(model, sample_df, labels_list, probe)
att_fault   = test_fault_sneaking(model, sample_df, labels_list, probe)

logger.info("✅ All attack tests completed.")

# Conjuntos disjuntos de neuronas

In [None]:
from collections import defaultdict
import os
import json

top_percentage = 0.1  # 50% 

# Detect and convert (layer, neuron) tuples to global indices if needed
hidden_dim = model.config.hidden_size

def tuple_to_global_index(neuron_tuples, hidden_dim):
    return [layer * hidden_dim + neuron for (layer, neuron) in neuron_tuples]

per_class_top_indices = {}
for class_id, neuron_list in per_class_top_neurons.items():
    if len(neuron_list) > 0 and isinstance(neuron_list[0], tuple):
        per_class_top_indices[class_id] = tuple_to_global_index(neuron_list, hidden_dim)
    else:
        per_class_top_indices[class_id] = neuron_list

# Exclusive neurons: present in top of class A but not in any other class
exclusive_class_neurons = {}
for cid, own_top in per_class_top_indices.items():
    other = set()
    for other_cid, other_top in per_class_top_indices.items():
        if other_cid != cid:
            other.update(other_top)
    exclusive = sorted(set(own_top) - other)
    exclusive_class_neurons[cid] = exclusive
    print(f"Class {cid}: {len(exclusive)} exclusive neurons out of {len(own_top)} top neurons")

# Save exclusive neurons to JSON
exclusive_dir = f"{BASE_PATH}/exclusive_neurons"
os.makedirs(exclusive_dir, exist_ok=True)
for class_id, neuron_list in exclusive_class_neurons.items():
    path = f"{exclusive_dir}/exclusive_top{int(top_percentage*100)}p_class_{class_id}.json"
    with open(path, "w") as f:
        json.dump([int(x) for x in neuron_list], f, indent=2)
    logger.info(f"Saved exclusive neurons for class {class_id} to {path}")

In [None]:
def silence_exclusive_class_and_evaluate(
    model,
    sample_df,
    labels_list,
    exclusive_neuron_indices,
    class_id,
    report_path=None,
    experiment_title=None
):
    hidden_dim = model.config.hidden_size
    num_layers = model.config.num_hidden_layers

    logger.info(f"🔧 Silencing {len(exclusive_neuron_indices)} EXCLUSIVE neurons for class {class_id}")

    encoder_layers = get_encoder_layers(model)
    hook_handles = []
    for i in range(num_layers):
        indices_layer = [idx - i * hidden_dim for idx in exclusive_neuron_indices if i * hidden_dim <= idx < (i + 1) * hidden_dim]
        if indices_layer:
            logger.info(f"📌 Layer {i}: silencing {len(indices_layer)} exclusive neurons for class {class_id}")
            if hasattr(encoder_layers[i], "output"):
                handle = encoder_layers[i].output.register_forward_hook(make_cls_silence_hook(indices_layer))
            else:
                handle = encoder_layers[i].register_forward_hook(make_cls_silence_hook(indices_layer))
            hook_handles.append(handle)

    # --- Evaluation ---
    model.eval()
    predictions = []
    for i in range(len(sample_df)):
        input_ids_tensor = torch.tensor(sample_df.loc[i, 'input_ids']).unsqueeze(0).to(model.device)
        attention_mask_tensor = torch.tensor(sample_df.loc[i, 'attention_mask']).unsqueeze(0).to(model.device)
        with torch.no_grad():
            outputs = model(input_ids=input_ids_tensor, attention_mask=attention_mask_tensor)
            logits = outputs['logits']
            pred = torch.argmax(logits, dim=1).item()
            predictions.append(pred)
        del input_ids_tensor, attention_mask_tensor, outputs, logits
        torch.cuda.empty_cache()

    # --- Metrics & reporting ---
    from sklearn.metrics import accuracy_score, f1_score, classification_report
    import pandas as pd
    accuracy = accuracy_score(labels_list, predictions)
    f1 = f1_score(labels_list, predictions, average='weighted')
    report_dict = classification_report(labels_list, predictions, output_dict=True)
    report_df = pd.DataFrame(report_dict).transpose().round(4)
    report_df = report_df.drop("accuracy", errors="ignore")

    accuracy_row = pd.DataFrame({
        'precision': [""],
        'recall': [""],
        'f1-score': [accuracy],
        'support': [sum(report_df["support"])]
    }, index=["overall_accuracy"])
    final_df = pd.concat([report_df, accuracy_row])

    # --- Save classification report ---
    if report_path is None:
        report_path = f"{BASE_PATH}/results/exclusive_class_silencing_{class_id}.csv"
    os.makedirs(os.path.dirname(report_path), exist_ok=True)
    if experiment_title is None:
        experiment_title = f"Silencing exclusive neurons for class {class_id}"
    if not os.path.exists(report_path):
        with open(report_path, "w") as f:
            f.write(f"# {experiment_title}\n")
            final_df.to_csv(f)
    else:
        with open(report_path, "a") as f:
            f.write(f"\n\n# {experiment_title}\n")
        final_df.to_csv(report_path, mode="a")

    logger.info(f"🎯 Accuracy after exclusive silencing: {accuracy:.4f}")
    logger.info(f"📏 Weighted F1 Score: {f1:.4f}")
    logger.info(f"📋 Classification report saved to {report_path}")

    # Remove hooks
    for handle in hook_handles:
        handle.remove()
    logger.info("✅ All hooks removed after evaluation")

In [None]:
# --- Run silencing experiments for each class with its exclusive neurons ---
for class_id, neuron_indices in exclusive_class_neurons.items():
    silence_exclusive_class_and_evaluate(
        model=model,
        sample_df=sample_df,
        labels_list=labels_list,
        exclusive_neuron_indices=neuron_indices,
        class_id=class_id,
        report_path=f"{BASE_PATH}/results/exclusive_class_silencing_{class_id}.csv",
        experiment_title=f"Silencing exclusive neurons for class {class_id}"
    )

# GoEmotions

## Dataset Configuration

In [None]:
# 📁 Dataset and mappings
GOEMOTIONS_PATH = "data/goemotions"
INPUT_FILE = f"{GOEMOTIONS_PATH}/test.tsv"
EMOTIONS_FILE = f"{GOEMOTIONS_PATH}/emotions.txt"

# 🎯 Target emotions (subset of original GoEmotions)
TARGET_EMOTIONS = ["anger", "disgust", "fear", "joy", "sadness", "surprise"]

# 🧠 Pretrained Model
GOEMOTIONS_MODEL_HF = "monologg/bert-base-cased-goemotions-original"

# 💾 Outputs
SAMPLE_OUTPUT = f"{GOEMOTIONS_PATH}/sample_60.json"
TOKENIZED_OUTPUT = f"{GOEMOTIONS_PATH}/tokenized.pt"
LABELS_OUTPUT = f"{GOEMOTIONS_PATH}/labels.pt"
LABEL_MAPPING_OUTPUT = f"{GOEMOTIONS_PATH}/label_mapping.json"
CSV_REPORT_PATH = f"{GOEMOTIONS_PATH}/classification_report_eval.csv"
CSV_REPORT_GOBAL_SILENCING = f"{GOEMOTIONS_PATH}/classification_report_global_silencing.csv"
ACTIVATIONS_GOEMOTIONS = f"{GOEMOTIONS_PATH}/activations.json"
SAMPLE_OUTPUT_JSON = "data/goemotions/sample_df.json"
# 📟 Device

device_goemo = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
def silence_top_global_percentage_and_evaluate(
    model,
    sample_df,
    labels_list,
    probe,
    label2idx,
    percentage=None,
    experiment_title=None,
    report_path=None,
    custom_indices=None  # Nuevo: permite pasar neuronas custom (por ejemplo aleatorias)
):
    import os
    import torch
    import pandas as pd
    from sklearn.metrics import accuracy_score, f1_score, classification_report

    hidden_dim = model.config.hidden_size
    num_layers = model.config.num_hidden_layers
    total_neurons = num_layers * hidden_dim

    # Decide qué neuronas silenciar
    if custom_indices is not None:
        top_neurons = custom_indices
        print(f"🔧 Silencing custom list of {len(top_neurons)} neurons")
    else:
        # Si no, selecciona top del probe (como siempre)
        top_neurons = top_neurons_probe(
            probe, percentage=percentage, class_to_idx=label2idx
        )
        print(f"🔧 Silencing {len(top_neurons)} neurons from probe (percentage={percentage})")

    # Hook setup
    encoder_layers = get_encoder_layers(model)
    hook_handles = []
    for i in range(num_layers):
        # Neuronas de esta capa
        indices_layer = [idx - i * hidden_dim for idx in top_neurons if i * hidden_dim <= idx < (i + 1) * hidden_dim]
        if indices_layer:
            print(f"📌 Layer {i}: silencing {len(indices_layer)} neurons")
            if hasattr(encoder_layers[i], "output"):
                handle = encoder_layers[i].output.register_forward_hook(make_cls_silence_hook(indices_layer))
            else:
                handle = encoder_layers[i].register_forward_hook(make_cls_silence_hook(indices_layer))
            hook_handles.append(handle)

    # Evaluación estándar (igual que ya tienes)
    model.eval()
    predictions = []
    for i in range(len(sample_df)):
        input_ids_tensor = torch.tensor(sample_df.loc[i, 'input_ids']).unsqueeze(0).to(model.device)
        attention_mask_tensor = torch.tensor(sample_df.loc[i, 'attention_mask']).unsqueeze(0).to(model.device)
        with torch.no_grad():
            outputs = model(input_ids=input_ids_tensor, attention_mask=attention_mask_tensor)
            logits = outputs['logits']
            pred = torch.argmax(logits, dim=1).item()
            predictions.append(pred)
        del input_ids_tensor, attention_mask_tensor, outputs, logits
        torch.cuda.empty_cache()

    # Métricas y reporte
    accuracy = accuracy_score(labels_list, predictions)
    f1 = f1_score(labels_list, predictions, average='weighted')
    report_dict = classification_report(labels_list, predictions, output_dict=True)
    report_df = pd.DataFrame(report_dict).transpose().round(4)
    report_df = report_df.drop("accuracy", errors="ignore")

    accuracy_row = pd.DataFrame({
        'precision': [""],
        'recall': [""],
        'f1-score': [accuracy],
        'support': [sum(report_df["support"])]
    }, index=["overall_accuracy"])
    final_df = pd.concat([report_df, accuracy_row])

    # Guardar reporte
    if report_path is None:
        report_path = "results/class_silencing_global.csv"
    os.makedirs(os.path.dirname(report_path), exist_ok=True)

    if experiment_title is None:
        experiment_title = "Silencing top global neurons"
    if not os.path.exists(report_path):
        with open(report_path, "w") as f:
            f.write(f"# {experiment_title}\n")
            final_df.to_csv(f)
    else:
        with open(report_path, "a") as f:
            f.write(f"\n\n# {experiment_title}\n")
        final_df.to_csv(report_path, mode="a")

    print(f"🎯 Accuracy after silencing: {accuracy:.4f}")
    print(f"📏 Weighted F1 Score: {f1:.4f}")
    print(f"📋 Classification report saved to {report_path}")

    # Quitar hooks
    for handle in hook_handles:
        handle.remove()
    print("✅ All hooks removed after evaluation")

In [None]:
if not os.path.exists(SAMPLE_OUTPUT):
    # 📥 Load emotion names
    with open(EMOTIONS_FILE, "r") as f:
        id2emotion = [line.strip() for line in f.readlines()]
    emotion2id = {e: i for i, e in enumerate(id2emotion)}

    # 🎯 Select target emotions and their GoEmotions IDs
    TARGET_EMOTIONS = ["anger", "disgust", "fear", "joy", "sadness", "surprise"]
    target_ids = [emotion2id[e] for e in TARGET_EMOTIONS]

    # 🌐 Mapping from GoEmotion ID to 0–5 label
    goemo2local = {eid: i for i, eid in enumerate(target_ids)}

    # 📊 Load dataset
    df = pd.read_csv(INPUT_FILE, sep="\t", header=None, names=["text", "labels", "split"])
    df = df.dropna(subset=["labels"])
    df["label_ids"] = df["labels"].apply(lambda x: list(map(int, str(x).split(","))))

    # 🧼 Filter: single-label only & target emotions
    df_filtered = df[df["label_ids"].apply(lambda ids: len(ids) == 1 and ids[0] in target_ids)].copy()
    df_filtered["label_id"] = df_filtered["label_ids"].apply(lambda ids: goemo2local[ids[0]])

    # 📉 Count examples per class
    counts = df_filtered["label_id"].value_counts()
    print("Available examples for selected emotions:")
    print(counts)

    # 🎯 Balanced subset (max 10 per class)
    max_per_class = 10
    samples = []

    for label in counts.index:
        subset = df_filtered[df_filtered["label_id"] == label]
        sampled = shuffle(subset, random_state=42).iloc[:max_per_class]
        samples.append(sampled[["text", "label_id"]])

    df_final = pd.concat(samples).reset_index(drop=True)

    # 💾 Save to JSON
    df_final.to_json(SAMPLE_OUTPUT, orient="records", lines=True, force_ascii=False)
    print(f"\n✅ Saved dataset: {len(df_final)} examples (max {max_per_class} per emotion)")
else:
    print(f"⚠️ Skipping dataset generation: {SAMPLE_OUTPUT} already exists.")

## Original Performance

### Load model, tokenizer and inputs

In [None]:
# 📥 Load dataset
with open(SAMPLE_OUTPUT, "r") as f:
    data = [json.loads(line) for line in f]

texts = [x["text"] for x in data]
labels = [x["label_id"] for x in data]

# 🔢 Label mappings
label2id = {label: i for i, label in enumerate(sorted(set(labels)))}
id2label = {i: label for label, i in label2id.items()}
label_ids = [label2id[label] for label in labels]

# 🔠 Tokenize and save only if not already saved
tokenizer = AutoTokenizer.from_pretrained(GOEMOTIONS_MODEL_HF)

if not os.path.exists(TOKENIZED_OUTPUT):
    encodings = tokenizer(texts, padding=True, truncation=True, return_tensors="pt")
    torch.save(encodings, TOKENIZED_OUTPUT)
    logger.info("✅ Tokenized inputs saved.")
else:
    logger.warning(f"⚠️ Skipping: {TOKENIZED_OUTPUT} already exists.")

if not os.path.exists(LABELS_OUTPUT):
    torch.save(torch.tensor(label_ids), LABELS_OUTPUT)
    logger.info("✅ Label tensor saved.")
else:
    logger.warning(f"⚠️ Skipping: {LABELS_OUTPUT} already exists.")

if not os.path.exists(LABEL_MAPPING_OUTPUT):
    with open(LABEL_MAPPING_OUTPUT, "w") as f:
        json.dump(label2id, f, indent=2)
    logger.info("✅ Label mapping saved.")
else:
    logger.warning(f"⚠️ Skipping: {LABEL_MAPPING_OUTPUT} already exists.")

# ✅ NEW: Generate sample_df for later neuron silencing evaluation
if not os.path.exists(SAMPLE_OUTPUT_JSON):
    logger.info("📄 Creating and saving sample_df.json for evaluation hooks...")
    sample_rows = []
    for text in texts:
        encoded = tokenizer(text, truncation=True, padding="max_length", max_length=128)
        sample_rows.append({
            "input_ids": encoded["input_ids"],
            "attention_mask": encoded["attention_mask"]
        })
    sample_df = pd.DataFrame(sample_rows)
    sample_df.to_json(SAMPLE_OUTPUT_JSON, orient="records", lines=True)
    logger.info(f"✅ sample_df saved to {SAMPLE_OUTPUT_JSON}")
else:
    logger.warning(f"⚠️ Skipping: {SAMPLE_OUTPUT_JSON} already exists.")

# Summary
logger.info("🧠 Emotions (IDs): %s", sorted(label2id.keys()))
logger.info("🔢 Label mapping: %s", label2id)

## Inference

In [None]:
from tqdm import tqdm
import torch

# Define the target GoEmotions IDs
target_emotion_names = ["anger", "disgust", "fear", "joy", "sadness", "surprise"]

# Load emotion mapping
with open(EMOTIONS_FILE, "r") as f:
    id2emotion = [line.strip() for line in f.readlines()]
emotion2id = {e: i for i, e in enumerate(id2emotion)}

target_ids = [emotion2id[e] for e in target_emotion_names]
target_ids_tensor = torch.tensor(target_ids).to(device_goemo)

# Map GoEmotions IDs → local labels
label2id = {goid: i for i, goid in enumerate(target_ids)}
id2label = {i: goid for goid, i in label2id.items()}

print(f"🎯 Target GoEmotions IDs: {target_ids}")
print(f"🗺️ Mapping to local labels: {label2id}")

# Load model_goem
from transformers import AutoModelForSequenceClassification
model_goem = AutoModelForSequenceClassification.from_pretrained(GOEMOTIONS_MODEL_HF)
model_goem.to(device_goemo)
model_goem.eval()

# Load data
inputs = torch.load(TOKENIZED_OUTPUT, weights_only=False)
labels = torch.load(LABELS_OUTPUT).tolist()

predictions = []
true_labels = []

with torch.no_grad():
    for i in tqdm(range(len(labels))):
        input_ids = inputs["input_ids"][i].unsqueeze(0).to(device_goemo)
        attention_mask = inputs["attention_mask"][i].unsqueeze(0).to(device_goemo)

        logits = model_goem(input_ids=input_ids, attention_mask=attention_mask).logits.squeeze(0)

        selected_logits = logits[target_ids_tensor]
        pred_local = torch.argmax(selected_logits).item()

        predictions.append(pred_local)
        true_labels.append(labels[i])  # already 0–5



In [None]:
# ✅ Report
accuracy = accuracy_score(true_labels, predictions)
f1 = f1_score(true_labels, predictions, average="weighted")
ordered_labels = sorted(label2id.values())

report = classification_report(
    true_labels,
    predictions,
    labels=ordered_labels,
    target_names=[id2label[i] for i in ordered_labels],
    output_dict=True,
    zero_division=0
)

report_df = pd.DataFrame(report).transpose().round(4)
accuracy_row = pd.DataFrame({
    'precision': [""],
    'recall': [""],
    'f1-score': [accuracy],
    'support': [sum(report_df["support"])]
}, index=["overall_accuracy"])

final_df = pd.concat([report_df, accuracy_row])

if not os.path.exists(CSV_REPORT_PATH):
    final_df.to_csv(CSV_REPORT_PATH)
    print(f"✅ Report saved to {CSV_REPORT_PATH}")
else:
    print(f"⚠️ Skipping save: {CSV_REPORT_PATH} already exists.")

print(f"✅ Accuracy: {accuracy:.4f}")
print(f"✅ F1 Score: {f1:.4f}")

## Dataset Wrapper and DataLoader (Goemotions)

In [None]:
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

class GoEmotionsDataset(Dataset):
    def __init__(self, input_ids, labels):
        self.input_ids = input_ids
        self.labels = labels

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return torch.tensor(self.input_ids[idx]), torch.tensor(self.labels[idx])

# Load input IDs and labels from disk
input_data = torch.load(TOKENIZED_OUTPUT, weights_only=False)
labels = torch.load(LABELS_OUTPUT).tolist()

input_ids_list = input_data["input_ids"].tolist()

# Create dataset and dataloader
dataset = GoEmotionsDataset(input_ids_list, labels)
dataloader = DataLoader(dataset, batch_size=4, shuffle=False)

logger.info("✅ Dataloader created successfully.")

## Extract Activations

In [None]:
if os.path.exists(ACTIVATIONS_GOEMOTIONS):
    logger.info(f"⚡ Activations already exist at {ACTIVATIONS_GOEMOTIONS}. Skipping extraction.")
else:
    logger.info("🚀 Starting activation extraction from model (CLS token only).")
    
    transformers_extractor.extract_representations(
        model=model_goem,
        input_tokens_list=input_ids_list,   
        output_file=ACTIVATIONS_GOEMOTIONS,
        device=device_goemo,
        output_type="json",                
        decompose_layers=False,
        filter_layers=None
    )

    logger.info(f"✅ Activations successfully saved to {ACTIVATIONS_GOEMOTIONS}")

## Load Activations

In [None]:
import torch
import numpy as np
import logging
from collections import defaultdict
from sklearn.preprocessing import LabelEncoder

logger = logging.getLogger(__name__)

def create_tensors_goemo(tokens_data, activations, task_specific_tag="NN", task_type="classification", dtype=torch.float32):
    """
    Create input/output tensors from CLS activations and labels for classification tasks.

    Args:
        tokens_data (list): List of dicts with keys "tokens" and "target"
        activations (list): List of numpy arrays with CLS activations
        task_specific_tag (str): Not used for CLS, kept for compatibility
        task_type (str): "classification" or "regression"
        dtype (torch.dtype): Data type of the tensors

    Returns:
        X (torch.Tensor): Input features (num_samples, num_layers * hidden_size)
        y (torch.Tensor): Labels
        mapping (tuple): label2idx, idx2label, None, None
    """

    logger.info("🔄 Creating tensors from activations and labels")

    # Number of samples
    num_samples = len(tokens_data)
    assert num_samples == len(activations), "Mismatch between tokens and activations"

    logger.info(f"🧪 Number of samples: {num_samples}")

    # Flatten each activation: (num_layers, 1, hidden_dim) → (num_layers * hidden_dim)
    X = []
    for i, sample in enumerate(activations):
        if sample.ndim == 3 and sample.shape[1] == 1:
            flattened = sample.squeeze(1).flatten()
        elif sample.ndim == 2:
            flattened = sample.flatten()
        else:
            raise ValueError(f"Unexpected shape for activation {i}: {sample.shape}")
        X.append(flattened)
    X = np.array(X)

    
    # Encode labels
    labels = [sample["target"] for sample in tokens_data]
    label_encoder = LabelEncoder()
    y = label_encoder.fit_transform(labels)

    # Logging label mapping
    label2idx = {label: int(idx) for idx, label in enumerate(label_encoder.classes_)}
    idx2label = {int(idx): label for label, idx in label2idx.items()}
    logger.info(f"🔢 Labels mapping: {label2idx}")

    return (
        torch.tensor(X, dtype=dtype),
        torch.tensor(y),
        (label2idx, idx2label, None, None)
    )

In [None]:
from neurox.data.loader import load_activations
from neurox.interpretation import utils

# ⚡ Load activations
activations, num_layers = load_activations(ACTIVATIONS_GOEMOTIONS)
logger.info(f"✅ Activations loaded from {ACTIVATIONS_GOEMOTIONS} with {num_layers} layers")

# 🧠 Prepare dataset with correct structure
sentence_data = [{"tokens": ["[CLS]"], "target": label} for label in labels]

# 📦 Convert to tensors
X, y, mapping = create_tensors_goemo(
    sentence_data,
    activations,
    task_specific_tag="NN",
    task_type="classification"
)

label2idx, idx2label, _, _ = mapping
logger.info("✅ Tensors and label mappings created successfully")

## Train Probe

In [None]:
# ✅ Convert tensors to numpy arrays (required by train_logistic_regression_probe)
X_np = X.numpy() if isinstance(X, torch.Tensor) else X
y_np = y.numpy() if isinstance(y, torch.Tensor) else y

# 🧪 Train logistic regression probe
logger.info("🔧 Training logistic regression probe")
probe = linear_probe.train_logistic_regression_probe(
    X_np, y_np,
    lambda_l1=1.1,
    lambda_l2=1.1
)

# 🧾 Evaluate the trained probe
logger.info("📈 Evaluating the probe")
scores = linear_probe.evaluate_probe(probe, X_np, y_np, idx_to_class=idx2label)
logger.info(f"🎯 Probe evaluation results:\n{scores}")

# 🔍 Get top neurons
top_neurons_probe, per_class_top_neurons = linear_probe.get_top_neurons(
    probe,
    percentage=0.1,
    class_to_idx=label2idx
)
logger.info(f"🧠 Top global neurons: {top_neurons_probe}")
logger.info(f"🧠 Top neurons per class: {per_class_top_neurons}")

## Global Silencing

## Silencing Functions

In [None]:
# ✅ Load tokenized input and labels
sample_df = pd.read_json(SAMPLE_OUTPUT_JSON, lines=True)
labels_list = torch.load(LABELS_OUTPUT).tolist()

In [None]:
import random

hidden_dim = model_goem.config.hidden_size
num_layers = model_goem.config.num_hidden_layers
total_neurons = num_layers * hidden_dim
num_to_silence = int(0.1 * total_neurons)  # mismo porcentaje que top global

random_indices = random.sample(range(total_neurons), num_to_silence)

silence_top_global_percentage_and_evaluate(
    model=model_goem,
    sample_df=sample_df,
    labels_list=labels_list,
    probe=None,  # No se usa
    label2idx=label2idx,
    percentage=None,
    experiment_title="Silencing random 10% global neurons",
    report_path="data/goemotions/classification_report_random_silencing.csv",
    custom_indices=random_indices
)

In [None]:
# percentages = [0.025, 0.05, 0.075, 0.10, 0.125, 0.15, 0.175, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50, 0.65, 0.75, 0.8, 0.95]
percentages = [0.1]

for pct in percentages:
    silence_top_global_percentage_and_evaluate(
        model=model_goem,                    # o tu variable del modelo cargado
        sample_df=sample_df,
        labels_list=labels_list,
        probe=probe,
        label2idx=label2idx,
        percentage=pct,
        experiment_title=f"Silencing {pct*100:.1f}% Global Neurons",
        report_path=CSV_REPORT_GOBAL_SILENCING
    )

In [None]:
import pandas as pd

# 🧐 Ver las primeras filas
print(sample_df.head())

# 🔍 Ver los tipos de cada columna
print(sample_df.dtypes)

# 🧪 Ver si hay valores nulos
print(sample_df.isnull().sum())

# Diagnostico

In [None]:
# Verifica dimensiones del probe
print("🔍 probe.linear.weight shape:", probe.linear.weight.shape)

# Debería dar (num_classes, total_neurons) → en tu caso: (6, 9216)

In [None]:
hidden_dim = model_goem.config.hidden_size
num_layers = model_goem.config.num_hidden_layers

layer_counts = {i: 0 for i in range(num_layers)}
for idx in top_neurons_probe:
    layer = idx // hidden_dim
    layer_counts[layer] += 1

print("Neuronas silenciadas por capa (top global):")
for l in range(num_layers):
    print(f"  Layer {l}: {layer_counts[l]} neurons")

In [None]:
print("🧷 label2idx:", label2idx)

# Crea tensor con las IDs originales (de GoEmotions)
target_goemotion_ids = torch.tensor(list(label2idx.keys()))
print("🎯 target_goemotion_ids:", target_goemotion_ids.tolist())

# Confirma si las posiciones corresponden 1:1 con etiquetas de `labels_list` que tú usas
print("🧪 Sample labels_list:", labels_list[:10])

In [None]:
# Usa solo 1 ejemplo
i = 0

# Sin silenciamiento todavía
input_ids_tensor = torch.tensor(sample_df.loc[i, 'input_ids']).unsqueeze(0).to(model_goem.device)
attention_mask_tensor = torch.tensor(sample_df.loc[i, 'attention_mask']).unsqueeze(0).to(model_goem.device)

with torch.no_grad():
    outputs = model_goem(input_ids=input_ids_tensor, attention_mask=attention_mask_tensor)
    logits = outputs.logits
    selected_logits = logits[:, target_goemotion_ids]  # [1, 6]
    pred_local = torch.argmax(selected_logits, dim=1).item()

print("🔢 Full logits:", logits.tolist())
print("🎯 Selected logits (target emotions):", selected_logits.tolist())
print("✅ Predicted class index (0–5):", pred_local)
print("🏷️ True label:", labels_list[i])

In [None]:
# ⚠️ Hook solo para capa 11 (donde vimos que hay muchas neuronas silenciadas)


encoder_layers = get_encoder_layers(model_goem)
hook_handles = []

for i in range(num_layers):
    indices_layer = [idx - i * hidden_dim for idx in top_neurons_global if i * hidden_dim <= idx < (i + 1) * hidden_dim]
    if indices_layer:
        handle = encoder_layers[i].output.register_forward_hook(make_cls_silence_hook2(indices_layer))
        hook_handles.append(handle)

# Misma inferencia que antes
with torch.no_grad():
    outputs = model_goem(input_ids=input_ids_tensor, attention_mask=attention_mask_tensor)
    logits = outputs.logits
    selected_logits = logits[:, target_goemotion_ids]
    pred_local = torch.argmax(selected_logits, dim=1).item()

print("🧪 Silenced logits:", selected_logits.tolist())
print("🎯 New prediction:", pred_local)

# Limpiar hooks
for h in hook_handles:
    h.remove()

In [None]:
# Ejecuta una sola inferencia para ver qué capa da output y qué se usa como input al classifier
with torch.no_grad():
    outputs = model_goem(input_ids=input_ids_tensor, attention_mask=attention_mask_tensor, output_hidden_states=True)
    hidden_states = outputs.hidden_states  # Tuple: (layer_0, ..., layer_n)
    final_hidden = hidden_states[-1]  # shape: [1, seq_len, hidden_size]

    print("🧠 Final hidden state shape:", final_hidden.shape)
    print("🔍 CLS vector (posición 0):", final_hidden[:, 0, :].abs().sum().item())

    logits = model_goem.classifier(final_hidden[:, 0, :])  # ¿así lo hace el modelo?
    print("🎯 Recomputed logits from CLS:", logits)

In [None]:
import torch
from transformers import AutoModel
import logging

# Suponemos que ya tienes estas variables cargadas:
# model_goem, input_ids_tensor, attention_mask_tensor, top_neurons_global, make_cls_silence_hook2, get_encoder_layers

# Configura el logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# Accede a capas internas
hidden_dim = model_goem.config.hidden_size
num_layers = model_goem.config.num_hidden_layers
encoder_layers = get_encoder_layers(model_goem)

# --- 1. Inference sin silenciar (sin hook) ---
with torch.no_grad():
    outputs_original = model_goem(
        input_ids=input_ids_tensor,
        attention_mask=attention_mask_tensor,
        output_hidden_states=True
    )
    hidden_states_original = outputs_original.hidden_states[-1][:, 0, :]  # [CLS] final layer

# --- 2. Aplicar hooks para silenciar ---
hook_handles = []
for i in range(num_layers):
    indices_layer = [idx - i * hidden_dim for idx in top_neurons_global if i * hidden_dim <= idx < (i + 1) * hidden_dim]
    if indices_layer:
        handle = encoder_layers[i].output.register_forward_hook(make_cls_silence_hook2(indices_layer))
        hook_handles.append(handle)

# --- 3. Inference con neuronas silenciadas ---
with torch.no_grad():
    outputs_silenced = model_goem(
        input_ids=input_ids_tensor,
        attention_mask=attention_mask_tensor,
        output_hidden_states=True
    )
    hidden_states_silenced = outputs_silenced.hidden_states[-1][:, 0, :]  # [CLS] silenciado

# --- 4. Comparar los vectores CLS ---
diff = torch.abs(hidden_states_original - hidden_states_silenced)
logger.info(f"🔍 CLS difference after silencing: mean={diff.mean()}, max={diff.max()}")

# Limpieza: eliminar hooks
for handle in hook_handles:
    handle.remove()

In [None]:
for i, layer in enumerate(get_encoder_layers(model_goem)):
    print(f"Layer {i} -> {layer.__class__.__name__}")

## Second try

In [None]:
import torch

def make_cls_silence_hook(indices: list[int]):
    # crea un tensor de índices (vacío si no hay nada que silenciar)
    indices_tensor = torch.tensor(indices, dtype=torch.long) if indices else torch.tensor([], dtype=torch.long)

    def hook(module, input, output):
        # sólo intervenimos si es un Tensor
        if not isinstance(output, torch.Tensor):
            return output

        out = output.clone()
        idx = indices_tensor.to(out.device)

        if out.dim() == 3:
            # batch × seq_len × hidden
            cls = out[:, 0, :]                      # (batch, hidden)
            mask = torch.ones_like(cls)
            if idx.numel() > 0:
                mask[:, idx] = 0.0
            out[:, 0, :] = cls * mask
        elif out.dim() == 2:
            # batch × hidden  (ej. pooler)
            mask = torch.ones_like(out)
            if idx.numel() > 0:
                mask[:, idx] = 0.0
            out = out * mask

        return out

    return hook

In [None]:
# ──────────────────────────────────────────────────────────────────────────────
# 🔍 2️⃣ Celda: Entrenar el probe y extraer los TOP_NEURONS
# ──────────────────────────────────────────────────────────────────────────────

# (a) Entrena tu logistic regression probe como antes
probe = linear_probe.train_logistic_regression_probe(
    X_np, y_np,
    lambda_l1=0.001,
    lambda_l2=0.001
)

# (b) Obtén los índices globales de las neuronas más relevantes
#     `top_neurons_probe` es una lista de enteros en [0, num_layers*hidden_size)
top_neurons_probe, per_class_top_neurons = linear_probe.get_top_neurons(
    probe,
    percentage=0.10,        # 10% de las neuronas
    class_to_idx=label2idx
)

# (c) Ahora sí definimos `global_indices` para usar en el hook
global_indices = top_neurons_probe

# 📋 Comprueba un par de valores:
print(f"Número total de neuronas a silenciar: {len(global_indices)}")
print(f"Primeros 10 índices globales: {global_indices[:10]}")  # deben estar entre 0 y hidden_size*num_layers-1

In [None]:
from collections import defaultdict

hidden_size = model_goem.config.hidden_size      # p.ej. 768
num_layers  = model_goem.config.num_hidden_layers  # p.ej. 12

layer_to_indices = defaultdict(list)
for gidx in global_indices:
    layer_idx  = gidx // hidden_size
    neuron_idx = gidx %  hidden_size
    layer_to_indices[layer_idx].append(int(neuron_idx))

# Verifica que todo esté correcto
for L in sorted(layer_to_indices):
    print(f"Capa {L}: {len(layer_to_indices[L])} neuronas")

In [None]:
handles = []  # para luego removerlos

for layer_idx, layer in enumerate(model_goem.bert.encoder.layer):
    idxs = layer_to_indices.get(layer_idx, [])
    if idxs:
        h = layer.output.LayerNorm.register_forward_hook(
            make_cls_silence_hook(idxs)
        )
        handles.append(h)

In [None]:
# Si quieres además silenciar en la salida del pooler:
final_layer_idxs = layer_to_indices.get(num_layers-1, [])
if final_layer_idxs:
    h = model_goem.bert.pooler.dense.register_forward_hook(
        make_cls_silence_hook(final_layer_idxs)
    )
    handles.append(h)

In [None]:
from sklearn.metrics import accuracy_score, f1_score, classification_report
import torch

def evaluate_silenced_model(
    model,
    sample_df,
    labels_list,
    target_ids_tensor,
    device,
    label2idx
):
    """
    Corre inferencia en el modelo (con hooks ya activos),
    selecciona sólo los logits de los target_ids_tensor,
    calcula accuracy, f1 y classification_report.
    """
    model.eval()
    preds, trues = [], []
    with torch.no_grad():
        for i, row in sample_df.iterrows():
            input_ids   = torch.tensor(row["input_ids"]).unsqueeze(0).to(device)
            attention_mask = torch.tensor(row["attention_mask"]).unsqueeze(0).to(device)

            logits = model(input_ids=input_ids, attention_mask=attention_mask).logits.squeeze(0)
            sel_logits = logits[target_ids_tensor]
            pred_local = torch.argmax(sel_logits).item()

            preds.append(pred_local)
            trues.append(labels_list[i])

    acc = accuracy_score(trues, preds)
    f1  = f1_score(trues, preds, average="weighted", zero_division=0)
    report_dict = classification_report(
        trues,
        preds,
        labels=list(label2idx.values()),
        target_names=[str(k) for k in sorted(label2idx.keys())],
        zero_division=0,
        output_dict=True
    )
    return {"accuracy": acc, "f1": f1, "report": report_dict}

In [None]:
import torch
from collections import defaultdict

def silence_top_global_percentage_and_evaluate(
    model,
    sample_df,
    labels_list,
    probe,
    label2idx,
    percentage=0.10,
    experiment_title="Silencing neurons",
    report_path=None,
    custom_indices=None      # <-- nuevo parámetro
):
    # ① Selección de índices
    if custom_indices is not None:
        top_neurons = custom_indices
        print(f"🔧 Silencing custom list of {len(top_neurons)} neurons")
    else:
        # si no se pasa custom_indices, volvemos a llamar al probe (no existe shadowing aquí)
        top_neurons, _ = linear_probe.get_top_neurons(
            probe, percentage=percentage, class_to_idx=label2idx
        )
        print(f"🔧 Silencing {len(top_neurons)} neurons from probe (percentage={percentage})")

    # ② Mapeo global→por capa
    hidden_size = model.config.hidden_size
    layer_to_indices = defaultdict(list)
    for gidx in top_neurons:
        layer_idx  = gidx // hidden_size
        neuron_idx = gidx %  hidden_size
        layer_to_indices[layer_idx].append(int(neuron_idx))

    # ③ Registro de hooks
    handles = []
    for layer_idx, indices in layer_to_indices.items():
        if not indices:
            continue
        # hook en la LayerNorm de cada encoder.layer
        h = model.bert.encoder.layer[layer_idx].output.LayerNorm.register_forward_hook(
            make_cls_silence_hook(indices)
        )
        handles.append(h)

    # (Opcional) pooler
    final_idxs = layer_to_indices.get(model.config.num_hidden_layers-1, [])
    if final_idxs:
        h = model.bert.pooler.dense.register_forward_hook(
            make_cls_silence_hook(final_idxs)
        )
        handles.append(h)

    # ─── ④ Evaluación ────────────────────────────────────────────────
    from pathlib import Path
    # asegúrate de tener target_ids_tensor disponible en el scope
    silence_scores = evaluate_silenced_model(
        model,
        sample_df,
        labels_list,
        target_ids_tensor,   # tu tensor con los IDs GoEmotions
        device_goemo,
        label2idx
    )

    # guardar el classification_report en CSV
    report_df = (
        pd.DataFrame(silence_scores["report"])
          .transpose()
          .round(4)
    )
    # añadir fila de accuracy/f1 global si quieres
    report_df.to_csv(report_path, index=True)
    print(f"✅ Accuracy after silencing: {silence_scores['accuracy']:.4f}")
    print(f"✅ F1 Score after silencing: {silence_scores['f1']:.4f}")

    # ─── ⑤ Remove hooks ──────────────────────────────────────────────
    for h in handles:
        h.remove()
    print("✅ All hooks removed")

In [None]:
# tras cargar sample_df, labels_list y probe como hacías
silence_top_global_percentage_and_evaluate(
    model=model_goem,
    sample_df=sample_df,
    labels_list=labels_list,
    probe=probe,
    label2idx=label2idx,
    percentage=0.8,
    experiment_title="Silencing 10% Global Neurons",
    report_path=CSV_REPORT_GOBAL_SILENCING
)

In [None]:
for h in handles:
    h.remove()