### Exercise 8.4 (programming) 

**Q**

The exploration bonus described above actually changes the estimated values of states and actions. Is this necessary? Suppose the bonus $\kappa \sqrt{\tau}$ was used not in updates, but solely in action selection. That is, suppose the action selected was always that for which $Q(S_t, a) + \kappa \sqrt{\tau(S_t, a)}$ was maximal. Carry out a gridworld experiment that tests and illustrates the strengths and weaknesses of this alternate approach.

**A**

In [None]:
class BaseEnv:
    def __init__(self, n_states: int, n_actions: int):
        self.n_states = n_states
        self.n_actions = n_actions

    def step(self, action: int) -> tuple[int, float, bool, bool]:
        raise NotImplementedError

    def reset(self, seed: int = 0) -> int:
        self.steps = 0

class BaseAgent:
    def __init__(self, env: BaseEnv):
        self.env = env

    def act(self, state: int) -> int:
        raise NotImplementedError

    def train(self, steps: int):
        raise NotImplementedError

    def reset(self):
        return

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import random

NORMAL_COLOR = np.array([1., 1., 1.])
START_COLOR = np.array([255, 99, 71]) / 255
FINISH_COLOR = np.array([152, 249, 152]) / 255
BARRIER_COLOR = np.array([0.5, 0.5, 0.5])
AGENT_COLOR = np.array([39, 116, 218]) / 255

TYPE_NORMAL = 0
TYPE_START = 1
TYPE_FINISH = 2
TYPE_BARRIER = 3

class GridWorldEnv(BaseEnv):
    def __init__(
        self,
        size: tuple[int, int],
        start: tuple[int, int],
        finish: tuple[int, int],
        barriers_over_time: list[tuple[int, list[tuple[int, int]]]],
        max_steps: int | None = None,
    ):
        rows, cols = size
        start_x, start_y = start
        s_start = start_y * cols + start_x
        finish_x, finish_y = finish
        s_finish = finish_y * cols + finish_x

        assert rows > 2 and cols > 2, "Invalid grid size"
        assert (start_y >= 0 and start_y < rows), f"Invalid start row: {start_y} (row should be in [0, {rows}))"
        assert (start_x >= 0 and start_x < cols), f"Invalid start column: {start_x} (column should be in [0, {cols}))"
        assert (finish_y >= 0 and finish_y < rows), f"Invalid finish row: {finish_y} (row should be in [0, {rows}))"
        assert (finish_x >= 0 and finish_x < cols), f"Invalid finish column: {finish_x} (column should be in [0, {cols}))"

        for time_step, barriers in barriers_over_time:
            for barrier in barriers:
                x, y = barrier
                assert (y >= 0 and y < rows), f"Invalid barrier row at time-step {time_step}: {y} (row should be in [0, {rows}))"
                assert (x >= 0 and x < cols), f"Invalid barrier column at time-step {time_step}: {x} (column should be in [0, {cols}))"
                assert barrier != start, f"Invalid barrier at time-step {time_step}: should not be in the start cell"
                assert barrier != finish, f"Invalid barrier at time-step {time_step}: should not be in the finish cell"

        grid = np.full((rows, cols), TYPE_NORMAL)
        grid[start_y, start_x] = TYPE_START
        grid[finish_y, finish_x] = TYPE_FINISH

        actions = [
            (0, 1),
            (1, 0),
            (0, -1),
            (-1, 0),
        ]

        n_states = rows * cols
        n_actions = len(actions)

        super().__init__(
            n_states=n_states,
            n_actions=n_actions)

        self.rows = rows
        self.cols = cols
        self.start = start
        self.s_start = s_start
        self.finish = finish
        self.s_finish = s_finish
        self.actions = actions
        self.grid = grid
        self.max_steps = max_steps
        self.seed: int | None = None
        self.barriers_over_time = barriers_over_time
        self.barrier_time_step = -1
        self.barriers: list[tuple[int, int]] = []

        self.steps = 0
        self.all_steps = 0
        self.state: int | None = None

        self.update_barriers()

    def update_barriers(self):
        rows = self.rows
        cols = self.cols
        all_steps = self.all_steps
        barriers_over_time = self.barriers_over_time
        grid = self.grid

        time_step = 0
        barriers: list[tuple[int, int]] = []

        for t, b in barriers_over_time:
            if t > all_steps:
                break

            time_step = t
            barriers = b

        if time_step > self.barrier_time_step:
            self.barrier_time_step = time_step
            self.barriers = barriers

            for row in range(rows):
                for col in range(cols):
                    if grid[row, col] == TYPE_BARRIER:
                        grid[row, col] = TYPE_NORMAL

            for x, y in barriers:
                grid[y, x] = TYPE_BARRIER

    def reset(self, seed: int = 0) -> int:
        random.seed(seed)
        state = self.s_start
        self.steps = 0
        self.state = state
        return state

    def reset_all_steps(self, seed: int = 0) -> int:
        state = self.reset(seed=seed)
        self.all_steps = 0
        self.update_barriers()
        return state

    def invalid_position(self, row: int, col: int) -> bool:
        return row < 0 or row >= self.rows or col < 0 or col >= self.cols or self.grid[row, col] == TYPE_BARRIER

    def step(self, action: int) -> tuple[int, float, bool, bool]:
        steps = self.steps + 1
        state = self.state

        assert state is not None, "The environment was not initialized"
        assert state != self.s_finish, "The environment is in a terminal state"

        row = state // self.cols
        col = state % self.cols

        action_move = self.actions[action]
        move_h, move_v = action_move

        new_row = row + move_v
        new_col = col + move_h
        new_state = state
        reward = 0
        terminated = False

        if not self.invalid_position(row=new_row, col=new_col):
            new_state = new_row * self.cols + new_col
            terminated = new_state = self.s_finish

            if terminated:
                reward = 1

        truncated = (not terminated) and (self.max_steps is not None) and (self.max_steps <= steps)

        self.steps = steps
        self.state = new_state
        self.all_steps += 1

        self.update_barriers()

        return new_state, reward, terminated, truncated

    def plot(self, title: str | None = None, history: list[int] = []):
        rows, cols = self.rows, self.cols

        color_grid = np.full((rows, cols, 3), NORMAL_COLOR)

        for row in range(rows):
            inv_row = rows - row - 1
            for col in range(cols):
                if self.grid[row, col] == TYPE_START:
                    color_grid[inv_row, col] = START_COLOR
                elif self.grid[row, col] == TYPE_FINISH:
                    color_grid[inv_row, col] = FINISH_COLOR
                elif self.grid[row, col] == TYPE_BARRIER:
                    color_grid[inv_row, col] = BARRIER_COLOR

        plt.figure(figsize=(12, 8))
        plt.imshow(color_grid, aspect='auto')
        plt.grid(which='both', color='#333', linestyle='-', linewidth=1)
        plt.xticks(np.arange(.5, self.cols, 1), [])
        plt.yticks(np.arange(.5, self.rows, 1), [])
        plt.tick_params(bottom=False, top=False, left=False, right=False, labelbottom=False, labelleft=False)

        for i, state in enumerate(history):
            row = state // self.cols
            col = state % self.cols
            row = rows - row - 1
            # color starts black and ends as AGENT_COLOR
            shade = i/len(history)
            color = np.array(AGENT_COLOR) * shade
            plt.plot(col, row, 'o', color=color, markersize=20)

        if title:
            plt.title(title)

        plt.show()

    @classmethod
    def blocking_maze(cls, max_steps: int | None = None):
        rows, cols = 6, 9
        start = (8, 5)
        finish = (3, 0)
        barriers_over_time = [
            (0, [(x, 2) for x in range(8)]),
            (1000, [(x, 2) for x in range(1, 9)]),
        ]
        return cls(
            size=(rows, cols),
            start=start,
            finish=finish,
            barriers_over_time=barriers_over_time,
            max_steps=max_steps)

    @classmethod
    def shortcut_maze(cls, max_steps: int | None = None):
        rows, cols = 6, 9
        start = (8, 5)
        finish = (3, 0)
        barriers_over_time = [
            (0, [(x, 2) for x in range(1, 9)]),
            (3000, [(x, 2) for x in range(1, 8)]),
        ]
        return cls(
            size=(rows, cols),
            start=start,
            finish=finish,
            barriers_over_time=barriers_over_time,
            max_steps=max_steps)

class GridWorldAgentParams:
    def __init__(
        self,
        state: int,
        action: int,
        reward: float,
        next_state: int,
        terminated: bool,
        truncated: bool,
    ):
        self.state = state
        self.action = action
        self.reward = reward
        self.next_state = next_state
        self.terminated = terminated
        self.truncated = truncated


class GridWorldAgent(BaseAgent):
    def __init__(self, env: GridWorldEnv, name: str, seed: int | None = None):
        super().__init__(env)
        self.grid_env = env
        self.name = name
        self.seed = seed

    def train(self, steps: int):
        self.train_with_rewards(steps=steps, show=False)

    def update(self, params: GridWorldAgentParams) -> None:
        raise NotImplementedError

    def train_with_rewards(self, steps: int, plot=True) -> list[float]:
        seed = self.seed

        state = self.grid_env.reset_all_steps(seed=seed)
        all_steps = 0
        rewards = 0
        terminated = False
        truncated = False
        history = [state]
        last_full_history: tuple[list[int], bool, bool] = []
        reward_history: list[float] = [0]

        while all_steps < steps:
            all_steps += 1
            action = self.act(state)
            next_state, r, c, t = self.env.step(action)
            rewards += r
            terminated = c
            truncated = t

            self.update(GridWorldAgentParams(
                state=state,
                action=action,
                reward=r,
                next_state=next_state,
                terminated=terminated,
                truncated=truncated,
            ))

            history.append(next_state)
            reward_history.append(rewards)

            if terminated or truncated:
                state = self.env.reset(seed=seed)
                last_full_history = [history, terminated, truncated]
                history = [state]
            else:
                state = next_state

        if not last_full_history:
            last_full_history = [history, terminated, truncated]

        if plot:
            self.grid_env.plot(title=self.name, history=history)

        return reward_history

class TestAgent(GridWorldAgent):
    def __init__(self, env: GridWorldEnv, name: str, seed: int | None = None):
        super().__init__(env=env, name=name, seed=seed)

    def act(self, state: int) -> int:
        return 0 # Always North

    def update(self, params: GridWorldAgentParams) -> None:
        return

In [None]:
seed = 1
env = GridWorldEnv.blocking_maze(max_steps=300)
agent = TestAgent(env, name='Blocking Maze (999 steps)')
agent.train_with_rewards(steps=999)
state = env.reset()
action = agent.act(state)
env.step(action)
env.plot(title='Blocking Maze (1000 steps)')

In [None]:
seed = 1
env = GridWorldEnv.shortcut_maze(max_steps=300)
agent = TestAgent(env, name='Shortcut Maze (2999 steps)')
agent.train_with_rewards(steps=2999)
state = env.reset()
action = agent.act(state)
env.step(action)
env.plot(title='Shortcut Maze (3000 steps)')

In [None]:
class DynaAgent(GridWorldAgent):
    def __init__(
        self,
        env: GridWorldEnv,
        name: str,
        n_plan: int = 0,
        alpha: float = 0.2,
        gamma: float = 0.9,
        epsilon: float = 0.1,
        kappa: float = 0,
        seed: int | None = None,
    ):
        super().__init__(env=env, name=name, seed=seed)
        self.n_plan = n_plan
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        self.kappa = kappa
        self.Q = np.zeros((env.n_states, env.n_actions), dtype=float)
        self.M: dict[tuple[int, int], tuple[int, int]] = dict()
        self.actions = 0
        self.T: dict[tuple[int, int], int] = dict()

    def train_with_rewards(self, steps: int, plot=True) -> list[float]:
        self.actions = 0
        self.tau = dict()
        return super().train_with_rewards(steps=steps, plot=plot)

    def act(self, state: int) -> int:
        n_actions = self.env.n_actions
        epsilon = self.epsilon
        kappa = self.kappa

        if kappa > 0:
            action = next((a for a in range(n_actions) if (state, a) not in self.T), None)

            if action is None:
                action: int = np.argmax(self.actions - self.T[(state, a)] for a in range(n_actions))

            self.T[(state, action)] = self.actions
            self.actions += 1
            return action
        else:
            qs = self.Q[state]
            probs = np.ones(n_actions, dtype=float) * epsilon / n_actions
            probs[np.argmax(qs)] += 1 - epsilon
            action = np.random.choice(len(probs), p=probs)
            return action

    def sample(self) -> tuple[int, int]:
        M = self.M
        keys = list(M.keys())
        idx = np.random.choice(len(keys))
        state, action = keys[idx]
        return state, action

    def update(self, params: GridWorldAgentParams) -> None:
        state = params.state
        action = params.action
        reward = params.reward
        next_state = params.next_state

        Q = self.Q
        n_plan = self.n_plan
        alpha = self.alpha
        gamma = self.gamma

        Q[state, action] += alpha * (reward + gamma * max(Q[next_state]) - Q[state, action])
        self.M[(state, action)] = (reward, next_state)

        for _ in range(n_plan):
            s, a = self.sample()
            r, sp = self.M[(s, a)]
            Q[s, a] += alpha * (r + gamma * max(Q[sp]) - Q[s, a])

In [None]:
def plot_gridworld(agents: list[DynaAgent]):
    rewards: list[tuple[str, list[float]]] = []

    for agent in agents:
        agent = DynaAgent(
            env,
            name=agent.name,
            n_plan=10,
            kappa=0,
        )
        r = agent.train_with_rewards(steps=6000)
        rewards.append((agent.name, r))

    plt.figure(figsize=(8, 8))

    for name, r in rewards:
        plt.plot(r, label=name)

    plt.xlabel('Time steps')
    plt.ylabel('Cumulative reward')
    plt.legend(loc='center left', bbox_to_anchor=(1, 0.5))
    plt.show()

In [None]:
def test_gridworld(name: str, env: GridWorldEnv):
    agents: list[DynaAgent] = [
        DynaAgent(
            env,
            name=f'{name} - No Exploration Bonus - n=10',
            n_plan=10,
            kappa=0,
        ),
        # DynaAgent(
        #     env,
        #     name=f'{name} - No Exploration Bonus - n=50',
        #     n_plan=50,
        #     kappa=0,
        # ),
        DynaAgent(
            env,
            name=f'{name} - Exploration 10% - n=10',
            n_plan=10,
            kappa=0.1,
        ),
        # DynaAgent(
        #     env,
        #     name=f'{name} - Exploration 10% - n=50',
        #     n_plan=50,
        #     kappa=0.1,
        # ),
        # DynaAgent(
        #     env,
        #     name=f'{name} - Exploration 20% - n=10',
        #     n_plan=10,
        #     kappa=0.2,
        # ),
        # DynaAgent(
        #     env,
        #     name=f'{name} - Exploration 50% - n=50',
        #     n_plan=50,
        #     kappa=0.2,
        # ),
    ]
    plot_gridworld(agents)

In [None]:
env = GridWorldEnv.blocking_maze(max_steps=300)
test_gridworld(name='Blocking Maze', env=env)

In [None]:
env = GridWorldEnv.shortcut_maze(max_steps=300)
test_gridworld(name='Shortcut Maze', env=env)