# Current working version

In [20]:
import sys, random, uuid, os, string
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from collections import defaultdict
from tqdm import tqdm
import torch
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
import nltk
nltk.download('wordnet')
nltk.download('names')
nltk.download('gazetteers')
from nltk.corpus import wordnet, names, gazetteers

[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package names to /root/nltk_data...
[nltk_data]   Package names is already up-to-date!
[nltk_data] Downloading package gazetteers to /root/nltk_data...
[nltk_data]   Package gazetteers is already up-to-date!


In [2]:
def load_model(model_id):
    model = AutoModelForCausalLM.from_pretrained(model_id, dtype=torch.bfloat16, device_map="auto")
    tokenizer = AutoTokenizer.from_pretrained(model_id)
    return model, tokenizer

def get_single_tokens(tokenizer, arr):
  return [a for a in arr if len( tokenizer(a, add_special_tokens=False)["input_ids"] ) == 1 ]

def get_single_token_nouns(tokenizer):
    """get list of nouns from wordnet that are single tokens in the given model"""
    nouns = [lemma.name() for syn in wordnet.all_synsets("n") for lemma in syn.lemmas()]
    nouns = [n for n in nouns if n.isalpha()]
    return get_single_tokens(tokenizer, nouns)

def get_single_token_names(tokenizer):
    """get list of names from nltk names corpus that are single tokens"""
    all_names = names.words('male.txt') + names.words('female.txt')
    return list(set(get_single_tokens(tokenizer, all_names)))


fictional_sci_fi_cities = ["Arrakeen","Coruscant","Theed","Omelas",
                           "Trantor","Giedi Prime","Ankh-Morpork",
                           "Minas Tirith","Caprica City","Terminus"]
real_countries = ["Japan","Brazil","Germany","Canada","Kenya",
                  "Australia","India","Norway","Mexico","South Africa"]

names = ["Ava","Liam","Noah","Emma","Maya",
         "Ethan","Sofia","Lucas","Isabella","Arjun"]

def generate_prompt():
    target_city, distractor_city = np.random.choice(fictional_sci_fi_cities, size=2, replace=False)
    target_country, distractor_country = np.random.choice(real_countries, size=2, replace=False)

    target_name, distractor_name = np.random.choice(names, size=2, replace=False)
    name_city_pairs = [(target_name, target_city), (distractor_name, distractor_city)]
    city_country_pairs = [(target_city, target_country), (distractor_city, distractor_country)]

    np.random.shuffle(name_city_pairs)
    np.random.shuffle(city_country_pairs)

    sentence1 = f"{name_city_pairs[0][0]} is from {name_city_pairs[0][1]}."
    sentence2 = f"{name_city_pairs[1][0]} is from {name_city_pairs[1][1]}."
    sentence3 = f"{city_country_pairs[0][0]} is in {city_country_pairs[0][1]}."
    sentence4 = f"{city_country_pairs[1][0]} is in {city_country_pairs[1][1]}."

    prompt = f"{sentence1} {sentence2} {sentence3} {sentence4} Therefore, {target_name} is from"

    return prompt, target_country, distractor_country, target_city, distractor_city

In [21]:
def x_before_y(text: str, x: str, y: str) -> bool:
    """does string x appear before y?"""
    x_pos = text.find(x)
    y_pos = text.find(y)
    if x_pos == -1:
        return False
    if y_pos == -1:
        return True
    return x_pos < y_pos

def correct_helper(df):
        """Decide if output is correct: is target token first, or does it come before distractor/direct"""
        target_first = np.array([outp.split()[0].translate(str.maketrans('', '', string.punctuation)) == target_country for outp, target_country in zip(df['outp'], df['target_country'])])
        target_before_distractor = np.array([x_before_y(outp, target_country, distractor_country) for outp, target_country, distractor_country in zip(df['outp'], df['target_country'], df['distractor_country'])])
        target_before_direct = np.array([x_before_y(outp, target_country, target_city) for outp, target_country, target_city in zip(df['outp'], df['target_country'], df['target_city'])])
        df['correct'] = target_first | target_before_distractor
        df['direct'] = target_first | target_before_direct
        return df

In [3]:
model, tokenizer = load_model('Qwen/Qwen3-4B')

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/726 [00:00<?, ?B/s]

model.safetensors.index.json: 0.00B [00:00, ?B/s]

Downloading (incomplete total...): 0.00B [00:00, ?B/s]

Fetching 3 files:   0%|          | 0/3 [00:00<?, ?it/s]



Loading weights:   0%|          | 0/398 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json:   0%|          | 0.00/11.4M [00:00<?, ?B/s]

In [56]:
dct = {
    'outp': [],
    'target_country': [],
    'distractor_country': [],
    'target_city': [],
    'distractor_city': []
}
for _ in tqdm(range(100)):
  prompt, target_country, distractor_country, target_city, distractor_city = generate_prompt()
  inputs = tokenizer.encode(prompt, return_tensor='pt')
  outp = model.generate(torch.tensor(inputs).cuda().unsqueeze(0), max_new_tokens=10)
  outp = tokenizer.batch_decode(outp[0][len(inputs):])
  dct['outp'].append(outp)
  dct['target_country'].append(target_country)
  dct['distractor_country'].append(distractor_country)
  dct['target_city'].append(target_city)
  dct['distractor_city'].append(distractor_city)
df = pd.DataFrame(dct)
df['outp'] = [x[0] for x in df['outp']]

df = correct_helper(df)
df.correct.mean()

np.float64(0.69)

# Chris additions


In [53]:
fictional_sci_fi_cities = ["Arrakeen","Coruscant","Theed","Omelas",
                           "Trantor","Giedi Prime","Ankh-Morpork",
                           "Minas Tirith","Caprica City","Terminus"]
real_countries = ["Japan","Brazil","Germany","Canada","Kenya",
                  "Australia","India","Norway","Mexico","South Africa"]

names = ["Ava","Liam","Noah","Emma","Maya",
         "Ethan","Sofia","Lucas","Isabella","Arjun"]


def generate_prompt(n_sets=2, fan_type='none', fan_degree=2, add_ac_pairs=False):

    n_names = n_sets + (int(fan_type == 'in') * fan_degree * n_sets)
    n_cities = n_sets + (int(fan_type == 'out') * fan_degree * n_sets)

    all_names = np.random.choice(names, size=n_names, replace=False)
    all_cities = np.random.choice(fictional_sci_fi_cities, size=n_cities, replace=False)
    all_countries = np.random.choice(real_countries, size=n_sets, replace=False)

    ab_pairs = []
    bc_pairs = []
    ac_map = {}
    ab_map = {}

    if fan_type == 'in': # N names from each city
        for i in range(n_sets):
            city = all_cities[i]
            country = all_countries[i]
            bc_pairs.append((city, country))
            for j in range(fan_degree):
                name = all_names[i * fan_degree + j]
                ab_pairs.append((name, city))
                ac_map[name] = country
                ab_map[name] = city

    elif fan_type == 'out':
        for i in range(n_sets):
            name = all_names[i]
            country = all_countries[i]
            ac_map[name] = country
            current_cities = []
            for j in range(fan_degree):
                city = all_cities[i * fan_degree + j]
                ab_pairs.append((name, city))
                bc_pairs.append((city, country))
                current_cities.append(city)
            ab_map[name] = current_cities # List for fan-out

    else:
        for i in range(n_sets):
            name = all_names[i]
            city = all_cities[i]
            country = all_countries[i]
            ab_pairs.append((name, city))
            bc_pairs.append((city, country))
            ac_map[name] = country
            ab_map[name] = city

    # choose target
    target_name = np.random.choice(list(ac_map.keys()))
    target_country = ac_map[target_name]
    target_city = '' if fan_type == 'out' else ab_map[target_name] # fan out means name is from multiple cities, just ignore

    # if zero or multiple distractors, just ignore; else find the other one
    distractor_country = [c for c in all_countries if c != target_country][0] if n_sets == 2 else ''
    distractor_city = [pair[0] for pair in bc_pairs if pair[1] == distractor_country][0] if n_sets == 2 else ''

    np.random.shuffle(ab_pairs)
    np.random.shuffle(bc_pairs)

    sentences = []
    for n, c in ab_pairs:
        sentences.append(f"{n} is from {c}.")
    for ci, co in bc_pairs:
        sentences.append(f"{ci} is in {co}.")
    if add_ac_pairs:
        for name, country in ac_map.items():
            if name != target_name:
                sentences.append(f"{name} is from {country}.")

    prompt_body = " ".join(sentences)
    prompt = f"{prompt_body} Therefore, {target_name} is from"

    return prompt, target_country, distractor_country, target_city, distractor_city

In [38]:
def testing_helper(**kwargs):
    dct = {
        'outp': [],
        'target_country': [],
        'distractor_country': [],
        'target_city': [],
        'distractor_city': []
    }
    for _ in tqdm(range(100)):
      prompt, target_country, distractor_country, target_city, distractor_city = generate_prompt(**kwargs)
      inputs = tokenizer.encode(prompt, return_tensor='pt')
      outp = model.generate(torch.tensor(inputs).cuda().unsqueeze(0), max_new_tokens=10)
      outp = tokenizer.batch_decode(outp[0][len(inputs):])
      dct['outp'].append(outp)
      dct['target_country'].append(target_country)
      dct['distractor_country'].append(distractor_country)
      dct['target_city'].append(target_city)
      dct['distractor_city'].append(distractor_city)
    df = pd.DataFrame(dct)
    df['outp'] = [x[0] for x in df['outp']]

    return correct_helper(df)

### Try 3 sets

In [24]:
three_sets = testing_helper(n_sets=3)
three_sets.correct.mean()

np.float64(0.81)

### Add distractor ACs into prompt

In [54]:
add_ac = testing_helper(add_ac_pairs=True)
add_ac.correct.mean()

100%|██████████| 100/100 [01:26<00:00,  1.16it/s]


np.float64(0.72)

### Fan in (2 names to 1 city)

In [55]:
fan_in = testing_helper(fan_type='in')
fan_in.correct.mean()

100%|██████████| 100/100 [01:28<00:00,  1.14it/s]


np.float64(0.72)

### Fan out (1 name from 2 cities)

In [57]:
fan_out = testing_helper(fan_type='out')
fan_out.correct.mean()

100%|██████████| 100/100 [01:28<00:00,  1.14it/s]


np.float64(0.66)

# Testing memory_inference.py

In [6]:
"""
MEMORY INFERENCE EXPERIMENTS
This implements something like a paired-associate inference experiment.
Given A-R1-B and B-R2-C we test if the model can infer A-R1+R2-C.

It is implemented for the following stimuli:
- Single token nouns
- Real world geography (names, cities, countries)
- Swapped (incorrect) real world geography (names, countries, cities)
- Sci-fi geography (names, sci-fi cities, real countries)
- Properties (objects, materials, properties)
- Formal stimuli (A1, B1, C1)
- Fake world geography (names, fake cities, real countries)
"""
import random
import string
import numpy as np
import nltk
from nltk.corpus import wordnet
from typing import List, Tuple, Dict, Any, Optional
import torch
import pandas as pd
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModelForCausalLM
def setup_nltk():
    try:
        wordnet.all_synsets('n')
    except LookupError:
        nltk.download('wordnet')
setup_nltk()
from stimuli import NAMES, CITY_COUNTRY_PAIRS, SCIFI_LOCATIONS, PROPERTIES


# --- Stimulus Generator Helpers (Pools) ---

def get_fake_words(n, length=5, seed=None):
    """Pronounceable fake words alternating consonants and vowels"""
    if seed is not None:
        random.seed(seed)
    vowels = "aeiou"
    consonants = "".join(set(string.ascii_lowercase) - set(vowels))
    words = []
    while len(words) < n:
        word = ""
        for i in range(length):
            if i % 2 == 0:
                word += random.choice(consonants)
            else:
                word += random.choice(vowels)
        if word not in words:
            words.append(word.capitalize())
    return words

def get_single_token_nouns(tokenizer, n):
    """Single token nouns from wordnet in a given tokenizer"""
    nouns = list(set([lemma.name() for syn in wordnet.all_synsets("n") for lemma in syn.lemmas() if lemma.name().isalpha()]))
    if tokenizer:
        single_tokens = [n for n in nouns if len(tokenizer.encode(n, add_special_tokens=False)) == 1]
    else:
        single_tokens = [n for n in nouns if len(n) < 6]
    return random.sample(single_tokens, n)

def get_names(n):
    """Names from stimuli module"""
    return random.sample(NAMES, n)

def get_scifi_cities(n):
    """Sci-Fi locations from stimuli module"""
    return random.sample(SCIFI_LOCATIONS, n)

def get_cities_countries(n, swap_pairs=False):
    """Verified City-Country pairs from stimuli module"""
    selected_pairs = random.sample(CITY_COUNTRY_PAIRS, n)
    cities = [p[0] for p in selected_pairs]
    countries = [p[1] for p in selected_pairs]
    if swap_pairs:
        # Guarantee no city matches its country
        shuffled_countries = countries[:]
        if len(countries) > 1:
            while any(shuffled_countries[i] == countries[i] for i in range(len(countries))):
                random.shuffle(shuffled_countries)
        countries = shuffled_countries
    return cities, countries


# --- Stimulus Generators (return A, B, C, relations) ---

def get_noun_stimuli(tokenizer, n):
    # Need 3n unique nouns total for A, B, C
    pool = get_single_token_nouns(tokenizer, 3*n)
    return pool[:n], pool[n:2*n], pool[2*n:3*n], [" -> ", " -> ", " -> "]

def get_real_geography_stimuli(n):
    a = get_names(n)
    b, c = get_cities_countries(n, swap_pairs=False)
    return a, b, c, [" is from ", " is in ", " is from the country of "]

def get_wrong_geography_stimuli(n):
    a = get_names(n)
    b, c = get_cities_countries(n, swap_pairs=True)
    return a, b, c, [" is from ", " is in ", " is from the country of "]

def get_fake_geography_stimuli(n):
    a = get_names(n)
    b = get_fake_words(n, length=6)
    _, c = get_cities_countries(n)
    return a, b, c, [" is from ", " is in ", " is from the country of "]

def get_scifi_geography_stimuli(n):
    a = get_names(n)
    b = get_scifi_cities(n)
    _, c = get_cities_countries(n) # Random real countries
    return a, b, c, [" is from ", " is in ", " is from the country of "]

def get_object_property_stimuli(n):
    # Need 2n fake words for objects and materials
    words = get_fake_words(2*n, length=6)
    objs = words[:n]
    mats = words[n:]
    prop_list = random.sample(PROPERTIES, n)
    return objs, mats, prop_list, [" is made of ", " has the property of ", " has the property of "]

def get_formal_stimuli(n, offset=0):
    a_items = [f"A{i+offset}" for i in range(1, n + 1)]
    b_items = [f"B{i+offset}" for i in range(1, n + 1)]
    c_items = [f"C{i+offset}" for i in range(1, n + 1)]
    relations = [" R1 ", " R2 ", " (R1+R2) "]
    return a_items, b_items, c_items, relations


# --- Prompt Assembly Logic ---

def assemble_inference_prompt(a_items, b_items, c_items, relations, 
    fan_type='none', fan_degree=1, add_ac_pairs=False, n_sets=1):

    r1, r2, r1r2 = relations
    ab_pairs = []
    bc_pairs = []
    ac_map = {}
    ab_map = {}
    
    if fan_type == 'in':
        # Many A's to one B. B -> C is 1:1.
        for i in range(n_sets):
            b = b_items[i]
            c = c_items[i]
            bc_pairs.append((b, c))
            for j in range(fan_degree):
                idx = i * fan_degree + j
                if idx < len(a_items):
                    a = a_items[idx]
                    ab_pairs.append((a, b))
                    ac_map[a] = c
                    ab_map[a] = b
                
    elif fan_type == 'out':
        # One A to many B's. Each B -> C is 1:1.
        for i in range(n_sets):
            if i < len(a_items) and i < len(c_items):
                a = a_items[i]
                c = c_items[i]
                current_cities = []
                for j in range(fan_degree):
                    idx = i * fan_degree + j
                    if idx < len(b_items):
                        b = b_items[idx]
                        ab_pairs.append((a, b))
                        bc_pairs.append((b, c))
                        current_cities.append(b)
                ac_map[a] = c
                ab_map[a] = current_cities
            
    else: # none
        for i in range(n_sets):
            if i < len(a_items) and i < len(b_items) and i < len(c_items):
                a, b, c = a_items[i], b_items[i], c_items[i]
                ab_pairs.append((a, b))
                bc_pairs.append((b, c))
                ac_map[a] = c
                ab_map[a] = b

    random.shuffle(ab_pairs)
    random.shuffle(bc_pairs)
    
    # Target Selection (use first a_item as target)
    target_a = a_items[0]
    target_c = ac_map.get(target_a, "")
    target_b = "" if fan_type == 'out' else ab_map.get(target_a, "")
    
    # Distractor Selection (only if n_sets == 2)
    distractor_c = [c for c in c_items if c != target_c][0] if n_sets == 2 else ""
    # Find a city associated with the distractor country
    distractor_b = [b for b, c in bc_pairs if c == distractor_c][0] if n_sets == 2 and distractor_c else ""
    
    sentences = []
    for a, b in ab_pairs:
        sentences.append(f"{a}{r1}{b}.")
    for b, c in bc_pairs:
        sentences.append(f"{b}{r2}{c}.")
        
    if add_ac_pairs:
        for a, c in ac_map.items():
            if a != target_a:
                sentences.append(f"{a}{r1r2}{c}.")
    
    prompt_body = " ".join(sentences)
    prompt = f"{prompt_body} Therefore, {target_a}{r1r2}"
    
    return {
        "prompt": prompt,
        "target": target_c,
        "target_c": target_c,
        "distractor_c": distractor_c,
        "target_b": target_b,
        "distractor_b": distractor_b,
        "target_a": target_a,
        "a_items": a_items,
        "b_items": b_items,
        "c_items": c_items
    }

def get_stimuli_by_type(stim_type, n, tokenizer=None, seed=None, offset=0):
    if seed is not None: random.seed(seed)
    if stim_type == "formal":
        return get_formal_stimuli(n, offset=offset)
    elif stim_type == "single_token_nouns":
        return get_noun_stimuli(tokenizer, n)
    elif stim_type == "real_geography":
        return get_real_geography_stimuli(n)
    elif stim_type == "wrong_geography":
        return get_wrong_geography_stimuli(n)
    elif stim_type == "scifi_geography":
        return get_scifi_geography_stimuli(n)
    elif stim_type == "fake_geography":
        return get_fake_geography_stimuli(n)
    elif stim_type == "properties":
        return get_object_property_stimuli(n)
    else:
        raise ValueError(f"Unknown stim_type: {stim_type}")

def generate_trials(stim_type, n_trials=100, tokenizer=None, 
    fan_type='none', fan_degree=1, n_sets=2, add_ac_pairs=False):

    trials = []
    for i in range(n_trials):
        n_needed = max(n_sets * fan_degree, n_sets)
        a, b, c, rels = get_stimuli_by_type(stim_type, n_needed, tokenizer, seed=i, offset=i*n_needed)
        trial = assemble_inference_prompt(
            a, b, c, rels,
            fan_type=fan_type,
            fan_degree=fan_degree,
            add_ac_pairs=add_ac_pairs,
            n_sets=n_sets
        )
        trials.append(trial)
    return trials


def load_model(model_id):
    """Load model and tokenizer with bfloat16"""
    model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.bfloat16, device_map="auto")
    tokenizer = AutoTokenizer.from_pretrained(model_id)
    return model, tokenizer

# helpers for assessing correctness
def x_before_y(text: str, x: str, y: str) -> bool:
    """Check if string x appears before y in text"""
    x_pos = text.find(x)
    y_pos = text.find(y)
    if x_pos == -1: return False
    if y_pos == -1: return True
    return x_pos < y_pos

def correct_helper(df):
    """Decide if output is correct: is target token first, or does it come before distractor/direct"""
    target_first = np.array([
        outp.split()[0].translate(str.maketrans('', '', string.punctuation)) == target_c 
        for outp, target_c in zip(df['outp'], df['target_c'])
    ])
    target_before_distractor = np.array([
        x_before_y(outp, target_c, distractor_c) 
        for outp, target_c, distractor_c in zip(df['outp'], df['target_c'], df['distractor_c'])
    ])
    target_before_direct = np.array([
        x_before_y(outp, target_c, target_b) 
        for outp, target_c, target_b in zip(df['outp'], df['target_c'], df['target_b'])
    ])
    df['correct'] = target_first | target_before_distractor
    df['direct'] = target_first | target_before_direct
    return df


### run some trials
def testing_helper(model, tokenizer, stim_type="scifi", n_trials=100, **kwargs):
    """Run model inference on a set of trials and return evaluation results"""
    dct = {
        'outp': [],
        'target_c': [],
        'distractor_c': [],
        'target_b': [],
        'distractor_b': []
    }
    for _ in tqdm(range(n_trials)):
        trial = generate_trials(stim_type, n_trials=1, tokenizer=tokenizer, **kwargs)[0]
        inputs = tokenizer.encode(trial['prompt'], return_tensors='pt').to(model.device)
        outp_ids = model.generate(inputs, max_new_tokens=10)
        outp = tokenizer.batch_decode(outp_ids[:, inputs.shape[-1]:], skip_special_tokens=True)
        
        dct['outp'].append(outp)
        dct['target_c'].append(trial['target_c'])
        dct['distractor_c'].append(trial['distractor_c'])
        dct['target_b'].append(trial['target_b'])
        dct['distractor_b'].append(trial['distractor_b'])
        
    df = pd.DataFrame(dct)
    df['outp'] = [x[0] for x in df['outp']]
    return correct_helper(df)


In [10]:
for stim in ["scifi_geography", "properties", "real_geography", "wrong_geography", "formal", "single_token_nouns"]:
    trial = generate_trials(stim, n_trials=1, n_sets=2, fan_type='in', fan_degree=2)[0]
    print(trial['prompt'],'\n')

Maya is from Rapture. Ximena is from Aura. Elsa is from Rapture. Hilda is from Aura. Aura is in Nigeria. Rapture is in Germany. Therefore, Ximena is from the country of  

Tigine is made of Ciguwi. Kexewu is made of Bileqo. Mulexa is made of Bileqo. Toyiko is made of Ciguwi. Bileqo has the property of explosivity. Ciguwi has the property of pyroelectricity. Therefore, Toyiko has the property of  

Ximena is from Cape Town. Maya is from Incheon. Hilda is from Cape Town. Elsa is from Incheon. Incheon is in South Korea. Cape Town is in South Africa. Therefore, Ximena is from the country of  

Hilda is from Cape Town. Maya is from Incheon. Ximena is from Cape Town. Elsa is from Incheon. Cape Town is in Iraq. Incheon is in South Africa. Therefore, Ximena is from the country of  

A3 R1 B2. A1 R1 B1. A2 R1 B1. A4 R1 B2. B1 R2 C1. B2 R2 C2. Therefore, A1 (R1+R2)  

Weser -> CF. dolly -> miaow. Chain -> CF. wise -> miaow. CF -> Serb. miaow -> hobby. Therefore, dolly ->  

