<a href="https://colab.research.google.com/github/ychervonyi/reinforcement-learning-learning/blob/main/cliff_walking_chapter6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np


np.random.seed(1231231)


class Environment:
    def __init__(self, grid):
        self.parse_grid(grid)
        self.all_actions = [(-1, 0), (1, 0), (0, -1), (0, 1)]
        self.n_actions = len(self.all_actions)

    def parse_grid(self, grid):
        self._grid = np.array([list(s.lstrip()) for s in grid.split("\n")[1:-1]])
        self._n_rows, self._n_cols = self._grid.shape
        self.state_tuple = self._grid.shape
        self._start_points = []
        self._finish_points = []
        for r in range(self._n_rows):
            for c in range(self._n_cols):
                if self._grid[r, c] == "S":
                    self._start_points.append((r, c))
                elif self._grid[r, c] == "F":
                    self._finish_points.append((r, c))

    def _is_valid_point(self, r, c):
        return 0 <= r < self._n_rows and 0 <= c < self._n_cols

    def get_start_point(self):
        return self._start_points[np.random.randint(0, len(self._start_points))]

    def get_possible_action_indices(self, r, c):
        possible_actions = []
        for i, a_i in enumerate(self.all_actions):
            dr, dc = a_i
            if self._is_valid_point(r + dr, c + dc):
                possible_actions.append(i)
        return possible_actions

    def step(self, r, c, policy_fn):
        action_i = policy_fn(r, c)
        dr, dc = self.all_actions[action_i]
        new_r, new_c = r + dr, c + dc
        if not self._is_valid_point(new_r, new_c):
            reward = -1
            new_r, new_c = r, c
        elif self._grid[new_r, new_c] == "F":
            reward = 0
        elif self._grid[new_r, new_c] == "X":
            reward = -100
            new_r, new_c = self.get_start_point()
        else:
            reward = -1
        return action_i, reward, new_r, new_c

    def plot_trajectory(self, traj):
        grid = np.zeros(self._grid.shape)
        for r, c in traj:
            grid[r][c] += 1
        print(grid)

In [2]:
class Agent:
    """
    Agent contains Q function, policy and C. Policy is determined
    from Q function.
    Agent also contains policies.
    """
    def __init__(self, env, epsilon=0.1):
        self.env = env
        # Value function
        self.Q = np.random.uniform(
            low=-1,
            high=1,
            size=self.env.state_tuple + (self.env.n_actions,)
        )
        # self.Q = np.zeros(self.env.state_tuple + (self.env.n_actions,))
        for r, c in self.env._finish_points:
            self.Q[r, c] = np.zeros(self.env.n_actions)

        # Policy
        self.policy = np.argmax(self.Q, axis=2)
        # self.policy = np.zeros((rows, cols))
        # self.policy_init_value = -1
        # self.policy = np.full(self.env.state_tuple, self.policy_init_value)

        self.epsilon = epsilon

    def epsilon_greedy_policy(self, r, c):
        """
        Get possible actions from the environment,
        choose one random action out of the possible actions with
        probability `epsilon` or return learned action if it is allowed.

        Returns action index.
        """
        if np.random.rand() > self.epsilon:
            a_index = self.policy[r, c]
        else:
            a_index = np.random.randint(0, len(self.env.all_actions))
        return a_index

In [10]:
class QLearningControl:
    def __init__(self, env, agent):
        self.gamma = 1
        self.alpha = 0.1
        self.env = env
        self.agent = agent

    def optimize(self, train_episodes):
        total_steps = ep_count = 0
        for ep in range(train_episodes):
            if ep == 10 or ep % 1000 == 0 and total_steps != 0:
                print(f"Average trajectory length: {round(total_steps / ep_count, 2)}")
                total_steps = ep_count = 0
                self.env.plot_trajectory(traj)
            r, c = self.env.get_start_point()
            steps = 0
            traj = []
            while True:
                new_state = self.env.step(r, c, self.agent.epsilon_greedy_policy)
                traj.append((r, c))
                action_i, reward, new_r, new_c = new_state
                if reward == 0:
                    traj.append((new_r, new_c))
                    break
                self.agent.Q[r, c, action_i] += self.alpha * (reward + self.gamma * np.amax(self.agent.Q[new_r, new_c]) - self.agent.Q[r, c, action_i])
                self.agent.policy[r, c] = np.argmax(self.agent.Q[r, c])
                steps += 1
                r, c = new_r, new_c
            total_steps += steps
            ep_count += 1

In [12]:
grid = """
OOOOOOOOOOOOOOOOOOOO
OOOOOOOOOOOOOOOOOOOO
OOOOOOOOOOOOOOOOOOOO
OOOOOOOOOOOOOOOOOOOO
SXXXXXXXXXXXXXXXXXXF
"""
env = Environment(grid)
agent = Agent(env)
td_control = OffPolicyTDControl(env, agent)
td_control.optimize(20000)

Average trajectory length: 692.9
[[12.  9.  5.  4.  5.  4.  5.  5.  5.  5.  7.  6.  7.  7.  5.  3.  0.  0.
   0.  0.]
 [10.  9.  7.  5.  8.  7.  6.  5.  6.  5.  7.  6.  7.  8.  7.  5.  3.  1.
   0.  0.]
 [ 8.  8.  9.  7.  8.  7.  7.  5.  7.  7.  8.  8.  7.  7.  6.  5.  3.  0.
   0.  0.]
 [11.  7.  6.  5.  5.  4.  6.  6.  5.  5.  5.  5.  5.  5.  4.  4.  2.  1.
   1.  2.]
 [ 9.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
   0.  1.]]
Average trajectory length: 46.24
[[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 1. 1. 0. 0. 0. 0. 0. 0. 0.]
 [1. 1. 1. 1. 1. 1. 1. 1. 1. 2. 2. 0. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]]
Average trajectory length: 29.51
[[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0