# Reinforcement Learning

# 5. Gradient Methods

This notebook presents gradient methods, useful for learning in some environment with a large state space.

We use a neural network with a single hidden layer.

In [127]:
import numpy as np
from matplotlib import pyplot as plt

In [268]:
from model import TicTacToe, Nim, ConnectFour
from agent import Agent, OnlineEvaluation
from dynamic import ValueIteration

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim

## Value gradient

We start with value-based methods. The neural network is a regressor that approximates the value function. Note that the model is supposed to be known, so that we don't need the action-value function.

In [348]:
class Regressor(nn.Module):
    """Neural network for value function approximation. Return the value of each state."""
    def __init__(self, model, hidden_size):
        if not hasattr(model, 'one_hot_encode'):
            raise ValueError("The environment must have a one-hot encoding of states.")   
        super(Regressor, self).__init__()
        self.model = model
        state = model.init_state()
        code = model.one_hot_encode(state)
        input_size = len(code)
        self.nn = nn.Sequential(
            nn.Linear(input_size, hidden_size), 
            nn.GELU(), 
            nn.Linear(hidden_size, 1))

    def forward(self, code):
        """Forward pass."""
        return self.nn(code)

In [349]:
game = TicTacToe()

In [24]:
regressor = Regressor(model=game, hidden_size=100)

In [27]:
state

(1,
 array([[0, 0, 0],
        [0, 0, 0],
        [0, 0, 0]]))

In [25]:
state = game.state
# one-hot encoding
code = game.one_hot_encode(state)
# tensor
code = torch.tensor(code).float()

In [81]:
code

tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])

In [8]:
value = regressor.forward(code).detach()

In [9]:
# value of the state
print(value)

tensor([0.0137])


## To do

* Complete the method get_best_actions of the class ValueGradient. 
* Test the agent on TicTacToe, against (1) a random adversary and (2) a perfect adversary.
* Test the agent on ConnectFour, against (1) a random adversary and (2) an adversary with the one-step policy.
* Compare your results to another learning strategy (e.g., Monte-Carlo learning) and interpret the results.

In [361]:
class ValueGradient(OnlineEvaluation):
    """Agent learning by value gradient. The model is supposed to be known.
    
    Parameters
    ----------
    model : object of class Environment
        Model.
    player : int
        Player for games (1 or -1, default = default player of the game).
    gamma : float
        Discount rate (in [0, 1], default = 1).
    hidden_size : int
        Size of the hidden layer (default = 100).
    """
    def __init__(self, model, player=None, gamma=1, hidden_size=100):
        super(ValueGradient, self).__init__(model, player=player)  
        if not hasattr(model, "get_next_state"):
            raise ValueError("The model must be known, with a 'get_next_state' method.")
        self.nn = Regressor(model, hidden_size)
        self.gamma = gamma
        self.model = model
    
    def get_best_actions(self, state):
        """Get the best actions in some state according to the value function."""
        actions = self.get_actions(state)
        if len(actions) > 1:
            values = []

            code = self.model.one_hot_encode(state)
            code = torch.tensor(code).float()
            current_value = self.nn.forward(code).detach()

            for act in actions:
                player, next_state = self.model.get_next_state(state, act)
                next_state = (player, next_state)
                
                if self.model.is_terminal(next_state):
                    values.append(current_value)
                else:
                    next_code = self.model.one_hot_encode(next_state)
                    next_code = torch.tensor(next_code).float()
                    next_value = self.nn.forward(next_code).detach()
                    values.append(next_value)

            if self.player == 1:
                best_value = max(values)
            else:
                best_value = min(values)
            actions = [action for action, value in zip(actions, values) if value == best_value]

        return actions      
    
    def update_policy(self):
        self.policy = self.get_policy()
    
    def get_samples(self, horizon, epsilon):
        """Get samples from one episode under the epsilon-greedy policy."""
        self.policy = self.randomize_policy(epsilon=epsilon)
        self.model.reset()
        state = self.model.state
        states = []
        rewards = []
        for t in range(horizon):
            action = self.get_action(state)
            reward, stop = self.model.step(action)
            states.append(state)
            rewards.append(reward)
            if stop:
                break
            state = self.model.state
        gains = []
        gain = 0
        for reward in reversed(rewards):
            gain = reward + self.gamma * gain
            gains.append(gain)
        return reversed(states), gains
        
    def train(self, horizon=100, n_episodes=1000, learning_rate=0.01, epsilon=0.1):
        """Train the neural network with samples drawn from the epsilon-greedy policy."""
        optimizer = optim.Adam(self.nn.parameters(), lr=learning_rate)
        for t in range(n_episodes):
            states, gains = self.get_samples(horizon, epsilon)
            codes = [self.model.one_hot_encode(state) for state in states]
            codes = np.array(codes)
            codes = torch.tensor(codes).float()
            values = self.nn.forward(codes)
            gains = torch.tensor(gains).float().reshape(-1, 1)
            mse = nn.MSELoss()
            loss = mse(values, gains)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

In [362]:
Game = TicTacToe
game = TicTacToe()
agent = ValueGradient(game)

In [363]:
gains = agent.get_gains(n_runs=1000)
np.mean(gains)

0.277

In [364]:
agent.train(n_episodes=1000)
agent.update_policy()
gains = agent.get_gains(n_runs=1000)
np.mean(gains)

0.87

Game against perfect adversary

In [365]:
algo = ValueIteration(game)
policy, ad_policy = algo.get_perfect_players()


In [366]:
game = Game(adversary_policy=ad_policy)
agent = ValueGradient(game)

agent.train(n_episodes=100)
agent.update_policy()
gains = agent.get_gains(n_runs=100)
np.mean(gains)

0.0

Based on the results, we see that the method works well against the random policy of the opponent, but against the perfect policy it shows poor results

ConnectFour

In [367]:
Game = ConnectFour
game = Game(adversary_policy="random")

In [368]:
agent = ValueGradient(game)

In [369]:
agent.train(n_episodes=100)
agent.update_policy()
gains = agent.get_gains(n_runs=100)
np.mean(gains)

0.44

In [370]:
game = Game(adversary_policy="one_step")
agent = ValueGradient(game)
agent.train(n_episodes=100)
agent.update_policy()
gains = agent.get_gains(n_runs=100)
np.mean(gains)

-0.76

similar to tic-tac-toe the results on the 1 step policy are bad

Monte-Carlo Tic-Tac-Toe

In [384]:
class MCLearning(OnlineEvaluation):
    """Online evaluation by Monte-Carlo."""
        
    def update_values(self, state=None, horizon=100):
        """Update the values from one episode."""
        stop, states, rewards = self.get_episode(state=state, horizon=horizon)
        gain = 0
        # backward update
        states.pop()
        for state, reward in zip(reversed(states), reversed(rewards)):
            self.add_state(state)
            code = self.model.encode(state)
            self.count[code] += 1
            # to be modified
            # begin
            gain = self.gamma * gain + reward
            # end 
            diff = gain - self.value[code]
            count = self.count[code]
            self.value[code] += diff / count

In [385]:
Game = TicTacToe
game = Game(adversary_policy="random")

In [386]:
algo = MCLearning(game, policy='random')

In [387]:
n_episodes = 100
values_mc = []
for t in range(n_episodes):
    algo.update_values()

In [388]:
policy = algo.get_policy()

In [389]:
agent = Agent(game, policy=policy)

In [390]:
np.mean(agent.get_gains())

0.75

In [391]:
Game = TicTacToe
game = Game(adversary_policy="one_step")

In [392]:
algo = MCLearning(game)


In [393]:
n_episodes = 1000
values_mc = []
for t in range(n_episodes):
    algo.update_values()

In [394]:
policy = algo.get_policy()

In [395]:
agent = Agent(game, policy=policy)

In [396]:
np.mean(agent.get_gains())

0.27

Unlike valuegradient, ms learning shows good results in both cases.
However, on a random policy the results of the gradient are slightly better

## Policy gradient

We now consider a policy-based method. The neural network is a classifier that approximates the optimal policy. It returns the probability of each action.

In [232]:
class Classifier(nn.Module):
    """Neural network for policy gradient. Return the distribution of actions in each state."""
    def __init__(self, model, hidden_size):
        if not hasattr(model, 'one_hot_encode'):
            raise ValueError("The environment must have a one-hot encoding of states.")   
        super(Classifier, self).__init__()
        self.model = model
        actions = model.get_all_actions()
        if self.model.is_game():
            # remove action when passing
            actions.pop()
        self.actions = actions
        state = model.init_state()
        code = model.one_hot_encode(state)
        input_size = len(code)
        output_size = len(actions)
        self.nn = nn.Sequential(
            nn.Linear(input_size, hidden_size), 
            nn.GELU(), 
            nn.Linear(hidden_size, output_size), 
            nn.Softmax(dim=0))

    def forward(self, code):
        """Forward pass."""
        return self.nn(code)

In [233]:
game = TicTacToe()

In [234]:
classifier = Classifier(model=game, hidden_size=100)

In [235]:
state = game.state
# one-hot encoding
code = game.one_hot_encode(state)
# tensor
code = torch.tensor(code).float()

In [236]:
probs = classifier.forward(code).detach()

In [237]:
print(probs)

tensor([0.1132, 0.1117, 0.1055, 0.1150, 0.1233, 0.1130, 0.1044, 0.1002, 0.1137])


In [238]:
len(probs)

9

In [239]:
torch.sum(probs)

tensor(1.)

## To do

* Complete the method 'train' of the class PolicyGradient. Observe that a penalty is assigned for illegal actions.
* Test the agent on TicTacToe, against (1) a random adversary and (2) a perfect adversary.
* Test the agent on ConnectFour, against (1) a random adversary and (2) an adversary with the one-step policy.
* Compare your results to another learning strategy (e.g., Monte-Carlo learning) and interpret the results.
* (bonus) Try to improve policy gradient on TicTacToe with a perfect adversary.

In [264]:
class PolicyGradient(Agent):
    """Agent learning by policy gradient.
    
    Parameters
    ----------
    model : object of class Environment
        Model.a
    player : int
        Player for games (1 or -1, default = default player of the game).
    gamma : float
        Discount rate (in [0, 1], default = 1).
    hidden_size : int
        Size of the hidden layer (default = 100).
    penalty : float
        Penalty for illegal actions (default = -5).
    min_log : float
        Minimal value to compute the log (default = 1e-10)
    """
    def __init__(self, model, player=None, gamma=1, hidden_size=100, penalty=-1, min_log=1e-10):
        super(PolicyGradient, self).__init__(model, player=player)  
        self.nn = Classifier(model, hidden_size)
        self.action_id = {action: i for i, action in enumerate(self.nn.actions)}
        self.gamma = gamma
        self.penalty = penalty
        self.min_log = min_log
        
    def get_policy(self):
        """Get the current policy."""
        def policy(state):
            actions = self.model.get_actions(state)
            if len(actions) > 1:
                win_actions = []
                # check win actions
                if self.model.is_game():
                    next_states = [self.model.get_next_state(state, action) for action in actions]
                    win_actions = [self.model.get_reward(next_state) == self.player for next_state in next_states]
                if any(win_actions):
                    probs = np.array(win_actions).astype(float)
                else:
                    # get prob of each action
                    code = self.model.one_hot_encode(state)
                    code = torch.tensor(code).float()
                    probs = self.nn.forward(code)
                    probs = probs.detach().numpy()
                    # restrict to available actions
                    indices = [self.action_id[action] for action in actions]
                    probs = probs[indices]                    
                # renormalize
                if np.sum(probs) > 0:
                    probs = probs / np.sum(probs)
                else:
                    probs = np.ones(len(actions)) / len(actions)
            else:
                probs = [1]
            return probs, actions
        return policy
    
    def update_policy(self):
        """Update the policy."""
        self.policy = self.get_policy()
    
    def get_samples(self, horizon):
        """Get samples from one episode."""
        self.update_policy()
        rewards = []
        log_probs = []
        log_probs_illegal = []
        self.model.reset()
        state = self.model.state
        for t in range(horizon):
            action = self.get_action(state)
            if action is not None:
                i = self.action_id[action]
                code = self.model.one_hot_encode(state)
                code = torch.tensor(code).float()
                probs = self.nn.forward(code)
                prob = torch.clip(probs[i], self.min_log, 1 - self.min_log)
                log_prob = torch.log(prob).reshape(1)

                actions = self.model.get_actions(state)
                if action in actions:
                    reward, stop = self.model.step(action)
                    state = self.model.state
                    rewards.append(reward)
                    log_probs.append(log_prob)
                else:
                    log_probs_illegal.append(log_prob)
            else:
                reward, stop = self.model.step(action)
                state = self.model.state
            if stop:
                break
        gain = 0
        for reward in reversed(rewards):
            gain = reward + self.gamma * gain
        return gain, log_probs, log_probs_illegal
        
    def train(self, horizon=100, n_episodes=1000, learning_rate=0.01):
        """Train the neural network."""
        optimizer = optim.Adam(self.nn.parameters(), lr=learning_rate)
        for t in range(n_episodes):
            gain, log_probs, log_probs_illegal = self.get_samples(horizon)
            loss = 0
            if len(log_probs):
                log_probs = torch.cat(log_probs)
                loss -= gain * torch.sum(log_probs)
            if len(log_probs_illegal):
                log_probs_illegal = torch.cat(log_probs_illegal)
                loss += self.penalty * torch.sum(log_probs_illegal)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()


In [397]:
game = TicTacToe()
agent = PolicyGradient(game)

In [398]:
agent.train(n_episodes=100)
agent.update_policy()
gains = agent.get_gains(n_runs=100)
np.mean(gains)

0.82

Game with perfect adversary policy

In [399]:
algo = ValueIteration(game)
policy, ad_policy = algo.get_perfect_players()

In [400]:
Game = TicTacToe

In [401]:
game = Game(adversary_policy=ad_policy)
agent = PolicyGradient(game)

agent.train(n_episodes=100)
agent.update_policy()
gains = agent.get_gains(n_runs=100)
np.mean(gains)

-0.74

ConnectFour

In [402]:
Game = ConnectFour
game = Game(adversary_policy="random")

In [403]:
agent = PolicyGradient(game)

In [404]:
agent.train(n_episodes=100)
agent.update_policy()
gains = agent.get_gains(n_runs=100)
np.mean(gains)

0.82

In a game with a more complex space (connect four) the policy gradient shows results significantly better than the value gradient

One-step policy

In [405]:
game = Game(adversary_policy="one_step")
agent = PolicyGradient(game)

In [406]:
agent.train(n_episodes=100)
agent.update_policy()
gains = agent.get_gains(n_runs=100)
np.mean(gains)

-0.54

however, against a more complex policy the results are the same as in the previous method

In games like Tic Tac Toe and Connect Four, rewards are sparse (they occur only at the end of the game). This sparsity can make it difficult for policy gradient methods to effectively learn from the rewards.