# Winter 2026 COMP 579 Assignment 2 Starter Code

In this assignment, you will experiment with different algorithms to evaluate a given fixed policy.

Following the instructions in the questions document, fill in the missing code indicated by "TODO" marks.

In [None]:
%matplotlib inline
import random
import numpy as np
import matplotlib.pyplot as plt
import math
import seaborn as sns
import gymnasium as gym
from collections import defaultdict
from typing import Dict, Tuple, List

## Question 1

Do NOT change the RANDOM_SEED value set in the next code cell. This ensures reproducibility.

In [None]:
RANDOM_SEED = 42

np.random.seed(RANDOM_SEED)
random.seed(RANDOM_SEED)

We will start by initialising our environment:

> Before you proceed, we recommend that you check the Gymnasium Documentation: https://gymnasium.farama.org/ and play around with the Frozen Lake environment https://gymnasium.farama.org/environments/toy_text/frozen_lake/ to get more comfortable with Gym, as you will be frequently using it in RL and the next assignment.





In [None]:
env = gym.make("FrozenLake-v1", is_slippery=True)
env.reset(seed=RANDOM_SEED)

In [None]:
number_of_states = env.observation_space.n
number_of_actions = env.action_space.n

states = list(range(number_of_states))
actions = list(range(number_of_actions))

print("States:", number_of_states)
print("Actions:", number_of_actions)

In Frozen Lake, the action space is defined as:

```
0: Move left
1: Move down
2: Move right
3: Move up
```




In [None]:
P = env.unwrapped.P

`P` gives the transition information for the Markov Decision Process (MDP). In Gym’s tabular environments (like FrozenLake), `P` is a dictionary of dictionaries of lists

In [None]:
P



`P[state][action]` is a list of possible outcomes when taking "action" in a particular "state", because taking an action in a stochastic environment can lead to multiple next states with different probabilities.
Each element in this list is a tuple of the form:
`(transition probability, next state, reward, is the next next a terminal state?)`


In [None]:
P[0][1]

So `P[0][1]` shows all possible outcomes of taking action 1 in state 0, including how likely each outcome is, the resulting state, the reward, and whether the episode ends.

We can visualize our grid by checking the "desc" attribute

In [None]:
env.unwrapped.desc

```
“S” for Start tile
“G” for Goal tile
“F” for frozen tile
“H” for a tile with a hole
```




### Configs

In [None]:
GAMMA = 0.9
CONVERGENCE_THRESHOLD = 1e-8

### Implementing the Policy

In [None]:
def get_terminal_states(P):
  terminal_states = set()

  #TODO


  return terminal_states

In [None]:
terminal_states = get_terminal_states(P)
terminal_states

In [None]:
desc = env.unwrapped.desc
print(desc)

In [None]:
RIGHT = 2

In [None]:
def get_valid_actions(env, s):
  """
  Given an env and a state, return a list of valid actions at this state.
  An action is valid if it leads to a different state with non-zero probability
  """
  valid_actions = []
  for a in range(env.action_space.n):
      for (prob, next_s, _, _) in P[s][a]:
          if prob > 0 and next_s != s:
              valid_actions.append(a)
              break
  return valid_actions

In [None]:
def right_favoring_policy(env, s):
  pi = np.zeros(number_of_actions)
   # TODO

  return pi

### Deriving $v_\pi$ with Matrix Inversion

In [None]:
def policy_evaluation_analytical(env, policy, gamma):
  # TODO

  return V

In [None]:
V_analytical = policy_evaluation_analytical(env, right_favoring_policy, gamma=GAMMA)
V_analytical

### Synchronous DP

In [None]:
def synchronous_policy_evaluation(P, policy, gamma, tol):
  V = np.zeros(number_of_states)
  residuals = []

  # TODO

  return V, residuals

In [None]:
V_sync, residuals = synchronous_policy_evaluation(P, right_favoring_policy, GAMMA, CONVERGENCE_THRESHOLD)

In [None]:
V_sync

In [None]:
plt.figure()
plt.plot(residuals)
plt.yscale("log")
plt.title("Bellman Residual over Iterations")
plt.xlabel("Iteration")
plt.ylabel("Bellman Residual")
plt.show()

### Value Function Visualization

In [None]:
def plot_value_heatmap(env, V, title):
  # TODO

In [None]:
plot_value_heatmap(env, V_sync, title="State-Value Function")

### Non-stationary

In [None]:
def uniform_random_policy(env, s):
  pi = np.zeros(number_of_actions) # action-probability vector at state s

  #TODO

  return pi

In [None]:
def non_stationary_policy_evaluation(P, policy_A, policy_B, gamma, total_steps, switch_interval):
  V = np.zeros(number_of_states)
  residuals = []
  V_norms = []

  # TODO

  return V, residuals, V_norms

In [None]:
non_stationary_V, non_stationary_residuals, non_stationary_V_norms = non_stationary_policy_evaluation(P, right_favoring_policy, uniform_random_policy, GAMMA, 40, 4)

In [None]:
plt.figure()
plt.plot(non_stationary_residuals)
plt.yscale("log")
plt.xlabel("Iteration")
plt.xlabel("Bellman Residual")
plt.title("Non-Stationary Bellman Residual over Iterations")
plt.show()

## Question 2

### 2D Random Walk Environment

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import random
from math import floor
from itertools import zip_longest

In [None]:
class RandomWalk2DEnv:

    def __init__(self, seed=None):
        if seed is not None:
            np.random.seed(seed)
            random.seed(seed)
        self.reset()

    def reset(self):
        self.state = np.array([0.5, 0.5])
        return self.state.copy()

    def sample_action(self):
        return np.array([
            random.uniform(-0.2, 0.2),
            random.uniform(-0.2, 0.2)
        ])

    def step(self, action):
        next_state = self.state + action

        # Check termination
        done = (
            next_state[0] < 0 or next_state[0] > 1 or
            next_state[1] < 0 or next_state[1] > 1
        )

        # Reward function
        reward = next_state[0] + next_state[1] if done else 0.0

        self.state = next_state
        return next_state.copy(), reward, done


### Tile coding software (from incompleteideas.net)

In [None]:
basehash = hash

class IHT:
    "Structure to handle collisions"
    def __init__(self, sizeval):
        self.size = sizeval
        self.overfullCount = 0
        self.dictionary = {}

    def __str__(self):
        return (
            "Collision table:"
            + " size:" + str(self.size)
            + " overfullCount:" + str(self.overfullCount)
            + " dictionary:" + str(len(self.dictionary)) + " items"
        )

    def count(self):
        return len(self.dictionary)

    def fullp(self):
        return len(self.dictionary) >= self.size

    def getindex(self, obj, readonly=False):
        d = self.dictionary
        if obj in d:
            return d[obj]
        elif readonly:
            return None
        size = self.size
        count = self.count()
        if count >= size:
            if self.overfullCount == 0:
                print("IHT full, starting to allow collisions")
            self.overfullCount += 1
            return basehash(obj) % self.size
        else:
            d[obj] = count
            return count

def hashcoords(coordinates, m, readonly=False):
    if type(m) == IHT:
        return m.getindex(tuple(coordinates), readonly)
    if type(m) == int:
        return basehash(tuple(coordinates)) % m
    if m is None:
        return coordinates


def tiles(ihtORsize, numtilings, floats, ints=[], readonly=False):
    """Returns num-tilings tile indices corresponding to the floats and ints"""
    qfloats = [floor(f * numtilings) for f in floats]
    Tiles = []
    for tiling in range(numtilings):
        tilingX2 = tiling * 2
        coords = [tiling]
        b = tiling
        for q in qfloats:
            coords.append((q + b) // numtilings)
            b += tilingX2
        coords.extend(ints)
        Tiles.append(hashcoords(coords, ihtORsize, readonly))
    return Tiles


def tileswrap(ihtORsize, numtilings, floats, wrapwidths, ints=[], readonly=False):
    """Returns num-tilings tile indices with wrapping"""
    qfloats = [floor(f * numtilings) for f in floats]
    Tiles = []
    for tiling in range(numtilings):
        tilingX2 = tiling * 2
        coords = [tiling]
        b = tiling
        for q, width in zip_longest(qfloats, wrapwidths):
            c = (q + b % numtilings) // numtilings
            coords.append(c % width if width else c)
            b += tilingX2
        coords.extend(ints)
        Tiles.append(hashcoords(coords, ihtORsize, readonly))
    return Tiles


### Fixed tiling parameters

In [None]:
NUM_TILINGS = 8
TILES_PER_DIM = 4          # 4 x 4 per tiling
IHT_SIZE = 4096

# Global index hash table
iht = IHT(IHT_SIZE)

In [None]:
def tile_coding_indices(state):

    x, y = state

    return tiles(
        ihtORsize=iht,
        numtilings=NUM_TILINGS,
        floats=[x * TILES_PER_DIM, y * TILES_PER_DIM]
    )

def tile_coding_feature_vector(state):

    phi = np.zeros(IHT_SIZE)
    for idx in tile_coding_indices(state):
        phi[idx] = 1.0
    return phi


### Test

In [None]:
s = (0.5, 0.5)
idx = tile_coding_indices(s)

print("Active tiles:", idx)
print("Number of active tiles:", len(idx))  # should be 8


Active tiles: [0, 1, 2, 3, 4, 5, 6, 7]
Number of active tiles: 8


### Part b)

#### Every-visit Monte Carlo Agent

In [None]:
class MCAgentTileCoding:
    def __init__(self, alpha=0.1):
        ## TODO: initialize alpha and weights
        pass

    def value(self, state):
        ## TODO: compute sum of weights for active tiles
        pass

    def update_episode(self, episode):
        ## TODO: compute returns Gt
        ## TODO: update weights for each state in the episode
        pass


#### Run MC experiments

In [None]:
def run_mc_experiment(
    learning_rates=[0.1, 0.03, 0.01, 0.003, 0.001],
    num_episodes=1000,
    num_runs=30
):
    """
    Run every-visit Monte Carlo prediction experiments using Tile Coding.
    Returns mean and std of MSVE for each learning rate.
    """
    mean_msve = {}  # placeholder
    std_msve = {}   # placeholder

    ## TODO: Implement experiment loop over alphas, runs, episodes

    return mean_msve, std_msve

#### MSVE Evaluation

In [None]:
def compute_msve(agent):
    """
    Compute the Mean Squared Value Error (MSVE) over a 21x21 grid
    in [0,1] x [0,1] using the true value function V(x, y) = x + y.
    """
    msve = 0.0  # placeholder

    ## TODO: Compute MSVE over grid for the given agent

    return msve

#### Main execution

In [None]:
def plot_mc_results(mean_msve, std_msve):
    alphas = list(mean_msve.keys())
    means = np.array(list(mean_msve.values()))
    stds = np.array(list(std_msve.values()))

    plt.figure(figsize=(8,6))
    plt.errorbar(alphas, means, yerr=stds, fmt='o-', capsize=5)
    plt.xscale('log')
    plt.xlabel("Learning rate α")
    plt.ylabel("MSVE ± 1 std")
    plt.title("Every-Visit MC Prediction with Tile Coding (30 runs)")
    plt.grid(True)
    plt.show()

if __name__ == "__main__":
    learning_rates = [0.1, 0.03, 0.01, 0.003, 0.001]

    mean_msve, std_msve = run_mc_experiment(
        learning_rates=learning_rates,
        num_episodes=1000,
        num_runs=30
    )

    print("Mean MSVE ± Std Dev over 30 runs after 1000 episodes:")
    for alpha in learning_rates:
        print(f"α={alpha}: {mean_msve[alpha]:.5f} ± {std_msve[alpha]:.5f}")

    plot_mc_results(mean_msve, std_msve)


### Part c)

#### TD(λ) Agent (Accumulating Traces)

In [None]:
class TDAgentLambda:
    def __init__(self, alpha=0.01, gamma=1.0, lam=0.5):
        """
        alpha: learning rate
        gamma: discount factor
        lam: trace decay parameter
        """
        ## TODO: initialize weights and eligibility traces ##

    def value(self, state):
        """
        Return the current value estimate for a given state.
        """
        ## TODO: compute value from active tile indices ##

    def reset_traces(self):
        """
        Reset eligibility traces at the start of each episode.
        """
        ## TODO: reset eligibility traces ##

    def update(self, state, reward, next_state, done):
        """
        Perform a TD(lambda) update for a single step.
        """
        ## TODO: update weights using accumulating traces ##



#### Run TD(λ) Experiments

In [None]:
def run_td_lambda_experiment(
    lambdas=[0, 0.25, 0.5, 0.75, 0.9],
    alphas=[0.001, 0.003, 0.01, 0.03, 0.1],
    num_episodes=1000,
    num_runs=30
):
    results = {}

    ## TODO:
    # Loop over lambdas and alphas
    # For each seed, initialize env with seed and TDAgentLambda
    # Run num_episodes, update agent each step
    # Compute MSVE over 21x21 grid
    # Average MSVE over runs and store in results

    return results


#### Plot 1: MSVE vs α (one curve per λ)

In [None]:
def plot_msve_vs_alpha(results, lambdas, alphas):
    plt.figure(figsize=(8, 6))

    for lam in lambdas:
        msves = [results[(lam, alpha)] for alpha in alphas]
        plt.plot(alphas, msves, marker='o', label=f"λ={lam}")

    plt.xscale("log")
    plt.xlabel("Learning rate α")
    plt.ylabel("Average MSVE")
    plt.title("TD(λ): MSVE vs α")
    plt.legend()
    plt.grid(True)
    plt.show()


#### Plot 2: MSVE vs λ (one curve per α)

In [None]:
def plot_msve_vs_lambda(results, lambdas, alphas):
    plt.figure(figsize=(8, 6))

    for alpha in alphas:
        msves = [results[(lam, alpha)] for lam in lambdas]
        plt.plot(lambdas, msves, marker='o', label=f"α={alpha}")

    plt.xlabel("λ")
    plt.ylabel("Average MSVE")
    plt.title("TD(λ): MSVE vs λ")
    plt.legend()
    plt.grid(True)
    plt.show()


#### Plot 3: Heatmap (λ × α)

In [None]:
def plot_heatmap(results):
    df = pd.DataFrame(
        [(lam, alpha, msve) for (lam, alpha), msve in results.items()],
        columns=["lambda", "alpha", "MSVE"]
    )

    heatmap_data = df.pivot(index="lambda", columns="alpha", values="MSVE")

    plt.figure(figsize=(8, 6))
    sns.heatmap(heatmap_data, annot=True, fmt=".4f", cmap="viridis")
    plt.xlabel("Learning rate α")
    plt.ylabel("λ")
    plt.title("TD(λ) with Accumulating Traces: MSVE")
    plt.show()


#### Main Execution

In [None]:
lambdas = [0, 0.25, 0.5, 0.75, 0.9]
alphas = [0.001, 0.003, 0.01, 0.03, 0.08]

results = run_td_lambda_experiment(
    lambdas=lambdas,
    alphas=alphas,
    num_episodes=1000,
    num_runs=30
)

print("Average MSVE over 30 runs after 1000 episodes:")
for (lam, alpha), msve in results.items():
    print(f"λ={lam}, α={alpha}: MSVE={msve:.5f}")

plot_msve_vs_alpha(results, lambdas, alphas)
plot_msve_vs_lambda(results, lambdas, alphas)
plot_heatmap(results)


### Part d)

#### TD(0) agent with Tile Coding

In [None]:
class TDAgentTileCoding:
    def __init__(self, alpha=0.1, gamma=1.0):
        ## TODO: initialize alpha, gamma, and weights for tile coding


    def value(self, state):
        """Estimate value of a state using tile coding."""
        ## TODO: return estimated value using active tile indices

    def update(self, state, reward, next_state, done):
        """TD(0) update for a single transition."""
        ## TODO: compute TD error and update weights for active tiles

#### TD(0) agent with 2-layer Neural Network

In [None]:
class NNValueFunction:
    def __init__(self, input_dim=2, hidden_dim=64, alpha=1e-3, gamma=1.0):
        self.alpha = alpha
        self.gamma = gamma
        self.W1 = np.random.randn(input_dim, hidden_dim) / np.sqrt(input_dim)
        self.b1 = np.zeros(hidden_dim)
        self.W2 = np.random.randn(hidden_dim, 1) / np.sqrt(hidden_dim)
        self.b2 = np.zeros(1)

    def forward(self, state):
        self.z1 = np.dot(state, self.W1) + self.b1
        self.h1 = np.tanh(self.z1)
        value = np.dot(self.h1, self.W2) + self.b2
        return value.item()

    def td_update(self, state, reward, next_state, done):
        ## TODO: compute TD target
        ## TODO: compute TD error
        ## TODO: backpropagate TD error through network
        ## TODO: update weights W1, W2 and biases b1, b2 using alpha



#### Training

In [None]:
def train_agents(env, num_episodes=2000):

    tile_agent = TDAgentTileCoding(alpha=0.1)
    nn_agent = NNValueFunction(alpha=1e-3)

    tile_values = []
    nn_values = []

    # ===========================
    # TODO: Implement training loop
    # ===========================
    # For each episode:
    # 1. Reset environment
    # 2. For Tile Coding agent:
    #    - Take steps until termination
    #    - Update the tile_agent using TD(0)
    # 3. For Neural Network agent:
    #    - Reset environment again
    #    - Take steps until termination
    #    - Update the nn_agent using td_update
    # 4. Record the value of the center state [0.5, 0.5] for both agents

    return tile_values, nn_values


#### Run training and plotting

In [None]:
env = RandomWalk2DEnv(seed=42)
tile_vals, nn_vals = train_agents(env, num_episodes=2000)

plt.plot(tile_vals, label="Tile Coding")
plt.plot(nn_vals, label="Neural Network")
plt.xlabel("Episode")
plt.ylabel("Estimated value at center")
plt.title("2D Random Walk: Tile Coding vs Neural Network")
plt.legend()
plt.show()

# Question 3

In [None]:
GAMMA = 0.9
N = 10
r_non_terminal = 0
r_terminal = 1

In [None]:
def linear_transiton_matrix(N):
  """
  A simple linear transition matrix for N states
  At every state (except the terminal one), the agent deterministically moves
  to s+1. At the terminal state, the agent stays in the terminal state.
  """
  P = np.zeros((N, N))

  for s in range(N-1):
        P[s, s+1] = 1.0

  P[N-1, N-1] = 1.0

  return P

In [None]:
P = linear_transiton_matrix(N)

In [None]:
P

array([[0., 1., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 1., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 1., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 1., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 1., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 1., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 1., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 1., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 1.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 1.]])

In [None]:
class LinearChainEnv:
  def __init__(self, N, reward_noise=0):
    self.N = N
    self.terminal = N-1
    self.P = linear_transiton_matrix(N)

    self.reward_noise = reward_noise

    self.reset()

  def reset(self):
    self.state = 0
    return self.state

  def step(self):
    if self.state == self.terminal:
      return self.state, 0.0, True

    next_state_prob = self.P[self.state]
    next_state = np.random.choice(self.N, p=next_state_prob)

    if next_state == self.terminal:
      reward = r_terminal + np.random.randn() * self.reward_noise
    else:
      reward = r_non_terminal + np.random.randn() * self.reward_noise

    self.state = next_state
    done = self.state == self.terminal
    return self.state, reward, done

In [None]:
class TDAgent:
  """
  Update your implementation from Q2 to work with the new linear chain env
  Hint: Instead of tile coding, use one-hot encoding for the state (every state is a feature)
  """
  def __init__(self, num_states, alpha=0.01, gamma=GAMMA):
    pass

  def value(self, state):
    pass

  def update(self, state, reward, next_state, done):
    pass

In [None]:
class MCAgent:
  """
  Update your implementation from Q2 to work with the new linear chain env
  Hint: Instead of tile coding, use one-hot encoding for the state (every state is a feature)
  """
  def __init__(self, num_states, alpha=0.01, gamma=GAMMA):
    pass

  def value(self, state):
    pass

  def update_episode(self, episode):
    pass

In [None]:
def true_value_linear_chain(N, gamma, r_non_terminal=0, r_terminal=1.0):
    V = np.zeros(N)
    #TODO: Implement the true value function
    return V

In [None]:
V_true = true_value_linear_chain(N, GAMMA, r_non_terminal, r_terminal)

In [None]:
def compute_msve(agent, V_true):
  # TODO: Implement the MSVE function
  pass

In [None]:
def train_td0_and_mc(env, V_true, num_episodes=10000, lr=0.003):
  num_states = env.N

  td_agent = TD0Agent(num_states, alpha=lr, gamma=GAMMA)
  mc_agent = MCAgent(num_states, alpha=lr, gamma=GAMMA)

  td_values = []
  mc_values = []
  td_msve = []
  mc_msve = []

  # TODO: Implement the training logic for the two agents

  return td_agent, mc_agent, td_values, mc_values, td_msve, mc_msve

In [None]:
# TODO: create the two versions of the LinearChainEnv

# TODO: For each env, train your agents and use the below code to plot your values and calculate mean MSVE

# plt.figure()
# plt.plot(td_values, label="TD(0)")
# plt.plot(mc_values, label="MC")
# plt.xlabel("Episode")
# plt.ylabel("V(start state)")
# plt.title("Start state value over episodes")
# plt.legend()
# plt.show()

# plt.figure()
# plt.plot(td_msve, label="TD(0)")
# plt.plot(mc_msve, label="MC")
# plt.xlabel("Episode")
# plt.ylabel("MSVE")
# plt.title("Mean Squared Value Error over episodes")
# plt.legend()
# plt.show()

# mean_td_msve = sum(td_msve) / len(td_msve)
# mean_mc_msve = sum(mc_msve) / len(mc_msve)

# print("Mean MSVE TD(0):", mean_td_msve)
# print("Mean MSVE MC:", mean_mc_msve)

### **Explain your choice of parameter value and how it affects performance in TD0 and MC:**