In [None]:
!git clone https://github.com/adhakimi/TransformerLens.git
%cd TransformerLens


Cloning into 'TransformerLens'...
remote: Enumerating objects: 4458, done.[K
remote: Counting objects: 100% (131/131), done.[K
remote: Compressing objects: 100% (99/99), done.[K
remote: Total 4458 (delta 85), reused 32 (delta 32), pack-reused 4327 (from 2)[K
Receiving objects: 100% (4458/4458), 21.71 MiB | 22.01 MiB/s, done.
Resolving deltas: 100% (2993/2993), done.
/content/TransformerLens


In [None]:
!pip install -e .

Obtaining file:///content/TransformerLens
  Installing build dependencies ... [?25l[?25hdone
  Checking if build backend supports build_editable ... [?25l[?25hdone
  Getting requirements to build editable ... [?25l[?25hdone
  Preparing editable metadata (pyproject.toml) ... [?25l[?25hdone
Collecting beartype<0.15.0,>=0.14.1 (from transformer-lens==0.0.0)
  Downloading beartype-0.14.1-py3-none-any.whl.metadata (28 kB)
Collecting better-abc<0.0.4,>=0.0.3 (from transformer-lens==0.0.0)
  Downloading better_abc-0.0.3-py3-none-any.whl.metadata (1.4 kB)
Collecting fancy-einsum>=0.0.3 (from transformer-lens==0.0.0)
  Downloading fancy_einsum-0.0.3-py3-none-any.whl.metadata (1.2 kB)
Collecting jaxtyping>=0.2.11 (from transformer-lens==0.0.0)
  Downloading jaxtyping-0.3.2-py3-none-any.whl.metadata (7.0 kB)
Collecting transformers-stream-generator<0.0.6,>=0.0.5 (from transformer-lens==0.0.0)
  Downloading transformers-stream-generator-0.0.5.tar.gz (13 kB)
  Preparing metadata (setup.py) 

In [None]:
import torch, json
from transformer_lens import HookedTransformer, HookedTransformerConfig
from transformer_lens.utils import test_prompt
import os, json
import gc
from typing import Union, Optional, Dict, Any, List, Tuple
from rich import print as rprint

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


### Model laden

In [None]:
%cd /content

from google.colab import drive, files
drive.mount("/content/drive")
drive_dir = "/content/drive/MyDrive/master_thesis/models/olmo"

from transformer_lens import HookedTransformer
import torch, os, json, dataclasses

model_name = "allenai/OLMo-7B-0424-hf"
model = HookedTransformer.from_pretrained(
    model_name,
    device="cpu",
    dtype=torch.float16,
    trust_remote_code=True,
    cache_dir="/content/olmo_cache"
)

os.makedirs(drive_dir, exist_ok=True)

weights_path = f"{drive_dir}/pytorch_model_fp16.bin"
state_fp16   = {k: v.cpu() for k, v in model.state_dict().items()}
torch.save(state_fp16, weights_path)

cfg_dict = dataclasses.asdict(model.cfg)
with open(f"{drive_dir}/config.json", "w") as f:
    json.dump(cfg_dict, f, indent=2, default=str)

!ls -lh "$drive_dir"
print("Block A abgeschlossen – Dateien liegen im Drive")

###Mount Model von Drive

In [None]:
from google.colab import drive
drive.mount("/content/drive")
drive_dir = "/content/drive/MyDrive/master_thesis/models/olmo"


with open(f"{drive_dir}/config.json") as f:
    cfg = HookedTransformerConfig.from_dict(json.load(f))
if isinstance(cfg.dtype, str):
    cfg.dtype = getattr(torch, cfg.dtype.split(".")[-1])

model = HookedTransformer(cfg)

state = torch.load(f"{drive_dir}/pytorch_model_fp16.bin", map_location="cpu")
model.load_state_dict(state)
del state
gc.collect()

if torch.cuda.is_available():
    torch.cuda.empty_cache()
    model = model.to(torch.float16)
    model = model.to("cuda")
    gc.collect()
    torch.cuda.empty_cache()

model.eval()
for param in model.parameters():
    param.requires_grad = False

print(f"OLMo bereit für Inferenz auf {next(model.parameters()).device}[/bold green]")

In [None]:
import os
import json
import torch
from typing import Dict, Any, Union, Optional, List


def new_test_prompt(
    prompt: str,
    answer: Union[str, List[str]],
    model,
    relation_name: str,
    json_output_folder: str,
    prepend_space_to_answer: bool = True,
    print_details: bool = True,
    prepend_bos: Optional[bool] = False,
    top_k: int = 10,
) -> Dict[str, Any]:
    """
    Lässt das Modell autoregressiv weiter‑dekodieren und evaluiert dabei
    Top‑1‑ und Top‑10‑Accuracy auf Token‑Ebene.

    Rückgabe:
        {
            "top_1_accuracy":  ...,
            "top_10_accuracy": ...,
            "first_token_prob": ...,
            "second_token_prob": ...,
            "is_correct":       True/False
        }
    """


    os.makedirs(json_output_folder, exist_ok=True)
    json_path = os.path.join(
        json_output_folder, relation_name.replace(" ", "_").lower() + ".json"
    )


    # Referenz‑Antwort vorbereiten

    answers: List[str] = [answer] if isinstance(answer, str) else list(answer)
    if prepend_space_to_answer:
        answers = [a if a.startswith(" ") else " " + a for a in answers]

    gold_answer = answers[0]
    gold_answer_ids = model.to_tokens(gold_answer, prepend_bos=False)[0]
    gold_answer_str_tokens = model.to_str_tokens(gold_answer, prepend_bos=False)


    # Prompt tokenisieren

    context_ids = model.to_tokens(prompt, prepend_bos=prepend_bos)[0].clone()
    _ = model.to_str_tokens(prompt, prepend_bos=prepend_bos)


    # Autoregressive Schleife

    first_token_prob = second_token_prob = None
    top_1_correct = top_10_correct = 0
    first_token_correct: Optional[bool] = None   # Flag, ob 1. Token korrekt war
    entries = []

    for pos in range(len(gold_answer_ids)):

        # Forward‑Pass
        logits = model(context_ids.unsqueeze(0))
        probs = logits.softmax(dim=-1)

        next_token_logits = logits[0, -1]
        next_token_probs = probs[0, -1]

        sorted_probs, sorted_indices = next_token_probs.sort(descending=True)

        # Wahrscheinlichkeiten der Top‑2‑Tokens merken
        if first_token_prob is None:
            first_token_prob = sorted_probs[0].item()
            second_token_prob = sorted_probs[1].item()

        # Predicted Token + Vergleich mit Gold
        predicted_token_id = sorted_indices[0].item()
        predicted_token_str = model.to_string(predicted_token_id)

        gold_token_id = gold_answer_ids[pos].item()
        gold_token_str = gold_answer_str_tokens[pos]

        #print("Indizies top k",sorted_indices[:top_k])

        # Accuracy‑Zählung
        if predicted_token_id == gold_token_id:
            top_1_correct += 1
            #print(predicted_token_str, gold_token_str)
        if (sorted_indices[:top_k] == gold_token_id).any():
            top_10_correct += 1


        # Flag für korrektes 1. Token setzen
        if pos == 0:
            first_token_correct = (predicted_token_id == gold_token_id)

        # Logging‑Eintrag
        entries.append(
            {
                "prompt": prompt,
                "generated_prefix": model.to_string(context_ids.tolist()),
                "gold_token": gold_token_str,
                "predicted_top1_token": predicted_token_str,
                "token_pos_in_answer": pos,
                "gold_logit": next_token_logits[gold_token_id].item(),
                "gold_prob_percent": round(next_token_probs[gold_token_id].item() * 100, 2),
                "first_token_prob": first_token_prob,
                "second_token_prob": second_token_prob,
                "is_top_1": int(predicted_token_id == gold_token_id),
                "top_k_predictions": [
                    {
                        "rank": k,
                        "token": model.to_string(sorted_indices[k].item()),
                        "logit": next_token_logits[sorted_indices[k]].item(),
                        "probability_percent": round(sorted_probs[k].item() * 100, 2),
                    }
                    for k in range(min(top_k, sorted_indices.shape[0]))
                ],
            }
        )

        # Kontext erweitern (Greedy)
        context_ids = torch.cat(
            [
                context_ids,
                torch.tensor(
                    [predicted_token_id],
                    dtype=context_ids.dtype,
                    device=context_ids.device,
                ),
            ]
        )

        if print_details:
            print(
                f"[Step {pos}] Gold: |{gold_token_str}| "
                f"Pred: |{predicted_token_str}|  "
                f"Top‑1 Prob: {sorted_probs[0].item():.2%}"
            )


    # Accuracy berechnen & Ergebnisse speichern

    total_tokens = len(gold_answer_ids)

    # Falls das 1. Sub‑Token falsch ist, setzen wir nur Top‑1‑Accuracy auf 0
    if first_token_correct is False:
        top_1_accuracy = 0.0
        is_correct_word = False
    else:
        top_1_accuracy = 100 * top_1_correct / total_tokens
        is_correct_word = top_1_correct == total_tokens

    # Top‑10‑Accuracy wird immer unabhängig vom ersten Token berechnet
    top_10_accuracy = 100 * top_10_correct / total_tokens


    try:
        with open(json_path, "r", encoding="utf-8") as fh:
            history = json.load(fh)
            if not isinstance(history, list):
                history = []
    except FileNotFoundError:
        history = []

    history.extend(entries)
    with open(json_path, "w", encoding="utf-8") as fh:
        json.dump(history, fh, indent=2, ensure_ascii=False)

    return {
        "top_1_accuracy": top_1_accuracy,
        "top_10_accuracy": top_10_accuracy,
        "first_token_prob": first_token_prob,
        "second_token_prob": second_token_prob,
        "is_correct": is_correct_word,
    }


In [None]:
def load_json_files(directory: str, selected_category: str, selected_relation: str) -> Dict[str, List[Dict]]:
    """
    Load JSON files from the specified directory.

    Args:
        directory (str): Directory containing the JSON files.

    Returns:
        dict: Dictionary where keys are relation names and values are lists of data entries.
    """
    data = {}
    for root, _, files in os.walk(directory):
        for file in files:
            if file.endswith(".json"):
                path = os.path.join(root, file)
                with open(path, 'r') as f:
                    category = os.path.basename(root)
                    if category == selected_category or not selected_category:
                        relation = os.path.basename(file).replace('.json', '')
                        if relation == selected_relation or not selected_relation:
                            if relation not in data:
                                data[relation] = []
                            data[relation].append(json.load(f))
    return data


def parse_samples(data: Dict[str, List[Dict]]) -> Tuple[List[str], List[str]]:
    facts = []
    targets = []
    sentences = []
    subjects = []
    for entries in data:
        #for entry in entries:
        prompt_templates = entries['prompt_templates']
        samples = entries['samples']

        for sample in samples:
            subject = sample['subject']
            obj = " " + sample['object']


            #for template in prompt_templates:  # Iterate through multiple templates
                #fact = template.format(subject)
            subjects.append(subject)
            targets.append(obj)
            #sentences.append(fact + obj)
            # fact = prompt_templates.format(subject)
            # facts.append(fact)
            # targets.append(obj)
    return prompt_templates, subjects, targets

In [None]:
relation_name = "city_in_country"

file_path_factual = "/content/drive/MyDrive/master_thesis/data/factual_data/zero_shot_factual/input_data"
data = load_json_files(file_path_factual, "input_data", relation_name)
for key, value in data.items():
    #sentences, facts, targets = parse_samples(value)
    prompt_templates, subjects, targets = parse_samples(value)
    print(subjects)

['New York City', 'Rio de Janeiro', 'Buenos Aires', 'Mexico City', 'São Paulo', 'Los Angeles', 'Saint Petersburg', 'San Francisco', 'Ho Chi Minh City', 'Kuala Lumpur', 'Abu Dhabi', 'Cape Town', 'New Delhi', 'Las Vegas', 'Hong Kong', 'Tel Aviv', 'Johannesburg', 'Santo Domingo', 'Port-au-Prince', 'Santiago de Chile', 'Panama City', 'Siem Reap', 'Casablanca', 'San Juan', 'Costa Rica', 'Addis Ababa', 'Punta Cana']


In [None]:
import os
import time

def calculate_average_accuracy(data, model, relation_name, permutation_name, base_output_dir):
    """
    Berechnet die durchschnittliche Top-1- und Top-10-Accuracy.
    Speichert automatisch Logits in einem passenden Ordner.

    Args:
        data (dict): Die geladenen Beispieldaten.
        model: Das Sprachmodell.
        relation_name (str): z. B. "adj_superlative_5shot"
        permutation_name (str): z. B. "permutation_2"
        base_output_dir (str): Basisverzeichnis für alle Resultate (z. B. "…/result/logits/")

    Returns:
        dict mit average_top_1_accuracy und average_top_10_accuracy.
    """
    start_time = time.time()
    total_top_1_accuracy = 0
    total_top_10_accuracy = 0
    total_examples = 0

    # Zielverzeichnis für Logits
    #json_output_folder = os.path.join(base_output_dir, permutation_name, relation_name)


    os.makedirs(base_output_dir, exist_ok=True)

    relation_base = relation_name.rsplit("_", 1)[0]
    json_output_folder = os.path.join(
         base_output_dir,
         relation_base
     )
    os.makedirs(json_output_folder, exist_ok=True)

    for key, value in data.items():
        prompt_templates, subjects, targets = parse_samples(value)

        for subj, target in zip(subjects, targets):
            for template in prompt_templates:
                prompt = template.format(subj)

                result = new_test_prompt(
                    prompt=prompt,
                    answer=target,
                    model=model,
                    relation_name=relation_name,
                    json_output_folder=json_output_folder,  # automatisch generiert!
                    prepend_space_to_answer=True,
                    prepend_bos=False,
                    print_details=False
                )

                total_top_1_accuracy += result["top_1_accuracy"]
                total_top_10_accuracy += result["top_10_accuracy"]
                total_examples += 1

    average_top_1_accuracy = total_top_1_accuracy / total_examples
    average_top_10_accuracy = total_top_10_accuracy / total_examples
    end_time = time.time()

    print(f" Dauer: {end_time - start_time:.2f}s | Beispiele: {total_examples}")
    return {
        "average_top_1_accuracy": average_top_1_accuracy,
        "average_top_10_accuracy": average_top_10_accuracy,
    }


In [None]:
def save_accuracy_to_json(average_accuracies: dict, relation_name: str, output_folder: str) -> None:
    os.makedirs(output_folder, exist_ok=True)

    filename = f"accuracy_{relation_name.replace(' ', '_').lower()}.json"
    path = os.path.join(output_folder, filename)

    accuracy_data = {
        "relation": relation_name,
        "average_top_1_accuracy": round(average_accuracies.get("average_top_1_accuracy", 0), 2),
        "average_top_10_accuracy": round(average_accuracies.get("average_top_10_accuracy", 0), 2)
    }

    with open(path, "w", encoding="utf-8") as f:
        json.dump(accuracy_data, f, indent=2, ensure_ascii=False)

    print(f"Accuracy gespeichert unter: {path}")

In [None]:


import os
import re

# Basisverzeichnisse
root_path = "/content/drive/MyDrive/master_thesis/data/linguistic_data/few_shots_final"
result_root = os.path.join(root_path, "result")
logits_root = os.path.join(result_root, "logits_olmo")
eval_root = os.path.join(result_root, "eval_accuracy_olmo")

# Durchlaufe alle Permutationsordner
for permutation_folder in sorted(os.listdir(root_path)):
    if not permutation_folder.startswith("permutation_"):
        continue

    perm_path = os.path.join(root_path, permutation_folder)
    json_files = [f for f in os.listdir(perm_path) if f.endswith(".json")]

    for json_file in json_files:
        match = re.match(r"(.+?)_(\d+)shot\.json", json_file)
        if not match:
            continue

        relation_base, shot = match.groups()
        relation_name = f"{relation_base}_{shot}shot"
        file_path = root_path
        folder_name = permutation_folder

        # Zielpfade
        logits_output_path = logits_root
        accuracy_output_path = os.path.join(eval_root, permutation_folder, f"accuracy_{relation_base}")
        os.makedirs(accuracy_output_path, exist_ok=True)

        # Daten laden
        data = load_json_files(file_path, folder_name, relation_name)

        # Berechnung (jetzt mit automatisch erzeugtem Logits-Ordner)
        average_accuracies = calculate_average_accuracy_(
            data=data,
            model=model,
            relation_name=relation_name,
            permutation_name=permutation_folder,
            base_output_dir=logits_output_path
        )

        # Abspeichern
        save_accuracy_to_json(average_accuracies, relation_name, accuracy_output_path)


In [None]:

import re


root_path   = "/content/drive/MyDrive/master_thesis/data/factual_data/few_shots_final"
result_root = os.path.join(root_path, "result")
logits_root = os.path.join(result_root, "logits_ollmo/permutation_0")
eval_root   = os.path.join(result_root, "eval_accuracy_ollmo")

# Nur diese Permutation verarbeiten
target_permutation = "permutation_0"

perm_path = os.path.join(root_path, target_permutation)
if not os.path.isdir(perm_path):
    raise FileNotFoundError(f"{perm_path} existiert nicht oder ist kein Ordner")

json_files = [f for f in os.listdir(perm_path) if f.endswith(".json")]


for json_file in json_files:
    #print(json_file)
    match = re.match(r"(.+?)_(\d+)shot\.json", json_file)
    if not match:
        continue

    relation_base, shot = match.groups()

    relation_name = f"{relation_base}_{shot}shot"

    # Zielpfade
    logits_output_path   = logits_root
    accuracy_output_path = os.path.join(
        eval_root,
        target_permutation,
        f"accuracy_{relation_base}"
    )
    os.makedirs(accuracy_output_path, exist_ok=True)

    # Daten laden
    data = load_json_files(root_path, target_permutation, relation_name)
    #print(data)


    average_accuracies = calculate_average_accuracy(
        data=data,
        model=model,
        relation_name=relation_name,
        permutation_name=target_permutation,
        base_output_dir=logits_output_path
    )


    # Abspeichern
    save_accuracy_to_json(average_accuracies, relation_name, accuracy_output_path)

print("Alle Dateien wurden verarbeiten")


 Dauer: 24.69s | Beispiele: 96
Accuracy gespeichert unter: /content/drive/MyDrive/master_thesis/data/factual_data/few_shots_final/result_10_accuracy/eval_accuracy_ollmo/permutation_5/accuracy_superhero_archnemesis/accuracy_superhero_archnemesis_3shot.json
 Dauer: 25.69s | Beispiele: 100
Accuracy gespeichert unter: /content/drive/MyDrive/master_thesis/data/factual_data/few_shots_final/result_10_accuracy/eval_accuracy_ollmo/permutation_5/accuracy_superhero_person/accuracy_superhero_person_3shot.json
 Dauer: 83.40s | Beispiele: 362
Accuracy gespeichert unter: /content/drive/MyDrive/master_thesis/data/factual_data/few_shots_final/result_10_accuracy/eval_accuracy_ollmo/permutation_5/accuracy_star_constellation/accuracy_star_constellation_3shot.json
 Dauer: 2.14s | Beispiele: 19
Accuracy gespeichert unter: /content/drive/MyDrive/master_thesis/data/factual_data/few_shots_final/result_10_accuracy/eval_accuracy_ollmo/permutation_5/accuracy_presidents_election_year/accuracy_presidents_election_y