# Reinforcement Learning

# Q-Learning

This notebook presents SARSA and Q-learning.

Credits: T. Bonald, Telecom Paris

In [None]:
import numpy as np
from matplotlib import pyplot as plt
import sys
PATH = ''

In [None]:
# This cell has to be run ONLY if you are using google colab on google drive
from google.colab import drive
drive.mount('/content/drive')
PATH = "/content/drive/MyDrive/Colab Notebooks/RL/ENSAI-smart-data/" #Put here the correct path
sys.path.append(PATH)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from model import Walk, Maze, TicTacToe, Nim, ConnectFour
from agent import Agent, OnlineControl

## Handling (state,action) value functions
We first make some basic experiments using the walk environment

In [None]:
walk = Walk()

In [None]:
# Check the OnlineControl class in agent.py
control = OnlineControl(walk)

In [None]:
# Generate a idle (randomly chosen) (state,action) value function
for state in walk.get_states():
  for action in walk.get_actions(state):
    control.state_action_value[walk.encode(state)][action] = # Your code here


In [None]:
# Choose an arbitrary state, display the values of the actions : what is the corresponding best action?
state = walk.get_states()[2]
# Your code here

In [None]:
# For the same state, compute the best action using get_best_action
# Your code here

In [None]:
# For the same state, compute the epsilon greedy best action for a given epsilon
control.eps = #Choose epsilon value
control.get_best_action_randomized(state)
# Discuss the impact of epsilon

## SARSA

In [None]:
class SARSA(OnlineControl):
    """Online control by SARSA."""
        
    def update_values(self):
        """Learn the state-action value online."""
        self.environment.reinit_state()
        state = self.environment.state
        action = self.get_best_action_randomized(state)
        self.add_state_action(state, action)
        for t in range(self.n_steps):
            state_code = self.environment.encode(state)
            self.state_action_count[state_code][action] += 1
            reward, stop = self.environment.step(action)
            if stop:
                gain = reward
            else:
                new_state = self.environment.state
                new_action = self.get_best_action_randomized(new_state) 
                self.add_state_action(new_state, new_action)
                # to be modified
                gain = 0
            # to be modified
            diff = 0
            self.state_action_value[state_code][action] += diff / self.state_action_count[state_code][action]
            if stop:
                break
            state = self.environment.state
            action = new_action

## Q-learning

In [None]:
class QLearning(OnlineControl):
    """Online control by Q-learning."""
        
    def update_values(self):
        """Learn the state-action value online."""
        # to be completed
        

## Walk

In [None]:
walk = Walk()

In [None]:
algo = SARSA(walk, gamma=0.9, eps=0.1, n_steps=100)

In [None]:
n_episodes = 100
for t in range(n_episodes):
    algo.update_values()

In [None]:
policy = algo.get_policy()

In [None]:
walk.display_policy(policy)

## Maze

In [None]:
maze = Maze()
# set parameters
maze_map = np.load(PATH+'maze_small.npy')
maze.set_parameters(maze_map, (1, 0), [(3, 8)])
# init
maze = Maze()

In [None]:
# display the maze

In [None]:
# Run SARSA and/or Qlearning

In [None]:
# display the policy

In [None]:
# COMPLETE AT HOME: Run an episode with your policy and show the animation. Can you escape the maze? 

## Tic-Tac-Toe

In [None]:
game = TicTacToe()
agent = Agent(game)

In [None]:
# Explain the following line of code, and explain the result: what does it show?
np.unique(agent.get_gains(), return_counts=True)

In [None]:
# Run SARSA or Q-learning

In [None]:
# Discuss the output of the following code
np.unique(agent.get_gains(), return_counts=True)

In [None]:
# Play against the one-step best policy defined in the first lab. Who's winning?


## Perfect adversary

Let's get a perfect adversary by Value Iteration.

In [None]:
from scipy import sparse
from dp import PolicyEvaluation

In [None]:
def dot_max(matrix: sparse.csr_matrix, vector: np.ndarray):
    """Get the dot_max product of a matrix by a vector, replacing the sum by the max."""
    return np.maximum.reduceat(vector[matrix.indices] * matrix.data, matrix.indptr[:-1])

In [None]:
class ValueIteration(PolicyEvaluation):
    """Value iteration.
    
    Parameters
    ----------
    environment: 
        The environment.
    player: 
        Player for games (1 or -1, default = 1).
    gamma:
        Discount factor (between 0 and 1).
    n_iter:
        Number of value iterations.
    tol:
        Tolerance = maximum difference between two iterations for early stopping.
    """
    
    def __init__(self, environment, player=1, gamma=1, n_iter=100, tol=0, verbose=True):
        agent = Agent(environment, player=player)
        policy = agent.policy
        super(ValueIteration, self).__init__(environment, policy, player, gamma)  
        self.n_iter = n_iter
        self.tol = tol
        self.verbose = verbose
        
   
    def get_optimal_policy(self):
        """Get the optimal policy by iteration of Bellman's optimality equation."""
        if hasattr(self.environment, 'player'):
            return self.get_optimal_policy_game()
        self.values = np.zeros(self.n_states)
        moves = self.get_transitions().astype(bool)
        for t in range(self.n_iter):
            values = self.values.copy()
            values_next = self.rewards + self.gamma * self.values
            values[self.non_terminal] = dot_max(moves[self.non_terminal], values_next)
            diff = np.max(np.abs(values - self.values))
            self.values = values
            if diff <= self.tol:
                if self.verbose:
                    print(f"Convergence after {t+1} iterations.")
                break
        policy = self.get_policy()
        return policy
    
    def get_optimal_policy_game(self):
        """Get the optimal policy for games, assuming the best response of the adversary."""
        self.values = np.zeros(self.n_states)
        moves = self.get_transitions().astype(bool)
        player = np.array([state[0] == self.player for state in self.states]) & self.non_terminal
        adversary = np.array([state[0] == -self.player for state in self.states]) & self.non_terminal
        for t in range(self.n_iter):
            values = self.values.copy()
            values_next = self.rewards + self.gamma * self.values
            values[player] = self.player * dot_max(moves[player], self.player * values_next)
            values[adversary] = -self.player * dot_max(moves[adversary], -self.player * values_next)
            diff = np.max(np.abs(values - self.values))
            self.values = values
            if diff <= self.tol:
                if self.verbose:
                    print(f"Convergence after {t+1} iterations.")
                break
        policy = self.get_policy()
        return policy

In [None]:
# get a perfect adversary
game = TicTacToe(play_first=False, player=-1)
algo = ValueIteration(game, player=-1, n_iter=10)
adversary_policy = algo.get_optimal_policy()

Convergence after 6 iterations.


In [None]:
# Define a TicTacToe game with this player
game = TicTacToe(adversary_policy)
agent = Agent(game)

In [None]:
# before training : how many times do the random agent wins/get a draw?


(array([-1,  0]), array([84, 16]))

In [None]:
# Now train the agent

In [None]:
# After training, evaluate the performance of the agent (frequency of wins/draw)

## Nim

In [None]:
# Play the Nim game with RL agent, evaluate the improvement over the random agent.

## Connect Four

In [None]:
# Play the Nim game with RL agent, evaluate the improvement over the random agent.