# Chapter 5: Tabular Learning and the Bellman Equation

Building towards Q-learning, via value iteration. Cool maths omitted in this notebook because I can't be bothered to typeset it all.

In [1]:
import gym
import collections
from torch.utils.tensorboard import SummaryWriter

ENV_NAME = "FrozenLake-v1"
GAMMA = 0.9
TEST_EPISODES = 20

In [2]:
class Agent:
    def __init__(self):
        self.env = gym.make(ENV_NAME)
        self.state = self.env.reset()
        self.rewards = collections.defaultdict(float)
        self.transits = collections.defaultdict(collections.Counter)
        self.values = collections.defaultdict(float)

    def play_n_random_steps(self, count):
        for _ in range(count):
            action = self.env.action_space.sample()
            new_state, reward, is_done, _ = self.env.step(action)
            self.rewards[(self.state, action, new_state)] = reward # record observed reward, with tuple as dict key
            self.transits[(self.state, action)][new_state] += 1 # increase count of times new_state was reached from state via action
            self.state = self.env.reset() if is_done else new_state

    def calc_action_value(self, state, action):
        target_counts = self.transits[(state, action)]
        total = sum(target_counts.values())
        action_value = 0.0
        for tgt_state, count in target_counts.items():
            reward = self.rewards[(state, action, tgt_state)]
            val = reward + GAMMA*self.values[tgt_state] # Bellman equation - immediate reward plus discounted value of target state
            action_value += (count/total)*val # approximation of transition probability, based on observations
        return action_value

    def select_action(self, state):
        best_action, best_value = None, None
        for action in range(self.env.action_space.n):
            action_value = self.calc_action_value(state, action)
            if best_value is None or best_value < action_value: # greedy selection of best action
                best_action, best_value = action, action_value
        return best_action

    def play_episode(self, env): # using a new envionment, in order not to affect the "training" environment
        total_reward = 0.0
        state = env.reset()
        while True:
            action = self.select_action(state)
            new_state, reward, is_done, _ = env.step(action)
            self.rewards[(state, action, new_state)] = reward
            self.transits[(state, action)][new_state] += 1
            total_reward += reward
            if is_done:
                break
            state = new_state
        return total_reward

    def value_iteration(self):
        for state in range(self.env.observation_space.n):
            state_values = [self.calc_action_value(state, action) for action in range(self.env.action_space.n)]
            self.values[state] = max(state_values)

In [3]:
test_env = gym.make(ENV_NAME)
agent = Agent()
writer = SummaryWriter(comment="-v-iteration")

iter_no = 0
best_reward = 0.0
while True:
    iter_no += 1
    agent.play_n_random_steps(100) # get fresh data
    agent.value_iteration() # do the thing

    reward = 0.0
    for _ in range(TEST_EPISODES):
        reward += agent.play_episode(test_env)
    reward /= TEST_EPISODES # average reward per episode
    writer.add_scalar("reward", reward, iter_no)
    if reward > best_reward:
        print("Best reward updated %.3f -> %.3f" % (best_reward, reward))
        best_reward = reward
    if reward > 0.80:
        print("Solved in %d iterations!" % iter_no)
        writer.close()
        break

Best reward updated 0.000 -> 0.050
Best reward updated 0.050 -> 0.100
Best reward updated 0.100 -> 0.350
Best reward updated 0.350 -> 0.450
Best reward updated 0.450 -> 0.600
Best reward updated 0.600 -> 0.650
Best reward updated 0.650 -> 0.700
Best reward updated 0.700 -> 0.750
Best reward updated 0.750 -> 0.800
Best reward updated 0.800 -> 0.900
Solved in 66 iterations!


## Q-learning

A simple adaptation of value iteration... now we learn using Q-values, $Q(s, a)$, rather than V-values, $V(s)$.

We need more memory to store all the Q-values - as in, we now have to store `env.action_space.n` times as many values - but now we don't need info about the reward, or about probabilities, to choose the best action!

In [4]:
class QAgent(Agent):

    def __init__(self):
        super().__init__()

    # Override functions from Agent
    def select_action(self, state):
        best_action, best_value = None, None
        for action in range(self.env.action_space.n):
            action_value = self.values[(state, action)] # use stored value, rather than calculated value
            if best_value is None or best_value < action_value:
                best_action, best_value = action, action_value
        return best_action

    def value_iteration(self):
        # Previously we could just refer to our stored values (using those as an approximation of "true" values
        # Now we instead use .select_action() to get action with largest Q-value
        for state in range(self.env.observation_space.n):
            for action in range(self.env.action_space.n):
                action_value = 0.0
                target_counts = self.transits[(state, action)]
                total = sum(target_counts.values())
                for tgt_state, count in target_counts.items():
                    reward = self.rewards[(state, action, tgt_state)]
                    best_action = self.select_action(tgt_state)
                    val = reward + GAMMA*self.values[(tgt_state, best_action)]
                    action_value += (count/total)*val
                self.values[(state, action)] = action_value

In [5]:
test_env = gym.make(ENV_NAME)
agent = QAgent()
writer = SummaryWriter(comment="-q-iteration")

iter_no = 0
best_reward = 0.0
while True:
    iter_no += 1
    agent.play_n_random_steps(100) # get fresh data
    agent.value_iteration() # do the thing

    reward = 0.0
    for _ in range(TEST_EPISODES):
        reward += agent.play_episode(test_env)
    reward /= TEST_EPISODES # average reward per episode
    writer.add_scalar("reward", reward, iter_no)
    if reward > best_reward:
        print("Best reward updated %.3f -> %.3f" % (best_reward, reward))
        best_reward = reward
    if reward > 0.80:
        print("Solved in %d iterations!" % iter_no)
        writer.close()
        break

Best reward updated 0.000 -> 0.050
Best reward updated 0.050 -> 0.250
Best reward updated 0.250 -> 0.400
Best reward updated 0.400 -> 0.500
Best reward updated 0.500 -> 0.550
Best reward updated 0.550 -> 0.600
Best reward updated 0.600 -> 0.800
Best reward updated 0.800 -> 0.850
Solved in 99 iterations!
