An implementation of the 'soccer'-game proposed in (Littman, 94)
----------------------------------------------------------------------------------

# Soccer

[Markov games as a framework for multi-agent reinforcement learning (Littman, 94)](https://www2.cs.duke.edu/courses/spring07/cps296.3/littman94markov.pdf)

Rules of the game:
* played by two players A & B on a 4x5 grid
* A & B occupy distinct squares of the grid and can choose from 5 actions on each turn: N, S, E, W and stand. Once both players have selected their actions, the two moves are executes in  random order.
* One player has the ball. If he steps with it in the appropiate goal, that player scores a goal and the board is reset to the original configuration. Possesion of the ball goes to one or the other player at random.
* When a player takes an action that would take him to the square occupied by the other player (remember: random order of execution AFTER action choice!), possesion of the ball goes to the stationary player and the move doesn't take place. Goals are worth 1 point and the discount factor is set to 0.9, which makes scoring sooner somewhat better than scoring later.

In [2]:
import random
from numpy.random import multinomial # for policy sampling
import numpy as np

In [3]:
class Soccer:
    directions = dict(zip(['N', 'E', 'S', 'W'], [(0,-1), (1,0), (0,1), (-1,0)]))
    def __init__(self, **kwargs):
        self.players = ('A', 'B')
        self.initialize()
    
    def get_state(self):
        """Returns state tuple, with positions of A and B
        and who has ball"""
        state = (self.find_player('A'),
                 self.find_player('B'),
                 self.has_ball)
        return state
    
    def show(self):
        s = '----------------\n'
        for y in range(4):
            for x in range(5):
                s += '|'
                if self.board[(x,y)] is not None:
                    s += self.board[(x,y)]
                    if self.has_ball == self.board[(x,y)]:
                        s += 'o'
                    else:
                        s += ' '
                else:
                    s += '  '
            s += '|\n'
            s += '----------------\n'
        print(s)
        
    def initialize(self):
        self.board = dict(zip([(x, y) for x in range(5) for y in range(4)], [None]*20))
        self.board[(3,1)] = self.players[0]
        self.board[(1,2)] = self.players[1]
        self.has_ball = random.choice(self.players)
        self.winner = None
    
    def find_player(self, player):
        for y in range(4):
            for x in range(5):
                if self.board[(x,y)]  == player:
                    return (x,y)
        raise KeyError('Player {} not found'.format(player))
    
    def other_player(self, player):
        if player == 'A':
            return 'B'
        elif player == 'B':
            return 'A'
        else:
            raise ValueError('Player {} not found'.format(player))
    
    def move(self, player, direction):
        # direction in ['N', 'E', 'S', 'W']
        (x, y) = self.find_player(player)
        old_x, old_y = x, y
        
        # check if this leads to a win
        if player == 'A' and (x,y) in [(0, 1), (0, 2)] and self.has_ball == 'A' and direction == 'W':
            self.winner = 'A'
        if player == 'B' and (x,y) in [(4, 1), (4, 2)] and self.has_ball == 'B' and direction == 'E':
            self.winner = 'B'
        
        # compute rewards
        if self.winner is None:
            reward = 0
        else:
            if self.winner == player:
                reward = 1
            elif self.winner == self.other_player(player):
                reward = -1
        
        self.board[(x, y)] = None
        if direction == 'N':
            y = max(0, y-1)
        elif direction == 'E':
            x = min(4, x+1)
        elif direction == 'S':
            y = min(3, y+1)
        elif direction == 'W':
            x = max(0, x-1)
        else:
            raise KeyError('Direction {} not valid'.format(direction))
        if self.board[(x,y)] is None or self.board[(x,y)] == player:
            self.board[(x,y)] = player
        else: # player walked into other player's square
            self.board[(old_x, old_y)] = player
            if self.has_ball == player:
                self.has_ball = self.other_player(player)
        
        return reward, self.board

In [4]:
soccer = Soccer()
soccer.show()

----------------
|  |  |  |  |  |
----------------
|  |  |  |Ao|  |
----------------
|  |B |  |  |  |
----------------
|  |  |  |  |  |
----------------



In [5]:
soccer.move('B', 'E')
soccer.show()
print(soccer.winner)

----------------
|  |  |  |  |  |
----------------
|  |  |  |Ao|  |
----------------
|  |  |B |  |  |
----------------
|  |  |  |  |  |
----------------

None


In [6]:
def choose_action(actions, player, policy):
    pvals = policy(moves, soccer)
    r = multinomial(1, pvals)
    return moves[int(np.where(r==1)[0])]

In [7]:
class Policy:
    def __init__(self, moves, game):
        self.game = game
        self.moves = moves
    def __call__(self, moves, soccer):
        return [1/len(moves)]*len(moves)

In [8]:
soccer = Soccer()
moves = ['N', 'E', 'S', 'W']
policy = Policy(moves, soccer)

In [9]:
soccer = Soccer()
moves = ['N', 'E', 'S', 'W']
p = Policy(moves, soccer)

i = 0
while True:
    _,_ = soccer.move('A', choose_action(moves, soccer, p))
    if soccer.winner: break
    _,_ = soccer.move('B', random.choice(moves))
    if soccer.winner: break
    i += 1
soccer.show()
print('# moves = {}, winner = {}'.format(i, soccer.winner))

----------------
|  |  |  |  |  |
----------------
|Ao|  |  |  |  |
----------------
|  |  |  |  |  |
----------------
|  |B |  |  |  |
----------------

# moves = 55, winner = A


In [10]:
def play_game(game, moves, policy_A, policy_B):
    i = 0
    while True:
        game.move('A', choose_action(moves, game, policy_A))
        if game.winner: break
        game.move('B', choose_action(moves, game, policy_B))
        if game.winner: break
        i += 1
    game.show()
    print('# moves = {}, winner = {}'.format(i, game.winner))

In [11]:
soccer = Soccer()
moves = ['N', 'E', 'S', 'W']
policy_A = Policy(moves, soccer)
policy_B = Policy(moves, soccer)
play_game(soccer, moves, policy_A, policy_B)

----------------
|  |  |  |  |  |
----------------
|Ao|  |  |B |  |
----------------
|  |  |  |  |  |
----------------
|  |  |  |  |  |
----------------

# moves = 22, winner = A


# Minimax-Q

 ![](./minimax_Q.png)

How to store Q[s,a,o]? 
1. As a 'dict' => allows for easy and intuitive lookup
2. As a $N_s x N_a x N_o$ matrix? => allows for 'easy' computation when sum/max has to be taken?
As of now -> 'dict' paradigm is implemented: resolve computation of V


In [21]:
def generate_init_dicts(moves):
    Q = {}
    V = {}
    for xa in range(5):
        for ya in range(4):
            for xb in range(5):
                for yb in range(4):
                    for has_ball in ['A', 'B']:
                        for action_a in moves:
                            for action_b in moves:
                                Q[(((xa, ya), (xb, yb), has_ball), action_a, action_b)] = 1
                        V[((xa, ya), (xb, yb), has_ball)] = 1
    return Q, V

In [22]:
from scipy import optimize
def compute_best_policy(moves, state, Q):
    """Uses Linear Programming to compute best distribution over moves for player A,
    based on the Q-values"""
    keys = [(a, o) for (s, a, o) in Q.keys() if s == state]
    
    # construct the different matrices, based on estimated rewards
    # found in the Q-matrix
    A_ub = np.zeros((len(moves), len(moves)+1))    
    for ib, action_b in enumerate(moves):
        for ia, action_a in enumerate(moves):
            A_ub[ib, ia] = Q[state, action_a, action_b]
    A_ub[:, -1] = -1
    b_ub = np.zeros((len(moves), 1))
    
    A_eq = np.ones((1, len(moves)+1))
    A_eq[:, -1] = 0
    b_eq = np.array([[1.]])
    
    c = np.zeros(len(moves)+1)
    c[-1] = 1
    
    #return A_ub, b_ub, A_eq, b_eq, c
    result = optimize.linprog(c, -A_ub, b_ub, A_eq, b_eq)
    
    return result.x[:-1]

In [23]:
Q = {}
Q[(0, 'R', 'R')] =  0
Q[(0, 'R', 'P')] = -1
Q[(0, 'R', 'S')] =  1

Q[(0, 'P', 'R')] =  1
Q[(0, 'P', 'P')] =  0
Q[(0, 'P', 'S')] = -1

Q[(0, 'S', 'R')] = -1
Q[(0, 'S', 'P')] =  1
Q[(0, 'S', 'S')] =  0

result = compute_best_policy(['R', 'P', 'S'], 0, Q)

print(result)

[0.33333333 0.33333333 0.33333333]


In [24]:
# max number of states (s, a, o)
((4 * 5) * (4 * 5) * 2) * 4 * 4 

12800

In [25]:
class Policy:
    def __init__(self, moves, game):
        self.game = game
        self.moves = moves
        self.probs = {}
        
    def __call__(self, state):
        if state in self.probs:
            return self.probs[state]
        else:
            return [1/len(self.moves)]*len(self.moves)
    def update(self, state, probs):
        self.probs[state] = probs

In [26]:
def choose_action(actions, player, state, policy):
    pvals = policy(state)
    r = multinomial(1, pvals)
    return actions[int(np.where(r==1)[0])]

In [43]:
10**(np.log10(0.01)/10e6)

0.9999995394830874

In [44]:
def minimaxQ(game, moves, player='A', opponent='B', **kwargs):
    """
    Compute the optimal policy for player. 
    !!TODO: check if everything works for player B
    Arguments:  * game
                * moves: a list of allowed moves
                * player: the player for which the optimal policy will be computed
                * opponent: the other player
    Parameters: * N : the number of games to play
                * explor : prob that random move (not from policy) is picked
                * alpha : determines weight of current value in update process
                * gamma : discount factor
                * decay : how fast alpha decreases
    Returns:    * Q : table of Q[(s,a,o)] values
                * V[s] : expected return from state s under policy
                * ploicies : policies of both player and opponent
    """
    N      = kwargs.get('N', 10) 
    explor = kwargs.get('explor', 0.2)
    alpha  = kwargs.get('alpha',  0.1)
    gamma  = kwargs.get('gamma',  0.1)
    decay  = kwargs.get('decay',  10**(np.log10(0.01)/10e6)) # see article
    
    policies = {}
    policies[player]   = Policy(moves, game)
    policies[opponent] = Policy(moves, game)
    
    # initialization (this makes the function application-specific!)
    Q, V = generate_init_dicts(moves)

    for _ in range(N):
        game.initialize()
        while game.winner is None:
            # get current state
            s = game.get_state()

            # choose an action
            if random.uniform(0, 1) < explor:
                action = random.choice(moves)
            else:
                action = choose_action(moves, player, s, policies[player])
            # Learn
            # Save state in which actions are taken:
            reward, next_state = game.move(player, action)
            if not reward == 1: # means game is not yet won, play other player
                s_next = game.get_state()
                action_opponent  = choose_action(moves, game, s_next, policies[opponent])
                reward, next_state = game.move(opponent, action_opponent)

            s_next = game.get_state()
            a = action
            o = action_opponent

            Q[(s,a,o)] = (1 - alpha) * Q[(s,a,o)] + \
                         alpha * (reward + gamma * V[s_next])

            result = compute_best_policy(moves, s, Q)
            #policies[player].update(s, result)
            # TO DO: update V
            
            alpha = alpha * decay # decrease step size
    return Q, V, policies

In [36]:
game = Soccer()
moves = ['N', 'E', 'S', 'W']
policy_A = Policy(moves, game)
policy_B = Policy(moves, game)
Q, V, policies = minimaxQ(game, moves, player='A', opponent='B', N=20, explor=.1)

In [37]:
Q

{(((0, 0), (0, 0), 'A'), 'N', 'N'): 1,
 (((0, 0), (0, 0), 'A'), 'N', 'E'): 1,
 (((0, 0), (0, 0), 'A'), 'N', 'S'): 1,
 (((0, 0), (0, 0), 'A'), 'N', 'W'): 1,
 (((0, 0), (0, 0), 'A'), 'E', 'N'): 1,
 (((0, 0), (0, 0), 'A'), 'E', 'E'): 1,
 (((0, 0), (0, 0), 'A'), 'E', 'S'): 1,
 (((0, 0), (0, 0), 'A'), 'E', 'W'): 1,
 (((0, 0), (0, 0), 'A'), 'S', 'N'): 1,
 (((0, 0), (0, 0), 'A'), 'S', 'E'): 1,
 (((0, 0), (0, 0), 'A'), 'S', 'S'): 1,
 (((0, 0), (0, 0), 'A'), 'S', 'W'): 1,
 (((0, 0), (0, 0), 'A'), 'W', 'N'): 1,
 (((0, 0), (0, 0), 'A'), 'W', 'E'): 1,
 (((0, 0), (0, 0), 'A'), 'W', 'S'): 1,
 (((0, 0), (0, 0), 'A'), 'W', 'W'): 1,
 (((0, 0), (0, 0), 'B'), 'N', 'N'): 1,
 (((0, 0), (0, 0), 'B'), 'N', 'E'): 1,
 (((0, 0), (0, 0), 'B'), 'N', 'S'): 1,
 (((0, 0), (0, 0), 'B'), 'N', 'W'): 1,
 (((0, 0), (0, 0), 'B'), 'E', 'N'): 1,
 (((0, 0), (0, 0), 'B'), 'E', 'E'): 1,
 (((0, 0), (0, 0), 'B'), 'E', 'S'): 1,
 (((0, 0), (0, 0), 'B'), 'E', 'W'): 1,
 (((0, 0), (0, 0), 'B'), 'S', 'N'): 1,
 (((0, 0), (0, 0), 'B'), 

In [225]:
policies

{'A': <__main__.Policy at 0x7f1a57f16358>,
 'B': <__main__.Policy at 0x7f1a57d087b8>}

# Test the Trained Policy

In [215]:
class Policy:
    def __init__(self, moves, game, Q=None):
        self.game = game
        self.moves = moves
        self.probs = {}

    def __call__(self, state):
        if state in self.probs:
            return self.probs[state]
        else:
            return [1/len(self.moves)]*len(self.moves)
    def update(self, state, probs):
        self.probs[state] = probs

In [226]:
def play_game(game, moves, policy_A, policy_B):
    i = 0
    while True:
        game.move('A', choose_action(moves, 'A', game.get_state(), policy_A))
        if game.winner: break
        game.move('B', choose_action(moves, 'B', game.get_state(), policy_B))
        if game.winner: break
        i += 1
    game.show()
    print('# moves = {}, winner = {}'.format(i, game.winner))

In [263]:
soccer = Soccer()
moves = ['N', 'E', 'S', 'W']
policy_A = policies['A']
policy_B = Policy(moves, soccer)
#play_game(soccer, moves, policy_A, policy_B)

In [262]:
game.move('A', choose_action(moves, 'A', game.get_state(), policy_A))
game.move('B', choose_action(moves, 'B', game.get_state(), policy_B))
game.show()

----------------
|Ao|  |  |  |  |
----------------
|  |  |  |  |  |
----------------
|  |  |  |  |  |
----------------
|B |  |  |  |  |
----------------



In [264]:
policy_A(game.get_state())

array([1., 0., 0., 0.])