# Lab 3: Policy Search

## Task

Write agents able to play [_Nim_](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., _subtraction game_).

The player **taking the last object wins**.

-  Task3.1: An agent using fixed rules based on _nim-sum_ (i.e., an _expert system_)
-  Task3.2: An agent using evolved rules
-  Task3.3: An agent using minmax
-  Task3.4: An agent using reinforcement learning


### ⚙️ Imports


In [124]:
import logging
from collections import namedtuple
import random
import math
from typing import Callable
from copy import deepcopy
from itertools import accumulate
from operator import xor

logging.getLogger().setLevel(logging.DEBUG)


### 🧩 _Nim_ and _Nimply_ classes


In [125]:
Nimply = namedtuple("Nimply", "row, num_objects")


class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return f"[{' '.join(str(i) for i in self.rows)}]"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    @property
    def k(self) -> int:
        return self._k

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        assert num_objects != 0
        self._rows[row] -= num_objects


In [126]:
def cook_status(state: Nim) -> dict:
    cooked = dict()
    cooked["possible_moves"] = [
        (r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1) if state.k is None or o <= state.k
    ]
    cooked["active_rows_number"] = sum(o > 0 for o in state.rows)
    cooked["active_rows"] = [(i, o) for i, o in enumerate(state.rows) if o > 0]
    cooked["shortest_row"] = min((x for x in enumerate(
        state.rows) if x[1] > 0), key=lambda y: y[1])[0]
    cooked["longest_row"] = max(
        (x for x in enumerate(state.rows)), key=lambda y: y[1])[0]
    cooked["nim_sum"] = nim_sum(state)

    brute_force = list()
    for m in cooked["possible_moves"]:
        tmp = deepcopy(state)
        tmp.nimming(m)
        brute_force.append((m, nim_sum(tmp)))
    cooked["brute_force"] = brute_force
    return cooked


def nim_sum(state: Nim) -> int:
    *_, result = accumulate(state.rows, xor)
    return result


### 🕹️ Strategies

-  Optimal


In [127]:
def optimal_strategy(state: Nim) -> Nimply:
    data = cook_status(state)
    tmp = next((bf for bf in data["brute_force"] if bf[1] == 0), random.choice(
        data["brute_force"]))[0]
    return Nimply(*tmp)


-  Random


In [128]:
def random_strategy(state: Nim) -> Nimply:
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(
        1, state.rows[row] if state.k is None else min(state.rows[row], state.k))
    return Nimply(row, num_objects)


-  3.1 - Fixed rules


In [129]:

def fixed_strategy(state: Nim) -> Nimply:
    cooked = cook_status(state)

    # endgame strategy
    if cooked["active_rows_number"] == 3:
        rows = cooked["active_rows"]
        dangerRow = [t for t in rows if t[1] == 1]
        if len(dangerRow) == 1:
            return Nimply(dangerRow[0][0], 1)

    # main strategy
    if cooked["active_rows_number"] % 2 == 0:
        row = cooked["longest_row"]
        num_objects = 1
    else:
        row = cooked["longest_row"]
        num_objects = state.rows[row]

    if state.k is not None:
        num_objects = min(num_objects, state.k)

    return Nimply(row, num_objects)


-  ⚠️ 3.2 - Evolved rules


In [130]:
POP_SIZE = 10
GEN = 100
OFF_SIZE = 10


def evolved_strategy(state: Nim) -> Nimply:
    Individual = namedtuple('Individual', ('genome', 'fitness'))

    def mutation(g):
        point = random.randint(0, len(g) - 1)
        return g[:point] + (1 - g[point,]) + g[point + 1:]

    def crossover(g1, g2):
        cut = random.randint(0, min(len(g1), len(g2)) - 1)
        return g1[:cut] + g2[cut:]

    def tournament(population, size):
        return max(random.choices(population, k=size), key=lambda i: i.fitness)


-  3.3 - MinMax


In [131]:
MAX_DEPTH = 5


def minmax_strategy(state: Nim) -> Nimply:

    def minmax(state: Nim, turn: int, alpha: float = -1, beta: float = 1, depth: int = 0) -> tuple:
        if not state or (MAX_DEPTH is not None and depth >= MAX_DEPTH):
            return None, turn

        cooked = cook_status(state)
        moves = cooked["possible_moves"]
        score = turn * math.inf
        for ply in moves:
            new_state = deepcopy(state)
            new_state.nimming(ply)
            _, val = minmax(new_state, -turn, alpha, beta, depth + 1)
            score = (min if turn == 1 else max)(score, val)
            if turn == 1:
                if score <= alpha:
                    break
                beta = min(beta, score)
            if turn == -1:
                if score >= beta:
                    break
                alpha = max(alpha, score)
        return (ply, score)

    ply, score = minmax(state, 1)
    return Nimply(*ply)


-  3.4 - Reinforcement learning


In [132]:
EPOCHS = 500
ALPHA = 0.1
RANDOM_FACTOR = 0.2
TEACHER = optimal_strategy


def rl_strategy(state: Nim) -> Nimply:

    class RLAgent(object):
        def __init__(self, alhpa=0.1, random_factor=0.2):
            self.state_history = []
            self.alpha = alhpa
            self.random_factor = random_factor
            self.G = {}

        def choose_action(self, state):
            cooked = cook_status(state)
            moves = cooked["possible_moves"]
            maxG = -10e15
            randomN = random.random()

            if randomN < self.random_factor:
                next_move = random.choice(moves)
                new_state = deepcopy(state)
                new_state.nimming(next_move)
                if new_state.rows not in self.G:
                    self.G[new_state.rows] = random.uniform(0.1, 1.0)
            else:
                next_move = random.choice(moves)
                for action in moves:
                    new_state = deepcopy(state)
                    new_state.nimming(action)
                    if new_state.rows not in self.G:
                        self.G[new_state.rows] = random.uniform(0.1, 1.0)
                    if self.G[new_state.rows] >= maxG:
                        next_move = action
                        maxG = self.G[new_state.rows]

            return next_move

        def update_state_history(self, state, reward):
            self.state_history.append((state, reward))

        def learn(self):
            target = 0

            for prev, reward in reversed(self.state_history):
                self.G[prev] = self.G[prev] + \
                    self.alpha * (target - self.G[prev])
                target += reward

            self.state_history = []
            self.random_factor -= 10e-5

    agent = RLAgent(ALPHA, RANDOM_FACTOR)
    for i in range(EPOCHS):
        copy_state = deepcopy(state)
        player = random.choice([0, 1])
        while copy_state:
            if player == 0:
                ply = agent.choose_action(copy_state)
                copy_state.nimming(ply)
                agent.update_state_history(
                    copy_state.rows, 0 if copy_state else 1)
            else:
                ply = TEACHER(copy_state)
                copy_state.nimming(ply)
            player = 1 - player
        agent.learn()
    return Nimply(*agent.choose_action(state))


### 🏆 Single Match


In [None]:
NIM_SIZE = 5
K = None

players = (
    minmax_strategy,
    optimal_strategy,
)


def match(players: tuple[Callable, Callable]) -> Callable:
    nim = Nim(NIM_SIZE, K)
    player = 0
    logging.debug(f"  -  {nim}")

    while nim:
        ply = players[player](nim)
        nim.nimming(ply)
        logging.debug(
            f"  -  {players[player].__name__} -> row: {ply.row} / obj: {ply.num_objects}")
        logging.debug(f"  -  {nim}")
        player = 1 - player
    winner = 1 - player
    return players[winner]


winner = match(players)
logging.info(f" {winner.__name__} won!")


### 📊 Evaluation


In [None]:
NIM_SIZE = 5
K = None
NUM_MATCHES = 100

players = (
    minmax_strategy,
    rl_strategy,
)


def evaluate(players: tuple[Callable, Callable]) -> int:
    won = 0
    for m in range(NUM_MATCHES):
        nim = Nim(NIM_SIZE, K)
        player = random.choice([0, 1])
        while nim:
            ply = players[player](nim)
            nim.nimming(ply)
            player = 1 - player
        if player == 1:
            won += 1
    return won


won = evaluate(players)
rate = won / NUM_MATCHES * 100
logging.info(
    f"  -  the winning rate of {players[0].__name__} against {players[1].__name__} was {rate}% ({won}/{NUM_MATCHES})")
