# Organism: A Non-Linguistic Cognitive Agent in an OS-Like Environment

This notebook implements:
- a filesystem-like environment
- a reference cognitive agent architecture
- imagination-based planning with goal conditioning

The focus is on **grounded, non-linguistic cognition**, not language or tool calling.


The notebook is organized top-down:

environment â†’ perception â†’ models â†’ planning â†’ goals â†’ evaluation.

#Environment

In [1]:
from pathlib import Path

ROOT = Path("creature_world")

DIRS = ["docs", "scripts", "data", "logs", "tmp"]

FILES = {
    "docs/notes.txt": "meeting at 3pm\n",
    "docs/todo.txt": "- buy milk\n- finish project\n",
    "scripts/sum.py": "nums = [1, 2, 3, 4]\nprint(sum(nums))\n",
    "data/numbers.txt": "1,2,3,4\n"
}

def create_world():
    if ROOT.exists():
        print("World already exists")
        return

    ROOT.mkdir()
    for d in DIRS:
        (ROOT / d).mkdir()

    for path, content in FILES.items():
        (ROOT / path).write_text(content)

    print("Creature world created.")

create_world()


Creature world created.


#Perception

In [2]:
import hashlib
import numpy as np

BASE_DIM = 64
OBJECT_DIM = 128

def hash_to_vec(text: str, dim=BASE_DIM):
    h = hashlib.sha256(text.encode()).digest()
    nums = np.frombuffer(h, dtype=np.uint8).astype(np.float32)
    vec = np.tile(nums, dim // len(nums) + 1)[:dim]
    return vec / 255.0

def project(vec, out_dim=OBJECT_DIM):
    if len(vec) > out_dim:
        return vec[:out_dim]
    elif len(vec) < out_dim:
        return np.concatenate([vec, np.zeros(out_dim - len(vec))])
    return vec

def encode_file(path: Path):
    content = path.read_text()
    meta = f"FILE|{path.suffix}|{path.parent.name}|{len(content)}"
    return project(np.concatenate([
        hash_to_vec(content),
        hash_to_vec(meta)
    ]))

def encode_dir(path: Path):
    meta = f"DIR|{path.name}|{len(list(path.iterdir()))}"
    return project(hash_to_vec(meta))

def encode_world(root: Path):
    objects = {}
    parts = []

    for item in sorted(root.rglob("*")):
        if item.is_dir():
            vec = encode_dir(item)
            key = f"dir:{item.relative_to(root)}"
        else:
            vec = encode_file(item)
            key = f"file:{item.relative_to(root)}"

        objects[key] = vec
        parts.append(vec)

    global_vec = np.mean(np.stack(parts), axis=0)

    return objects, global_vec

objects, global_vec = encode_world(ROOT)
print(len(objects), global_vec.shape)


9 (128,)


In [3]:
import subprocess

def read_file(path: Path):
    return path.read_text()

def write_file(path: Path, text: str):
    path.write_text(path.read_text() + text)

def replace_file(path: Path, text: str):
    path.write_text(text)

def create_file(dir_path: Path, name: str, text: str = ""):
    file_path = dir_path / name
    if file_path.exists():
        raise RuntimeError("File already exists")
    file_path.write_text(text)

def move_file(src: Path, dst_dir: Path):
    src.rename(dst_dir / src.name)

def run_file(path: Path):
    result = subprocess.run(
        ["python3", str(path)],
        capture_output=True,
        text=True,
        cwd=ROOT
    )
    log_path = ROOT / "logs" / f"{path.stem}.log"
    log_path.write_text(result.stdout.strip())


In [4]:
from dataclasses import dataclass

@dataclass(frozen=True)
class Action:
    type: str
    args: tuple

def apply_action(action: Action):
    t, a = action.type, action.args

    if t == "READ":
        return read_file(ROOT / a[0])

    elif t == "WRITE":
        write_file(ROOT / a[0], a[1])

    elif t == "REPLACE":
        replace_file(ROOT / a[0], a[1])

    elif t == "CREATE":
        create_file(ROOT / a[0], a[1], a[2])

    elif t == "MOVE":
        move_file(ROOT / a[0], ROOT / a[1])

    elif t == "RUN":
        run_file(ROOT / a[0])

    else:
        raise ValueError("Unknown action")


#Experience & Dataset Generation

In [5]:
import shutil

def reset_world():
    if ROOT.exists():
        shutil.rmtree(ROOT)
    create_world()


In [6]:
import random
import string

def random_text(n=20):
    return ''.join(random.choice(string.ascii_lowercase + " ") for _ in range(n))

def traj_append_random():
    file_path = random.choice([
        "docs/notes.txt",
        "docs/todo.txt"
    ])
    return [
        Action("WRITE", (file_path, "\n" + random_text()))
    ]


In [7]:
def traj_run_and_store():
    report_name = f"report_{random.randint(0,999)}.txt"
    return [
        Action("RUN", ("scripts/sum.py",)),
        Action("CREATE", ("docs", report_name, "Result:\n")),
        Action("WRITE", (f"docs/{report_name}", "See logs/sum.log\n"))
    ]


In [8]:
def traj_make_summary():
    notes = (ROOT / "docs/notes.txt").read_text()
    todo = (ROOT / "docs/todo.txt").read_text()

    summary_name = f"summary_{random.randint(0,999)}.txt"

    return [
        Action("CREATE", ("docs", summary_name, "SUMMARY\n")),
        Action("WRITE", (f"docs/{summary_name}", notes)),
        Action("WRITE", (f"docs/{summary_name}", "\n")),
        Action("WRITE", (f"docs/{summary_name}", todo)),
    ]


In [9]:
def snapshot_world():
    """
    Capture the creature's full perception of the world.
    """
    objects, global_vec = encode_world(ROOT)
    return {
        "objects": objects,
        "global": global_vec
    }


In [10]:
def rollout_trajectory(action_sequence):
    data = []

    for action in action_sequence:
        before = snapshot_world()
        apply_action(action)
        after = snapshot_world()

        data.append({
            "state_before": before,
            "action": action,
            "state_after": after
        })

    return data


In [11]:
def generate_dataset(n=100):
    dataset = []

    generators = [
        traj_append_random,
        traj_run_and_store,
        traj_make_summary
    ]

    for _ in range(n):
        reset_world()
        traj_fn = random.choice(generators)
        actions = traj_fn()
        dataset.extend(rollout_trajectory(actions))

    return dataset


In [12]:
dataset = generate_dataset(200)
len(dataset)


Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world created.
Creature world c

526

#Planning

In [13]:
import numpy as np

# ---- Action vocabulary (LOCKED) ----
ACTION_TYPES = ["READ", "WRITE", "REPLACE", "CREATE", "MOVE", "RUN"]
ACTION_TO_ID = {a: i for i, a in enumerate(ACTION_TYPES)}

def encode_action(action):
    """
    Symbolic action encoding:
    - one-hot for action type
    - simple numeric features for args length
    """
    one_hot = np.zeros(len(ACTION_TYPES), dtype=np.float32)
    one_hot[ACTION_TO_ID[action.type]] = 1.0

    # minimal, cheap arg features (do NOT overthink)
    arg_feats = np.array([
        len(action.args),
        sum(len(str(a)) for a in action.args)
    ], dtype=np.float32)

    return np.concatenate([one_hot, arg_feats])


def build_training_pairs(dataset):
    X, Y = [], []

    for sample in dataset:
        s = sample["state_before"]["global"]
        a = encode_action(sample["action"])
        s_next = sample["state_after"]["global"]

        X.append(np.concatenate([s, a]))
        Y.append(s_next)

    return np.stack(X), np.stack(Y)


X, Y = build_training_pairs(dataset)
print(X.shape, Y.shape)


(526, 136) (526, 128)


In [14]:
import torch
import torch.nn as nn
import torch.optim as optim

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

input_dim = X.shape[1]
output_dim = Y.shape[1]

class WorldModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, output_dim)
        )

    def forward(self, x):
        return self.net(x)


model = WorldModel().to(DEVICE)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()


In [15]:
# Convert to tensors
Xt = torch.tensor(X, dtype=torch.float32).to(DEVICE)
Yt = torch.tensor(Y, dtype=torch.float32).to(DEVICE)

batch_size = 64
epochs = 20

for epoch in range(epochs):
    perm = torch.randperm(len(Xt))
    total_loss = 0.0

    for i in range(0, len(Xt), batch_size):
        idx = perm[i:i+batch_size]
        xb, yb = Xt[idx], Yt[idx]

        pred = model(xb)
        loss = loss_fn(pred, yb)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * len(xb)

    avg_loss = total_loss / len(Xt)
    print(f"Epoch {epoch:02d} | loss = {avg_loss:.6f}")


Epoch 00 | loss = 0.090259
Epoch 01 | loss = 0.018719
Epoch 02 | loss = 0.009035
Epoch 03 | loss = 0.003714
Epoch 04 | loss = 0.002105
Epoch 05 | loss = 0.001615
Epoch 06 | loss = 0.001351
Epoch 07 | loss = 0.001230
Epoch 08 | loss = 0.001163
Epoch 09 | loss = 0.001125
Epoch 10 | loss = 0.001100
Epoch 11 | loss = 0.001085
Epoch 12 | loss = 0.001060
Epoch 13 | loss = 0.001044
Epoch 14 | loss = 0.001025
Epoch 15 | loss = 0.001015
Epoch 16 | loss = 0.001008
Epoch 17 | loss = 0.000987
Epoch 18 | loss = 0.000976
Epoch 19 | loss = 0.000962


In [16]:
model.eval()

with torch.no_grad():
    i = np.random.randint(len(X))
    x = Xt[i:i+1]
    y_true = Yt[i]
    y_pred = model(x)[0]

    mse = torch.mean((y_true - y_pred) ** 2).item()
    cos = torch.nn.functional.cosine_similarity(y_true, y_pred, dim=0).item()

print("MSE:", mse)
print("Cosine similarity:", cos)


MSE: 0.0025178391952067614
Cosine similarity: 0.9917025566101074


In [17]:
# Build (state -> action_type) pairs
def build_policy_data(dataset):
    X, Y = [], []

    for sample in dataset:
        state = sample["state_before"]["global"]
        action = sample["action"]

        X.append(state)
        Y.append(ACTION_TO_ID[action.type])

    return np.stack(X), np.array(Y)


Xp, Yp = build_policy_data(dataset)
print(Xp.shape, Yp.shape)


(526, 128) (526,)


In [18]:
class PolicyNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, len(ACTION_TYPES))
        )

    def forward(self, x):
        return self.net(x)


policy = PolicyNet().to(DEVICE)
optimizer = optim.Adam(policy.parameters(), lr=1e-3)
loss_fn = nn.CrossEntropyLoss()


In [19]:
Xp_t = torch.tensor(Xp, dtype=torch.float32).to(DEVICE)
Yp_t = torch.tensor(Yp, dtype=torch.long).to(DEVICE)

batch_size = 64
epochs = 20

for epoch in range(epochs):
    perm = torch.randperm(len(Xp_t))
    total_loss = 0.0
    correct = 0

    for i in range(0, len(Xp_t), batch_size):
        idx = perm[i:i+batch_size]
        xb, yb = Xp_t[idx], Yp_t[idx]

        logits = policy(xb)
        loss = loss_fn(logits, yb)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * len(xb)
        correct += (logits.argmax(dim=1) == yb).sum().item()

    avg_loss = total_loss / len(Xp_t)
    acc = correct / len(Xp_t)

    print(f"Epoch {epoch:02d} | loss = {avg_loss:.4f} | acc = {acc:.3f}")


Epoch 00 | loss = 1.6859 | acc = 0.392
Epoch 01 | loss = 1.3397 | acc = 0.608
Epoch 02 | loss = 1.1150 | acc = 0.608
Epoch 03 | loss = 1.0103 | acc = 0.608
Epoch 04 | loss = 0.9642 | acc = 0.608
Epoch 05 | loss = 0.9393 | acc = 0.608
Epoch 06 | loss = 0.9301 | acc = 0.608
Epoch 07 | loss = 0.9259 | acc = 0.608
Epoch 08 | loss = 0.9206 | acc = 0.608
Epoch 09 | loss = 0.9193 | acc = 0.608
Epoch 10 | loss = 0.9155 | acc = 0.608
Epoch 11 | loss = 0.9151 | acc = 0.608
Epoch 12 | loss = 0.9123 | acc = 0.608
Epoch 13 | loss = 0.9108 | acc = 0.608
Epoch 14 | loss = 0.9087 | acc = 0.608
Epoch 15 | loss = 0.9097 | acc = 0.608
Epoch 16 | loss = 0.9054 | acc = 0.608
Epoch 17 | loss = 0.9045 | acc = 0.608
Epoch 18 | loss = 0.9002 | acc = 0.608
Epoch 19 | loss = 0.9033 | acc = 0.608


In [20]:
def index_world_objects(snapshot):
    """
    Assign stable indices to files and directories in a snapshot.
    """
    files = []
    dirs = []

    for k in snapshot["objects"].keys():
        if k.startswith("file:"):
            files.append(k.replace("file:", ""))
        elif k.startswith("dir:"):
            dirs.append(k.replace("dir:", ""))

    files = sorted(files)
    dirs = sorted(dirs)

    file_to_id = {f: i for i, f in enumerate(files)}
    dir_to_id = {d: i for i, d in enumerate(dirs)}

    return file_to_id, dir_to_id


In [21]:
def build_argument_data(dataset):
    file_X, file_Y = [], []
    dir_X, dir_Y = [], []

    for sample in dataset:
        state = sample["state_before"]["global"]
        action = sample["action"]

        snapshot = sample["state_before"]
        file_to_id, dir_to_id = index_world_objects(snapshot)

        # Actions involving files
        if action.type in ["READ", "WRITE", "REPLACE", "RUN", "MOVE"]:
            file_path = action.args[0]
            if file_path in file_to_id:
                file_X.append(state)
                file_Y.append(file_to_id[file_path])

        # Actions involving directories
        if action.type in ["CREATE", "MOVE"]:
            dir_path = action.args[0] if action.type == "CREATE" else action.args[1]
            if dir_path in dir_to_id:
                dir_X.append(state)
                dir_Y.append(dir_to_id[dir_path])

    return (
        np.stack(file_X), np.array(file_Y),
        np.stack(dir_X), np.array(dir_Y)
    )


file_X, file_Y, dir_X, dir_Y = build_argument_data(dataset)
print(file_X.shape, dir_X.shape)


(393, 128) (133, 128)


In [22]:
class FileArgNet(nn.Module):
    def __init__(self, max_files):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, max_files)
        )

    def forward(self, x):
        return self.net(x)


class DirArgNet(nn.Module):
    def __init__(self, max_dirs):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, max_dirs)
        )

    def forward(self, x):
        return self.net(x)


In [23]:
# Determine maximum sizes seen
MAX_FILES = int(file_Y.max()) + 1
MAX_DIRS = int(dir_Y.max()) + 1

file_net = FileArgNet(MAX_FILES).to(DEVICE)
dir_net = DirArgNet(MAX_DIRS).to(DEVICE)

opt_file = optim.Adam(file_net.parameters(), lr=1e-3)
opt_dir = optim.Adam(dir_net.parameters(), lr=1e-3)

loss_fn = nn.CrossEntropyLoss()

# tensors
file_Xt = torch.tensor(file_X, dtype=torch.float32).to(DEVICE)
file_Yt = torch.tensor(file_Y, dtype=torch.long).to(DEVICE)

dir_Xt = torch.tensor(dir_X, dtype=torch.float32).to(DEVICE)
dir_Yt = torch.tensor(dir_Y, dtype=torch.long).to(DEVICE)

# train file selector
for epoch in range(10):
    logits = file_net(file_Xt)
    loss = loss_fn(logits, file_Yt)

    opt_file.zero_grad()
    loss.backward()
    opt_file.step()

    acc = (logits.argmax(1) == file_Yt).float().mean().item()
    print(f"[File] epoch {epoch} | loss {loss.item():.4f} | acc {acc:.3f}")

# train dir selector
for epoch in range(10):
    logits = dir_net(dir_Xt)
    loss = loss_fn(logits, dir_Yt)

    opt_dir.zero_grad()
    loss.backward()
    opt_dir.step()

    acc = (logits.argmax(1) == dir_Yt).float().mean().item()
    print(f"[Dir ] epoch {epoch} | loss {loss.item():.4f} | acc {acc:.3f}")


[File] epoch 0 | loss 1.3227 | acc 0.186
[File] epoch 1 | loss 1.2401 | acc 0.728
[File] epoch 2 | loss 1.1686 | acc 0.728
[File] epoch 3 | loss 1.1066 | acc 0.728
[File] epoch 4 | loss 1.0530 | acc 0.728
[File] epoch 5 | loss 1.0074 | acc 0.728
[File] epoch 6 | loss 0.9671 | acc 0.728
[File] epoch 7 | loss 0.9317 | acc 0.728
[File] epoch 8 | loss 0.9018 | acc 0.728
[File] epoch 9 | loss 0.8775 | acc 0.728
[Dir ] epoch 0 | loss 0.7122 | acc 0.000
[Dir ] epoch 1 | loss 0.6337 | acc 1.000
[Dir ] epoch 2 | loss 0.5668 | acc 1.000
[Dir ] epoch 3 | loss 0.5088 | acc 1.000
[Dir ] epoch 4 | loss 0.4618 | acc 1.000
[Dir ] epoch 5 | loss 0.4207 | acc 1.000
[Dir ] epoch 6 | loss 0.3842 | acc 1.000
[Dir ] epoch 7 | loss 0.3522 | acc 1.000
[Dir ] epoch 8 | loss 0.3240 | acc 1.000
[Dir ] epoch 9 | loss 0.2976 | acc 1.000


In [24]:
i = np.random.randint(len(file_X))
state = torch.tensor(file_X[i:i+1], dtype=torch.float32).to(DEVICE)

with torch.no_grad():
    pred_id = file_net(state).argmax(dim=1).item()

print("Predicted file id:", pred_id)
print("True file id:", file_Y[i])


Predicted file id: 2
True file id: 2


## Changes for oe

In [25]:
def compute_object_deltas(before, after, eps=1e-3):
    """
    Returns a dict: object_key -> 0/1 indicating change.
    """
    deltas = {}

    before_objs = before["objects"]
    after_objs = after["objects"]

    all_keys = set(before_objs) | set(after_objs)

    for k in all_keys:
        if k not in before_objs:
            deltas[k] = 1  # appeared
        elif k not in after_objs:
            deltas[k] = 1  # disappeared
        else:
            diff = np.linalg.norm(after_objs[k] - before_objs[k])
            deltas[k] = int(diff > eps)

    return deltas


In [26]:
def build_object_index(dataset):
    keys = set()
    for sample in dataset:
        keys |= set(sample["state_before"]["objects"].keys())
        keys |= set(sample["state_after"]["objects"].keys())

    keys = sorted(keys)
    return {k: i for i, k in enumerate(keys)}


In [27]:
OBJECT_INDEX = build_object_index(dataset)
NUM_OBJECTS = len(OBJECT_INDEX)
print("Tracked objects:", NUM_OBJECTS)


Tracked objects: 139


In [28]:
def build_object_effect_data(dataset, object_index):
    X, Y = [], []

    for sample in dataset:
        state = sample["state_before"]["global"]
        action = encode_action(sample["action"])

        deltas = compute_object_deltas(
            sample["state_before"],
            sample["state_after"]
        )

        y = np.zeros(len(object_index), dtype=np.float32)
        for k, v in deltas.items():
            if k in object_index:
                y[object_index[k]] = v

        X.append(np.concatenate([state, action]))
        Y.append(y)

    return np.stack(X), np.stack(Y)


Xe, Ye = build_object_effect_data(dataset, OBJECT_INDEX)
print(Xe.shape, Ye.shape)


(526, 136) (526, 139)


## sub-training

In [29]:
class ObjectEffectModel(nn.Module):
    def __init__(self, in_dim, out_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, 256),
            nn.ReLU(),
            nn.Linear(256, out_dim)
        )

    def forward(self, x):
        return self.net(x)


In [30]:
effect_model = ObjectEffectModel(Xe.shape[1], Ye.shape[1]).to(DEVICE)
opt = optim.Adam(effect_model.parameters(), lr=1e-3)
loss_fn = nn.BCEWithLogitsLoss()

Xt = torch.tensor(Xe, dtype=torch.float32).to(DEVICE)
Yt = torch.tensor(Ye, dtype=torch.float32).to(DEVICE)

for epoch in range(20):
    logits = effect_model(Xt)
    loss = loss_fn(logits, Yt)

    opt.zero_grad()
    loss.backward()
    opt.step()

    print(f"epoch {epoch:02d} | loss {loss.item():.4f}")


epoch 00 | loss 0.7193
epoch 01 | loss 0.6248
epoch 02 | loss 0.5410
epoch 03 | loss 0.4666
epoch 04 | loss 0.4002
epoch 05 | loss 0.3413
epoch 06 | loss 0.2895
epoch 07 | loss 0.2445
epoch 08 | loss 0.2058
epoch 09 | loss 0.1732
epoch 10 | loss 0.1460
epoch 11 | loss 0.1239
epoch 12 | loss 0.1062
epoch 13 | loss 0.0922
epoch 14 | loss 0.0813
epoch 15 | loss 0.0730
epoch 16 | loss 0.0668
epoch 17 | loss 0.0622
epoch 18 | loss 0.0588
epoch 19 | loss 0.0565


In [31]:
def action_object_mask(action_type, object_index):
    """
    Returns a binary mask over objects indicating which objects
    this action is allowed to affect.
    """
    mask = np.zeros(len(object_index), dtype=np.float32)

    for obj, i in object_index.items():
        if action_type == "RUN":
            if obj.startswith("file:logs/"):
                mask[i] = 1.0

        elif action_type == "CREATE":
            if obj.startswith("file:docs/"):
                mask[i] = 1.0

        elif action_type == "WRITE":
            if obj.startswith("file:docs/") or obj.startswith("file:data/"):
                mask[i] = 1.0

        elif action_type == "MOVE":
            if obj.startswith("file:"):
                mask[i] = 1.0

        elif action_type == "REPLACE":
            if obj.startswith("file:"):
                mask[i] = 1.0

        elif action_type == "READ":
            # READ does not change objects
            pass

    return mask


In [32]:
def predict_object_effects(snapshot, action_type, top_k=3):
    dummy = Action(action_type, ())
    a_enc = encode_action(dummy)
    x = np.concatenate([snapshot["global"], a_enc])

    x_t = torch.tensor(x, dtype=torch.float32).unsqueeze(0).to(DEVICE)

    with torch.no_grad():
        logits = effect_model(x_t)[0]
        probs = torch.sigmoid(logits).cpu().numpy()

    # apply action mask
    mask = action_object_mask(action_type, OBJECT_INDEX)
    probs = probs * mask

    # ðŸ”‘ TOP-K instead of threshold
    top_ids = probs.argsort()[::-1][:top_k]
    return {
        obj for obj, i in OBJECT_INDEX.items()
        if i in top_ids and probs[i] > 0
    }


#Imagination

In [33]:
def score_hybrid_transition(before_snap, after_global, action_type, goal):
    score = 0.0

    # 1. Weak latent signal (keeps imagination relevant)
    latent_delta = np.linalg.norm(after_global - before_snap["global"])
    score += 0.3 * latent_delta

    # 2. Symbolic effects weighted by goal
    effects = predict_object_effects(before_snap, action_type)

    for e in effects:
        if e.startswith("file:logs"):
            score += 5.0 * goal.get("logs", 0.0)
        if e.startswith("file:docs"):
            score += 3.0 * goal.get("docs", 0.0)

    # 3. Mild inaction penalty
    if action_type == "READ":
        score -= 1.0

    return score


In [34]:
def policy_priors(snapshot, top_k=3):
    state = torch.tensor(
        snapshot["global"], dtype=torch.float32
    ).unsqueeze(0).to(DEVICE)

    with torch.no_grad():
        logits = policy(state)[0]
        probs = torch.softmax(logits, dim=0).cpu().numpy()

    # top-k action indices
    top_ids = probs.argsort()[::-1][:top_k]
    return [(ACTION_TYPES[i], probs[i]) for i in top_ids]


In [35]:
def imagine_next(global_state, action_type, file_id=None, dir_id=None):
    """
    Predict next global state embedding using the world model.
    """
    dummy_action = Action(action_type, ())
    a_enc = encode_action(dummy_action)

    x = np.concatenate([global_state, a_enc])
    x_t = torch.tensor(x, dtype=torch.float32).unsqueeze(0).to(DEVICE)

    with torch.no_grad():
        pred_next = model(x_t)[0].cpu().numpy()

    return pred_next


In [36]:
def plan_one_step_guided_goal(snapshot, goal, top_k=3):
    priors = policy_priors(snapshot, top_k=top_k)

    best_score = -1e9
    best_action = None

    for action_type, prior in priors:
        pred_global = imagine_next(snapshot["global"], action_type)

        s = score_hybrid_transition(
            snapshot, pred_global, action_type, goal
        )

        # Policy prior biases but does not dominate
        s += 0.5 * np.log(prior + 1e-6)

        if s > best_score:
            best_score = s
            best_action = action_type

    return best_action, best_score


#Goal

In [37]:
GOAL_LOGS = {
    "logs": 1.0,
    "docs": 0.0,
}

GOAL_DOCS = {
    "logs": 0.0,
    "docs": 1.0,
}

GOAL_QUIET = {
    "logs": -1.0,
    "docs": 0.0,
}


In [38]:
reset_world()
snap = snapshot_world()

for name, goal in {
    "LOGS": GOAL_LOGS,
    "DOCS": GOAL_DOCS,
    "QUIET": GOAL_QUIET,
}.items():
    action, score = plan_one_step_guided_goal(snap, goal)
    print(name, "â†’", action, "| score:", score)


Creature world created.
LOGS â†’ RUN | score: 4.262936238509378
DOCS â†’ CREATE | score: 8.604660490512888
QUIET â†’ WRITE | score: -0.008814724999130108


In [39]:
snapshot = snap

for action in ACTION_TYPES:
    print(action, predict_object_effects(snap, action))


READ set()
WRITE {'file:docs/report_465.txt', 'file:docs/report_148.txt', 'file:data/numbers.txt'}
REPLACE {'file:logs/sum.log', 'file:docs/notes.txt', 'file:data/numbers.txt'}
CREATE {'file:docs/report_465.txt', 'file:docs/report_148.txt', 'file:docs/notes.txt'}
MOVE {'file:docs/report_148.txt', 'file:logs/sum.log', 'file:data/numbers.txt'}
RUN {'file:logs/sum.log'}


# Task-testing

In [40]:
def resolve_action_args(action_type, snapshot):
    """
    Minimal argument resolver for task execution.
    Uses simple heuristics (not learning yet).
    """

    if action_type == "RUN":
        return ("scripts/sum.py",)

    elif action_type == "READ":
        return ("docs/notes.txt",)

    elif action_type == "WRITE":
        return ("docs/notes.txt", "\nupdate")

    elif action_type == "REPLACE":
        return ("docs/notes.txt", "replaced content\n")

    elif action_type == "CREATE":
        return ("docs", f"task_{np.random.randint(1000)}.txt", "task output\n")

    elif action_type == "MOVE":
        return ("docs/notes.txt", "tmp")

    else:
        return ()


In [41]:
def run_task_with_trace(task, max_steps=5, verbose=True):
    reset_world()
    trace = []

    snapshot = snapshot_world()

    for step in range(max_steps):
        # Check success BEFORE acting
        if task["success"](snapshot):
            return {
                "success": True,
                "steps": step,
                "trace": trace,
                "final_snapshot": snapshot
            }

        action_type, score = plan_one_step_guided_goal(
            snapshot, task["goal"]
        )

        args = resolve_action_args(action_type, snapshot)
        action = Action(action_type, args)

        before = snapshot
        apply_action(action)
        snapshot = snapshot_world()

        # Record trace entry
        trace.append({
            "step": step,
            "action": action_type,
            "score": score,
            "objects_before": set(before["objects"].keys()),
            "objects_after": set(snapshot["objects"].keys())
        })

        if verbose:
            print(f"[{step}] ACTION = {action_type:6s} | score = {score:.3f}")

    # Final success check
    return {
        "success": task["success"](snapshot),
        "steps": max_steps,
        "trace": trace,
        "final_snapshot": snapshot
    }

In [42]:
TASK_LOG = {
    "name": "Generate Log",
    "goal": GOAL_LOGS,
    "success": lambda snap: "file:logs/sum.log" in snap["objects"]
}

result = run_task_with_trace(TASK_LOG, max_steps=3)
print("SUCCESS:", result["success"])


Creature world created.
[0] ACTION = RUN    | score = 4.263
SUCCESS: True
