In [None]:
# pip install gymnasium[toy_text] numpy tensorboard tensorboardX
import time
import random
import numpy as np
import gymnasium as gym
from tensorboardX import SummaryWriter

ARROW_MAP = {0:"←", 1:"↓", 2:"→", 3:"↑"}  # Gymnasium: 0=L,1=D,2=R,3=U

# -------------------- Seeding Utils --------------------
def set_global_seed(seed: int):
    np.random.seed(seed)
    random.seed(seed)

def make_env(desc=None, map_name=None, is_slippery=False, seed: int | None = None):
    # Hinweis: desc und map_name nicht gleichzeitig setzen
    kwargs = {"is_slippery": is_slippery}
    if desc is not None:
        kwargs["desc"] = desc
    if map_name is not None:
        kwargs["map_name"] = map_name

    env = gym.make("FrozenLake-v1", **kwargs)
    # WICHTIG: seed beim reset setzen, zusätzlich Spaces seeden
    if seed is not None:
        env.reset(seed=seed)
        try:
            env.action_space.seed(seed)
            env.observation_space.seed(seed)
        except Exception:
            pass
    else:
        env.reset()
    return env

# -------------------- Algo + Eval --------------------
def run_episode(env, policy, reset_seed: int | None = None, max_steps=10_000):
    if reset_seed is None:
        obs, _ = env.reset()
    else:
        obs, _ = env.reset(seed=reset_seed)

    total_r, steps, done = 0.0, 0, False
    while not done and steps < max_steps:
        a = int(policy[obs])
        obs, r, done, truncated, _ = env.step(a)
        total_r += r
        steps += 1
        if truncated:
            break
    return total_r, steps, done

def evaluate_policy(env, policy, episodes=50, base_seed: int | None = None):
    # deterministische Folge von Episoden-Seeds
    rng = np.random.default_rng(base_seed) if base_seed is not None else None
    wins, total_return, total_steps = 0, 0.0, 0
    for _ in range(episodes):
        ep_seed = int(rng.integers(0, 2**31-1)) if rng is not None else None
        ret, steps, done = run_episode(env, policy, reset_seed=ep_seed)
        total_return += ret
        total_steps += steps
        wins += int(done and ret > 0)
    return (total_return / episodes), (wins / episodes), (total_steps / episodes)

def value_iteration(env, gamma=0.99, theta=1e-8, max_iters=10_000,
                    writer: SummaryWriter | None = None,
                    eval_every: int = 0, eval_episodes: int = 50,
                    eval_seed: int | None = None,
                    verbose=False):
    P = env.unwrapped.P
    nS = env.observation_space.n
    nA = env.action_space.n
    V = np.zeros(nS, dtype=np.float64)

    for it in range(max_iters):
        delta = 0.0
        for s in range(nS):
            v_old = V[s]
            best = -np.inf
            for a in range(nA):
                q = 0.0
                for prob, s_next, r, done in P[s][a]:
                    q += prob * (r + (0.0 if done else gamma * V[s_next]))
                if q > best:
                    best = q
            V[s] = best
            delta = max(delta, abs(v_old - V[s]))

        if writer is not None:
            writer.add_scalar("train/delta", float(delta), it)
            writer.add_scalar("train/mean_V", float(np.mean(V)), it)
            writer.add_scalar("train/max_V", float(np.max(V)), it)

        if eval_every and (it % eval_every == 0 or it == max_iters - 1):
            policy = extract_policy(env, V, gamma=gamma)
            avg_ret, success, avg_steps = evaluate_policy(env, policy, episodes=eval_episodes, base_seed=eval_seed)
            if writer is not None:
                writer.add_scalar("eval/avg_return", float(avg_ret), it)
                writer.add_scalar("eval/success_rate", float(success), it)
                writer.add_scalar("eval/avg_steps", float(avg_steps), it)

        if verbose and it % 50 == 0:
            print(f"Iter {it:4d}  delta={delta:.3e}  meanV={np.mean(V):.4f}")
        if delta < theta:
            if verbose:
                print(f"Konvergiert nach {it} Iterationen (delta={delta:.3e}).")
            break
    return V

def extract_policy(env, V, gamma=0.99):
    P = env.unwrapped.P
    nS = env.observation_space.n
    nA = env.action_space.n
    policy = np.zeros(nS, dtype=np.int64)

    for s in range(nS):
        q_values = np.zeros(nA, dtype=np.float64)
        for a in range(nA):
            for prob, s_next, r, done in P[s][a]:
                q_values[a] += prob * (r + (0.0 if done else gamma * V[s_next]))
        policy[s] = int(np.argmax(q_values))
    return policy

def render_policy_grid(env, policy):
    n = int(np.sqrt(env.observation_space.n))
    desc = env.unwrapped.desc.astype(str).copy()
    for s, a in enumerate(policy):
        r, c = divmod(s, n)
        if desc[r, c] in ("H", "G", "S"):
            continue
        desc[r, c] = ARROW_MAP[a]
    print("\nPolicy (Pfeile), S=Start, G=Goal, H=Hole:")
    for r in range(n):
        print(" ".join(desc[r]))

# -------------------- Main --------------------
def main():
    SEED = 12345  # <— ändere hier für reproduzierbare Runs
    set_global_seed(SEED)

    # Env wählen
    custom_map = ["SFFF","FHFH","FFFH","HFFG"]
    env = make_env(desc=custom_map, is_slippery=False, seed=SEED)
    # Alternativen:
    # env = make_env(map_name="4x4", is_slippery=True, seed=SEED)
    # env = make_env(map_name="4x4", is_slippery=False, seed=SEED)

    gamma = 0.99
    theta = 1e-8

    run_name = f"VI_FrozenLake_seed{SEED}_{int(time.time())}"
    writer = SummaryWriter(logdir=f"runs/{run_name}")
    writer.add_text("meta/run", run_name, 0)
    writer.add_text("meta/seed", str(SEED), 0)
    writer.add_text("meta/env", f"is_slippery={env.unwrapped.is_slippery}, map={custom_map}", 0)
    writer.add_text("meta/hparams", f"gamma={gamma}, theta={theta}", 0)

    V = value_iteration(
        env,
        gamma=gamma,
        theta=theta,
        max_iters=10_000,
        writer=writer,
        eval_every=5,          # alle 5 Iterationen evaluieren
        eval_episodes=50,
        eval_seed=SEED,        # sorgt für reproduzierbare Eval-Episoden
        verbose=True
    )
    policy = extract_policy(env, V, gamma=gamma)

    # finale Evaluation mit größerem Sample
    avg_ret, success, avg_steps = evaluate_policy(env, policy, episodes=200, base_seed=SEED)
    writer.add_scalar("final/avg_return", float(avg_ret))
    writer.add_scalar("final/success_rate", float(success))
    writer.add_scalar("final/avg_steps", float(avg_steps))
    writer.close()

    n = int(np.sqrt(env.observation_space.n))
    print("\nValue-Funktion (reshaped):")
    print(V.reshape(n, n))
    render_policy_grid(env, policy)

if __name__ == "__main__":
    main()


In [None]:
python main.py
tensorboard --logdir runs
