Source: https://www.kaggle.com/lucabasa/a-shameless-journey-into-rl-from-scratch

This notebook wants to record my process of learning and practicing concept of Reinforcement Learning. My current level in this field is nearly 0 and I am currently struggling in acquiring skills from online courses that are somewhat reproducible without guidance. My professional experience never gave me the chance of developing knowledge and skill in this field and I would like to make it a possibility for the future.

Similarly to what [I am doing for NLP](https://www.kaggle.com/lucabasa/a-shameless-journey-into-nlp-from-scratch), the notebook is meant to be a diary for myself and it is made public to hold myself accountable for making significant progress over the next few months. I will be following [the Kaggle course](https://www.kaggle.com/learn/intro-to-game-ai-and-reinforcement-learning) and try to supplement it with further research in order to make those concepts stick.

**Do not expect a brillian notebook, nor a high scoring one. It will most likely be a fairly pedantic exploration of functionalities I don't know yet. Feel free to drop a suggestion in the comments**

***Day count = 4***

In [1]:
import numpy as np 
import pandas as pd 

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# Environment and Agents

Kaggle has already implemented a game environment for us to play with. I will need to play around with it a bit to know more what this environment does. I assume it provides the board with the game rules.

In [4]:
from kaggle_environments import make, evaluate

env = make("connectx", debug=True)

The first thing I see is `make`, which is a good name for something that creates the environment. I can't find the code at the moment, so the help will do

In [5]:
help(make)

Help on function make in module kaggle_environments.core:

make(environment, configuration={}, info={}, steps=[], logs=[[], [], []], debug=False, state=None)
    Creates an instance of an Environment.
    
    Args:
        environment (str|Environment):
        configuration (dict, optional):
        info (dict, optional):
        steps (list, optional):
        debug (bool=False, optional): Render print() statments to stdout
        state (optional):
    
    Returns:
        Environment: Instance of a specific environment.



I see that in the `envs` folder there are `connectx`, `tictactoe` and `identity`. I wonder if I can simply load a different environment

In [6]:
env = make("tictactoe", debug=True)

Apparently yes, good to know. Moving on, what is this environment?

In [7]:
env = make("connectx", debug=True)

dir(env)

['_Environment__agent_runner',
 '_Environment__get_shared_state',
 '_Environment__get_state',
 '_Environment__process_specification',
 '_Environment__run_interpreter',
 '_Environment__set_state',
 '_Environment__state_schema',
 '_Environment__state_schema_value',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__state_schema_0',
 '__state_schema_1',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 'agents',
 'clone',
 'configuration',
 'debug',
 'debug_print',
 'done',
 'html_renderer',
 'id',
 'info',
 'interpreter',
 'logs',
 'name',
 'play',
 'pool',
 'render',
 'renderer',
 'reset',
 'run',
 'specification',
 'state',
 'step',
 'steps',
 'toJSON',
 'train',
 'version']

In [8]:
env.__state_schema_0

{'type': 'object',
 'additionalProperties': False,
 'properties': {'action': {'description': 'Column to drop a checker onto the board.',
   'type': 'integer',
   'default': 0,
   'minimum': 0},
  'reward': {'description': '-1 = Lost, 0 = Draw/Ongoing, 1 = Won',
   'type': ['number', 'null'],
   'default': 0,
   'enum': [-1, 0, 1]},
  'info': {'description': 'Additional infomation; not available for evaluation.',
   'type': 'object',
   'default': {},
   'properties': {}},
  'observation': {'description': 'Observation to create an action based upon.',
   'type': 'object',
   'additionalProperties': True,
   'properties': {'remainingOverageTime': {'description': 'Total remaining banked time (seconds) that can be used in excess of per-step actTimeouts -- agent is disqualified with TIMEOUT status when this drops below 0.',
     'shared': False,
     'type': 'number',
     'minimum': 0,
     'default': 60},
    'step': {'description': 'Current step within the episode.',
     'type': 'intege

Indeed, it seems to contain all the methods we need to play a game. In particular, it comes with some **agents**

In [9]:
env.agents

{'random': <function kaggle_environments.envs.connectx.connectx.random_agent(obs, config)>,
 'negamax': <function kaggle_environments.envs.connectx.connectx.negamax_agent(obs, config)>}

And I can find the code of these agents in the file `connectx.py`, nice. In this context, the agent is a python function that we program or train to take decisions given the situation, which is the essence of RL.

The agents must take 2 arguments, 

* `obs`, which has information about the board (`obs.board`) and the player (`obs.mark`, either a player or the other)
* `config`, which defines the board (rows and columns) and the rule to win (in this case, 4 in a row)

The agent will return the column number where the next piece will be played. Therefore, I expect the environment to know where the pieces are and to put the new piece in the only available spot given the column number.

So the random agent takes a random valid move. 

In [12]:
import random
import numpy as np

def agent_random(obs, config):
    print(config)  # I want to see what is inside
    # print(obs.board)
    valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    return random.choice(valid_moves)

Let's see how it plays

In [13]:
# Agents play one game round
env.run([agent_random, agent_random])

# Show the game
env.render(mode="ipython")

{'episodeSteps': 1000, 'actTimeout': 2, 'runTimeout': 1200, 'columns': 7, 'rows': 6, 'inarow': 4, 'agentTimeout': 60, 'timeout': 2}
{'episodeSteps': 1000, 'actTimeout': 2, 'runTimeout': 1200, 'columns': 7, 'rows': 6, 'inarow': 4, 'agentTimeout': 60, 'timeout': 2}
{'episodeSteps': 1000, 'actTimeout': 2, 'runTimeout': 1200, 'columns': 7, 'rows': 6, 'inarow': 4, 'agentTimeout': 60, 'timeout': 2}
{'episodeSteps': 1000, 'actTimeout': 2, 'runTimeout': 1200, 'columns': 7, 'rows': 6, 'inarow': 4, 'agentTimeout': 60, 'timeout': 2}
{'episodeSteps': 1000, 'actTimeout': 2, 'runTimeout': 1200, 'columns': 7, 'rows': 6, 'inarow': 4, 'agentTimeout': 60, 'timeout': 2}
{'episodeSteps': 1000, 'actTimeout': 2, 'runTimeout': 1200, 'columns': 7, 'rows': 6, 'inarow': 4, 'agentTimeout': 60, 'timeout': 2}
{'episodeSteps': 1000, 'actTimeout': 2, 'runTimeout': 1200, 'columns': 7, 'rows': 6, 'inarow': 4, 'agentTimeout': 60, 'timeout': 2}
{'episodeSteps': 1000, 'actTimeout': 2, 'runTimeout': 1200, 'columns': 7, 'r

Alright, the config is just a dictionary coming from the environment, nothing special there. The board is simply a representation of the board as list, making a move corresponds to putting numbers in the list, 1 for a player, 2 for the other. Therefore, the first 7 zeros correspond to the top row of the board and thus it makes sense that a move is available if those entries are 0 because that would mean that the column is not full.

I am curious about how to break this. Let's say I want to overflow the first column

In [10]:
def agent_overflow(obs, config):
    print(obs.board)
    return 0

env.run([agent_overflow, agent_overflow])

# Show the game
env.render(mode="ipython")

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]
[2, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]
Invalid Action: Invalid column: 0


Alright, the game stops, good to know, I might want to build some exeptions in case my agent goes crazy. The second way I want to break it, is to miss the board entirely

In [11]:
def agent_drunk(obs, config):
    print(obs.board)
    return 7 # which doesn't exist

env.run([agent_drunk, agent_overflow])

# Show the game
env.render(mode="ipython")

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Invalid Action: Invalid column: 7


Nice, it breaks differently. Thus the list is just a representation of the board but the rules are enforced in a way that, even thoug the position 7 exists, it is not recognized as a valid move.

The function that makes everything happen is `run`

In [12]:
help(env.run)

Help on method run in module kaggle_environments.core:

run(agents) method of kaggle_environments.core.Environment instance
    Steps until the environment is "done" or the runTimeout was reached.
    
    Args:
        agents (list of any): List of agents to obtain actions from.
    
    Returns:
        tuple of:
            list of list of dict: The agent states of all steps executed.
            list of list of dict: The agent logs of all steps executed.



Given what I saw from the env object, I would have expected play to be the function to play.

In [13]:
help(env.play)

Help on method play in module kaggle_environments.core:

play(agents=[], **kwargs) method of kaggle_environments.core.Environment instance
    Renders a visual representation of the environment and allows interactive action selection.
    
    Args:
        **kwargs (dict): Args directly passed into render().  Mode is fixed to ipython.
    
    Returns:
        None: prints directly to an IPython notebook



In [14]:
env.play([agent_random, agent_random])

InvalidArgument: One agent must be marked 'None' to train.

It seems to be the case, it just renders the game after it runs. However, it seems to call a method that the documentation doesn't mention. I will come back to it.

To evaluate an agent, the course suggests to run the game many times to get a percentage of wins

In [15]:
def get_win_percentages(agent1, agent2, n_rounds=100):
    import time
    from joblib import Parallel, delayed
    from kaggle_environments import evaluate
    import multiprocessing as mp
 
    # Use default Connect Four setup
    config = {'rows': 6, 'columns': 7, 'inarow': 4}
    
    cores = mp.cpu_count()
    half_rounds_per_core = n_rounds // cores // 2

    def evaluate_per_core(half_rounds):
        outcomes = evaluate("connectx", [agent1, agent2], config, [], half_rounds)
        outcomes += [[b,a] for [a,b] in evaluate("connectx", [agent2, agent1], config, [], half_rounds)]
        return outcomes  
    
    start_time=time.time()
    results = Parallel(n_jobs=mp.cpu_count())(delayed(evaluate_per_core)(half_rounds_per_core) for i in range(cores))
    total_time=time.time()-start_time

    outcomes = [result for results_per_job in results for result in results_per_job]

    print("In total, {} episodes have been evaluated using {} CPU's cores.".format(len(outcomes), cores))
    print("Total time: {:.2f} minutes ({:.2f} seconds per match on average)".format(total_time/60, total_time/n_rounds))
    print("Agent 1 Won: {:.2%}".format(outcomes.count([1,-1])/len(outcomes)))
    print("Agent 2 Won: {:.2%}".format(outcomes.count([-1,1])/len(outcomes)))
    print("Ties:        {:.2%}".format(outcomes.count([0,0])/len(outcomes)))
    print("Invalid Plays by Agent 1:", outcomes.count([None, 0]))
    print("Invalid Plays by Agent 2:", outcomes.count([0, None]))
    
    
def agent_random(obs, config):
    valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    return random.choice(valid_moves)

# Selects leftmost valid column
def agent_leftmost(obs, config):
    valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    return valid_moves[0]

get_win_percentages(agent1=agent_leftmost, agent2=agent_random)

In total, 96 episodes have been evaluated using 4 CPU's cores.
Total time: 0.03 minutes (0.02 seconds per match on average)
Agent 1 Won: 81.25%
Agent 2 Won: 18.75%
Ties:        0.00%
Invalid Plays by Agent 1: 0
Invalid Plays by Agent 2: 0


The trick is done by a function I imported at the beginning, and I don't know what some of the inputs are but I don't like those lists as default arguments

In [16]:
help(evaluate)

Help on function evaluate in module kaggle_environments.core:

evaluate(environment, agents=[], configuration={}, steps=[], num_episodes=1, debug=False, state=None)
    Evaluate and return the rewards of one or more episodes (environment and agents combo).
    
    Args:
        environment (str|Environment):
        agents (list):
        configuration (dict, optional):
        steps (list, optional):
        num_episodes (int=1, optional): How many episodes to execute (run until done).
        debug (bool=False, optional): Render print() statments to stdout
        state (optional)
    
    Returns:
        list of list of int: List of final rewards for all agents for all episodes.



# Making better agents

The core functioning of the environment feels clear so far, which is a testament to the effort Kaggle put into it. Thanks to that, I can focus almost exclusively on creating a better agent. The first exercises suggests to create an agent that looks one move ahead to take a decision, which feels a good greedy approach (take the best move available now, defined as the one that gives the highest immediate reward).

Intuitively, I think it should not be difficult to then extend this to looking several moves ahead. 

Let's go with order, the first example creates an agent that either picks a winning move, if available, or a random one. To do that, we need to simulate the board one step ahead and test if it is a winning board.

In [17]:
# Gets board at next step if agent drops piece in selected column
def drop_piece(grid, col, piece, config):
    next_grid = grid.copy()
    for row in range(config.rows-1, -1, -1):
        if next_grid[row][col] == 0:
            break
    next_grid[row][col] = piece
    return next_grid

# Returns True if dropping piece in column results in game win
def check_winning_move(obs, config, col, piece):
    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    next_grid = drop_piece(grid, col, piece, config)
    # horizontal
    for row in range(config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(next_grid[row,col:col+config.inarow])
            if window.count(piece) == config.inarow:
                return True
    # vertical
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns):
            window = list(next_grid[row:row+config.inarow,col])
            if window.count(piece) == config.inarow:
                return True
    # positive diagonal
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns-(config.inarow-1)):
            window = list(next_grid[range(row, row+config.inarow), range(col, col+config.inarow)])
            if window.count(piece) == config.inarow:
                return True
    # negative diagonal
    for row in range(config.inarow-1, config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(next_grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
            if window.count(piece) == config.inarow:
                return True
    return False

Ok, with order because there is a lot going on here.

The board is a list of numbers, we reshape it to make it 2 dimensional

In [18]:
a = [0,0,0,0,0,0,0,0,0,0,0,0]

np.asarray(a).reshape(3, 4) # a simpler board 

array([[0, 0, 0, 0],
       [0, 0, 0, 0],
       [0, 0, 0, 0]])

Then we drop a piece in the board

In [19]:
b = np.asarray(a).reshape(3, 4)

def drop_piece(grid, col, piece, config):
    next_grid = grid.copy()
    for row in range(config['rows']-1, -1, -1): # simplified this to work in this experiment
        if next_grid[row][col] == 0:
            break
    next_grid[row][col] = piece
    return next_grid

drop_piece(b, 1, 1, {'rows':3})

array([[0, 0, 0, 0],
       [0, 0, 0, 0],
       [0, 1, 0, 0]])

This is a very reasonable thing, 
* try a move by picking a column, 
* check the rows position in this column backwards (thus starting from the highest index to the lowest), 
* stop at the first 0 you find because that's the spot the piece is going to be dropping, 
* fill the spot

Then it just checks if the new board shows positions that have 4 pieces in a row and returns True if it does. It does so by moving a window of 4 in a row on the board and counting the numbers (so it is counting how many `1` or `-1` are in the window.

Alright, then if I want an agent that plays with this strategy, I can do.

In [20]:
# need to redefine to work properly
def drop_piece(grid, col, piece, config):
    next_grid = grid.copy()
    for row in range(config.rows-1, -1, -1):
        if next_grid[row][col] == 0:
            break
    next_grid[row][col] = piece
    return next_grid

In [21]:
def agent_winmove(obs, config):
    valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    # for each valid move, check if it is a winning one
    for col in valid_moves:
        if check_winning_move(obs, config, col, obs.mark):
            return col
    # otherwise, pick randomly
    return random.choice(valid_moves)

In [22]:
get_win_percentages(agent1=agent_leftmost, agent2=agent_winmove)
print('_'*40)
get_win_percentages(agent1=agent_random, agent2=agent_winmove)

In total, 96 episodes have been evaluated using 4 CPU's cores.
Total time: 0.03 minutes (0.02 seconds per match on average)
Agent 1 Won: 65.62%
Agent 2 Won: 34.38%
Ties:        0.00%
Invalid Plays by Agent 1: 0
Invalid Plays by Agent 2: 0
________________________________________
In total, 96 episodes have been evaluated using 4 CPU's cores.
Total time: 0.05 minutes (0.03 seconds per match on average)
Agent 1 Won: 29.17%
Agent 2 Won: 70.83%
Ties:        0.00%
Invalid Plays by Agent 1: 0
Invalid Plays by Agent 2: 0


So it wins just a bit more often against leftmost and it wins solidly against random. Good for winmove!

Leftmost seems to be the easiest player to beat but apparently it is still outsmarting my agent. I need to make it capable of stopping a winning move.

In [23]:
def agent_winmove_nolose(obs, config):  # the naming might overstate the quality of the agent
    valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    # for each valid move, check if it is a winning one
    for col in valid_moves:
        if check_winning_move(obs, config, col, obs.mark):
            return col
    # if no winning move is available, check if the other player has a winning move and stop it
    for col in valid_moves:
        if check_winning_move(obs, config, col, obs.mark%2+1): # this is the mark of the opponent
            return col
    # otherwise, pick randomly
    return random.choice(valid_moves)

In [24]:
env.run([agent_leftmost, agent_winmove_nolose])

# Show the game
env.render(mode="ipython")

It seems the days we lose against leftmost are over

In [25]:
get_win_percentages(agent1=agent_leftmost, agent2=agent_winmove_nolose)

In total, 96 episodes have been evaluated using 4 CPU's cores.
Total time: 0.06 minutes (0.03 seconds per match on average)
Agent 1 Won: 0.00%
Agent 2 Won: 100.00%
Ties:        0.00%
Invalid Plays by Agent 1: 0
Invalid Plays by Agent 2: 0


However, this agent can still lose if the other player has 2 options to win the game in one move. It feels I need to look further, but then the problem would present itself again when meeting an agent that looks 3 steps further and so on. The tutorial suggests another way to tackle the problem

# Heuristics

This approach assigns points to game boards and make the move that gives the better score. I am guessing that the quality of my agent will heavily depends on how the points are assigned.

The first agent I will build following the tutorial is still looking only one step ahead, but it is also scoring how valuable is, for example, to get to 3 pieces in a row. With the right point assignment, the effect might reproduce better a player that is forecasting the game a bit more than by only one move.

First, I need a function that creates a board after a piece is dropped, and we have that in `drop_piece` defined above. Then I need a function that counts how many pieces my agent and its opponent have in a specified window of the board.


In [26]:
def check_window(window, num_discs, piece, config):
    return (window.count(piece) == num_discs and window.count(0) == config.inarow-num_discs)

def count_windows(grid, num_discs, piece, config):
    num_windows = 0
    # horizontal
    for row in range(config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[row, col:col+config.inarow])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # vertical
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns):
            window = list(grid[row:row+config.inarow, col])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # positive diagonal
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # negative diagonal
    for row in range(config.inarow-1, config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    return num_windows

Which is basically scanning the board for horizontal, vertical, and diagonal patterns, counting how many times it finds a given number of pieces in each window and returning the count. The window size is always of size 4 (given that we need to create 4 in a row) and `num_disc` is telling the function how many pieces of a given player to look for.

In [27]:
def get_heuristic(grid, mark, config):
    num_threes = count_windows(grid, 3, mark, config)
    num_fours = count_windows(grid, 4, mark, config)
    num_threes_opp = count_windows(grid, 3, mark%2+1, config)
    score = num_threes - 1e2*num_threes_opp + 1e6*num_fours
    return score

Those numbers are something I can tune to find a better strategy. Now I should create a function that, given a move, it scores its board

In [28]:
def score_move(grid, col, mark, config):
    next_grid = drop_piece(grid, col, mark, config)
    score = get_heuristic(next_grid, mark, config)
    return score

And I now have everythin from my next agent

In [29]:
def agent_heuristic(obs, config):
    # Get list of valid moves
    valid_moves = [c for c in range(config.columns) if obs.board[c] == 0]
    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    # Use the heuristic to assign a score to each possible board in the next turn
    scores = dict(zip(valid_moves, [score_move(grid, col, obs.mark, config) for col in valid_moves]))
    # Get a list of columns (moves) that maximize the heuristic
    max_cols = [key for key in scores.keys() if scores[key] == max(scores.values())]
    # Select at random from the maximizing columns
    return random.choice(max_cols)

Let's see how it performs against the best agent we have so far

In [30]:
env.run([agent_heuristic, agent_winmove_nolose])

# Show the game
env.render(mode="ipython")

In [31]:
get_win_percentages(agent1=agent_heuristic, agent2=agent_winmove_nolose)

In total, 96 episodes have been evaluated using 4 CPU's cores.
Total time: 0.23 minutes (0.14 seconds per match on average)
Agent 1 Won: 72.92%
Agent 2 Won: 27.08%
Ties:        0.00%
Invalid Plays by Agent 1: 0
Invalid Plays by Agent 2: 0


Which is a decent performance. I wonder what happens if I had more rules, for example, can I make it more aggressive? or more conservative? It feels I can try several different combination of scores and eventually find a better one but it starts feeling as something that should be learned by playing the game already.

In [32]:
def get_heuristic_alt(grid, mark, config):
    num_twos = count_windows(grid, 2, mark, config)
    num_threes = count_windows(grid, 3, mark, config)
    num_fours = count_windows(grid, 4, mark, config)
    num_twos_opp = count_windows(grid, 2, mark%2+1, config)
    num_threes_opp = count_windows(grid, 3, mark%2+1, config)
    score = 1e6*num_fours + 10*num_threes + 1*num_twos - 1*num_twos_opp - 1e3*num_threes_opp
    return score

def score_move_alt(grid, col, mark, config):
    next_grid = drop_piece(grid, col, mark, config)
    score = get_heuristic_alt(next_grid, mark, config)
    return score

def agent_heuristic_alt(obs, config):
    # Get list of valid moves
    valid_moves = [c for c in range(config.columns) if obs.board[c] == 0]
    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    # Use the heuristic to assign a score to each possible board in the next turn
    scores = dict(zip(valid_moves, [score_move_alt(grid, col, obs.mark, config) for col in valid_moves]))
    # Get a list of columns (moves) that maximize the heuristic
    max_cols = [key for key in scores.keys() if scores[key] == max(scores.values())]
    # Select at random from the maximizing columns
    return random.choice(max_cols)

In [33]:
get_win_percentages(agent1=agent_heuristic, agent2=agent_heuristic_alt)

In total, 96 episodes have been evaluated using 4 CPU's cores.
Total time: 0.31 minutes (0.19 seconds per match on average)
Agent 1 Won: 31.25%
Agent 2 Won: 68.75%
Ties:        0.00%
Invalid Plays by Agent 1: 0
Invalid Plays by Agent 2: 0


Before diving in how to make my agent learn, it is worth to explore a more complex use of heuristic

# MinMax algorithm

If I start making my agent looking ahead one step, I wonder then how to make it look several steps ahead. This will likely include guessing what the opponent will do after my agent's move. Moreover, I wonder how to enforce a strategy that not only picks the best move at this moment (which is what I did so far), but also the best move to eventually win the game.

The **MinMax algorithm** choses the moves that increases the score of the agent the most and assumes the opponent will chose the move that lowers the score the most.

Let's start with a simple heuristic.

In [34]:
def get_heuristic(grid, mark, config):
    num_threes = count_windows(grid, 3, mark, config)
    num_fours = count_windows(grid, 4, mark, config)
    num_threes_opp = count_windows(grid, 3, mark%2+1, config)
    num_fours_opp = count_windows(grid, 4, mark%2+1, config)
    score = num_threes - 1e2*num_threes_opp - 1e4*num_fours_opp + 1e6*num_fours
    return score

Following the [Wikipedia's explanation](https://en.wikipedia.org/wiki/Minimax), I need a few functions. 

The first one checks if a game has ended.

In [35]:
def is_terminal_window(window, config):
    return window.count(1) == config.inarow or window.count(2) == config.inarow

def is_terminal_node(grid, config):
    # Check for draw 
    if list(grid[0, :]).count(0) == 0:
        return True
    # Check for win: horizontal, vertical, or diagonal
    # horizontal 
    for row in range(config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[row, col:col+config.inarow])
            if is_terminal_window(window, config):
                return True
    # vertical
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns):
            window = list(grid[row:row+config.inarow, col])
            if is_terminal_window(window, config):
                return True
    # positive diagonal
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
            if is_terminal_window(window, config):
                return True
    # negative diagonal
    for row in range(config.inarow-1, config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
            if is_terminal_window(window, config):
                return True
    return False

Then the MinMax implementation will look like this

In [36]:
def minimax(node, depth, maximizingPlayer, mark, config):
    is_terminal = is_terminal_node(node, config)
    valid_moves = [c for c in range(config.columns) if node[0][c] == 0]
    if depth == 0 or is_terminal:
        return get_heuristic(node, mark, config)
    if maximizingPlayer:
        value = -np.Inf
        for col in valid_moves:
            child = drop_piece(node, col, mark, config)
            value = max(value, minimax(child, depth-1, False, mark, config))
        return value
    else:
        value = np.Inf
        for col in valid_moves:
            child = drop_piece(node, col, mark%2+1, config)
            value = min(value, minimax(child, depth-1, True, mark, config))
        return value

In other words

* if we reached the maximum depth (e.g. when we look a predetermined number of steps ahead) or when the game is over, get the heuristic and thus score the board
* if we are trying to maximize the score, for every valid move, call recursively the function by reducing depth at each step and maximize the value of the board
* otherwise (e.g. when guessing the opponent's move) do the same but minimize the score
* note that in the recursion, we alternate the times we try to maximize or minimize the score (simulating one move per player)

Before continuing, it is not difficult to imagine that this algorithm will run for a long time. To speed up, I can also implement [the alpha-beta pruning](https://en.wikipedia.org/wiki/Alpha%E2%80%93beta_pruning). It is a pruning method and it aims to stop evaluating nodes when there is a proof that those moves are worse than a move previously evaluated.

In [37]:
def minimax_ab(node, depth, a, b, maximizingPlayer, mark, config):
    is_terminal = is_terminal_node(node, config)
    valid_moves = [c for c in range(config.columns) if node[0][c] == 0]
    if depth == 0 or is_terminal:
        return get_heuristic(node, mark, config)
    if maximizingPlayer:
        value = -np.Inf
        for col in valid_moves:
            child = drop_piece(node, col, mark, config)
            value = max(value, minimax_ab(child, depth-1, a, b, False, mark, config))
            a = max(a, value)
            if a >= b:
                break # β cutoff
        return value
    else:
        value = np.Inf
        for col in valid_moves:
            child = drop_piece(node, col, mark%2+1, config)
            value = min(value, minimax_ab(child, depth-1, a, b, True, mark, config))
            b = min(b, value)
            if b <= a:
                break # α cutoff
        return value

Then we need again a function to score the board

In [38]:
def score_move(grid, col, mark, config, nsteps):
    next_grid = drop_piece(grid, col, mark, config)
    score = minimax_ab(next_grid, nsteps-1, -np.Inf, np.Inf, False, mark, config)
    return score

The agent will then look like this

In [39]:
def agent_minmax3(obs, config):
    # Get list of valid moves
    valid_moves = [c for c in range(config.columns) if obs.board[c] == 0]
    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    # Use the heuristic to assign a score to each possible board in the next step
    scores = dict(zip(valid_moves, [score_move(grid, col, obs.mark, config, 3) for col in valid_moves]))
    # Get a list of columns (moves) that maximize the heuristic
    max_cols = [key for key in scores.keys() if scores[key] == max(scores.values())]
    # Select at random from the maximizing columns
    return random.choice(max_cols)

In other words,

* For each valid move, start by simulating the new board 
* Score the new board (1 step ahead) with the minmax algorithm. This will simulate the opponent move and then one more move for the player (because we made it look 3 steps ahead).
* Return the score of each valid move and pick the move with the highest score

So MinMax is still scoring the next move, but it is looking ahead a given amount of steps to do so.

In [40]:
env.run([agent_heuristic_alt, agent_minmax3])

# Show the game
env.render(mode="ipython")

In [41]:
get_win_percentages(agent1=agent_heuristic_alt, agent2=agent_minmax3)

In total, 96 episodes have been evaluated using 4 CPU's cores.
Total time: 5.08 minutes (3.05 seconds per match on average)
Agent 1 Won: 36.46%
Agent 2 Won: 61.46%
Ties:        2.08%
Invalid Plays by Agent 1: 0
Invalid Plays by Agent 2: 0


Our new agent is often winning against our previous champion. I want to start making some submissions for the LB.

In [42]:
# redefining as one function for the submission

def agent_minmax3(obs, config):
    import numpy as np
    import random
    
    def drop_piece(grid, col, piece, config):
        next_grid = grid.copy()
        for row in range(config.rows-1, -1, -1):
            if next_grid[row][col] == 0:
                break
        next_grid[row][col] = piece
        return next_grid
    
    def check_window(window, num_discs, piece, config):
        return (window.count(piece) == num_discs and window.count(0) == config.inarow-num_discs)

    def count_windows(grid, num_discs, piece, config):
        num_windows = 0
        # horizontal
        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[row, col:col+config.inarow])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        # vertical
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(grid[row:row+config.inarow, col])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        # positive diagonal
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        # negative diagonal
        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        return num_windows
    
    def score_move(grid, col, mark, config, nsteps):
        next_grid = drop_piece(grid, col, mark, config)
        score = minimax_ab(next_grid, nsteps-1, -np.Inf, np.Inf, False, mark, config)
        return score
    
    def minimax_ab(node, depth, a, b, maximizingPlayer, mark, config):
        is_terminal = is_terminal_node(node, config)
        valid_moves = [c for c in range(config.columns) if node[0][c] == 0]
        if depth == 0 or is_terminal:
            return get_heuristic(node, mark, config)
        if maximizingPlayer:
            value = -np.Inf
            for col in valid_moves:
                child = drop_piece(node, col, mark, config)
                value = max(value, minimax_ab(child, depth-1, a, b, False, mark, config))
                a = max(a, value)
                if a >= b:
                    break # β cutoff
            return value
        else:
            value = np.Inf
            for col in valid_moves:
                child = drop_piece(node, col, mark%2+1, config)
                value = min(value, minimax_ab(child, depth-1, a, b, True, mark, config))
                b = min(b, value)
                if b <= a:
                    break # α cutoff
            return value
    
    def is_terminal_window(window, config):
        return window.count(1) == config.inarow or window.count(2) == config.inarow

    def is_terminal_node(grid, config):
        # Check for draw 
        if list(grid[0, :]).count(0) == 0:
            return True
        # Check for win: horizontal, vertical, or diagonal
        # horizontal 
        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[row, col:col+config.inarow])
                if is_terminal_window(window, config):
                    return True
        # vertical
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(grid[row:row+config.inarow, col])
                if is_terminal_window(window, config):
                    return True
        # positive diagonal
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
                if is_terminal_window(window, config):
                    return True
        # negative diagonal
        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
                if is_terminal_window(window, config):
                    return True
        return False
    
    def get_heuristic(grid, mark, config):
        num_threes = count_windows(grid, 3, mark, config)
        num_fours = count_windows(grid, 4, mark, config)
        num_threes_opp = count_windows(grid, 3, mark%2+1, config)
        num_fours_opp = count_windows(grid, 4, mark%2+1, config)
        score = num_threes - 1e2*num_threes_opp - 1e5*num_fours_opp + 1e9*num_fours
        return score
    
    # Get list of valid moves
    valid_moves = [c for c in range(config.columns) if obs.board[c] == 0]
    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    # Use the heuristic to assign a score to each possible board in the next step
    scores = dict(zip(valid_moves, [score_move(grid, col, obs.mark, config, 3) for col in valid_moves]))
    # Get a list of columns (moves) that maximize the heuristic
    max_cols = [key for key in scores.keys() if scores[key] == max(scores.values())]
    # Select at random from the maximizing columns
    return random.choice(max_cols)

In [43]:
# redefining as one function for the submission

def agent_heuristic_alt(obs, config):
    import numpy as np
    import random
    
    def drop_piece(grid, col, piece, config):
        next_grid = grid.copy()
        for row in range(config.rows-1, -1, -1):
            if next_grid[row][col] == 0:
                break
        next_grid[row][col] = piece
        return next_grid    
    
    def check_window(window, num_discs, piece, config):
        return (window.count(piece) == num_discs and window.count(0) == config.inarow-num_discs)

    def count_windows(grid, num_discs, piece, config):
        num_windows = 0
        # horizontal
        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[row, col:col+config.inarow])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        # vertical
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(grid[row:row+config.inarow, col])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        # positive diagonal
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        # negative diagonal
        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        return num_windows
    
    def get_heuristic_alt(grid, mark, config):
        num_twos = count_windows(grid, 2, mark, config)
        num_threes = count_windows(grid, 3, mark, config)
        num_fours = count_windows(grid, 4, mark, config)
        num_twos_opp = count_windows(grid, 2, mark%2+1, config)
        num_threes_opp = count_windows(grid, 3, mark%2+1, config)
        if num_threes_opp > 1:
            mult_threes_opp = 1e5
        else:
            mult_threes_opp = 1e3
        if num_threes > 1:
            mult_threes = 1e4
        else:
            mult_threes = 10
        score = 1e6*num_fours + mult_threes*num_threes + 1*num_twos - 1*num_twos_opp - mult_threes_opp*num_threes_opp
        return score

    def score_move_alt(grid, col, mark, config):
        next_grid = drop_piece(grid, col, mark, config)
        score = get_heuristic_alt(next_grid, mark, config)
        return score
    
    
    # Get list of valid moves
    valid_moves = [c for c in range(config.columns) if obs.board[c] == 0]
    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    # Use the heuristic to assign a score to each possible board in the next turn
    scores = dict(zip(valid_moves, [score_move_alt(grid, col, obs.mark, config) for col in valid_moves]))
    # Get a list of columns (moves) that maximize the heuristic
    max_cols = [key for key in scores.keys() if scores[key] == max(scores.values())]
    # Select at random from the maximizing columns
    return random.choice(max_cols)

In [44]:
import inspect
import os

def write_agent_to_file(function, file):
    with open(file, "a" if os.path.exists(file) else "w") as f:
        f.write(inspect.getsource(function))
        print(function, "written to", file)

write_agent_to_file(agent_minmax3, "minmax3_agent.py")
write_agent_to_file(agent_heuristic_alt, "heuristic_agent.py")

<function agent_minmax3 at 0x7f287c66f830> written to minmax3_agent.py
<function agent_heuristic_alt at 0x7f287c66fef0> written to heuristic_agent.py


# Deep Reinforcement Learning

So far, I can see I can tell my agent how to pick a move. In principle, I can look ahead dozens of moves to pick the best move but it will be increasingly slow and the strategy might be very inefficient.

The basic concept of reinforcement learning I can leverage is instead to let the agent play the game, record if it wins or loses, and backpropagate this information to inform future games and have it making better decisions.

In this way, I would only need to score wins, losses, and draws rather than every possible configuration of the board.

I must say I am not a comfortable jumping into deep learning right now as it is not clear to me how to train and monitor what my agent is going to do. Google can help me finding the basic concept (or what looks like a building block of what I want to achieve).

## Q-Learning (and simpler games)

[Q-Learning](https://en.wikipedia.org/wiki/Q-learning) is a technique to learn the value of an action in a particular state. The goal is thus to create a policy that, given the configuration of the environment, tells what is the best action to take. The interesting thing is that it does not require a model of the environment. The algorithm works by looking several steps ahead (for example, it can just look till the end of the game), receive a reward for the outcome of those steps, and backpropagate this information to each of the steps.

In other words, 
* I create a dictionary Q that, for each possible state, has a value for each of the possible actions. At first the values are random (or 0)
* I let the agent play and record each state and action. 
* At the end of the game, I know if the agent won or lost and give it a reward accordingly.
* I update the dictionary by using the [Bellman equation](https://en.wikipedia.org/wiki/Bellman_equation) to essentially pass the information that *with this set of actions in these situations you won/lost the game*
* Eventually, the iterative process converges to the optimal policy, e.g. for each configuration there is an optimal action to win the game.

The simplest application of this concept I could find is to create an agent able to solve a small grid world puzzle. The rules are simple, there is a world on a grid. You start on a cell, you win if you end up on another cell, you lose if you end up on yet another cell, and there might be cells that are simply unaccessible. A well trained agent should be able to reach the winning cell via the shortest path.

[The code](https://github.com/lucabasa/reinforcement_learning_training/tree/main/GridWorld) is inspired by [this article](https://towardsdatascience.com/reinforcement-learning-implement-grid-world-from-scratch-c5963765ebff) and hidden for readibility.

In [45]:
class State:
    def __init__(self, rows=3, cols=4, 
                 state=None, # default ignored in the init
                 win_state=None, lose_state=None, # default ignored in the init
                 forbidden_blocks=None,
                 deterministic=True, **kwargs):
        
        if win_state is None:
            win_state = (0, cols-1)
        if lose_state is None:
            lose_state = (1, cols-1)
        if state is None:
            state = (rows-1, 0)
        if forbidden_blocks is None:
            forbidden_blocks = []
        
        self.rows = rows
        self.cols = cols
        self.win_state = win_state
        self.lose_state = lose_state
        self.state = state
        self.board = np.zeros([rows, cols])
        
        if not isinstance(forbidden_blocks, list):
            forbidden_blocks = [forbidden_blocks]
        if self.win_state in forbidden_blocks:
            forbidden_blocks = [b for b in forbidden_blocks if b != self.win_state]
        if self.lose_state in forbidden_blocks:
            forbidden_blocks = [b for b in forbidden_blocks if b != self.lose_state]
        if self.state in forbidden_blocks:
            forbidden_blocks = [b for b in forbidden_blocks if b != self.state] 
        for block in forbidden_blocks:
            self.board[block] = -1  # Forbidden block 
        self.forbidden_blocks = forbidden_blocks 
        
        self.isEnd = False
        self.deterministic = deterministic
        
        
    def giveReward(self):
        if self.state == self.win_state:
            return 1
        elif self.state == self.lose_state:
            return -1
        else:
            return 0
    
    
    def isEndFunc(self):
        if (self.state == self.win_state) or (self.state == self.lose_state):
            self.isEnd = True
    
    
    def _chooseActionProb(self, action):
        # For each action, there is a 10% probability of going in a perpendicular direction
        if action == "up":
            return np.random.choice(["up", "left", "right"], p=[0.8, 0.1, 0.1])
        if action == "down":
            return np.random.choice(["down", "left", "right"], p=[0.8, 0.1, 0.1])
        if action == "left":
            return np.random.choice(["left", "up", "down"], p=[0.8, 0.1, 0.1])
        if action == "right":
            return np.random.choice(["right", "up", "down"], p=[0.8, 0.1, 0.1])
    
    
    def nxtPosition(self, action):
        """
        action: up, down, left, right
        -------------
        0 | 1 | 2| 3|
        1 |
        2 |
        return next position
        """
        if self.deterministic:
            if action == "up":
                nxtState = (self.state[0]-1, self.state[1])
            elif action == "down":
                nxtState = (self.state[0]+1, self.state[1])
            elif action == "left":
                nxtState = (self.state[0], self.state[1]-1)
            else:
                nxtState = (self.state[0], self.state[1]+1)
        else:
            # non-deterministic
            action = self._chooseActionProb(action)
            self.deterministic = True
            nxtState = self.nxtPosition(action)
            self.deterministic = False
            
        # if next state legal
        if (nxtState[0] >= 0) and (nxtState[0] <= self.rows-1):  # board boundaries
            if (nxtState[1] >= 0) and (nxtState[1] <= self.cols-1): 
                if nxtState not in self.forbidden_blocks:  # forbidden block
                    return nxtState
        return self.state
    
    
    def showBoard(self):
        self.board[self.state] = 1
        for i in range(0, self.rows):
            print('-----'*(self.cols-1)+'--')
            out = '| '
            for j in range(0, self.cols):
                if self.board[i, j] == 1:
                    token = '*'
                if self.board[i, j] == -1:
                    token = 'z'
                if self.board[i, j] == 0:
                    token = '0'
                out += token + ' | '
            print(out)
        print('-----'*(self.cols-1)+'--')
        
        
class Agent:
    def __init__(self, *, state=None, lr=0.2, exp_rate=0.3, verbose=False):
        self.states = []
        self.actions = ["up", "down", "left", "right"]
        if state is None:
            state = State()
        self.State = state
        self.isEnd = self.State.isEnd
        self.lr = lr
        self.exp_rate = exp_rate
        self.orig_state = state
        self.verbose = verbose
        
        np.random.seed(234)
        
        # initial state reward
        self.state_values = {}
        for i in range(state.rows):  
            for j in range(state.cols):
                self.state_values[(i, j)] = 0  # init_reward[i, j]
    
    
    def chooseAction(self):
        # choose action with most expected value
        mx_nxt_reward = 0
        action = ""
        
        if np.random.uniform(0, 1) <= self.exp_rate:
            action = np.random.choice(self.actions)
        else:
            # greedy action
            for a in self.actions:
                # if the action is deterministic
                nxt_reward = self.state_values[self.State.nxtPosition(a)]
                if nxt_reward > mx_nxt_reward:
                    action = a
                    mx_nxt_reward = nxt_reward
            # print("current pos: {}, greedy aciton: {}".format(self.State.state, action))
        if action == '':
            action = np.random.choice(self.actions)
        return action
    
    
    def takeAction(self, action):
        position = self.State.nxtPosition(action)
        st_dict = self.State.__dict__.copy()
        st_dict['state'] = position
        return State(**st_dict)    
    
    
    def reset(self):
        self.states = []
        self.State = self.orig_state
        self.isEnd = self.State.isEnd
        
        
    def _backpropagate(self):
        # back propagate
        reward = self.State.giveReward()
        # explicitly assign end state to reward values
        self.state_values[self.State.state] = reward
        if self.verbose:
            print("Game End Reward", reward)
        for s in reversed(self.states):
            reward = self.state_values[s] + self.lr * (reward - self.state_values[s])
            self.state_values[s] = round(reward, 3)
        self.reset()
        
    
    def _find_solution(self):
        action = self.chooseAction()
        # append trace
        self.states.append(self.State.nxtPosition(action))
        if self.verbose:
            print("current position {} action {}".format(self.State.state, action))
        # by taking the action, it reaches the next state
        self.State = self.takeAction(action)
        # mark is end
        self.State.isEndFunc()
        if self.verbose:
            print("nxt state", self.State.state)
            print("---------------------")
        self.isEnd = self.State.isEnd
    
    
    def train(self, rounds=10):
        i = 0
        while i < rounds:
            # to the end of game back propagate reward
            if self.State.isEnd:
                self._backpropagate()
                i += 1
            else:
                self._find_solution()
                
        self.reset()
    
    
    def play(self, max_steps=100):
        # We don't need it to explore anymore
        self.exp_rate_bak = self.exp_rate
        self.exp_rate = 0
        self.verbose_bak = self.verbose
        self.verbose = True
        i = 0
        while i < max_steps:
            if self.State.isEnd:
                print(f'Solution found in {i} steps')
                self.exp_rate = self.exp_rate_bak
                self.verbose = self.verbose_bak
                self.reset()
                break
            else:
                self._find_solution()
                i += 1
    
    
    def showValues(self):
        for i in range(0, self.State.rows):
            print('----------------------------------')
            out = '| '
            for j in range(0, self.State.cols):
                out += str(self.state_values[(i, j)]) + ' | '
            print(out)
        print('----------------------------------')
        


class Agent_Q(Agent):
    def __init__(self, *, state=None, decay_gamma=0.9, lr=0.2, exp_rate=0.3, verbose=False):
        super().__init__(lr=lr, exp_rate=exp_rate, verbose=verbose)
        if state is None:
            state = State(deterministic=False)
        self.State = state
        self.decay_gamma = decay_gamma
        self.orig_state = state
        
        np.random.seed(234)
        
        # initial Q values
        self.Q_values = {}
        for i in range(self.State.rows):
            for j in range(self.State.cols):
                self.Q_values[(i, j)] = {}
                for a in self.actions:
                    self.Q_values[(i, j)][a] = 0  # Q value is a dict of dict
    
    
    def chooseAction(self):
        # choose action with most expected value
        mx_nxt_reward = 0
        action = ""

        if np.random.uniform(0, 1) <= self.exp_rate:
            action = np.random.choice(self.actions)
        else:
            # greedy action
            for a in self.actions:
                current_position = self.State.state
                nxt_reward = self.Q_values[current_position][a]
                if nxt_reward > mx_nxt_reward:
                    action = a
                    mx_nxt_reward = nxt_reward
            # print("current pos: {}, greedy aciton: {}".format(self.State.state, action))
        if action == '':
            action = np.random.choice(self.actions)
        return action
    
    
    def _backpropagate(self):
        # back propagate
        reward = self.State.giveReward()
        for a in self.actions:
            self.Q_values[self.State.state][a] = reward
        if self.verbose:
            print("Game End Reward", reward)
        for s in reversed(self.states):
            current_q_value = self.Q_values[s[0]][s[1]]
            reward = current_q_value + self.lr * (self.decay_gamma * reward - current_q_value)
            self.Q_values[s[0]][s[1]] = round(reward, 3)
        self.reset()
        
    
    def _find_solution(self):
        action = self.chooseAction()
        # append trace
        self.states.append([(self.State.state), action])
        if self.verbose:
            print("current position {} action {}".format(self.State.state, action))
        # by taking the action, it reaches the next state
        self.State = self.takeAction(action)
        # mark is end
        self.State.isEndFunc()
        if self.verbose:
            print("nxt state", self.State.state)
            print("---------------------")
        self.isEnd = self.State.isEnd
        

    def showValues(self):
        for pos in self.Q_values.keys():
            print(pos, self.Q_values[pos])

Let's start with the board. We start on the bottom left, the winning cell is by default on the top right, and there are 2 forbidden cells.

In [46]:
s = State(forbidden_blocks=[(1, 1), (1,2)])
s.showBoard()

-----------------
| 0 | 0 | 0 | 0 | 
-----------------
| 0 | z | z | 0 | 
-----------------
| * | 0 | 0 | 0 | 
-----------------


If I move left, it moves

In [47]:
s.state = s.nxtPosition('right')
s.showBoard()

-----------------
| 0 | 0 | 0 | 0 | 
-----------------
| 0 | z | z | 0 | 
-----------------
| * | * | 0 | 0 | 
-----------------


If I move up from there, I can't because that cell is forbidden

In [48]:
s.state = s.nxtPosition('up')
s.showBoard()

-----------------
| 0 | 0 | 0 | 0 | 
-----------------
| 0 | z | z | 0 | 
-----------------
| * | * | 0 | 0 | 
-----------------


First, I can put an agent that makes use of value iteration to learn the optimal policy. In short, I make an agent that has one value for each of the cells.

In [49]:
s = State(rows=5, cols=5, forbidden_blocks=[(1,2), (2,3), (3,4)])
s.showBoard()
ag = Agent(state=s)
ag.showValues()

----------------------
| 0 | 0 | 0 | 0 | 0 | 
----------------------
| 0 | 0 | z | 0 | 0 | 
----------------------
| 0 | 0 | 0 | z | 0 | 
----------------------
| 0 | 0 | 0 | 0 | z | 
----------------------
| * | 0 | 0 | 0 | 0 | 
----------------------
----------------------------------
| 0 | 0 | 0 | 0 | 0 | 
----------------------------------
| 0 | 0 | 0 | 0 | 0 | 
----------------------------------
| 0 | 0 | 0 | 0 | 0 | 
----------------------------------
| 0 | 0 | 0 | 0 | 0 | 
----------------------------------
| 0 | 0 | 0 | 0 | 0 | 
----------------------------------


The agent doesn't know what are the forbidden blocks (well, it does via the board that is provided) and the board will only tell it when it wins or loses by awarding +1 or -1.

I let the agent train and I backpropagate the reward on each cell. Eventually, I get the following values

In [50]:
ag.train(20)
ag.showValues()

----------------------------------
| 0.563 | 0.7 | 0.833 | 0.847 | 1.0 | 
----------------------------------
| 0.406 | 0.375 | 0 | 0.44 | 0 | 
----------------------------------
| 0.226 | 0.197 | 0.0 | 0 | 0 | 
----------------------------------
| 0.128 | 0.084 | 0.0 | 0.0 | 0 | 
----------------------------------
| 0.003 | 0.026 | 0.0 | 0.0 | 0.0 | 
----------------------------------


Given these values and following the intuition we can get by looking at the board, the agent will go up till the end of the board, then right (or turn right for a cell once before reaching the top of the board). There is a method to apply the learnings and find the right path, let's see

In [51]:
ag.play()

current position (4, 0) action up
nxt state (3, 0)
---------------------
current position (3, 0) action up
nxt state (2, 0)
---------------------
current position (2, 0) action up
nxt state (1, 0)
---------------------
current position (1, 0) action up
nxt state (0, 0)
---------------------
current position (0, 0) action right
nxt state (0, 1)
---------------------
current position (0, 1) action right
nxt state (0, 2)
---------------------
current position (0, 2) action right
nxt state (0, 3)
---------------------
current position (0, 3) action right
nxt state (0, 4)
---------------------
Solution found in 8 steps


There we go. This is however not quite I wanted. I want an agent that maps the state of the board to an action. Here I can finally implement Q-learning for this problem.

In [52]:
s = State(rows=5, cols=5, forbidden_blocks=[(1,2), (2,3), (3,4)])
s.showBoard()
ag = Agent_Q(state=s)
ag.showValues()

----------------------
| 0 | 0 | 0 | 0 | 0 | 
----------------------
| 0 | 0 | z | 0 | 0 | 
----------------------
| 0 | 0 | 0 | z | 0 | 
----------------------
| 0 | 0 | 0 | 0 | z | 
----------------------
| * | 0 | 0 | 0 | 0 | 
----------------------
(0, 0) {'up': 0, 'down': 0, 'left': 0, 'right': 0}
(0, 1) {'up': 0, 'down': 0, 'left': 0, 'right': 0}
(0, 2) {'up': 0, 'down': 0, 'left': 0, 'right': 0}
(0, 3) {'up': 0, 'down': 0, 'left': 0, 'right': 0}
(0, 4) {'up': 0, 'down': 0, 'left': 0, 'right': 0}
(1, 0) {'up': 0, 'down': 0, 'left': 0, 'right': 0}
(1, 1) {'up': 0, 'down': 0, 'left': 0, 'right': 0}
(1, 2) {'up': 0, 'down': 0, 'left': 0, 'right': 0}
(1, 3) {'up': 0, 'down': 0, 'left': 0, 'right': 0}
(1, 4) {'up': 0, 'down': 0, 'left': 0, 'right': 0}
(2, 0) {'up': 0, 'down': 0, 'left': 0, 'right': 0}
(2, 1) {'up': 0, 'down': 0, 'left': 0, 'right': 0}
(2, 2) {'up': 0, 'down': 0, 'left': 0, 'right': 0}
(2, 3) {'up': 0, 'down': 0, 'left': 0, 'right': 0}
(2, 4) {'up': 0, 'down': 0, 'left

At this stage, the agent has no reason to prefer an action over another when it finds itself on a cell. The training is very similar, except that now we give a value to an action at a given cell

In [53]:
ag.train(20)
ag.Q_values

{(0, 0): {'up': 0.001, 'down': 0.0, 'left': 0.0, 'right': 0.007},
 (0, 1): {'up': 0.039, 'down': 0.06, 'left': 0.0, 'right': 0.447},
 (0, 2): {'up': 0.144, 'down': 0.113, 'left': 0.089, 'right': 0.623},
 (0, 3): {'up': 0.285, 'down': 0, 'left': 0, 'right': 0.889},
 (0, 4): {'up': 1, 'down': 1, 'left': 1, 'right': 1},
 (1, 0): {'up': 0.0, 'down': 0.004, 'left': 0.0, 'right': 0.0},
 (1, 1): {'up': 0.302, 'down': 0.0, 'left': 0.001, 'right': 0.046},
 (1, 2): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (1, 3): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (1, 4): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (2, 0): {'up': 0.0, 'down': 0.0, 'left': 0.001, 'right': 0.015},
 (2, 1): {'up': 0.221, 'down': 0.006, 'left': 0.0, 'right': 0.001},
 (2, 2): {'up': 0.0, 'down': 0.006, 'left': 0.0, 'right': 0.0},
 (2, 3): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (2, 4): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (3, 0): {'up': 0.003, 'down': 0.0, 'left': 0.0, 'right': 0.041},
 (3, 1): {'up':

In [54]:
ag.play()

current position (4, 0) action up
nxt state (3, 0)
---------------------
current position (3, 0) action right
nxt state (3, 1)
---------------------
current position (3, 1) action up
nxt state (2, 1)
---------------------
current position (2, 1) action up
nxt state (1, 1)
---------------------
current position (1, 1) action up
nxt state (0, 1)
---------------------
current position (0, 1) action right
nxt state (0, 2)
---------------------
current position (0, 2) action right
nxt state (0, 3)
---------------------
current position (0, 3) action right
nxt state (0, 4)
---------------------
Solution found in 8 steps


However, this method has a big problem: in the game I want to solve the number of states is very high (4 quadrillion or something), there is no way I can really map each of them to a set of actions.

## Deep Q-Learning