# Reinforcement Learning

# 5. Gradient Methods

This notebook presents gradient methods, useful for learning in some environment with a large state space.

We use a neural network with a single hidden layer.

In [1]:
import numpy as np
from matplotlib import pyplot as plt

In [2]:
from model import TicTacToe, Nim, ConnectFour
from agent import Agent, OnlineEvaluation
from dynamic import ValueIteration

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim

## Value gradient

We start with value-based methods. The neural network is a regressor that approximates the value function. Note that the model is supposed to be known, so that we don't need the action-value function.

In [4]:
class Regressor(nn.Module):
    """Neural network for value function approximation. Return the value of each state."""
    def __init__(self, model, hidden_size):
        if not hasattr(model, 'one_hot_encode'):
            raise ValueError("The environment must have a one-hot encoding of states.")   
        super(Regressor, self).__init__()
        self.model = model
        state = model.init_state()
        code = model.one_hot_encode(state)
        input_size = len(code)
        self.nn = nn.Sequential(
            nn.Linear(input_size, hidden_size), 
            nn.GELU(), 
            nn.Linear(hidden_size, 1))

    def forward(self, code):
        """Forward pass."""
        return self.nn(code)

In [5]:
game = TicTacToe()

In [6]:
regressor = Regressor(model=game, hidden_size=100)

In [7]:
state = game.state
# one-hot encoding
code = game.one_hot_encode(state)
# tensor
code = torch.tensor(code).float()

In [8]:
value = regressor.forward(code).detach()

In [9]:
# value of the state
print(value)

tensor([-0.0719])


## To do

* Complete the method get_best_actions of the class ValueGradient. 
* Test the agent on TicTacToe, against (1) a random adversary and (2) a perfect adversary.
* Test the agent on ConnectFour, against (1) a random adversary and (2) an adversary with the one-step policy.
* Compare your results to another learning strategy (e.g., Monte-Carlo learning) and interpret the results.

In [11]:
class ValueGradient(OnlineEvaluation):
    """Agent learning by value gradient. The model is supposed to be known.
    
    Parameters
    ----------
    model : object of class Environment
        Model.
    player : int
        Player for games (1 or -1, default = default player of the game).
    gamma : float
        Discount rate (in [0, 1], default = 1).
    hidden_size : int
        Size of the hidden layer (default = 100).
    """
    def __init__(self, model, player=None, gamma=1, hidden_size=100):
        super(ValueGradient, self).__init__(model, player=player)  
        if not hasattr(model, "get_next_state"):
            raise ValueError("The model must be known, with a 'get_next_state' method.")
        self.nn = Regressor(model, hidden_size)
        self.gamma = gamma
        
    def get_best_actions(self, state):
        """Get the best actions in some state according to the value function.""" 
        actions = self.get_actions(state)
        if len(actions) > 1:
            values = []
            for action in actions:
                next_state = self.model.get_next_state(state, action)
                if self.model.is_terminal(next_state):
                    values.append(self.model.get_reward(next_state))
                else:
                    next_code = self.model.one_hot_encode(next_state)
                    next_code_preprocessed = torch.tensor(next_code).float()
                    values.append(self.nn.forward(next_code_preprocessed).item())
                
                if self.player == 1:
                    best_value = max(values)
                else:
                    best_value = min(values)
            actions = [action for action, value in zip(actions, values) if value==best_value]
        return actions        
    
    def update_policy(self):
        self.policy = self.get_policy()
    
    def get_samples(self, horizon, epsilon):
        """Get samples from one episode under the epsilon-greedy policy."""
        self.policy = self.randomize_policy(epsilon=epsilon)
        self.model.reset()
        state = self.model.state
        states = []
        rewards = []
        for t in range(horizon):
            action = self.get_action(state)
            reward, stop = self.model.step(action)
            states.append(state)
            rewards.append(reward)
            if stop:
                break
            state = self.model.state
        gains = []
        gain = 0
        for reward in reversed(rewards):
            gain = reward + self.gamma * gain
            gains.append(gain)
        return reversed(states), gains
        
    def train(self, horizon=100, n_episodes=1000, learning_rate=0.01, epsilon=0.1):
        """Train the neural network with samples drawn from the epsilon-greedy policy."""
        optimizer = optim.Adam(self.nn.parameters(), lr=learning_rate)
        for t in range(n_episodes):
            states, gains = self.get_samples(horizon, epsilon)
            codes = [self.model.one_hot_encode(state) for state in states]
            codes = np.array(codes)
            codes = torch.tensor(codes).float()
            values = self.nn.forward(codes)
            gains = torch.tensor(gains).float().reshape(-1, 1)
            mse = nn.MSELoss()
            loss = mse(values, gains)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

#### TicTacToe against random player ####

In [12]:
game = TicTacToe()
agent = ValueGradient(game)

In [13]:
gains = agent.get_gains(n_runs=10)
np.unique(gains, return_counts=True)

(array([-1,  0,  1]), array([4, 1, 5], dtype=int64))

In [17]:
agent.train(n_episodes=1000)
agent.update_policy()
gains = agent.get_gains(n_runs=10)
np.unique(gains, return_counts=True)

(array([0, 1]), array([1, 9], dtype=int64))

The trained agent wins 9 times and loses once. If we increase n_episodes, it increases this number. For instance, for n_episodes = 100 it can win just 5 or 6 times.

#### TicTacToe against perfect player

In [18]:
game = TicTacToe()
algo = ValueIteration(game)
policy, adversary_policy = algo.get_perfect_players()

In [19]:
game = TicTacToe(adversary_policy=adversary_policy)
agent = ValueGradient(game)

In [20]:
gains = agent.get_gains(n_runs=10)
np.unique(gains, return_counts=True)

(array([-1,  0]), array([8, 2], dtype=int64))

In [21]:
agent.train(n_episodes=1000)
agent.update_policy()
gains = agent.get_gains(n_runs=10)
np.unique(gains, return_counts=True)

(array([0]), array([10], dtype=int64))

Because the perfect player always strives to avoid losing, the trained agent can only achieve ties in the end.

#### Connect Four against random adversary

In [22]:
game = ConnectFour()
agent = ValueGradient(game)
gain = agent.get_gains(n_runs=10)
print(np.unique(gains, return_counts=True))


(array([0]), array([10], dtype=int64))


In [23]:
agent.train(n_episodes=1000)
agent.update_policy()
gain = agent.get_gains(n_runs=10)
print(np.unique(gains, return_counts=True))

(array([0]), array([10], dtype=int64))


We get ties only. Connect Four appears too challenging for the agent to train sufficiently to consistently win every game, even against a random opponent.

## Policy gradient

We now consider a policy-based method. The neural network is a classifier that approximates the optimal policy. It returns the probability of each action.

In [24]:
class Classifier(nn.Module):
    """Neural network for policy gradient. Return the distribution of actions in each state."""
    def __init__(self, model, hidden_size):
        if not hasattr(model, 'one_hot_encode'):
            raise ValueError("The environment must have a one-hot encoding of states.")   
        super(Classifier, self).__init__()
        self.model = model
        actions = model.get_all_actions()
        if self.model.is_game():
            # remove action when passing
            actions.pop()
        self.actions = actions
        state = model.init_state()
        code = model.one_hot_encode(state)
        input_size = len(code)
        output_size = len(actions)
        self.nn = nn.Sequential(
            nn.Linear(input_size, hidden_size), 
            nn.GELU(), 
            nn.Linear(hidden_size, output_size), 
            nn.Softmax(dim=0))

    def forward(self, code):
        """Forward pass."""
        return self.nn(code)

In [25]:
game = TicTacToe()

In [26]:
classifier = Classifier(model=game, hidden_size=100)

In [27]:
state = game.state
# one-hot encoding
code = game.one_hot_encode(state)
# tensor
code = torch.tensor(code).float()

In [28]:
probs = classifier.forward(code).detach()

In [29]:
print(probs)

tensor([0.1261, 0.1115, 0.1253, 0.0992, 0.1072, 0.1031, 0.1200, 0.1119, 0.0957])


In [30]:
len(probs)

9

In [31]:
torch.sum(probs)

tensor(1.)

## To do

* Complete the method 'train' of the class PolicyGradient. Observe that a penalty is assigned for illegal actions.
* Test the agent on TicTacToe, against (1) a random adversary and (2) a perfect adversary.
* Test the agent on ConnectFour, against (1) a random adversary and (2) an adversary with the one-step policy.
* Compare your results to another learning strategy (e.g., Monte-Carlo learning) and interpret the results.
* (bonus) Try to improve policy gradient on TicTacToe with a perfect adversary.

In [32]:
class PolicyGradient(Agent):
    """Agent learning by policy gradient.
    
    Parameters
    ----------
    model : object of class Environment
        Model.a
    player : int
        Player for games (1 or -1, default = default player of the game).
    gamma : float
        Discount rate (in [0, 1], default = 1).
    hidden_size : int
        Size of the hidden layer (default = 100).
    penalty : float
        Penalty for illegal actions (default = -5).
    min_log : float
        Minimal value to compute the log (default = 1e-10)
    """
    def __init__(self, model, player=None, gamma=1, hidden_size=100, penalty=-1, min_log=1e-10):
        super(PolicyGradient, self).__init__(model, player=player)  
        self.nn = Classifier(model, hidden_size)
        self.action_id = {action: i for i, action in enumerate(self.nn.actions)}
        self.gamma = gamma
        self.penalty = penalty
        self.min_log = min_log
        
    def get_policy(self):
        """Get the current policy."""
        def policy(state):
            actions = self.model.get_actions(state)
            if len(actions) > 1:
                win_actions = []
                # check win actions
                if self.model.is_game():
                    next_states = [self.model.get_next_state(state, action) for action in actions]
                    win_actions = [self.model.get_reward(next_state) == self.player for next_state in next_states]
                if any(win_actions):
                    probs = np.array(win_actions).astype(float)
                else:
                    # get prob of each action
                    code = self.model.one_hot_encode(state)
                    code = torch.tensor(code).float()
                    probs = self.nn.forward(code)
                    probs = probs.detach().numpy()
                    # restrict to available actions
                    indices = [self.action_id[action] for action in actions]
                    probs = probs[indices]                    
                # renormalize
                if np.sum(probs) > 0:
                    probs = probs / np.sum(probs)
                else:
                    probs = np.ones(len(actions)) / len(actions)
            else:
                probs = [1]
            return probs, actions
        return policy
    
    def update_policy(self):
        """Update the policy."""
        self.policy = self.get_policy()
    
    def get_samples(self, horizon):
        """Get samples from one episode."""
        self.update_policy()
        rewards = []
        log_probs = []
        log_probs_illegal = []
        self.model.reset()
        state = self.model.state
        for t in range(horizon):
            action = self.get_action(state)
            if action is not None:
                i = self.action_id[action]
                code = self.model.one_hot_encode(state)
                code = torch.tensor(code).float()
                probs = self.nn.forward(code)
                prob = torch.clip(probs[i], self.min_log, 1 - self.min_log)
                log_prob = torch.log(prob).reshape(1)

                actions = self.model.get_actions(state)
                if action in actions:
                    reward, stop = self.model.step(action)
                    state = self.model.state
                    rewards.append(reward)
                    log_probs.append(log_prob)
                else:
                    log_probs_illegal.append(log_prob)
            else:
                reward, stop = self.model.step(action)
                state = self.model.state
            if stop:
                break
        gain = 0
        for reward in reversed(rewards):
            gain = reward + self.gamma * gain
        return gain, log_probs, log_probs_illegal
        
    def train(self, horizon=100, n_episodes=1000, learning_rate=0.01):
        """Train the neural network."""
        optimizer = optim.Adam(self.nn.parameters(), lr=learning_rate)
        for t in range(n_episodes):
            gain, log_probs, log_probs_illegal = self.get_samples(horizon)
            loss = 0
            if len(log_probs):
                loss += -torch.cat(log_probs).sum() * gain
            if len(log_probs_illegal):
                penalty_loss = -torch.cat(log_probs_illegal).sum() * self.penalty
                loss += penalty_loss
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()


#### PolicyGradient on TicTacToe against a random aversary

In [33]:
game = TicTacToe()
agent = PolicyGradient(game)
gains = agent.get_gains(n_runs=10)
np.unique(gains, return_counts=True)


(array([-1,  1]), array([3, 7], dtype=int64))

In [34]:
agent.train()
agent.update_policy()
gains = agent.get_gains(n_runs=10)
np.unique(gains, return_counts=True)

(array([1]), array([10], dtype=int64))

The PolicyGradient agent, once trained, wins every Tic Tac Toe game against a random opponent.

#### PolicyGradient on TicTacToe against a perfect adversary

In [35]:
game = TicTacToe()
algo = ValueIteration(game)
policy, avdersary_policy = algo.get_perfect_players()

In [36]:
game = TicTacToe(adversary_policy=adversary_policy)
agent = PolicyGradient(game)
gains = agent.get_gains(n_runs=10)
np.unique(gains, return_counts=True)

(array([-1,  0]), array([7, 3], dtype=int64))

In [37]:
agent.train(n_episodes=1000)
agent.update_policy()
gains = agent.get_gains(n_runs=10)
np.unique(gains, return_counts=True)

(array([-1,  0]), array([7, 3], dtype=int64))

After different runs, the situation remains the same or slightly improves. The PolicyGradient agent shows only slight improvement when facing a perfect Tic Tac Toe opponent when it is trained

#### PolicyGradient on Connect Four against a random adversary

In [38]:
game = ConnectFour()
agent = PolicyGradient(game)
gains = agent.get_gains(n_runs=10)
np.unique(gains, return_counts=True)

(array([-1,  1]), array([7, 3], dtype=int64))

In [39]:
agent.train(n_episodes=100)
agent.update_policy()
gains = agent.get_gains(n_runs=10)
np.unique(gains, return_counts=True)

(array([1]), array([10], dtype=int64))

We get only wins, therefore PolicyGradient is a good algorithm to play Connect Four.