In [None]:
import torch
# Load model directly
from transformers import AutoTokenizer, AutoModelForCausalLM
model_name = "allenai/OLMo-2-1124-7B-Instruct" #"allenai/OLMo-7B" #"meta-llama/Llama-3.1-8B"
device = 'mps'



# tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    # torch_dtype=torch.float16,
    trust_remote_code=True
).to(device)

In [None]:
# Load model directly
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained("allenai/OLMo-7B-0724-hf", trust_remote_code=True)

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM

tokenizer = AutoTokenizer.from_pretrained("allenai/OLMo-7B-0724-hf", trust_remote_code=True)

In [None]:
bpe_model = tokenizer.backend_tokenizer.model
if hasattr(bpe_model, 'merges'):
    merges = bpe_model.merges  # This is likely a list of merge pairs in order.
elif hasattr(bpe_model, 'get_merges'):
    merges = bpe_model.get_merges()

In [None]:
# Access the underlying BPE model from the backend tokenizer
bpe_model = tokenizer.backend_tokenizer.model

# Try to access the merges attribute directly
merges = bpe_model.merges
print(merges)

In [None]:
# Get the underlying BPE model
bpe_model = tokenizer.backend_tokenizer.model

# Try to access the private attribute
merges = getattr(bpe_model, "_merges", None)
if merges is not None:
    print("Merges (private):", merges)
else:
    print("No _merges attribute found.")

In [None]:
sorted(tokenizer.get_vocab().items(), key=lambda x: x[1], reverse=False)[:100]

In [None]:
# import os
# from transformers import AutoTokenizer
# from huggingface_hub import hf_hub_download
#
# # Load your tokenizer (make sure trust_remote_code is True if needed)
# model_name = "allenai/OLMo-7B-0724-hf"
# tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
#
# # -----------------------------------------------------------------------------
# # Step 1: Locate the merges file using hf_hub_download
# # -----------------------------------------------------------------------------
# # The tokenizer object has a mapping of file names in its vocab_files_names.
# vocab_files = tokenizer.vocab_files_names
# print("Tokenizer file names:", vocab_files)
#
# # Get the merges file name from the mapping
# merges_filename = vocab_files.get("merges_file")
# if not merges_filename:
#     raise ValueError("merges_file not found in tokenizer.vocab_files_names.")
#
# # Use hf_hub_download to get the local path of the merges file.
# # This downloads the file from the model repository (or retrieves it from cache)
# merges_file_path = hf_hub_download(repo_id=model_name, filename=merges_filename)
# print("Merges file path:", merges_file_path)
#
# # -----------------------------------------------------------------------------
# # Step 2: Read and parse the merges file to obtain merge rules
# # -----------------------------------------------------------------------------
# def load_merges(file_path):
#     merge_rules = []
#     with open(file_path, "r", encoding="utf-8") as f:
#         for line in f:
#             line = line.strip()
#             # Skip header/comment lines (often start with "#")
#             if line.startswith("#") or not line:
#                 continue
#             # Expect each non-comment line to have two tokens separated by whitespace
#             parts = line.split()
#             if len(parts) != 2:
#                 continue  # or handle error if necessary
#             merge_rules.append(tuple(parts))
#     return merge_rules
import json

# Replace 'X' with the actual path to your JSON file
json_path = '/Users/guykaplan/Dev/OLMo/test_fixtures/test-olmo-model/tokenizer.json'

# Open and parse the JSON file
try:
    with open(json_path, 'r', encoding='utf-8') as file:
        data = json.load(file)
        print("Parsed JSON data:", data)
except FileNotFoundError:
    print(f"File not found at path: {json_path}")
except json.JSONDecodeError as e:
    print(f"Error decoding JSON: {e}")
merge_rules = data['model']['merges']
print(f"Loaded {len(merge_rules)} merge rules.")

# Create a dictionary mapping each merge pair to its rank (order of appearance).
# Lower rank means the merge was applied earlier.
bpe_ranks = {pair: rank for rank, pair in enumerate(merge_rules)}

# -------------------------------
# Step 3: Define a function to decompose a token using the merge rules
# -------------------------------
def decompose_token(token, bpe_ranks):
    """
    Decompose a token by simulating the reverse of BPE merge operations.
    Start by splitting the token into characters, then repeatedly merge adjacent
    characters if their join (separated by a space) is in the bpe_ranks.

    Returns:
      - components: The final list of sub-components obtained.
      - merge_count: The number of merge operations applied.
    """
    # Split the token into individual characters.
    components = list(token)
    merge_count = 0

    while True:
        candidate_index = None
        candidate_rank = None
        # Look at each adjacent pair in the current list of components.
        for i in range(len(components) - 1):
            # Form a string key that matches the format in bpe_ranks
            pair_str = f"{components[i]} {components[i+1]}"
            if pair_str in bpe_ranks:
                rank = bpe_ranks[pair_str]
                # Choose the candidate with the lowest rank (i.e. performed earlier in training)
                if candidate_rank is None or rank < candidate_rank:
                    candidate_rank = rank
                    candidate_index = i
        # If no merge candidate is found, exit the loop.
        if candidate_index is None:
            break
        # Merge the chosen pair.
        merge_count += 1
        new_component = components[candidate_index] + components[candidate_index+1]
        # Replace the two components with the merged version.
        components = components[:candidate_index] + [new_component] + components[candidate_index+2:]
    return components, merge_count

# -------------------------------
# Step 4: Compute and rank tokens by merge depth
# -------------------------------
vocab = tokenizer.get_vocab()  # dict: token (str) -> token_id (int)
token_merge_depth = {}
token_is_leaf = {}

for token in vocab.keys():
    _, merge_depth = decompose_token(token, bpe_ranks)
    token_merge_depth[token] = merge_depth
    token_is_leaf[token] = (merge_depth == 0)

# Sort tokens by merge depth (higher merge count means more composite)
sorted_tokens = sorted(token_merge_depth.items(), key=lambda x: x[1], reverse=True)

print("Top 20 tokens by merge depth:")
for token, depth in sorted_tokens[:20]:
    status = "Leaf" if token_is_leaf[token] else "Non-leaf"
    print(f"Token: {token:20s} | Merge Depth: {depth:3d} | {status}")

In [None]:
def get_top_leaf_tokens(tokenizer, token_is_leaf, x):
    """
    Returns a list of x tokens that are leaf tokens (i.e. non-composite, with merge depth 0)
    and have the highest token IDs.

    Parameters:
      tokenizer: A Hugging Face tokenizer object with a get_vocab() method.
      token_is_leaf: A dictionary mapping token strings to a boolean value (True if the token is a leaf).
      x: The number of tokens to return.

    Returns:
      A list of x tokens (strings) that are leaves and have the highest token IDs.
    """
    # Get the complete vocabulary (token -> token_id)
    vocab = tokenizer.get_vocab()

    # Filter tokens to include only leaves
    leaf_tokens = [token for token, is_leaf in token_is_leaf.items() if is_leaf]

    # Sort these leaf tokens by their token_id in descending order (highest token_id first)
    sorted_leaf_tokens = sorted(leaf_tokens, key=lambda token: vocab[token], reverse=True)

    # Return the top x tokens, or fewer if there are not enough.
    return sorted_leaf_tokens[:x]

# Example usage:
# Assuming your tokenizer and token_is_leaf dictionary have been computed,
# and you want the top 10 leaf tokens with the highest token IDs:
top_leaf_tokens = get_top_leaf_tokens(tokenizer, token_is_leaf, 10000)
print("Top 10 leaf tokens by highest token_id:")
for token in top_leaf_tokens:
    print(f"Token: {token:20s} | Token ID: {tokenizer.get_vocab()[token]}")

- custom list
- characters
- frequency criteria:
    - X most frequent words in English
    - X most frequent tokens in the tokenizer (is_full_word == True)
    - X most frequent tokens in the dataset (WIMBD)
- POS / synthetic criteria:
    - verbs
    - nouns
    - adjectives
    - numbers
    - punctuation
    - capitalization
    - whitespace
- semantic criteria:
    - No abstract nouns
    - No negation
    - No



In [None]:
token_is_leaf

In [None]:
sorted_tokens[-2000:-1800]

In [None]:
print("Top 20 tokens by merge depth:")
for token, depth in sorted_tokens[-20:]:
    print(f"Token: {token:20s} | Merge Depth: {depth}")

In [None]:
sorted_tokens

In [None]:
import json

# Replace 'X' with the actual path to your JSON file
json_path = '/Users/guykaplan/Dev/OLMo/test_fixtures/test-olmo-model/tokenizer.json'

# Open and parse the JSON file
try:
    with open(json_path, 'r', encoding='utf-8') as file:
        data = json.load(file)
        print("Parsed JSON data:", data)
except FileNotFoundError:
    print(f"File not found at path: {json_path}")
except json.JSONDecodeError as e:
    print(f"Error decoding JSON: {e}")

In [None]:
data['model']['merges'][:100]