### **Run the Benchmark Test For a Language Model**

**Step01 - Copy Validation dataset (benchmark_test.json)**

In [1]:
import json

with open("benchmark_test.json", "r", encoding="utf-8") as file:
    data = json.load(file)

with open("step01.json", "w", encoding="utf-8") as file:
    json.dump(data, file, indent=4, ensure_ascii=False)

**Step02 - Run LLM Architecture on dataset inputs**

Make sure to use the correct variant for single-model vs. two-model systems

In [None]:
# === Use for SINGLE-MODEL VERSION ===

import json
import re
import ast
import logging
from transformers import T5Tokenizer, T5ForConditionalGeneration

# === Logging Configuration ===
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# === Load Model and Tokenizer ===
llm0_tokenizer = T5Tokenizer.from_pretrained("./llm0_v1")
llm0_model = T5ForConditionalGeneration.from_pretrained("./llm0_v1")


# === LLM Helpers ===

def extract_scaling(prompt):
    """Extracts the scaling value and returns (key, value, cleaned prompt)."""
    match = re.search(r"(scaling)\s*=\s*([0-1](?:\.\d+)?)", prompt, re.IGNORECASE)
    if match:
        value = float(match.group(2))
        cleaned = re.sub(r"(scaling)\s*=\s*([0-1](?:\.\d+)?)(,)?", "", prompt, flags=re.IGNORECASE).strip().rstrip(",")
        return match.group(1), value, cleaned
    return None, 0.0, prompt


def generate_text(model, tokenizer, query):
    """Generates text from a model given an input query string."""
    input_ids = tokenizer.encode(query, return_tensors="pt", max_length=512, truncation=True)
    output_ids = model.generate(
        input_ids,
        max_length=512,
        num_beams=5,
        temperature=0.3,
        top_k=50,
        top_p=0.9,
        do_sample=True,
        early_stopping=True
    )
    return tokenizer.decode(output_ids[0], skip_special_tokens=True)


# === Parsing Helpers ===

def parse_llm0_output(output_str):
    """Parses the output string into rooms, connections, and placements."""
    try:
        parsed = ast.literal_eval("{" + output_str + "}")
    except (SyntaxError, ValueError) as e:
        raise ValueError(f"Invalid input format: {e}")

    rooms = parsed.get("rooms", [])
    connections = parsed.get("connections", [])
    raw_assets = parsed.get("assets", [])

    placements = {}
    for room, asset in raw_assets:
        if room in placements:
            placements[room].append(asset)
        else:
            placements[room] = [asset]

    return rooms, connections, placements


def case_shifter(asset, reference_list):
    """Matches the asset to reference list regardless of case/spacing."""
    formatted = asset.replace(" ", "").lower()
    for ref in reference_list:
        if formatted == ref.replace(" ", "").lower():
            return ref
    return "Unknown"


def build_json(rooms, connections, placements):
    """Builds the final structured output as a JSON string."""
    indexed = {i: placements.get(room, []) for i, room in enumerate(rooms)}
    return json.dumps({
        "connections": connections,
        "room names": [room.rsplit(" ", 1)[0] for room in rooms],
        "rooms": indexed
    }, indent=4)


# === Inference ===

def infer(query):
    """Main inference pipeline using only LLM0 (single-model approach)."""
    _, scaling, user_prompt = extract_scaling(query)
    logging.info(f"Scaling factor: {scaling}")

    match = re.search(r"prompt='(.*?)'", user_prompt)
    prompt = match.group(1) if match else ""

    assets = []
    available_match = re.search(r"available=\[(.*?)\]", user_prompt)
    if available_match:
        assets = ast.literal_eval(f"[{available_match.group(1)}]")
        assets = [a.lower() for a in assets]

    llm0_query = f"prompt='{prompt}', available={assets}"
    llm0_output = generate_text(llm0_model, llm0_tokenizer, llm0_query)
    print(llm0_output)

    rooms, connections, placements = parse_llm0_output(llm0_output)

    return build_json(rooms, connections, placements)

In [None]:
# === Use for TWO-MODEL VERSION ===

import json
import re
import ast
import logging
import random
import concurrent.futures
from pathlib import Path
from transformers import T5Tokenizer, T5ForConditionalGeneration

# === Logging Configuration ===
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# === Load Models and Tokenizers ===
llm0_tokenizer = T5Tokenizer.from_pretrained("./llm0_single")
llm0_model = T5ForConditionalGeneration.from_pretrained("./llm0_single")

llm1_tokenizer = T5Tokenizer.from_pretrained("./llm1_single")
llm1_model = T5ForConditionalGeneration.from_pretrained("./llm1_single")

llm2_tokenizer = T5Tokenizer.from_pretrained("./llm2_v111")
llm2_model = T5ForConditionalGeneration.from_pretrained("./llm2_v111")


# === File Helpers ===

def load_dataset(path):
    """Loads a JSON file and returns the data."""
    with open(path, "r", encoding="utf-8") as file:
        return json.load(file)


def save_dataset(path, data):
    """Saves the dataset to the given path."""
    with open(path, "w", encoding="utf-8") as file:
        json.dump(data, file, indent=4)
    print(f"Saved {path.name} with {len(data)} entries.")


# === LLM Helpers ===

def extract_scaling(prompt):
    """Extracts the scaling value and returns (key, value, cleaned prompt)."""
    match = re.search(r"(scaling)\s*=\s*([0-1](?:\.\d+)?)", prompt, re.IGNORECASE)
    if match:
        value = float(match.group(2))
        cleaned = re.sub(r"(scaling)\s*=\s*([0-1](?:\.\d+)?)(,)?", "", prompt, flags=re.IGNORECASE).strip().rstrip(",")
        return match.group(1), value, cleaned
    return None, 0.0, prompt


def generate_text(model, tokenizer, query):
    """Generates text from a model given an input query string."""
    input_ids = tokenizer.encode(query, return_tensors="pt", max_length=512, truncation=True)
    output_ids = model.generate(
        input_ids,
        max_length=512,
        num_beams=5,
        temperature=0.3,
        top_k=50,
        top_p=0.9,
        do_sample=True,
        early_stopping=True
    )
    return tokenizer.decode(output_ids[0], skip_special_tokens=True)


# === Parsing Helpers ===

def parse_llm0_output(output_str):
    """Parses the room list from LLM0 output."""
    return ast.literal_eval(output_str)


def parse_llm1_output(output_str, rooms):
    """Parses the connection list from LLM1 output based on room indexes."""
    raw = ast.literal_eval(output_str)
    return [[rooms.index(room) for room in group] for group in raw]


def parse_llm2_output(output_str, available_assets):
    """Parses asset placement output from LLM2."""
    data = {}
    entries = re.split(r',\s*(?![^\[]*\])', output_str)
    for entry in entries:
        key, value = entry.split("=", 1)
        key = key.strip()
        value = eval(value.strip())
        data[key] = [case_shifter(asset, available_assets) for asset in value]
    return data


def case_shifter(asset, reference_list):
    """Matches the asset to reference list regardless of case/spacing."""
    formatted = asset.replace(" ", "").lower()
    for ref in reference_list:
        if formatted == ref.replace(" ", "").lower():
            return ref
    return "Unknown"


def build_json(rooms, connections, placements):
    """Builds the final structured output as a JSON string."""
    indexed = {i: placements.get(room, []) for i, room in enumerate(rooms)}
    return json.dumps({
        "connections": connections,
        "room names": [room.rsplit(" ", 1)[0] for room in rooms],
        "rooms": indexed
    }, indent=4)


# === Inference ===

def infer(query):
    """Main inference pipeline using LLM0 → LLM1 + LLM2."""
    _, scaling, user_prompt = extract_scaling(query)
    logging.info(f"Scaling factor: {scaling}")

    match = re.search(r"prompt='(.*?)'", user_prompt)
    prompt = match.group(1) if match else ""

    assets = []
    available_match = re.search(r"available=\[(.*?)\]", user_prompt)
    if available_match:
        assets = ast.literal_eval(f"[{available_match.group(1)}]")
        assets = [a.lower() for a in assets]

    # LLM0
    llm0_out = generate_text(llm0_model, llm0_tokenizer, prompt)
    rooms = parse_llm0_output(llm0_out)

    # LLM1 and LLM2 in parallel
    llm1_query = f"prompt='{prompt}', rooms='{llm0_out}'"
    llm2_query = f"prompt='{prompt}', rooms={str(rooms)}, available={assets}"

    with concurrent.futures.ThreadPoolExecutor() as executor:
        future1 = executor.submit(generate_text, llm1_model, llm1_tokenizer, llm1_query)
        future2 = executor.submit(generate_text, llm2_model, llm2_tokenizer, llm2_query)
        llm1_out = future1.result()
        llm2_out = future2.result()

    connections = parse_llm1_output(llm1_out, rooms)
    placements = parse_llm2_output(llm2_out, assets)

    return build_json(rooms, connections, placements)

In [None]:
import json
from pathlib import Path

# === File Paths ===
DATASET_PATH = Path("step01.json")


def load_dataset(path):
    """Loads a JSON file and returns the dataset."""
    with open(path, "r", encoding="utf-8") as file:
        return json.load(file)


def save_dataset(path, data):
    """Saves the dataset to the specified path."""
    with open(path, "w", encoding="utf-8") as file:
        json.dump(data, file, indent=4)
    print(f"Saved {path.name} with {len(data)} entries.")


def process_entries(path):
    """Processes each entry by running inference if not already done."""
    dataset = load_dataset(path)

    for index, entry in enumerate(dataset):
        if "generated_output" in entry:
            continue

        try:
            result = infer(entry["input"])
            entry["generated_output"] = result
            save_dataset(path, dataset)
            print(f"Processed entry {index + 1}/{len(dataset)}")
        except Exception as e:
            print(f"Error processing entry {index + 1}: {e}")

    print("Processing complete. Updated dataset saved.")


# === Run ===
process_entries(DATASET_PATH)

**Step03 - Run Validation Part 1 (Room Numbers, Connection Type)**

In [None]:
import json
from pathlib import Path
from collections import defaultdict, deque


def load_dataset(path):
    """Loads a JSON file and returns the dataset."""
    with open(path, "r", encoding="utf-8") as file:
        return json.load(file)


def save_dataset(path, data):
    """Saves the dataset to the given path."""
    with open(path, "w", encoding="utf-8") as file:
        json.dump(data, file, indent=4)
    print(f"Saved {path.name} with {len(data)} entries.")


# === Connection Type Checks ===

def is_fully_connected(n, connections):
    """Returns True if every room is connected to every other room."""
    expected = set((i, j) for i in range(n) for j in range(i + 1, n))
    return set(map(tuple, connections)) == expected


def is_circular_connected(n, connections):
    """Returns True if rooms form a circular layout."""
    if n == 1:
        return True
    if len(connections) != n:
        return False

    graph = defaultdict(set)
    for a, b in connections:
        graph[a].add(b)
        graph[b].add(a)

    visited = set()
    queue = deque([0])

    while queue:
        node = queue.popleft()
        if node in visited:
            continue
        visited.add(node)
        queue.extend(graph[node] - visited)

    return len(visited) == n and all(len(graph[node]) == 2 for node in graph)


def is_linear_connected(n, connections):
    """Returns True if rooms form a single straight line."""
    if n == 1:
        return True
    if len(connections) != n - 1:
        return False

    graph = defaultdict(set)
    for a, b in connections:
        graph[a].add(b)
        graph[b].add(a)

    endpoints = [node for node in graph if len(graph[node]) == 1]
    if len(endpoints) != 2:
        return False

    visited = set()
    queue = deque([endpoints[0]])

    while queue:
        node = queue.popleft()
        if node in visited:
            continue
        visited.add(node)
        queue.extend(graph[node] - visited)

    return len(visited) == n


def is_central_to_room(n, connections, room_names, connection_type):
    """Checks if all rooms connect to one specific room."""
    if n == 1:
        return True

    room_counts = {}
    for a, b in connections:
        room_counts[a] = room_counts.get(a, 0) + 1
        room_counts[b] = room_counts.get(b, 0) + 1

    central_rooms = [room for room, count in room_counts.items() if count == n - 1]

    if connection_type == "central to ANY":
        return len(central_rooms) > 0

    expected_central = connection_type.replace("central to ", "").strip()
    return any(
        0 <= room < len(room_names) and room_names[room] == expected_central
        for room in central_rooms
    )


# === Main Evaluation ===

def validate_connections(entry):
    """Validates the connection type and room count in the entry."""
    output = json.loads(entry["generated_output"])
    room_names = output["room names"]
    num_rooms = len(room_names)

    entry["number_rooms_correct"] = num_rooms == entry["number_rooms"]
    connections = output.get("connections", [])
    connection_type = entry["connection_type"]

    if connection_type == "fully connected":
        entry["connection_type_correct"] = is_fully_connected(num_rooms, connections)
    elif connection_type == "circular connected":
        entry["connection_type_correct"] = is_circular_connected(num_rooms, connections)
    elif connection_type == "linear connected":
        entry["connection_type_correct"] = is_linear_connected(num_rooms, connections)
    elif connection_type.startswith("central to"):
        entry["connection_type_correct"] = is_central_to_room(num_rooms, connections, room_names, connection_type)
    else:
        entry["connection_type_correct"] = True

    return entry


def process_file(input_file, output_file):
    """Main processing function to validate connection types and update entries."""
    input_path = Path(input_file)
    output_path = Path(output_file)

    dataset = load_dataset(input_path)
    updated = [validate_connections(entry) for entry in dataset]
    save_dataset(output_path, updated)


# === Entry Point ===
INPUT_FILE = "step02.json"
OUTPUT_FILE = "step03.json"
process_file(INPUT_FILE, OUTPUT_FILE)

**Step04 - Run Validation Part 2**

In [None]:
import json
import re
from pathlib import Path


def load_dataset(path):
    """Loads a JSON file and returns the data."""
    with open(path, "r", encoding="utf-8") as file:
        return json.load(file)


def save_dataset(path, data):
    """Saves the dataset to the given path."""
    with open(path, "w", encoding="utf-8") as file:
        json.dump(data, file, indent=4)
    print(f"Saved {path.name} with {len(data)} entries.")


def map_connection_indexes_to_names(connection, room_names):
    """Convert a connection tuple of indexes to corresponding room names."""
    return tuple(room_names[idx].lower() for idx in connection if idx < len(room_names))


def validate_entries(dataset):
    """Validates each entry for rooms, connections, and asset placement."""
    for entry in dataset:
        assets_required = entry.get("assets", [])
        include_rooms = [room.lower() for room in entry.get("include_rooms", [])]
        exclude_rooms = [room.lower() for room in entry.get("exclude_rooms", [])]
        include_connections = entry.get("include_connections", [])
        exclude_connections = entry.get("exclude_connections", [])
        generated_output = json.loads(entry.get("generated_output", "{}"))
        input_text = entry.get("input", "")

        # Extract available assets from input
        available_match = re.search(r"available=\[(.*?)\]", input_text)
        if available_match:
            available_items = {item.strip().strip("'").lower() for item in available_match.group(1).split(",")}
        else:
            available_items = set()

        room_names = generated_output.get("room names", [])
        rooms = generated_output.get("rooms", {})
        connections = generated_output.get("connections", [])

        room_index_map = {str(idx): name.lower() for idx, name in enumerate(room_names)}

        # Rebuild room: [assets] dictionary with lowercase
        generated_assets = {}
        for idx, assets in rooms.items():
            room_name = room_index_map.get(idx, "")
            if room_name:
                generated_assets.setdefault(room_name, []).extend([asset.lower() for asset in assets])

        # Normalize required assets
        assets_required_lower = [(room.lower(), asset.lower()) for room, asset in assets_required]

        items_correct = []
        items_missing = []
        items_over = []
        asset_satisfied = {}

        for room_type, asset in assets_required_lower:
            check_asset = "unknown" if asset not in available_items else asset

            if room_type == "somewhere":
                found = any(check_asset in assets for assets in generated_assets.values())
            else:
                found = any(check_asset in assets for room, assets in generated_assets.items() if room_type in room)

            if found:
                asset_satisfied[(room_type, check_asset)] = True
                items_correct.append([room_type, check_asset])
            else:
                asset_satisfied.setdefault((room_type, check_asset), False)

        for (room_type, check_asset), satisfied in asset_satisfied.items():
            if not satisfied:
                items_missing.append([room_type, check_asset])

        required_assets_set = {
            (room, "unknown" if asset not in available_items else asset)
            for room, asset in assets_required_lower
        }

        for room, assets in generated_assets.items():
            for asset in assets:
                check_asset = "unknown" if asset not in available_items else asset
                if (room, check_asset) not in required_assets_set and ("somewhere", check_asset) not in required_assets_set:
                    items_over.append([room, check_asset])

        generated_rooms_set = set(generated_assets.keys())

        rooms_correct = [room for room in include_rooms if room in generated_rooms_set]
        rooms_missing = [room for room in include_rooms if room not in generated_rooms_set]
        rooms_over = [room for room in exclude_rooms if room in generated_rooms_set]

        # Map connections by room names
        generated_conn_names = {
            map_connection_indexes_to_names(conn, room_names)
            for conn in connections
        }
        expected_conn_names = {tuple(map(str.lower, conn)) for conn in include_connections}
        excluded_conn_names = {tuple(map(str.lower, conn)) for conn in exclude_connections}

        connections_correct = [conn for conn in expected_conn_names if conn in generated_conn_names]
        connections_missing = [conn for conn in expected_conn_names if conn not in generated_conn_names]
        connections_over = [conn for conn in excluded_conn_names if conn in generated_conn_names]

        # Update entry
        entry["items_correct"] = items_correct
        entry["items_missing"] = items_missing
        entry["items_over"] = items_over
        entry["rooms_correct"] = rooms_correct
        entry["rooms_missing"] = rooms_missing
        entry["rooms_over"] = rooms_over
        entry["connections_correct"] = connections_correct
        entry["connections_missing"] = connections_missing
        entry["connections_over"] = connections_over

    return dataset


def process_file(input_file, output_file):
    """Main processing function to validate asset, room, and connection correctness."""
    input_path = Path(input_file)
    output_path = Path(output_file)

    dataset = load_dataset(input_path)
    validated = validate_entries(dataset)
    save_dataset(output_path, validated)


# === Entry Point ===
INPUT_FILE = "step03.json"
OUTPUT_FILE = "step04.json"
process_file(INPUT_FILE, OUTPUT_FILE)