In [None]:
import gymnasium as gym

# Create the FrozenLake environment
env = gym.make("FrozenLake-v1", is_slippery=False)

# Reset the environment to get the initial observation
observation, info = env.reset()

print("Environment created: FrozenLake-v1")
print(f"Observation Space: {env.observation_space}")
print(f"Action Space: {env.action_space}")

# You can close the environment when you're done
# env.close()

Environment created: FrozenLake-v1
Observation Space: Discrete(16)
Action Space: Discrete(4)


###1

Gt = sum k=0 to T-t-1 gamma*Rt+k+1

V(s) = V(s) + alpha*(Gt - V(s)) # first visit

In [None]:
import numpy as np

# Number of states and actions
n_states = env.observation_space.n
n_actions = env.action_space.n

# Initialize value function
V = np.zeros(n_states)

# Policy: choose each action uniformly
policy = np.ones((n_states, n_actions)) / n_actions

# Hyperparameters
gamma = 0.99
alpha = 0.1
episodes = 5000

def run_episode():
    """Generate an episode following the policy: returns [(s,a,r), ...]."""
    episode = []
    state, _ = env.reset()

    done = False
    while not done:
        action = np.random.choice(n_actions, p=policy[state])
        next_state, reward, terminated, truncated, _ = env.step(action)
        episode.append((state, action, reward))
        state = next_state
        done = terminated or truncated
    return episode

Returns = {s: [] for s in range(n_states)}

for ep in range(episodes):
    episode = run_episode()

    # Compute returns from the episode
    G = 0
    visited = set()
    for t in reversed(range(len(episode))):
        s, a, r = episode[t]
        G = r + gamma * G

        # First-visit MC
        if s not in visited:
            visited.add(s)
            Returns[s].append(G)
            V[s] += alpha * (G - V[s])  # incremental mean update

print("\nMonte Carlo Policy Evaluation DONE.")
print("Estimated V(s):")
print(V.reshape(4, 4))



Monte Carlo Policy Evaluation DONE.
Estimated V(s):
[[2.58979844e-06 1.43355892e-04 1.12450282e-02 2.32203828e-02]
 [2.54643482e-11 0.00000000e+00 4.73198985e-02 0.00000000e+00]
 [2.35150840e-05 5.90534888e-02 1.10505890e-01 0.00000000e+00]
 [0.00000000e+00 1.96035920e-01 3.75237594e-01 0.00000000e+00]]


###2

Vk+1(s) = max(a) sum s',r P(s',r|s,a)[r+ gamma * Vk(s')]

pi(s) = arg max(a) sum s',r P(s',r|s,a)[r+ gamma * V(s')]

In [None]:
import numpy as np

# ---------- Hyperparameters ----------
gamma = 0.99        # discount factor
theta = 1e-6        # small threshold for stopping
max_iterations = 10000

# ---------- Helpers ----------
n_states = env.observation_space.n
n_actions = env.action_space.n

# mapping action index -> human readible
action_map = {0: "Left", 1: "Down", 2: "Right", 3: "Up"}

# Value array
V = np.zeros(n_states)

# Access the MDP model from gym's FrozenLake (env.P)
# env.P[state][action] -> list of (prob, next_state, reward, done, info)
P = env.unwrapped.P

# ---------- Value Iteration ----------
for it in range(max_iterations):
    delta = 0.0
    for s in range(n_states):
        v_old = V[s]
        q_values = np.zeros(n_actions)
        for a in range(n_actions):
            for transition in P[s][a]:
                prob, s_next, reward, done = transition[:4]
                q_values[a] += prob * (reward + gamma * V[s_next])
        V[s] = np.max(q_values)
        delta = max(delta, abs(v_old - V[s]))
    if delta < theta:
        print(f"Value iteration converged in {it+1} iterations (delta={delta:.2e}).")
        break
else:
    print(f"Value iteration stopped after max iterations ({max_iterations}). delta={delta:.2e}")

# ---------- Extract deterministic greedy policy ----------
policy = np.zeros(n_states, dtype=int)  # store best action index per state
for s in range(n_states):
    q_values = np.zeros(n_actions)
    for a in range(n_actions):
        for transition in P[s][a]:
            prob, s_next, reward, done = transition[:4]
            q_values[a] += prob * (reward + gamma * V[s_next])
    best_a = np.argmax(q_values)
    policy[s] = best_a

# ---------- Print results ----------
print("\nEstimated optimal V(s) (4x4):")
print(V.reshape(4, 4))

print("\nGreedy policy (as action names) for each state (0..15):")
policy_actions = [action_map[a] for a in policy]
for r in range(4):
    row = policy_actions[r*4:(r+1)*4]
    print(row)

# Example: rollout from initial state (0) following the greedy policy
def rollout_from(start_state=0, max_steps=50, render=False):
    state = start_state
    # set env to the given state by resetting and stepping until that state is reached
    # simpler approach for gymnasium: use env.reset() and step using policy until you reach start_state
    # but easiest: use env.unwrapped.s = start_state (works for FrozenLake)
    try:
        env.unwrapped.s = start_state
    except Exception:
        # fallback: perform steps from reset
        state, _ = env.reset()
        while state != start_state:
            a = 0  # move left until wraps (cheap fallback; deterministic FrozenLake may reach target)
            state, _, terminated, truncated, _ = env.step(a)
            if terminated or truncated:
                state, _ = env.reset()
        env.unwrapped.s = start_state

    state = start_state
    traj = [state]
    for t in range(max_steps):
        a = policy[state]
        next_state, reward, terminated, truncated, _ = env.step(a)
        traj.append(next_state)
        state = next_state
        if terminated or truncated:
            break
    return traj

print("\nRollout following greedy policy from state 0:")
print(rollout_from(0))


Value iteration converged in 7 iterations (delta=0.00e+00).

Estimated optimal V(s) (4x4):
[[0.95099005 0.96059601 0.970299   0.96059601]
 [0.96059601 0.         0.9801     0.        ]
 [0.970299   0.9801     0.99       0.        ]
 [0.         0.99       1.         0.        ]]

Greedy policy (as action names) for each state (0..15):
['Down', 'Right', 'Down', 'Left']
['Down', 'Left', 'Down', 'Left']
['Right', 'Down', 'Down', 'Left']
['Left', 'Right', 'Right', 'Left']

Rollout following greedy policy from state 0:
[0, 4, 8, 9, 13, 14, 15]


###3

at = pi(st)

In [None]:
import numpy as np

# --- Assumptions (set these if not already present) ---
n_states = env.observation_space.n
n_actions = env.action_space.n
gamma = 0.99

# If you already have `policy` from value-iteration, keep it.
# Otherwise, if you have V, compute greedy policy:
# policy = np.zeros(n_states, dtype=int)
# for s in range(n_states):
#     q_vals = np.zeros(n_actions)
#     for a in range(n_actions):
#         for (prob, s_next, reward, done, info) in env.P[s][a]:
#             q_vals[a] += prob * (reward + gamma * V[s_next])
#     policy[s] = np.argmax(q_vals)

# ---------- Rollout utilities ----------
def set_env_state(env, start_state):
    """
    Try to set the FrozenLake internal state to `start_state`.
    Gym environments vary; FrozenLake typically supports env.unwrapped.s assignment.
    If that fails, the function returns False and we fallback to resetting only.
    """
    try:
        env.reset()  # ensure env is initialized using new API (returns obs, info)
        env.unwrapped.s = int(start_state)
        return True
    except Exception:
        return False

def rollout_once(env, policy, start_state=0, render=False, max_steps=100):
    """
    Perform one rollout following the (deterministic) policy from `start_state`.
    Returns: trajectory (states), actions, total_reward, discounted_return, steps, terminated_flag
    """
    # Try to put env in given start state
    ok = set_env_state(env, start_state)
    if not ok:
        # fallback: reset env and start from whatever initial state we get
        obs, info = env.reset()
        current_state = int(obs)
        if current_state != start_state:
            # warn and proceed from actual reset state (safe fallback)
            print(f"[warning] couldn't set start_state to {start_state}. Beginning from reset state {current_state}.")
    else:
        # In this case, env.unwrapped.s is set; but we should still call reset to sync episode internals
        obs, info = env.reset()
        try:
            # some Gym versions require stepping after manual s set, but generally this is fine
            current_state = start_state
        except Exception:
            current_state = int(obs)

    traj_states = [int(current_state)]
    traj_actions = []
    total_reward = 0.0
    discounted_return = 0.0
    discount = 1.0
    terminated = False
    truncated = False

    for t in range(max_steps):
        a = int(policy[int(current_state)])
        traj_actions.append(a)
        next_obs, reward, terminated, truncated, info = env.step(a)
        next_state = int(next_obs)
        traj_states.append(next_state)

        total_reward += reward
        discounted_return += discount * reward
        discount *= gamma

        current_state = next_state
        if render:
            env.render()  # FrozenLake prints ascii grid

        if terminated or truncated:
            break

    return {
        "states": traj_states,
        "actions": traj_actions,
        "total_reward": total_reward,
        "discounted_return": discounted_return,
        "steps": len(traj_actions),
        "done": terminated or truncated
    }

def rollout_stats(env, policy, start_state=0, n_episodes=20, render=False):
    """Run multiple rollouts and print aggregate statistics."""
    results = []
    for i in range(n_episodes):
        res = rollout_once(env, policy, start_state=start_state, render=render)
        results.append(res)

    avg_total = np.mean([r["total_reward"] for r in results])
    avg_disc = np.mean([r["discounted_return"] for r in results])
    avg_steps = np.mean([r["steps"] for r in results])
    success_rate = np.mean([1.0 if r["total_reward"] > 0 else 0.0 for r in results])

    print(f"\nRollout stats over {n_episodes} episodes (start_state={start_state}):")
    print(f"  Avg total reward      : {avg_total:.3f}")
    print(f"  Avg discounted return : {avg_disc:.3f}")
    print(f"  Avg steps per episode : {avg_steps:.2f}")
    print(f"  Success rate (reward>0): {success_rate*100:.1f}%")
    return results

# ---------- Example usage ----------
# ensure `policy` exists here. If you ran Value Iteration earlier you already have it.
# Example: policy = ... (array length 16)

# Replace with your policy if needed:
# policy = np.array([0,0,1,1, ...])  # example length 16

# Run and inspect one rollout:
res = rollout_once(env, policy, start_state=0, render=False)
print("\nSingle rollout result:")
print(" States:", res["states"])
print(" Actions:", res["actions"])
print(f" Total reward: {res['total_reward']}, Discounted return: {res['discounted_return']:.3f}, Steps: {res['steps']}, Done: {res['done']}")

# Run aggregate statistics (useful when policy may be stochastic)
rollout_stats(env, policy, start_state=0, n_episodes=50)



Single rollout result:
 States: [0, 4, 8, 9, 13, 14, 15]
 Actions: [1, 1, 2, 1, 2, 2]
 Total reward: 1.0, Discounted return: 0.951, Steps: 6, Done: True

Rollout stats over 50 episodes (start_state=0):
  Avg total reward      : 1.000
  Avg discounted return : 0.951
  Avg steps per episode : 6.00
  Success rate (reward>0): 100.0%


[{'states': [0, 4, 8, 9, 13, 14, 15],
  'actions': [1, 1, 2, 1, 2, 2],
  'total_reward': 1.0,
  'discounted_return': 0.9509900498999999,
  'steps': 6,
  'done': True},
 {'states': [0, 4, 8, 9, 13, 14, 15],
  'actions': [1, 1, 2, 1, 2, 2],
  'total_reward': 1.0,
  'discounted_return': 0.9509900498999999,
  'steps': 6,
  'done': True},
 {'states': [0, 4, 8, 9, 13, 14, 15],
  'actions': [1, 1, 2, 1, 2, 2],
  'total_reward': 1.0,
  'discounted_return': 0.9509900498999999,
  'steps': 6,
  'done': True},
 {'states': [0, 4, 8, 9, 13, 14, 15],
  'actions': [1, 1, 2, 1, 2, 2],
  'total_reward': 1.0,
  'discounted_return': 0.9509900498999999,
  'steps': 6,
  'done': True},
 {'states': [0, 4, 8, 9, 13, 14, 15],
  'actions': [1, 1, 2, 1, 2, 2],
  'total_reward': 1.0,
  'discounted_return': 0.9509900498999999,
  'steps': 6,
  'done': True},
 {'states': [0, 4, 8, 9, 13, 14, 15],
  'actions': [1, 1, 2, 1, 2, 2],
  'total_reward': 1.0,
  'discounted_return': 0.9509900498999999,
  'steps': 6,
  'done':

###4

Q(s,a) = Q(s,a) + alpha * (r + gamma * max(a') Q(s',a') - Q(s,a))

In [None]:
import numpy as np
import random
import time
import os
import pickle

# ---------- Hyperparameters ----------
n_states = env.observation_space.n
n_actions = env.action_space.n

alpha = 0.8          # learning rate (0 < alpha <= 1)
gamma = 0.99         # discount factor
epsilon_start = 1.0  # initial exploration prob
epsilon_min = 0.01
epsilon_decay = 0.9995  # multiplicative decay per episode

episodes = 10000     # training episodes
max_steps = 100      # max steps per episode

eval_every = 500     # evaluate policy every N episodes
eval_episodes = 200  # episodes for evaluation

q_table_path = "q_table.pkl"

# ---------- Initialize Q-table ----------
Q = np.zeros((n_states, n_actions), dtype=float)

# ---------- Utilities ----------
def epsilon_greedy_action(state, epsilon):
    """Return action using epsilon-greedy policy derived from Q."""
    if random.random() < epsilon:
        return random.randint(0, n_actions - 1)
    else:
        return int(np.argmax(Q[state]))

def save_q_table(path=q_table_path):
    with open(path, "wb") as f:
        pickle.dump(Q, f)
    print(f"[info] Q-table saved to {path} (shape={Q.shape})")

def load_q_table(path=q_table_path):
    global Q
    if os.path.exists(path):
        with open(path, "rb") as f:
            Q = pickle.load(f)
        print(f"[info] Q-table loaded from {path}")
    else:
        print(f"[info] No Q-table found at {path}. Starting fresh.")

def evaluate_policy(n_eval=100, render=False):
    """Evaluate greedy policy derived from Q (no exploration)."""
    returns = []
    successes = 0
    for _ in range(n_eval):
        obs, info = env.reset()
        state = int(obs)
        total_reward = 0.0
        for t in range(max_steps):
            action = int(np.argmax(Q[state]))
            next_obs, reward, terminated, truncated, info = env.step(action)
            state = int(next_obs)
            total_reward += reward
            if terminated or truncated:
                break
        returns.append(total_reward)
        if total_reward > 0:
            successes += 1
        if render:
            env.render()
    avg_return = np.mean(returns)
    success_rate = successes / n_eval
    return avg_return, success_rate

# ---------- Training Loop ----------
def train_q_learning(episodes=episodes):
    epsilon = epsilon_start
    training_start = time.time()
    best_eval = -np.inf

    for ep in range(1, episodes + 1):
        obs, info = env.reset()                 # Gymnasium reset -> obs, info
        state = int(obs)
        total_reward = 0.0

        for t in range(max_steps):
            action = epsilon_greedy_action(state, epsilon)
            next_obs, reward, terminated, truncated, info = env.step(action)
            next_state = int(next_obs)

            # Q-learning update (off-policy)
            td_target = reward + gamma * np.max(Q[next_state])
            td_error = td_target - Q[state, action]
            Q[state, action] += alpha * td_error

            state = next_state
            total_reward += reward

            if terminated or truncated:
                break

        # decay epsilon
        epsilon = max(epsilon_min, epsilon * epsilon_decay)

        # periodic evaluation & logging
        if ep % eval_every == 0 or ep == 1:
            avg_return, success_rate = evaluate_policy(n_eval=eval_episodes)
            elapsed = time.time() - training_start
            print(f"[ep {ep}/{episodes}] eps={epsilon:.4f}  avg_return={avg_return:.3f}  success_rate={success_rate*100:.1f}%  elapsed={elapsed:.1f}s")
            # optionally save best Q
            if avg_return > best_eval:
                best_eval = avg_return
                save_q_table()

    print("Training complete.")
    return Q

# ---------- Example usage ----------
if __name__ == "__main__":
    # (optional) warm start from saved Q-table
    load_q_table()

    # Train
    trained_Q = train_q_learning(episodes=episodes)

    # Final evaluation
    avg_return, success_rate = evaluate_policy(n_eval=1000)
    print(f"\nFinal evaluation over 1000 episodes: avg_return={avg_return:.3f}, success_rate={success_rate*100:.2f}%")

    # Show derived greedy policy (action indices)
    greedy_policy = np.argmax(trained_Q, axis=1)
    print("\nGreedy policy (state 0..15):")
    print(greedy_policy.reshape(4, 4))


[info] Q-table loaded from q_table.pkl
[ep 1/10000] eps=0.9995  avg_return=1.000  success_rate=100.0%  elapsed=0.0s
[info] Q-table saved to q_table.pkl (shape=(16, 4))
[ep 500/10000] eps=0.7788  avg_return=1.000  success_rate=100.0%  elapsed=0.1s
[ep 1000/10000] eps=0.6065  avg_return=1.000  success_rate=100.0%  elapsed=0.3s
[ep 1500/10000] eps=0.4723  avg_return=1.000  success_rate=100.0%  elapsed=0.4s
[ep 2000/10000] eps=0.3678  avg_return=1.000  success_rate=100.0%  elapsed=0.5s
[ep 2500/10000] eps=0.2864  avg_return=1.000  success_rate=100.0%  elapsed=0.6s
[ep 3000/10000] eps=0.2230  avg_return=1.000  success_rate=100.0%  elapsed=0.8s
[ep 3500/10000] eps=0.1737  avg_return=1.000  success_rate=100.0%  elapsed=0.9s
[ep 4000/10000] eps=0.1353  avg_return=1.000  success_rate=100.0%  elapsed=1.0s
[ep 4500/10000] eps=0.1053  avg_return=1.000  success_rate=100.0%  elapsed=1.1s
[ep 5000/10000] eps=0.0820  avg_return=1.000  success_rate=100.0%  elapsed=1.2s
[ep 5500/10000] eps=0.0639  avg_r

###5

V(s) = V(s) + alpha * (r + gamma * V(s') - V(s))

TD target = r + gamma * V(s')

In [None]:
import numpy as np

# ---------- TD(0) hyperparameters ----------
n_states = env.observation_space.n
n_actions = env.action_space.n

gamma = 0.99    # discount factor
alpha = 0.1     # learning rate
episodes = 10000
max_steps = 100

# Use a fixed policy for prediction. Here: uniform random policy π(a|s) = 1/|A|
policy = np.ones((n_states, n_actions)) / n_actions

# Initialize value function V(s)
V = np.zeros(n_states, dtype=float)

def td0_episode(V, alpha, gamma, policy, max_steps=100):
    """
    Run one episode following 'policy' and do online TD(0) updates for V.
    Returns cumulative (undiscounted) reward for logging.
    """
    obs, info = env.reset()
    state = int(obs)
    total_reward = 0.0

    for t in range(max_steps):
        # Sample action according to policy[state]
        action = np.random.choice(n_actions, p=policy[state])

        next_obs, reward, terminated, truncated, info = env.step(action)
        next_state = int(next_obs)

        # TD(0) update: V(s) += alpha * (r + gamma * V(s') - V(s))
        td_target = reward + gamma * V[next_state]
        td_error = td_target - V[state]
        V[state] += alpha * td_error

        total_reward += reward
        state = next_state

        if terminated or truncated:
            break

    return total_reward

# ---------- Training loop ----------
print("[TD(0)] Starting training...")
rewards = []
for ep in range(1, episodes + 1):
    r = td0_episode(V, alpha, gamma, policy, max_steps=max_steps)
    rewards.append(r)

    # simple logging
    if ep % 500 == 0 or ep == 1:
        avg_recent = np.mean(rewards[-500:]) if len(rewards) >= 500 else np.mean(rewards)
        print(f"[ep {ep}/{episodes}] avg_reward(last up to 500)={avg_recent:.3f}")

print("[TD(0)] Training finished.")
print("Estimated state values V(s) (reshaped 4x4):")
print(V.reshape(4, 4))

# ---------- Quick evaluation: compare to Monte Carlo approximate values ----------
# (optional) If you have MC returns or a ground truth V from value iteration, compare here.
# Example: if you have V_true from value iteration, compute MSE:
# mse = np.mean((V - V_true)**2)
# print(f"MSE vs value-iteration: {mse:.6f}")


[TD(0)] Starting training...
[ep 1/10000] avg_reward(last up to 500)=0.000
[ep 500/10000] avg_reward(last up to 500)=0.010
[ep 1000/10000] avg_reward(last up to 500)=0.014
[ep 1500/10000] avg_reward(last up to 500)=0.014
[ep 2000/10000] avg_reward(last up to 500)=0.010
[ep 2500/10000] avg_reward(last up to 500)=0.016
[ep 3000/10000] avg_reward(last up to 500)=0.012
[ep 3500/10000] avg_reward(last up to 500)=0.024
[ep 4000/10000] avg_reward(last up to 500)=0.014
[ep 4500/10000] avg_reward(last up to 500)=0.016
[ep 5000/10000] avg_reward(last up to 500)=0.016
[ep 5500/10000] avg_reward(last up to 500)=0.016
[ep 6000/10000] avg_reward(last up to 500)=0.010
[ep 6500/10000] avg_reward(last up to 500)=0.018
[ep 7000/10000] avg_reward(last up to 500)=0.002
[ep 7500/10000] avg_reward(last up to 500)=0.022
[ep 8000/10000] avg_reward(last up to 500)=0.018
[ep 8500/10000] avg_reward(last up to 500)=0.016
[ep 9000/10000] avg_reward(last up to 500)=0.014
[ep 9500/10000] avg_reward(last up to 500)=0

###6

Eligibility trace update:

e(s) = gamma * lambda * e(s) + 1(s=st)

TD error:

delta_t = r + gamma * V(s') - V(s)

Value update:

V(s) = V(s) + alpha * delta_t * e(s)

In [None]:
import numpy as np

# ---------- Hyperparameters ----------
n_states = env.observation_space.n
n_actions = env.action_space.n

gamma = 0.99          # discount factor
alpha = 0.1           # learning rate for the value function
_lambda = 0.8         # trace-decay parameter (often called lambda)
episodes = 10000
max_steps = 200

# Choose trace type: 'accumulating' or 'replacing'
trace_type = "accumulating"  # or "replacing"

# Fixed policy for prediction: uniform random (change if you want to evaluate another policy)
policy = np.ones((n_states, n_actions)) / n_actions

# Initialize state-value function
V = np.zeros(n_states, dtype=float)

def run_episode_td_lambda(V, alpha, gamma, _lambda, policy, trace_type="accumulating", max_steps=200):
    """
    Run one episode and update V using backward-view TD(lambda) with eligibility traces.
    Returns total undiscounted reward for logging.
    """
    obs, info = env.reset()
    state = int(obs)

    # Eligibility traces (one per state)
    e = np.zeros_like(V, dtype=float)

    total_reward = 0.0

    for t in range(max_steps):
        action = np.random.choice(n_actions, p=policy[state])
        next_obs, reward, terminated, truncated, info = env.step(action)
        next_state = int(next_obs)

        # TD error
        td_target = reward + gamma * V[next_state]
        delta = td_target - V[state]

        # Update eligibility traces (backward view)
        if trace_type == "accumulating":
            e = gamma * _lambda * e
            e[state] += 1.0
        elif trace_type == "replacing":
            e = gamma * _lambda * e
            # replacing: set trace for current state to 1 (not add)
            e[state] = 1.0
        else:
            raise ValueError("trace_type must be 'accumulating' or 'replacing'")

        # Update all state values
        V += alpha * delta * e

        total_reward += reward
        state = next_state

        if terminated or truncated:
            break

    return total_reward

# ---------- Training loop ----------
print(f"[TD(lambda)] Starting training (trace_type={trace_type})...")
rewards = []
for ep in range(1, episodes + 1):
    r = run_episode_td_lambda(V, alpha, gamma, _lambda, policy, trace_type=trace_type, max_steps=max_steps)
    rewards.append(r)

    if ep % 500 == 0 or ep == 1:
        avg_recent = np.mean(rewards[-500:]) if len(rewards) >= 500 else np.mean(rewards)
        print(f"[ep {ep}/{episodes}] avg_reward(last up to 500)={avg_recent:.3f}")

print("[TD(lambda)] Training finished.")
print("Estimated state values V(s) (reshaped 4x4):")
print(V.reshape(4, 4))

# ---------- Optional comparisons ----------
# - Compare V to TD(0) or Monte Carlo estimates if you computed them earlier
# Example:
# if 'V_td0' in globals():
#     mse_td0 = np.mean((V - V_td0)**2)
#     print(f"MSE vs TD(0): {mse_td0:.6f}")
#
# if 'V_mc' in globals():
#     mse_mc = np.mean((V - V_mc)**2)
#     print(f"MSE vs MC: {mse_mc:.6f}")


[TD(lambda)] Starting training (trace_type=accumulating)...
[ep 1/10000] avg_reward(last up to 500)=0.000
[ep 500/10000] avg_reward(last up to 500)=0.008
[ep 1000/10000] avg_reward(last up to 500)=0.010
[ep 1500/10000] avg_reward(last up to 500)=0.026
[ep 2000/10000] avg_reward(last up to 500)=0.018
[ep 2500/10000] avg_reward(last up to 500)=0.018
[ep 3000/10000] avg_reward(last up to 500)=0.012
[ep 3500/10000] avg_reward(last up to 500)=0.012
[ep 4000/10000] avg_reward(last up to 500)=0.004
[ep 4500/10000] avg_reward(last up to 500)=0.016
[ep 5000/10000] avg_reward(last up to 500)=0.014
[ep 5500/10000] avg_reward(last up to 500)=0.010
[ep 6000/10000] avg_reward(last up to 500)=0.008
[ep 6500/10000] avg_reward(last up to 500)=0.026
[ep 7000/10000] avg_reward(last up to 500)=0.014
[ep 7500/10000] avg_reward(last up to 500)=0.016
[ep 8000/10000] avg_reward(last up to 500)=0.016
[ep 8500/10000] avg_reward(last up to 500)=0.014
[ep 9000/10000] avg_reward(last up to 500)=0.006
[ep 9500/1000

###7

Q(s,a) = Q(s,a) + alpha * (r+ gamma * Q(s', a') - Q(s,a))

In [None]:
import numpy as np
import random
import pickle
import os
import time

# ---------- Hyperparameters ----------
n_states = env.observation_space.n
n_actions = env.action_space.n

alpha = 0.8            # learning rate
gamma = 0.99           # discount factor
epsilon_start = 1.0
epsilon_min = 0.01
epsilon_decay = 0.9995

episodes = 8000
max_steps = 100

eval_every = 400
eval_episodes = 200

q_table_path = "sarsa_q_table.pkl"

# ---------- Initialize Q-table ----------
Q = np.zeros((n_states, n_actions), dtype=float)

# ---------- Utilities ----------
def epsilon_greedy_action(state, epsilon):
    if random.random() < epsilon:
        return random.randint(0, n_actions - 1)
    else:
        return int(np.argmax(Q[state]))

def save_q_table(path=q_table_path):
    with open(path, "wb") as f:
        pickle.dump(Q, f)
    print(f"[info] SARSA Q-table saved to {path}")

def load_q_table(path=q_table_path):
    global Q
    if os.path.exists(path):
        with open(path, "rb") as f:
            Q = pickle.load(f)
        print(f"[info] SARSA Q-table loaded from {path}")
    else:
        print(f"[info] No SARSA Q-table found at {path}. Starting fresh.")

def evaluate_policy(n_eval=100, render=False):
    """Evaluate greedy policy derived from Q (no exploration)."""
    returns = []
    successes = 0
    for _ in range(n_eval):
        obs, info = env.reset()
        state = int(obs)
        total_reward = 0.0
        for t in range(max_steps):
            action = int(np.argmax(Q[state]))
            next_obs, reward, terminated, truncated, info = env.step(action)
            state = int(next_obs)
            total_reward += reward
            if terminated or truncated:
                break
        returns.append(total_reward)
        if total_reward > 0:
            successes += 1
        if render:
            env.render()
    return np.mean(returns), successes / n_eval

# ---------- Training loop (SARSA) ----------
def train_sarsa(episodes=episodes):
    epsilon = epsilon_start
    start_time = time.time()
    best_eval = -np.inf

    for ep in range(1, episodes + 1):
        obs, info = env.reset()
        state = int(obs)
        action = epsilon_greedy_action(state, epsilon)
        total_reward = 0.0

        for t in range(max_steps):
            next_obs, reward, terminated, truncated, info = env.step(action)
            next_state = int(next_obs)

            # pick next action using current policy (epsilon-greedy)
            next_action = epsilon_greedy_action(next_state, epsilon)

            # SARSA update (on-policy)
            td_target = reward + gamma * Q[next_state, next_action]
            td_error = td_target - Q[state, action]
            Q[state, action] += alpha * td_error

            state = next_state
            action = next_action
            total_reward += reward

            if terminated or truncated:
                break

        # decay epsilon
        epsilon = max(epsilon_min, epsilon * epsilon_decay)

        # periodic evaluation
        if ep % eval_every == 0 or ep == 1:
            avg_return, success_rate = evaluate_policy(n_eval=eval_episodes)
            elapsed = time.time() - start_time
            print(f"[ep {ep}/{episodes}] eps={epsilon:.4f} avg_return={avg_return:.3f} success_rate={success_rate*100:.1f}% elapsed={elapsed:.1f}s")
            if avg_return > best_eval:
                best_eval = avg_return
                save_q_table()

    print("SARSA training complete.")
    return Q

# ---------- Example usage ----------
if __name__ == "__main__":
    # optionally load from previous run
    load_q_table()

    trained_Q = train_sarsa(episodes=episodes)

    avg_return, success_rate = evaluate_policy(n_eval=1000)
    print(f"\nFinal evaluation (1000 eps): avg_return={avg_return:.3f}, success_rate={success_rate*100:.2f}%")
    print("Derived greedy policy (action indices, reshaped 4x4):")
    print(np.argmax(trained_Q, axis=1).reshape(4, 4))


[info] SARSA Q-table loaded from sarsa_q_table.pkl
[ep 1/8000] eps=0.9995 avg_return=1.000 success_rate=100.0% elapsed=0.0s
[info] SARSA Q-table saved to sarsa_q_table.pkl
[ep 400/8000] eps=0.8187 avg_return=0.000 success_rate=0.0% elapsed=0.3s
[ep 800/8000] eps=0.6703 avg_return=1.000 success_rate=100.0% elapsed=0.4s
[ep 1200/8000] eps=0.5487 avg_return=1.000 success_rate=100.0% elapsed=0.4s
[ep 1600/8000] eps=0.4492 avg_return=0.000 success_rate=0.0% elapsed=0.7s
[ep 2000/8000] eps=0.3678 avg_return=0.000 success_rate=0.0% elapsed=1.1s
[ep 2400/8000] eps=0.3011 avg_return=1.000 success_rate=100.0% elapsed=1.2s
[ep 2800/8000] eps=0.2465 avg_return=0.000 success_rate=0.0% elapsed=1.5s
[ep 3200/8000] eps=0.2018 avg_return=0.000 success_rate=0.0% elapsed=1.8s
[ep 3600/8000] eps=0.1652 avg_return=0.000 success_rate=0.0% elapsed=2.2s
[ep 4000/8000] eps=0.1353 avg_return=1.000 success_rate=100.0% elapsed=2.3s
[ep 4400/8000] eps=0.1107 avg_return=1.000 success_rate=100.0% elapsed=2.4s
[ep 48

###8

Q(s,a) = Q(s,a) + alpha * (r + gamma * max(a') Q(s',a') - Q(s,a))

In [None]:
import numpy as np
import random
import time
import os
import pickle

# ---------- Hyperparameters ----------
n_states = env.observation_space.n
n_actions = env.action_space.n

alpha = 0.8          # learning rate (0 < alpha <= 1)
gamma = 0.99         # discount factor
epsilon_start = 1.0  # initial exploration prob
epsilon_min = 0.01
epsilon_decay = 0.9995  # multiplicative decay per episode

episodes = 10000     # training episodes
max_steps = 100      # max steps per episode

eval_every = 500     # evaluate policy every N episodes
eval_episodes = 200  # episodes for evaluation

q_table_path = "q_table.pkl"

# ---------- Initialize Q-table ----------
Q = np.zeros((n_states, n_actions), dtype=float)

# ---------- Utilities ----------
def epsilon_greedy_action(state, epsilon):
    """Return action using epsilon-greedy policy derived from Q."""
    if random.random() < epsilon:
        return random.randint(0, n_actions - 1)
    else:
        return int(np.argmax(Q[state]))

def save_q_table(path=q_table_path):
    with open(path, "wb") as f:
        pickle.dump(Q, f)
    print(f"[info] Q-table saved to {path} (shape={Q.shape})")

def load_q_table(path=q_table_path):
    global Q
    if os.path.exists(path):
        with open(path, "rb") as f:
            Q = pickle.load(f)
        print(f"[info] Q-table loaded from {path}")
    else:
        print(f"[info] No Q-table found at {path}. Starting fresh.")

def evaluate_policy(n_eval=100, render=False):
    """Evaluate greedy policy derived from Q (no exploration)."""
    returns = []
    successes = 0
    for _ in range(n_eval):
        obs, info = env.reset()
        state = int(obs)
        total_reward = 0.0
        for t in range(max_steps):
            action = int(np.argmax(Q[state]))
            next_obs, reward, terminated, truncated, info = env.step(action)
            state = int(next_obs)
            total_reward += reward
            if terminated or truncated:
                break
        returns.append(total_reward)
        if total_reward > 0:
            successes += 1
        if render:
            env.render()
    avg_return = np.mean(returns)
    success_rate = successes / n_eval
    return avg_return, success_rate

# ---------- Training Loop ----------
def train_q_learning(episodes=episodes):
    epsilon = epsilon_start
    training_start = time.time()
    best_eval = -np.inf

    for ep in range(1, episodes + 1):
        obs, info = env.reset()                 # Gymnasium reset -> obs, info
        state = int(obs)
        total_reward = 0.0

        for t in range(max_steps):
            action = epsilon_greedy_action(state, epsilon)
            next_obs, reward, terminated, truncated, info = env.step(action)
            next_state = int(next_obs)

            # Q-learning update (off-policy)
            td_target = reward + gamma * np.max(Q[next_state])
            td_error = td_target - Q[state, action]
            Q[state, action] += alpha * td_error

            state = next_state
            total_reward += reward

            if terminated or truncated:
                break

        # decay epsilon
        epsilon = max(epsilon_min, epsilon * epsilon_decay)

        # periodic evaluation & logging
        if ep % eval_every == 0 or ep == 1:
            avg_return, success_rate = evaluate_policy(n_eval=eval_episodes)
            elapsed = time.time() - training_start
            print(f"[ep {ep}/{episodes}] eps={epsilon:.4f}  avg_return={avg_return:.3f}  success_rate={success_rate*100:.1f}%  elapsed={elapsed:.1f}s")
            # optionally save best Q
            if avg_return > best_eval:
                best_eval = avg_return
                save_q_table()

    print("Training complete.")
    return Q

# ---------- Example usage ----------
if __name__ == "__main__":
    # (optional) warm start from saved Q-table
    load_q_table()

    # Train
    trained_Q = train_q_learning(episodes=episodes)

    # Final evaluation
    avg_return, success_rate = evaluate_policy(n_eval=1000)
    print(f"\nFinal evaluation over 1000 episodes: avg_return={avg_return:.3f}, success_rate={success_rate*100:.2f}%")

    # Show derived greedy policy (action indices)
    greedy_policy = np.argmax(trained_Q, axis=1)
    print("\nGreedy policy (state 0..15):")
    print(greedy_policy.reshape(4, 4))


[info] Q-table loaded from q_table.pkl
[ep 1/10000] eps=0.9995  avg_return=1.000  success_rate=100.0%  elapsed=0.0s
[info] Q-table saved to q_table.pkl (shape=(16, 4))
[ep 500/10000] eps=0.7788  avg_return=1.000  success_rate=100.0%  elapsed=0.1s
[ep 1000/10000] eps=0.6065  avg_return=1.000  success_rate=100.0%  elapsed=0.3s
[ep 1500/10000] eps=0.4723  avg_return=1.000  success_rate=100.0%  elapsed=0.4s
[ep 2000/10000] eps=0.3678  avg_return=1.000  success_rate=100.0%  elapsed=0.5s
[ep 2500/10000] eps=0.2864  avg_return=1.000  success_rate=100.0%  elapsed=0.6s
[ep 3000/10000] eps=0.2230  avg_return=1.000  success_rate=100.0%  elapsed=0.7s
[ep 3500/10000] eps=0.1737  avg_return=1.000  success_rate=100.0%  elapsed=0.8s
[ep 4000/10000] eps=0.1353  avg_return=1.000  success_rate=100.0%  elapsed=0.9s
[ep 4500/10000] eps=0.1053  avg_return=1.000  success_rate=100.0%  elapsed=1.0s
[ep 5000/10000] eps=0.0820  avg_return=1.000  success_rate=100.0%  elapsed=1.0s
[ep 5500/10000] eps=0.0639  avg_r

###9

L = (r + gamma max(a') Qtarget(s',a') - Qonline(s,a))^2

In [None]:
# DQN for FrozenLake-v1 (Gymnasium)
# Paste this after your env creation and reset code.

import numpy as np
import random
from collections import deque, namedtuple
import time
import torch
import torch.nn as nn
import torch.optim as optim

# ----------------- Hyperparameters -----------------
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

n_states = env.observation_space.n   # 16
n_actions = env.action_space.n       # 4

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# DQN params
hidden_dim = 64
lr = 1e-3
gamma = 0.99
batch_size = 64
buffer_size = 10000
min_buffer_size = 500        # start training only after this many transitions
target_update_freq = 500     # update target network every N training steps
max_frames = 40000           # total environment steps to run
epsilon_start = 1.0
epsilon_final = 0.01
epsilon_decay = 15000        # decay schedule (in frames)

# ----------------- Utilities -----------------
Transition = namedtuple('Transition', ('s', 'a', 'r', 's2', 'done'))

def one_hot_state(s, n=n_states):
    """Convert scalar state to one-hot torch tensor on device."""
    vec = np.zeros(n, dtype=np.float32)
    vec[int(s)] = 1.0
    return torch.from_numpy(vec).to(device)

# ----------------- Replay Buffer -----------------
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, *args):
        self.buffer.append(Transition(*args))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        return Transition(*zip(*batch))

    def __len__(self):
        return len(self.buffer)

# ----------------- Q-Network -----------------
class QNet(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_dim)
        )

    def forward(self, x):
        return self.net(x)

# ----------------- Helpers -----------------
def epsilon_by_frame(frame_idx):
    # exponential decay schedule
    return epsilon_final + (epsilon_start - epsilon_final) * np.exp(-1. * frame_idx / epsilon_decay)

def select_action(policy_net, state_scalar, epsilon):
    if random.random() < epsilon:
        return random.randrange(n_actions)
    else:
        with torch.no_grad():
            s = one_hot_state(state_scalar).unsqueeze(0)  # shape [1, n_states]
            qvals = policy_net(s)
            return int(qvals.argmax(dim=1).item())

def compute_loss(policy_net, target_net, batch):
    # batch: Transition of tuples
    s_batch = torch.stack([one_hot_state(s) for s in batch.s]).to(device)        # [B, n_states]
    a_batch = torch.tensor(batch.a, dtype=torch.int64, device=device).unsqueeze(1)  # [B,1]
    r_batch = torch.tensor(batch.r, dtype=torch.float32, device=device).unsqueeze(1) # [B,1]
    s2_batch = torch.stack([one_hot_state(s) for s in batch.s2]).to(device)
    done_batch = torch.tensor(batch.done, dtype=torch.float32, device=device).unsqueeze(1) # 1.0 if done else 0.0

    # Q(s,a)
    q_values = policy_net(s_batch).gather(1, a_batch)  # [B,1]

    # target: r + gamma * max_a' Q_target(s', a') * (1 - done)
    with torch.no_grad():
        q_next = target_net(s2_batch).max(dim=1, keepdim=True)[0]
        q_target = r_batch + gamma * q_next * (1.0 - done_batch)

    loss = nn.functional.mse_loss(q_values, q_target)
    return loss

# ----------------- Build nets and optimizer -----------------
policy_net = QNet(n_states, hidden_dim, n_actions).to(device)
target_net = QNet(n_states, hidden_dim, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=lr)
replay_buffer = ReplayBuffer(buffer_size)

# ----------------- Training loop -----------------
print("Starting DQN training on FrozenLake (one-hot input). Device:", device)
frame_idx = 0
train_losses = []
episode_rewards = []
state, info = env.reset()
state = int(state)

start_time = time.time()

while frame_idx < max_frames:
    epsilon = epsilon_by_frame(frame_idx)
    action = select_action(policy_net, state, epsilon)
    next_obs, reward, terminated, truncated, info = env.step(action)
    next_state = int(next_obs)
    done = bool(terminated or truncated)

    # store transition
    replay_buffer.push(state, action, reward, next_state, done)

    state = next_state
    frame_idx += 1

    # start new episode if done
    if done:
        obs, info = env.reset()
        state = int(obs)

    # training step
    if len(replay_buffer) >= min_buffer_size:
        batch = replay_buffer.sample(batch_size)
        loss = compute_loss(policy_net, target_net, batch)

        optimizer.zero_grad()
        loss.backward()
        # ⚠️ gradient clipping can help if training is unstable (tiny network here, optional)
        torch.nn.utils.clip_grad_norm_(policy_net.parameters(), 10.0)
        optimizer.step()

        train_losses.append(loss.item())

        # update target network periodically
        if frame_idx % target_update_freq == 0:
            target_net.load_state_dict(policy_net.state_dict())

    # optional logging
    if frame_idx % 2000 == 0:
        avg_loss = np.mean(train_losses[-200:]) if train_losses else 0.0
        elapsed = time.time() - start_time
        print(f"[frame {frame_idx}/{max_frames}] epsilon={epsilon:.3f} avg_loss={avg_loss:.4f} elapsed={elapsed:.1f}s")

# ----------------- Evaluation -----------------
def evaluate_policy_net(policy_net, n_episodes=200):
    policy_net.eval()
    returns = []
    successes = 0
    for _ in range(n_episodes):
        obs, info = env.reset()
        s = int(obs)
        total_r = 0.0
        done = False
        steps = 0
        while not done and steps < 100:
            a = select_action(policy_net, s, epsilon=0.0)  # greedy
            next_obs, reward, terminated, truncated, info = env.step(a)
            s = int(next_obs)
            total_r += reward
            done = bool(terminated or truncated)
            steps += 1
        returns.append(total_r)
        if total_r > 0:
            successes += 1
    policy_net.train()
    return np.mean(returns), successes / n_episodes

avg_return, success_rate = evaluate_policy_net(policy_net, n_episodes=500)
print(f"\nDQN evaluation -> avg_return={avg_return:.3f}, success_rate={success_rate*100:.2f}%")


Starting DQN training on FrozenLake (one-hot input). Device: cpu
[frame 2000/40000] epsilon=0.876 avg_loss=0.0000 elapsed=7.9s
[frame 4000/40000] epsilon=0.768 avg_loss=0.0000 elapsed=16.1s
[frame 6000/40000] epsilon=0.674 avg_loss=0.0000 elapsed=23.2s
[frame 8000/40000] epsilon=0.591 avg_loss=0.0000 elapsed=29.8s
[frame 10000/40000] epsilon=0.518 avg_loss=0.0000 elapsed=37.0s
[frame 12000/40000] epsilon=0.455 avg_loss=0.0000 elapsed=44.1s
[frame 14000/40000] epsilon=0.399 avg_loss=0.0000 elapsed=50.8s
[frame 16000/40000] epsilon=0.351 avg_loss=0.0000 elapsed=59.0s
[frame 18000/40000] epsilon=0.308 avg_loss=0.0000 elapsed=65.3s
[frame 20000/40000] epsilon=0.271 avg_loss=0.0000 elapsed=71.5s
[frame 22000/40000] epsilon=0.238 avg_loss=0.0000 elapsed=77.9s
[frame 24000/40000] epsilon=0.210 avg_loss=0.0000 elapsed=84.1s
[frame 26000/40000] epsilon=0.185 avg_loss=0.0000 elapsed=90.3s
[frame 28000/40000] epsilon=0.163 avg_loss=0.0000 elapsed=96.6s
[frame 30000/40000] epsilon=0.144 avg_loss=0

###10

Gt = sum k=0 T-t-1 gamma^k * Rt+k+1

Policy Gradient:

theta = theta + alpha * grad_theta * log pi_theta(at|st)(Gt-b)

In [None]:
# Colab cell - run once
!pip install gymnasium==0.28.1   # or latest compatible
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu
# If you want GPU, adjust torch install accordingly (Colab/GPU).
# PettingZoo not used here; Lab13 uses a simple custom multi-agent env


In [None]:
# Lab 10 - REINFORCE with baseline (policy + value network)
# Run after Colab setup
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

env = gym.make("CartPole-v1", render_mode=None)
obs_dim = env.observation_space.shape[0]
n_actions = env.action_space.n
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Policy network -> outputs logits
class PolicyNet(nn.Module):
    def __init__(self, obs_dim, n_actions, hidden=128):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_dim, hidden),
            nn.ReLU(),
            nn.Linear(hidden, n_actions)
        )
    def forward(self, x):
        return self.net(x)

# Value network (baseline)
class ValueNet(nn.Module):
    def __init__(self, obs_dim, hidden=128):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_dim, hidden),
            nn.ReLU(),
            nn.Linear(hidden, 1)
        )
    def forward(self, x):
        return self.net(x).squeeze(-1)

policy_net = PolicyNet(obs_dim, n_actions).to(device)
value_net = ValueNet(obs_dim).to(device)

policy_opt = optim.Adam(policy_net.parameters(), lr=1e-3)
value_opt = optim.Adam(value_net.parameters(), lr=1e-3)

gamma = 0.99
max_episodes = 800
batch_log_every = 10  # print avg reward

def run_episode(policy_net):
    obs, info = env.reset()
    obs = torch.tensor(obs, dtype=torch.float32, device=device)
    states = []
    actions = []
    rewards = []
    done = False
    while True:
        logits = policy_net(obs.unsqueeze(0))
        dist = torch.distributions.Categorical(logits=logits)
        a = dist.sample().item()
        next_obs, r, terminated, truncated, info = env.step(a)
        states.append(obs)
        actions.append(a)
        rewards.append(r)
        obs = torch.tensor(next_obs, dtype=torch.float32, device=device)
        if terminated or truncated:
            break
    return states, actions, rewards

# Compute discounted returns
def discounted_returns(rewards, gamma):
    G = []
    R = 0
    for r in reversed(rewards):
        R = r + gamma * R
        G.insert(0, R)
    return torch.tensor(G, dtype=torch.float32, device=device)

# Training loop
all_rewards = []
for ep in range(1, max_episodes+1):
    states, actions, rewards = run_episode(policy_net)
    G = discounted_returns(rewards, gamma)
    states_tensor = torch.stack(states)
    actions_tensor = torch.tensor(actions, device=device)

    # Baseline (value estimates)
    values = value_net(states_tensor).detach()
    advantages = G - values  # baseline reduces variance

    # Policy loss: - E[ log pi(a|s) * advantage ]
    logits = policy_net(states_tensor)
    dist = torch.distributions.Categorical(logits=logits)
    logp = dist.log_prob(actions_tensor)
    policy_loss = -(logp * advantages).sum()

    # Value loss: MSE to returns
    value_loss = nn.functional.mse_loss(value_net(states_tensor), G)

    # Update policy
    policy_opt.zero_grad()
    policy_loss.backward()
    policy_opt.step()

    # Update baseline
    value_opt.zero_grad()
    value_loss.backward()
    value_opt.step()

    episode_reward = sum(rewards)
    all_rewards.append(episode_reward)

    if ep % batch_log_every == 0:
        print(f"EP {ep}\tavg_reward(last {batch_log_every})={np.mean(all_rewards[-batch_log_every:]):.2f}")

print("Training finished. Final avg reward (last 50):", np.mean(all_rewards[-50:]))
env.close()


EP 10	avg_reward(last 10)=23.40
EP 20	avg_reward(last 10)=21.90
EP 30	avg_reward(last 10)=31.40
EP 40	avg_reward(last 10)=18.50
EP 50	avg_reward(last 10)=27.60
EP 60	avg_reward(last 10)=28.80
EP 70	avg_reward(last 10)=36.10
EP 80	avg_reward(last 10)=29.80
EP 90	avg_reward(last 10)=37.80
EP 100	avg_reward(last 10)=34.60
EP 110	avg_reward(last 10)=38.90
EP 120	avg_reward(last 10)=40.00
EP 130	avg_reward(last 10)=52.50
EP 140	avg_reward(last 10)=65.00
EP 150	avg_reward(last 10)=43.50
EP 160	avg_reward(last 10)=62.50
EP 170	avg_reward(last 10)=53.80
EP 180	avg_reward(last 10)=63.70
EP 190	avg_reward(last 10)=85.80
EP 200	avg_reward(last 10)=85.00
EP 210	avg_reward(last 10)=124.40
EP 220	avg_reward(last 10)=129.20
EP 230	avg_reward(last 10)=152.90
EP 240	avg_reward(last 10)=158.80
EP 250	avg_reward(last 10)=121.10
EP 260	avg_reward(last 10)=217.90
EP 270	avg_reward(last 10)=206.30
EP 280	avg_reward(last 10)=193.20
EP 290	avg_reward(last 10)=155.30
EP 300	avg_reward(last 10)=147.80
EP 310	av

###11

Advantage:

A(s,a) = Q(s,a) - V(s) or A(s) = Gt - V(s)

Update:

theta = theta + alpha * grad_theta * log pi_theta(at|st)A(st,at)

In [None]:
# Lab 10 - REINFORCE with baseline (policy + value network)
# Run after Colab setup
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

env = gym.make("CartPole-v1", render_mode=None)
obs_dim = env.observation_space.shape[0]
n_actions = env.action_space.n
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Policy network -> outputs logits
class PolicyNet(nn.Module):
    def __init__(self, obs_dim, n_actions, hidden=128):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_dim, hidden),
            nn.ReLU(),
            nn.Linear(hidden, n_actions)
        )
    def forward(self, x):
        return self.net(x)

# Value network (baseline)
class ValueNet(nn.Module):
    def __init__(self, obs_dim, hidden=128):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_dim, hidden),
            nn.ReLU(),
            nn.Linear(hidden, 1)
        )
    def forward(self, x):
        return self.net(x).squeeze(-1)

policy_net = PolicyNet(obs_dim, n_actions).to(device)
value_net = ValueNet(obs_dim).to(device)

policy_opt = optim.Adam(policy_net.parameters(), lr=1e-3)
value_opt = optim.Adam(value_net.parameters(), lr=1e-3)

gamma = 0.99
max_episodes = 800
batch_log_every = 10  # print avg reward

def run_episode(policy_net):
    obs, info = env.reset()
    obs = torch.tensor(obs, dtype=torch.float32, device=device)
    states = []
    actions = []
    rewards = []
    done = False
    while True:
        logits = policy_net(obs.unsqueeze(0))
        dist = torch.distributions.Categorical(logits=logits)
        a = dist.sample().item()
        next_obs, r, terminated, truncated, info = env.step(a)
        states.append(obs)
        actions.append(a)
        rewards.append(r)
        obs = torch.tensor(next_obs, dtype=torch.float32, device=device)
        if terminated or truncated:
            break
    return states, actions, rewards

# Compute discounted returns
def discounted_returns(rewards, gamma):
    G = []
    R = 0
    for r in reversed(rewards):
        R = r + gamma * R
        G.insert(0, R)
    return torch.tensor(G, dtype=torch.float32, device=device)

# Training loop
all_rewards = []
for ep in range(1, max_episodes+1):
    states, actions, rewards = run_episode(policy_net)
    G = discounted_returns(rewards, gamma)
    states_tensor = torch.stack(states)
    actions_tensor = torch.tensor(actions, device=device)

    # Baseline (value estimates)
    values = value_net(states_tensor).detach()
    advantages = G - values  # baseline reduces variance

    # Policy loss: - E[ log pi(a|s) * advantage ]
    logits = policy_net(states_tensor)
    dist = torch.distributions.Categorical(logits=logits)
    logp = dist.log_prob(actions_tensor)
    policy_loss = -(logp * advantages).sum()

    # Value loss: MSE to returns
    value_loss = nn.functional.mse_loss(value_net(states_tensor), G)

    # Update policy
    policy_opt.zero_grad()
    policy_loss.backward()
    policy_opt.step()

    # Update baseline
    value_opt.zero_grad()
    value_loss.backward()
    value_opt.step()

    episode_reward = sum(rewards)
    all_rewards.append(episode_reward)

    if ep % batch_log_every == 0:
        print(f"EP {ep}\tavg_reward(last {batch_log_every})={np.mean(all_rewards[-batch_log_every:]):.2f}")

print("Training finished. Final avg reward (last 50):", np.mean(all_rewards[-50:]))
env.close()


EP 10	avg_reward(last 10)=22.60
EP 20	avg_reward(last 10)=40.50
EP 30	avg_reward(last 10)=32.80
EP 40	avg_reward(last 10)=24.60
EP 50	avg_reward(last 10)=25.30
EP 60	avg_reward(last 10)=25.60
EP 70	avg_reward(last 10)=21.30
EP 80	avg_reward(last 10)=29.50
EP 90	avg_reward(last 10)=33.30
EP 100	avg_reward(last 10)=51.20
EP 110	avg_reward(last 10)=28.60
EP 120	avg_reward(last 10)=45.40
EP 130	avg_reward(last 10)=47.10
EP 140	avg_reward(last 10)=56.20
EP 150	avg_reward(last 10)=67.50
EP 160	avg_reward(last 10)=52.90
EP 170	avg_reward(last 10)=47.90
EP 180	avg_reward(last 10)=50.80
EP 190	avg_reward(last 10)=71.70
EP 200	avg_reward(last 10)=79.50
EP 210	avg_reward(last 10)=87.70
EP 220	avg_reward(last 10)=88.50
EP 230	avg_reward(last 10)=108.40
EP 240	avg_reward(last 10)=154.00
EP 250	avg_reward(last 10)=152.30
EP 260	avg_reward(last 10)=182.60
EP 270	avg_reward(last 10)=187.50
EP 280	avg_reward(last 10)=194.90
EP 290	avg_reward(last 10)=168.10
EP 300	avg_reward(last 10)=179.90
EP 310	avg_

###12

TD error:

theta = r + gamma* V(s') - V(s)

Actor update:

theta_actor = theta_actor + alpha * delta * grad_theta log pi_theta(a|s)

Critic update:

V(s) = V(s) + beta * delta

In [None]:
# Lab 12 - Actor-Critic (A2C-style lightweight) for Pendulum (continuous control)
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

env = gym.make("Pendulum-v1", render_mode=None)
obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.shape[0]  # 1
act_low = env.action_space.low[0]
act_high = env.action_space.high[0]

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class Actor(nn.Module):
    def __init__(self, obs_dim, act_dim, hidden=128):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_dim, hidden),
            nn.ReLU(),
            nn.Linear(hidden, hidden),
            nn.ReLU()
        )
        self.mu = nn.Linear(hidden, act_dim)
        self.logstd = nn.Parameter(torch.zeros(act_dim))

    def forward(self, x):
        h = self.net(x)
        mu = self.mu(h)
        std = torch.exp(self.logstd)
        return mu, std

class Critic(nn.Module):
    def __init__(self, obs_dim, hidden=128):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_dim, hidden),
            nn.ReLU(),
            nn.Linear(hidden, hidden),
            nn.ReLU(),
            nn.Linear(hidden, 1)
        )
    def forward(self, x):
        return self.net(x).squeeze(-1)

actor = Actor(obs_dim, act_dim).to(device)
critic = Critic(obs_dim).to(device)
actor_opt = optim.Adam(actor.parameters(), lr=3e-4)
critic_opt = optim.Adam(critic.parameters(), lr=1e-3)

gamma = 0.99
max_episodes = 600

def select_action(state):
    state_t = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    mu, std = actor(state_t)
    dist = torch.distributions.Normal(mu, std)
    action = dist.sample()
    logp = dist.log_prob(action).sum(dim=-1)
    action_clipped = action.cpu().detach().numpy()[0]
    # map to action range if needed (Pendulum expects [-2,2]), dist gives raw values
    action_clipped = np.clip(action_clipped, act_low, act_high)
    return action_clipped, logp, mu, std

for ep in range(1, max_episodes+1):
    obs, info = env.reset()
    obs = np.array(obs, dtype=np.float32)
    traj_logp = []
    traj_values = []
    traj_rewards = []
    done = False
    while True:
        a, logp, mu, std = select_action(obs)
        next_obs, r, terminated, truncated, info = env.step(a)
        traj_logp.append(logp)
        v = critic(torch.tensor(obs, dtype=torch.float32, device=device).unsqueeze(0)).squeeze(0)
        traj_values.append(v)
        traj_rewards.append(r)
        obs = np.array(next_obs, dtype=np.float32)
        if terminated or truncated:
            break

    # compute returns and advantages
    returns = []
    R = 0
    for r in reversed(traj_rewards):
        R = r + gamma * R
        returns.insert(0, R)
    returns = torch.tensor(returns, dtype=torch.float32, device=device)
    values = torch.stack(traj_values).squeeze(-1)
    advantages = returns - values.detach()

    # actor loss (policy gradient with advantage)
    logps = torch.stack(traj_logp)
    actor_loss = -(logps * advantages).sum()

    # critic loss
    critic_loss = nn.functional.mse_loss(values, returns)

    actor_opt.zero_grad(); actor_loss.backward(); actor_opt.step()
    critic_opt.zero_grad(); critic_loss.backward(); critic_opt.step()

    if ep % 20 == 0:
        print(f"EP {ep} total_reward={sum(traj_rewards):.2f} actor_loss={actor_loss.item():.3f} critic_loss={critic_loss.item():.3f}")

print("Finished Lab 12")
env.close()


EP 20 total_reward=-1390.55 actor_loss=-21876718.000 critic_loss=181680.016
EP 40 total_reward=-1155.40 actor_loss=-19086550.000 critic_loss=123829.570
EP 60 total_reward=-1174.53 actor_loss=-15659655.000 critic_loss=106847.789
EP 80 total_reward=-923.38 actor_loss=-9483645.000 critic_loss=45308.898
EP 100 total_reward=-1483.38 actor_loss=-18272346.000 critic_loss=143914.422
EP 120 total_reward=-1263.78 actor_loss=-4216532.000 critic_loss=40885.027
EP 140 total_reward=-1166.78 actor_loss=-3521658.000 critic_loss=47201.289
EP 160 total_reward=-1070.54 actor_loss=-1402875.750 critic_loss=37810.219
EP 180 total_reward=-1036.27 actor_loss=2729474.500 critic_loss=36765.191
EP 200 total_reward=-1199.98 actor_loss=-2044521.250 critic_loss=38040.785
EP 220 total_reward=-1325.77 actor_loss=-3222353.500 critic_loss=44355.395
EP 240 total_reward=-1208.32 actor_loss=-2541910.250 critic_loss=36420.816
EP 260 total_reward=-1114.31 actor_loss=-3161469.000 critic_loss=45936.922
EP 280 total_reward=-13

###13

Qi(s,a1,a2)

In [None]:
# Lab 13 - Simple multi-agent GridWorld (2 agents) - Colab compatible
# Two independent Q-learning agents (Independent Q-Learning)
import numpy as np
import random
from collections import defaultdict

# Custom GridWorld environment
class TwoAgentGrid:
    def __init__(self, size=5):
        self.size = size
        # agents: positions (r,c)
        self.reset()
    def reset(self):
        # place agent0 at top-left, agent1 at bottom-right
        self.pos = {0: (0,0), 1: (self.size-1, self.size-1)}
        # goals: agent0 -> bottom-right, agent1 -> top-left (cross-goal)
        self.goals = {0: (self.size-1, self.size-1), 1: (0,0)}
        # return observations as tuple of flattened states
        return self._get_obs()
    def _get_obs(self):
        # each obs = (r, c) flattened into int index
        obs0 = self.pos[0][0]*self.size + self.pos[0][1]
        obs1 = self.pos[1][0]*self.size + self.pos[1][1]
        return (obs0, obs1)
    def step(self, actions):
        # actions: dict {agent_id: action_idx} where action: 0=up,1=right,2=down,3=left,4=stay
        for aid, a in actions.items():
            r,c = self.pos[aid]
            if a == 0 and r>0: r -= 1
            elif a == 1 and c<self.size-1: c += 1
            elif a == 2 and r<self.size-1: r += 1
            elif a == 3 and c>0: c -= 1
            elif a == 4: pass
            self.pos[aid] = (r,c)
        # compute rewards and done
        rewards = {}
        done = False
        # collision penalty
        if self.pos[0] == self.pos[1]:
            rewards[0] = -1.0
            rewards[1] = -1.0
        else:
            # reward +1 for reaching respective goal, else 0
            rewards[0] = 1.0 if self.pos[0] == self.goals[0] else 0.0
            rewards[1] = 1.0 if self.pos[1] == self.goals[1] else 0.0
        # episode ends when both reach their goals or after some external cap (handled by trainer)
        if rewards[0] == 1.0 and rewards[1] == 1.0:
            done = True
        return self._get_obs(), rewards, done, {}
    def render(self):
        grid = [["." for _ in range(self.size)] for _ in range(self.size)]
        for aid, (r,c) in self.pos.items():
            grid[r][c] = str(aid)
        for row in grid:
            print(" ".join(row))
        print()

# Independent Q-learning for each agent (tabular)
def train_independent_q(env, episodes=2000, max_steps=50):
    n_states = env.size * env.size
    n_actions = 5
    alpha = 0.5
    gamma = 0.99
    epsilon = 1.0
    eps_min = 0.05
    eps_decay = 0.995

    Q0 = np.zeros((n_states, n_actions))
    Q1 = np.zeros((n_states, n_actions))

    for ep in range(1, episodes+1):
        obs0, obs1 = env.reset()
        total_r0 = 0.0; total_r1 = 0.0
        for t in range(max_steps):
            # epsilon-greedy for each agent
            if random.random() < epsilon:
                a0 = random.randrange(n_actions)
            else:
                a0 = int(np.argmax(Q0[obs0]))
            if random.random() < epsilon:
                a1 = random.randrange(n_actions)
            else:
                a1 = int(np.argmax(Q1[obs1]))

            (nobs0, nobs1), rewards, done, _ = env.step({0:a0, 1:a1})
            r0 = rewards[0]; r1 = rewards[1]

            # Q updates (independent)
            Q0[obs0, a0] += alpha * (r0 + gamma * np.max(Q0[nobs0]) - Q0[obs0, a0])
            Q1[obs1, a1] += alpha * (r1 + gamma * np.max(Q1[nobs1]) - Q1[obs1, a1])

            obs0, obs1 = nobs0, nobs1
            total_r0 += r0; total_r1 += r1

            if done:
                break

        epsilon = max(eps_min, epsilon * eps_decay)
        if ep % 200 == 0:
            print(f"EP {ep} total_r0={total_r0:.2f} total_r1={total_r1:.2f} eps={epsilon:.3f}")

    return Q0, Q1

# Example run
env = TwoAgentGrid(size=5)
Q0, Q1 = train_independent_q(env, episodes=1200)
print("Training done. Sample policy (agent0) from Q0 (reshaped):")
print(np.argmax(Q0, axis=1).reshape(env.size, env.size))
print("Agent1 policy:")
print(np.argmax(Q1, axis=1).reshape(env.size, env.size))


EP 200 total_r0=1.00 total_r1=7.00 eps=0.367
EP 400 total_r0=2.00 total_r1=1.00 eps=0.135
EP 600 total_r0=1.00 total_r1=1.00 eps=0.050
EP 800 total_r0=1.00 total_r1=1.00 eps=0.050
EP 1000 total_r0=1.00 total_r1=1.00 eps=0.050
EP 1200 total_r0=2.00 total_r1=1.00 eps=0.050
Training done. Sample policy (agent0) from Q0 (reshaped):
[[2 2 2 2 2]
 [1 2 2 1 2]
 [1 2 1 2 2]
 [1 1 1 2 2]
 [1 1 1 1 4]]
Agent1 policy:
[[3 3 3 3 3]
 [0 0 0 0 3]
 [0 0 0 0 0]
 [0 0 1 0 3]
 [0 0 1 0 3]]


In [3]:
!curl -L "https://github.com/wendigo0103/Zerodha/blob/main/README.md"







<!DOCTYPE html>
<html
  lang="en"
  
  data-color-mode="auto" data-light-theme="light" data-dark-theme="dark"
  data-a11y-animated-images="system" data-a11y-link-underlines="true"
  
  >




  <head>
    <meta charset="utf-8">
  <link rel="dns-prefetch" href="https://github.githubassets.com">
  <link rel="dns-prefetch" href="https://avatars.githubusercontent.com">
  <link rel="dns-prefetch" href="https://github-cloud.s3.amazonaws.com">
  <link rel="dns-prefetch" href="https://user-images.githubusercontent.com/">
  <link rel="preconnect" href="https://github.githubassets.com" crossorigin>
  <link rel="preconnect" href="https://avatars.githubusercontent.com">

  


  <link crossorigin="anonymous" media="all" rel="stylesheet" href="https://github.githubassets.com/assets/light-8e973f836952.css" /><link crossorigin="anonymous" media="all" rel="stylesheet" href="https://github.githubassets.com/assets/light_high_contrast-34b642d57214.css" /><link crossorigin="anonymous" media="all" rel=