<a href="https://colab.research.google.com/github/janbanot/msc-project/blob/main/test_notebooks/test_steering.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ===================================================
# 1. IMPORTY i KONFIGURACJA
# ===================================================

import os
import re
import numpy as np
import pandas as pd
import torch
from datetime import datetime
from datasets import Dataset
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
)

from google.colab import drive
drive.mount('/drive')

# Parametry eksperymentu
ALPHA_VALUES = [-10.0, -20.0, -50.0]

N_SAMPLES_PER_CLASS = 25  # Liczba przykładów na klasę
TARGET_LAYER_INDEX = 3  # Warstwa do interwencji 3 / 5
CLASSIFICATION_THRESHOLD = 0.5  # Próg klasyfikacji
MAX_SEQUENCE_LENGTH = 256

# Ścieżki (dostosuj do swojego środowiska)
DATA_PATH = "/drive/MyDrive/msc-project/jigsaw-toxic-comment/train.csv"
MODEL_CHECKPOINT = "/drive/MyDrive/msc-project/models/distilbert-jigsaw-full_20260125_133112"
TIMESTAMP = datetime.now().strftime("%Y%m%d_%H%M%S")
RESULTS_DIR = f"/drive/MyDrive/msc-project/steering_test_{TIMESTAMP}"

# Urządzenie
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Uruchomiono na: {device}")

# Tworzenie katalogu wyników
os.makedirs(RESULTS_DIR, exist_ok=True)
print(f"Wyniki zostaną zapisane w: {RESULTS_DIR}")

In [None]:
# ===================================================
# 2. FUNKCJE POMOCNICZE (z test.ipynb)
# ===================================================

def clean_text(example):
    """Czyści tekst komentarza."""
    text = example["comment_text"]
    text = text.lower()
    text = re.sub(r"http\S+|www\S+", "", text)
    text = re.sub(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", "", text)
    text = re.sub(r"\(talk\)", "", text)
    text = re.sub(r"\d{2}:\d{2}, \w+ \d{1,2}, \d{4} \(utc\)", "", text)
    text = text.replace("\n", " ").replace("\xa0", " ")
    text = text.strip(' "')
    text = re.sub(r"\s+", " ", text).strip()
    example["comment_text"] = text
    return example


def prepare_environment():
    """Wczytuje dane, tokenizuje i ładuje model."""
    print(">>> Wczytywanie i przetwarzanie danych...")

    # Wczytanie danych
    df = pd.read_csv(DATA_PATH).head(10000)  # Więcej danych dla lepszego wektora
    dataset = Dataset.from_pandas(df)
    dataset = dataset.map(clean_text)

    # Tokenizer
    print(f">>> Ładowanie tokenizera z: {MODEL_CHECKPOINT}")
    tokenizer = AutoTokenizer.from_pretrained(MODEL_CHECKPOINT)

    # Tokenizacja
    def tokenize_function(examples):
        return tokenizer(
            examples["comment_text"],
            padding="max_length",
            truncation=True,
            max_length=MAX_SEQUENCE_LENGTH,
        )

    tokenized_dataset = dataset.map(tokenize_function, batched=True)

    # Etykiety
    def create_labels(example):
        example["labels"] = [float(example["toxic"])]
        return example

    final_dataset = tokenized_dataset.map(create_labels)
    final_dataset.set_format("torch", columns=["input_ids", "attention_mask", "labels"])

    # Podział
    splits = final_dataset.train_test_split(test_size=0.2, seed=42)
    eval_dataset = splits["test"]

    # Model
    print(f">>> Ładowanie modelu z: {MODEL_CHECKPOINT}")
    model = AutoModelForSequenceClassification.from_pretrained(
        MODEL_CHECKPOINT, num_labels=1, problem_type="single_label_classification"
    )
    model.to(device)
    model.eval()

    print(">>> Środowisko gotowe.")
    return model, tokenizer, eval_dataset


# Inicjalizacja
model, tokenizer, eval_dataset = prepare_environment()

In [None]:
# ===================================================
# 3. OBLICZANIE WEKTORA STERUJĄCEGO
# ===================================================

print("\n=== 3. OBLICZANIE WEKTORA STERUJĄCEGO ===")

# Podział: 800 próbek do wyznaczenia wektora, reszta do testów (brak wycieku danych)
N_FOR_VECTOR = 800
vector_subset = eval_dataset.select(range(N_FOR_VECTOR))
test_subset = eval_dataset.select(range(N_FOR_VECTOR, len(eval_dataset)))

print(f"Próbek do wektora: {len(vector_subset)} | Próbek do testów: {len(test_subset)}")

# Ekstrakcja aktywacji z warstwy TARGET_LAYER_INDEX
layers_data = {TARGET_LAYER_INDEX: []}
all_labels = []

loader = torch.utils.data.DataLoader(vector_subset, batch_size=32)

print(f"Ekstrakcja aktywacji z warstwy {TARGET_LAYER_INDEX}...")
for batch in loader:
    input_ids = batch["input_ids"].to(device)
    mask = batch["attention_mask"].to(device)
    labels = batch["labels"][:, 0].cpu().numpy()

    with torch.no_grad():
        out = model(input_ids, attention_mask=mask, output_hidden_states=True)

    all_labels.extend(labels)
    # Token [CLS] (indeks 0)
    layers_data[TARGET_LAYER_INDEX].append(
        out.hidden_states[TARGET_LAYER_INDEX][:, 0, :].cpu().numpy()
    )

# Połączenie aktywacji
X = np.concatenate(layers_data[TARGET_LAYER_INDEX], axis=0)
y = np.array(all_labels)
y_bin = (y > CLASSIFICATION_THRESHOLD).astype(int)

print("\nStatystyki próbek:")
print(f"  - Toksyczne: {np.sum(y_bin == 1)}")
print(f"  - Bezpieczne: {np.sum(y_bin == 0)}")
print(f"  - Kształt aktywacji: {X.shape}")

# Obliczenie kierunku
mean_toxic = np.mean(X[y_bin == 1], axis=0)
mean_safe = np.mean(X[y_bin == 0], axis=0)
direction = mean_toxic - mean_safe

print("\n=== STATYSTYKI WEKTORA PRZED NORMALIZACJĄ ===")
print(f"Kształt wektora: {direction.shape}")
print(f"Mean: {np.mean(direction):.6f}")
print(f"Std: {np.std(direction):.6f}")
print(f"Min: {np.min(direction):.6f}")
print(f"Max: {np.max(direction):.6f}")
print(f"L2 Norm: {np.linalg.norm(direction):.6f}")

# --- NORMALIZACJA L2 ---
# Dzięki temu alpha będzie oznaczać "o ile jednostek odchylenia standardowego/normy przesuwamy aktywacje"
direction_normed = direction / np.linalg.norm(direction)

print("\n=== STATYSTYKI WEKTORA PO NORMALIZACJI L2 ===")
print(f"Nowa norma L2: {np.linalg.norm(direction_normed):.6f}")

# Top-10 największych elementów wektora
top_indices = np.argsort(np.abs(direction_normed))[-10:][::-1]
print("\nTop-10 największych elementów wektora (indeks | wartość):")
for idx in top_indices:
    print(f"  [{idx:4d}] {direction_normed[idx]:.6f}")

# Zapis wektora do pliku
steering_tensor = torch.tensor(direction_normed, dtype=torch.float32).to(device)
np.save(f"{RESULTS_DIR}/steering_vector.npy", direction_normed)
print(f"\nWektor znormalizowany. Nowa norma L2: {np.linalg.norm(direction_normed):.2f}")
print(f"Wektor zapisany do: {RESULTS_DIR}/steering_vector.npy")

In [None]:
# ===================================================
# 4. KLASA HOOK DO STEROWANIA
# ===================================================

class SteeringHook:
    """
    PyTorch hook modyfikujący hidden states poprzez dodanie wektora sterującego.
    """

    def __init__(self, vector, coeff):
        self.vector = vector
        self.coeff = coeff

    def __call__(self, module, inputs, output):
        is_tuple = isinstance(output, tuple)
        hidden_states = output[0] if is_tuple else output
        steering_vector = self.vector.to(hidden_states.device, dtype=hidden_states.dtype)
        modified_hidden = hidden_states + (self.coeff * steering_vector)

        if is_tuple:
            return (modified_hidden,) + output[1:]
        else:
            return modified_hidden

print(">>> SteeringHook zdefiniowany.")

In [None]:
# ===================================================
# 5. FUNKCJA TESTOWANIA - WERSJA Z RANKINGIEM PEWNOŚCI
# ===================================================

def test_steering_with_detailed_logs(
    model, tokenizer, dataset, steering_vector, alpha_value
):
    print(f"\n{'='*30}")
    print(f"TESTOWANIE ALPHA = {alpha_value}")
    print(f"{'='*30}")

    model.eval()

    # --- KROK 1: INTELIGENTNY WYBÓR PRÓBEK (RANKING) ---
    print(">>> Analizowanie bazy próbek dla testu...")
    all_scores = []

    # Sprawdzamy pierwsze 300 próbek z test_subset, żeby znaleźć najlepszych kandydatów
    search_range = min(300, len(dataset))
    for i in range(search_range):
        input_ids = dataset[i]["input_ids"].unsqueeze(0).to(device)
        mask = dataset[i]["attention_mask"].unsqueeze(0).to(device)
        label = dataset[i]["labels"][0].item()

        with torch.no_grad():
            logits = model(input_ids, attention_mask=mask).logits
            prob = torch.sigmoid(logits)[0, 0].item()

        all_scores.append({'idx': i, 'prob': prob, 'label': label})

    # Wybieramy 25 toksycznych, które model uważa za NAJBARDZIEJ toksyczne (Label 1, najwyższe Prob)
    toxic_candidates = [x for x in all_scores if x['label'] == 1]
    toxic_indices = [x['idx'] for x in sorted(toxic_candidates, key=lambda x: x['prob'], reverse=True)[:N_SAMPLES_PER_CLASS]]

    # Wybieramy 25 bezpiecznych, które model uważa za NAJMNIEJ toksyczne (Label 0, najniższe Prob)
    safe_candidates = [x for x in all_scores if x['label'] == 0]
    safe_indices = [x['idx'] for x in sorted(safe_candidates, key=lambda x: x['prob'])[:N_SAMPLES_PER_CLASS]]

    print(f"Wybrano do testu: {len(toxic_indices)} toksycznych i {len(safe_indices)} bezpiecznych.")
    if len(safe_indices) > 0:
        print(f"Średnie Prob dla Safe przed steeringiem: {np.mean([all_scores[i]['prob'] for i in safe_indices]):.3f}")

    layer_module = model.distilbert.transformer.layer[TARGET_LAYER_INDEX]
    results = []
    toxic_deltas, safe_deltas = [], []
    success_count, side_effect_count = 0, 0

    # --- KROK 2: WŁAŚCIWA INTERWENCJA (STEERING) ---
    def run_inference(indices, label_type):
        nonlocal success_count, side_effect_count
        if not indices: return

        for idx in indices:
            input_ids = dataset[idx]["input_ids"].unsqueeze(0).to(device)
            mask = dataset[idx]["attention_mask"].unsqueeze(0).to(device)

            # 1. POMIAR PRZED
            with torch.no_grad():
                prob_before = torch.sigmoid(model(input_ids, mask).logits)[0, 0].item()

            # 2. POMIAR PO (z hookiem)
            hook = SteeringHook(steering_vector, alpha_value)
            handle = layer_module.register_forward_hook(hook)
            with torch.no_grad():
                prob_after = torch.sigmoid(model(input_ids, mask).logits)[0, 0].item()
            handle.remove()

            delta = prob_after - prob_before

            if label_type == "TOXIC":
                success = prob_after < CLASSIFICATION_THRESHOLD
                status = "SUCCESS" if success else "FAILED"
                if success: success_count += 1
                toxic_deltas.append(delta)
            else:
                side_effect = prob_after > CLASSIFICATION_THRESHOLD
                status = "SIDE-EFFECT" if side_effect else "OK"
                if side_effect: side_effect_count += 1
                safe_deltas.append(delta)

            text = tokenizer.decode(dataset[idx]["input_ids"], skip_special_tokens=True)
            results.append({
                "label": label_type, "alpha": alpha_value,
                "prob_before": prob_before, "prob_after": prob_after,
                "delta": delta, "status": status, "text": text[:100]
            })

            emoji = "✓" if "OK" in status or "SUCCESS" in status else "✗"
            print(f"{label_type} #{idx:4d} | {emoji} {status:11s} | {prob_before:.3f} -> {prob_after:.3f} (Δ: {delta:+.4f})")

    run_inference(toxic_indices, "TOXIC")
    run_inference(safe_indices, "SAFE")

    # Obliczanie średnich (zabezpieczone przed dzieleniem przez zero)
    success_rate = (success_count / len(toxic_indices) * 100) if toxic_indices else 0
    side_rate = (side_effect_count / len(safe_indices) * 100) if safe_indices else 0
    avg_d_tox = np.mean(toxic_deltas) if toxic_deltas else 0
    avg_d_safe = np.mean(safe_deltas) if safe_deltas else 0

    return pd.DataFrame(results), success_rate, side_rate, avg_d_tox, avg_d_safe

print(">>> Funkcja test_steering_with_detailed_logs zdefiniowana.")

In [None]:
# ===================================================
# 6. GŁÓWNA PĘTLA - TESTOWANIE DLA 3 WARTOŚCI ALPHA
# ===================================================

print("\n" + "="*70)
print("ROZPOCZĘCIE TESTOWANIA STEERINGU DLA RÓŻNYCH WARTOŚCI ALPHA")
print("="*70)

all_results = []
summary_data = []

for alpha in ALPHA_VALUES:
    df_result, success_rate, side_effect_rate, avg_delta_toxic, avg_delta_safe = test_steering_with_detailed_logs(
        model, tokenizer, test_subset, steering_tensor, alpha
    )
    all_results.append(df_result)
    summary_data.append({
        "alpha": alpha,
        "success_rate": success_rate,
        "side_effect_rate": side_effect_rate,
        "avg_delta_toxic": avg_delta_toxic,
        "avg_delta_safe": avg_delta_safe
    })

    # Zapis wyników dla tej wartości alpha
    df_result.to_csv(f"{RESULTS_DIR}/results_alpha_{alpha}.csv", index=False)
    print(f"\nWyniki zapisane do: {RESULTS_DIR}/results_alpha_{alpha}.csv")

print("\n" + "="*70)
print("Wszystkie testy zakończone.")
print("="*70)

In [None]:
# ===================================================
# 7. PODSUMOWANIE PORÓWNAWCZE
# ===================================================

print("\n" + "="*70)
print("PODSUMOWANIE PORÓWNAWCZE DLA WSZYSTKICH WARTOŚCI ALPHA")
print("="*70 + "\n")

df_summary = pd.DataFrame(summary_data)
print(df_summary.to_string(index=False))

# Zapis podsumowania
df_summary.to_csv(f"{RESULTS_DIR}/summary_comparison.csv", index=False)
print(f"\nPodsumowanie zapisane do: {RESULTS_DIR}/summary_comparison.csv")

# Analiza
print("\n" + "="*70)
print("ANALIZA")
print("="*70)

for idx, row in df_summary.iterrows():
    alpha = row['alpha']
    status = ""
    if row['success_rate'] > 80 and row['side_effect_rate'] < 5:
        status = "OPTIMALNE"
    elif row['success_rate'] > 80:
        status = "DOBRA SKUTECZNOŚĆ, WYSOKIE SIDE EFFECTS"
    elif row['side_effect_rate'] < 5:
        status = "NISKIE SIDE EFFECTS, SŁABA SKUTECZNOŚĆ"
    else:
        status = "WYMAGA DOSTROJENIA"

    print(f"\nAlpha = {alpha}: {status}")
    print(f"  - Success Rate: {row['success_rate']:.2f}% (cel: >80%)")
    print(f"  - Side Effects:  {row['side_effect_rate']:.2f}% (cel: <5%)")
    print(f"  - Średnia delta (toksyczne):  {row['avg_delta_toxic']:+.4f}")
    print(f"  - Średnia delta (bezpieczne): {row['avg_delta_safe']:+.4f}")

# Znajdź najlepszą wartość alpha
optimal_rows = df_summary[
    (df_summary['success_rate'] > 80) &
    (df_summary['side_effect_rate'] < 5)
]

if len(optimal_rows) > 0:
    best_idx = optimal_rows['success_rate'].idxmax()
    best_alpha = df_summary.loc[best_idx, 'alpha']
    print("\n" + "="*70)
    print(f"REKOMENDACJA: Najlepsza wartość alpha = {best_alpha}")
    print("="*70)
else:
    print("\n" + "="*70)
    print("ŻADNA WARTOŚĆ ALPHA NIE SPEŁNIA KRYTERIÓW OPTIMALNOŚCI")
    print("="*70)

In [None]:
# ===================================================
# 8. PRZYKŁADY SZCZEGÓŁOWE - PRZED I PO
# ===================================================

print("\n" + "="*70)
print("SZCZEGÓŁOWE PRZYKŁADY PRZED I PO STEERINGU")
print("="*70)

# Połącz wszystkie wyniki
df_all = pd.concat(all_results, ignore_index=True)

# Przykłady toksyczne przed i po
print("\n--- TOKSYCZNE PRZYKŁADY (detoksykacja) ---")
print("\nPrzykłady z największą zmianą (delta) - Alpha = -3.0:")
top_delta_toxic = df_all[(df_all['label'] == 'TOXIC') & (df_all['alpha'] == -3.0)].nlargest(5, 'delta')

for idx, row in top_delta_toxic.iterrows():
    print(f"\n{'─'*70}")
    print(f"Tekst: {row['text'][:150]}")
    print(f"Prawdopodobieństwo przed: {row['prob_before']:.4f}")
    print(f"Prawdopodobieństwo po:    {row['prob_after']:.4f}")
    print(f"Delta: {row['delta']:+.4f}")
    print(f"Status: {row['status']}")

# Przykłady bezpieczne przed i po
print(f"\n\n{'='*70}")
print("BEZPIECZNE PRZYKŁADY (efekty uboczne)")
print("="*70)

print("\nPrzykłady z efektami ubocznymi - Alpha = -3.0:")
side_effects = df_all[(df_all['label'] == 'SAFE') & (df_all['alpha'] == -3.0) & (df_all['status'] == 'SIDE-EFFECT')]

if len(side_effects) > 0:
    for idx, row in side_effects.iterrows():
        print(f"\n{'─'*70}")
        print(f"Tekst: {row['text'][:150]}")
        print(f"Prawdopodobieństwo przed: {row['prob_before']:.4f}")
        print(f"Prawdopodobieństwo po:    {row['prob_after']:.4f}")
        print(f"Delta: {row['delta']:+.4f}")
        print(f"Status: {row['status']} (efekt uboczny)")
else:
    print("\nBrak efektów ubocznych dla alpha = -3.0")

# Porównanie tego samego tekstu dla różnych wartości alpha
print(f"\n\n{'='*70}")
print("PORÓWNANIE TEGO SAMEGO TEKSTU DLA RÓŻNYCH ALPHA")
print("="*70)

# Wybierz losowy tekst toksyczny
if len(df_all[df_all['label'] == 'TOXIC']) > 0:
    sample_text = df_all[df_all['label'] == 'TOXIC']['text'].iloc[0]
    text_comparison = df_all[df_all['text'] == sample_text][['alpha', 'prob_before', 'prob_after', 'delta', 'status']]

    print(f"\nTekst: {sample_text[:150]}")
    print("\nPorównanie prawdopodobieństw przed steering (alpha = 0):")
    print(f"  Prob: {text_comparison['prob_before'].iloc[0]:.4f}")
    print("\nPorównanie prawdopodobieństw po steering dla różnych alpha:")
    for idx, row in text_comparison.iterrows():
        print(f"  Alpha = {row['alpha']:+.1f}: {row['prob_after']:.4f} (delta: {row['delta']:+.4f})")

# Zapis szczegółowych przykładów
df_all.to_csv(f"{RESULTS_DIR}/all_results_detailed.csv", index=False)
print(f"\n\nWszystkie wyniki zapisane do: {RESULTS_DIR}/all_results_detailed.csv")

In [None]:
# ===================================================
# 9. GENEROWANIE RAPORTU KOŃCOWEGO
# ===================================================

# Tworzenie raportu - czesc 1: naglowek
report = """
═══════════════════════════════════════════════════════════════════════════════
                    RAPORT Z TESTOWANIA STEERINGU
═══════════════════════════════════════════════════════════════════════════════

"""

# Dodanie daty i metadanych
report += f"Data: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
report += f"Model: {MODEL_CHECKPOINT}\n"
report += f"Warstwa: {TARGET_LAYER_INDEX}\n"
report += f"Próbki na klasę: {N_SAMPLES_PER_CLASS}\n\n"

# Dodanie podsumowania
report += """───────────────────────────────────────────────────────────────────────────────
                          PODSUMOWANIE
───────────────────────────────────────────────────────────────────────────────

"""
report += df_summary.to_string(index=False) + "\n\n"

# Dodanie wnioskow
report += """───────────────────────────────────────────────────────────────────────────────
                         WNIOSKI
───────────────────────────────────────────────────────────────────────────────

Wartość STEERING_ALPHA została dobrana eksperymentalnie. Cel to:
  - Success Rate > 80% (skuteczna detoksykacja)
  - Side Effects < 5% (minimalne fałszywe alarmy)

"""

# Analiza kazdej wartosci alpha
for idx, row in df_summary.iterrows():
    alpha = row['alpha']
    success = row['success_rate']
    side = row['side_effect_rate']

    if success > 80 and side < 5:
        status = "OPTIMALNE"
    elif success > 80:
        status = f"WYSOKIE SIDE EFFECTS ({side:.2f}%)"
    elif side < 5:
        status = f"SŁABA SKUTECZNOŚĆ ({success:.2f}%)"
    else:
        status = "WYMAGA DOSTROJENIA"

    report += f"\nAlpha = {alpha:+.1f}: {status}\n"
    report += f"  - Success Rate: {success:.2f}%\n"
    report += f"  - Side Effects:  {side:.2f}%\n"

# Dodanie listy plikow
report += """
───────────────────────────────────────────────────────────────────────────────
                        PLIKI WYNIKÓW
───────────────────────────────────────────────────────────────────────────────

"""
report += f"Katalog: {RESULTS_DIR}/\n\n"
report += """  - steering_vector.npy          : Wektor sterujący (768-dim)
  - summary_comparison.csv       : Porównanie wszystkich wartości alpha
  - all_results_detailed.csv    : Szczegółowe wyniki dla wszystkich przykładów
  - results_alpha_-1.0.csv      : Wyniki dla alpha = -1.0
  - results_alpha_-3.0.csv      : Wyniki dla alpha = -3.0
  - results_alpha_-5.0.csv      : Wyniki dla alpha = -5.0
  - steering_report.txt         : Ten raport

═══════════════════════════════════════════════════════════════════════════════
"""

# Wyświetlenie raportu
print(report)

# Zapis raportu
with open(f"{RESULTS_DIR}/steering_report.txt", "w", encoding="utf-8") as f:
    f.write(report)

print(f"\nRaport zapisany do: {RESULTS_DIR}/steering_report.txt")


In [None]:
# ===================================================
# 10. LISTA WYGENEROWANYCH PLIKÓW
# ===================================================

print("\n" + "="*70)
print("WYGENEROWANE PLIKI")
print("="*70)

files = os.listdir(RESULTS_DIR)
for f in sorted(files):
    file_path = os.path.join(RESULTS_DIR, f)
    if os.path.isfile(file_path):
        size = os.path.getsize(file_path) / 1024  # KB
        print(f"  {f:40s} ({size:8.2f} KB)")

print(f"\nKatalog wyników: {RESULTS_DIR}")