# Lab 3.4 Reinforced Learning


## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The player **taking the last object wins**.


An agent using reinforcement learning


## Util functions from previous task
Nim class has been changed a little: we add avaible_actions that are all tha actions avaible for each state, it has been taken by the old function 'cooked' in the previous lab.

In [95]:
from collections import namedtuple
import random
from copy import deepcopy
from typing import Callable
from itertools import product, accumulate
import numpy as np
import logging

#Variables
NUM_MATCHES = 1000
NUM_TRAINING_TRIALS = 10000
NIM_SIZE = 5


Nimply = namedtuple("Nimply", "row, num_objects")

class Nim():
    def __init__(self, num_rows: int, player=0):
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._player = player
        
    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        self._rows[row] -= num_objects
    
    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    @property
    def k(self) -> int:
        return self._k
       
        
    def available_actions(rows):
        '''
        computes all the avaible actions as tuples (rowline,numobjectstoremove)
        '''
        actions = set()
        for r, c in enumerate(rows):
            for o in range(1, c + 1):
                actions.add((r, o))
        return actions


def cookStatus(state: Nim) -> dict:
    cooked = dict()
    cooked["possibleMoves"] = [
        (r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1) 
    ]
    cooked["activeRowsNumber"] = sum(o > 0 for o in state.rows)
    cooked["totalElements"] = sum(state.rows)
    cooked["shortestRow"] = min((x for x in enumerate(state.rows) if x[1] > 0), key=lambda y: y[1])[0]
    cooked["longestRow"] = max((x for x in enumerate(state.rows)), key=lambda y: y[1])[0]

    bruteForce = list()
    for m in cooked["possibleMoves"]:
        tmp = deepcopy(state)
        tmp.nimming(m)
        bruteForce.append((m, tuple(tmp._rows)))
    cooked["bruteForce"] = bruteForce

    return cooked


# Agent using Reinforcement Learning (Q-Learning)

https://en.wikipedia.org/wiki/Q-learning

The agent is using a model-free method: it will learn exclusively from trial and error (no modelling of the environment). 
Reinforced learning involves an agent, a set of states S and a set of actions A per state.

In [96]:
class Agent_RL():
    '''
    Class based on Q-Learning: For any finite Markov decision process (FMDP), 
    Q-learning finds an optimal policy in the sense of maximizing the expected 
    value of the total reward over any and all successive steps, 
    starting from the current state.
    Q-learning can identify an optimal action-selection policy for any given FMDP
    from wikipedia
    '''
    
    def __init__(self, alpha = 0.15, random_factor = 0.2): # 80% explore, 20% exploit 
        ''' We start from an empty Q dictionary that will contain all the tuples 
        state/action mapped as Q-values
        '''
        # DICTIONARY Q:  
        self.q = dict()

        # ALPHA: The learning rate or step size determines 
        # to what extent newly acquired information overrides old information
        self.alpha = alpha 
        self.random_factor = random_factor

    def get_q_value(self, state, action):
        '''
        We get the value of Q taking as inputs (state and action), if there is not yet we give q=0
        '''
        state = tuple(state)

        q_value = 0 # since Q-Learning is a iterative algorithm we must initializate the q_value to 0 
        if (state, action) in self.q: # finds the q_value in the dict of q values
            q_value = self.q[state, action]
        return q_value

    def best_future_reward(self, state):
        '''
        As input we take the Board state and then we compute all the possible actions
        '''
        actions = Nim.available_actions(state)

        # Check if there are no action possible, else go on
        if len(actions) == 0:
            return 0
        else:
            scores = list()
            for action in actions:
                score = self.get_q_value(state, action) #we can take the Q dictionary values 
                scores.append(score) 
        
        # The goal of the agent is to maximize its total reward. It does this by adding the maximum reward
        # attainable from future states to the reward for achieving its current state
        return max(scores)

    def choose_action(self, state, random_factor=True):
        '''
        returns an action to take once receive a state as input
        '''
        actions = Nim.available_actions(state)

        # If random_factor mode is enabled with probability random_factor, choose a random item
        if random.random() < self.random_factor:
            return random.choice(list(actions))
        else:
            actions_by_score = dict()
            for action in actions:
                score = self.get_q_value(state, action)
                actions_by_score[score] = action
            scores = list(actions_by_score.keys())  # Find the largest key in dictionary
            best_action = actions_by_score[max(scores)]             
            return best_action # We return the best action (q max) if the random state is disabled
        

    def update_q_value(self, state, action, old_q, reward, future_rewards):
        '''
        update w value once the action has been taken, there's also an estimate of the future reward

        '''
        state = tuple(state)
        # Q (state,action) = previous Q value + alpha_learning_rate * (new value estimate - olq Q value)
        # with new value estimate that is the current reward + the future reward estimation
        q_value = old_q + self.alpha*(reward + future_rewards)
        self.q[(state, action)] = q_value
        return

    def update(self, old_state, action, new_state, reward):
        
        '''
        Q-learning mode updates when performing the action on the old board state.
        '''

        old_q_value = self.get_q_value(old_state, action)
        best_future = self.best_future_reward(new_state)
        self.update_q_value(old_state, action, old_q_value, reward, best_future)

def Learn(num_games: int):
    '''Strategy Q-learning by playing free methods vs itself'''
    
    player = Agent_RL()
    
    for game in range(num_games):
        #print(f"Playing training game {game + 1}")
        game = Nim(NIM_SIZE)

        # History of the actions taken by the players, erased for each game
        last = {
            0: {"state": None, "action": None},
            1: {"state": None, "action": None}
        }

        # Play
        while True:
            state = game._rows.copy()
            action = player.choose_action(game._rows)
            
            # Keep track of last state and action
            last[game._player]["state"] = state
            last[game._player]["action"] = action
          
            # Take actions
            game.nimming(action)
            new_state = game._rows.copy()
            
            # When we got a winner we can update rewards
            if game:
                player.update(state, action, new_state, -1)
                player.update(last[game._player]["state"],last[game._player]["action"],new_state,1)
                break
            elif last[game._player]["state"] is not None:
                player.update(last[game._player]["state"],last[game._player]["action"],new_state,0)
            
            break
            
    print(f'Training done with {NUM_TRAINING_TRIALS} games')
    
    return player


## Strategy evaluations

In [97]:
def pure_random(state: Nim) -> Nimply:
    '''simple strategy, it just choose randomly from the rows'''
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)

def renforced(state: Nim, R: Agent_RL) -> Nimply:
    row, num_objects = R.choose_action(state._rows, random_factor=False)
    #row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    #num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)

def evaluate_LR(strategy1: Callable, strategy2: Callable, R: Agent_RL) -> float:
    '''evaluate put a first strategy versus a second one for a certain number of times, the results is the ratio 
    between the won matches and the total number of played matches'''
    won = 0

    for m in range(NUM_MATCHES):
        nim = Nim(NIM_SIZE)
        #player = 1
        player = random.randint(0,1) #make a random start
        while nim:
            if player == 0:
                ply = strategy1(nim, R)
            else:
                ply = strategy2(nim)
            #ply = opponent[player](nim)
            nim.nimming(ply)
            player = 1 - player
        if player == 1:
            won += 1
    return round(won / NUM_MATCHES, 2)

def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))


In [98]:
PlayerLR = Learn(NUM_TRAINING_TRIALS)
ev_RLvsRandom = evaluate_LR(renforced, pure_random, PlayerLR)
logging.info(f"Expert vs optimal => {ev_RLvsRandom}")
print(ev_RLvsRandom)

Training done with 10000 games
0.52


# Strategy from MAZE Project



In [99]:
class Agent_MZ(object):
        def __init__(self, state, alpha=0.15, random_factor=0.2):
            self.state_history = [(tuple(state._rows), 0)]  # 80% explore, 20% exploit
            self.alpha = alpha
            self.random_factor = random_factor
            self.G = {}
            self.init_reward(state)

        def init_reward(self, state):
            rows = list()
            # we do not have state as in maze so we have to use the _rows in nim
            state = state._rows
            for i in state:
                rows.append(list(range(i+1)))
            
            for row in product(*rows):
                self.G[row] = np.random.uniform(low=1.0, high=0.1)

        def choose_action(self, state, allowedpossibleMoves):
            #allowed possibleMoves arrives from cook functions as a list
            maxG = -10e15
            next_move = None
            randomN = np.random.random()
            if randomN < self.random_factor:
                # if random number below random factor, choose random action
                random_idx = np.random.choice(len(allowedpossibleMoves))
                return allowedpossibleMoves[random_idx]
            else:
                # if exploiting, gather all possible actions and choose one with the highest G (reward)
                for action in allowedpossibleMoves:
                    new_state = deepcopy(state)
                    new_state.nimming(action)
                    if self.G[tuple(new_state._rows)] >= maxG:
                        maxG = self.G[tuple(new_state._rows)]
                        return action

            return next_move

        def update_state_history(self, state, reward):
            self.state_history.append((tuple(state._rows), reward))

        def learn(self):
            target = 0

            for prev, reward in reversed(self.state_history):
                self.G[prev] = self.G[prev] + self.alpha * (target - self.G[prev])
                target += reward

            self.state_history = []

            self.random_factor -= 10e-5  # decrease random factor each episode of play

def RLAgent(G: dict) -> Nimply:
    def evolvable(state: Nim) -> Nimply:
        #possibleStates = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
        possibleStates = cookStatus(state)["bruteForce"]
        ply = max(
            ((state[0], G[state[1]]) for state in possibleStates if state[1] in G), 
            key=lambda i: i[1]
            )[0]
        return Nimply(ply[0], ply[1])
    return evolvable

def get_rewards(state):
    sum_row = sum(i > 0 for i in state._rows)
    if sum_row == 1:
        return -1 # losing situation
    elif sum_row >1:
        return -0.5 
    else:
        return 0


## Evaluation Function for the Reinforced Learnign from the Maze


In [100]:
def evaluate_MZ(strategy1: Callable, strategy2: Callable) -> float:
    '''evaluate put a first strategy versus a second one for a certain number of times, the results is the ratio 
    between the won matches and the total number of played matches'''
    opponent = (strategy1, strategy2)
    won = 0

    for m in range(NUM_MATCHES):
        nim = Nim(NIM_SIZE)
        #player = 1
        player = random.randint(0,1) #make a random start
        while nim:
            ply = opponent[player](nim)
            nim.nimming(ply)
            player = 1 - player
        if player == 1:
            won += 1
    return round(won / NUM_MATCHES, 2)

In [102]:
state = Nim(5)
agentRL = Agent_MZ(state, 0.2, 0.15)


for i in range(5000):
        #we start with a new istance of the game
        new_state = deepcopy(state)
        while new_state: # boolean of nim
            possibleMoves = cookStatus(new_state)["possibleMoves"]
            action = agentRL.choose_action(new_state, possibleMoves)
            new_state.nimming(action)

            reward = get_rewards(new_state)
            agentRL.update_state_history(new_state, reward)
            
            if sum(new_state._rows) == 0:
                # we have a winner
                break
            new_state.nimming(pure_random(new_state))
        agentRL.learn()


ev_RLvsRandom = evaluate_MZ(RLAgent(agentRL.G), pure_random)
logging.info(f"Expert vs optimal => {ev_RLvsRandom}")
print(ev_RLvsRandom)

0.48
