Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 3: ES

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The goal of the game is to **avoid** taking the last object.

* Task2.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task2.2: An agent using evolved rules using ES

## Instructions

* Create the directory `lab2` inside your personal course repository for the course 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.



In [258]:
import logging
from pprint import pprint, pformat
from collections import namedtuple
import random
from copy import deepcopy, copy
from math import ceil
import pandas as pd

## The *Nim* and *Nimply* classes

In [259]:
Nimply = namedtuple("Nimply", "row, num_objects")


In [260]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects


## Sample (and silly) startegies 

In [261]:
def pure_random(state: Nim) -> Nimply:
    """A completely random move"""
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)


In [262]:
def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))


In [263]:
def first_try(state: Nim) -> Nimply:
    """Pick always the first tile of the highest row"""
    move = Nimply(max([i for i,r in enumerate(state.rows) if r > 0]), 1)
    return move

In [264]:
def adaptive(state: Nim) -> Nimply:
    """A strategy that can adapt its parameters"""
    genome = {"love_small": 0.5}


In [265]:
import numpy as np


def nim_sum(state: Nim) -> int:
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)


def analize(raw: Nim) -> dict:
    cooked = dict()
    cooked["possible_moves"] = dict()
    for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c + 1)):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = nim_sum(tmp)
    return cooked


def optimal(state: Nim) -> Nimply:
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = random.choice(spicy_moves)
    return ply


In [None]:
NUM_MATCH = 250
NUM_GEN = 10
NUM_IND = 25

In [None]:
# Early tries
# class Nimmer:
#     def __init__(self, n: float, p: float) -> None:
#         self.n = n 
#         self.p = p
#         self.win_rate = 0

#     def __lt__(self, other):
#         return self.win_rate < other.win_rate
    
#     def __str__(self):
#         return "{" + str(self.n) + "," + str(self.p) + "}" + " " + "WIN:" + str(self.win_rate*100)

#     def valute(self, wr):
#         self.win_rate = wr

In [None]:
# Early tries
# def es_strategy(state: Nim, ind: Nimmer) -> Nimply:
#     if random.random() <= ind.p :
#         possible_rows = [(r,i) for i,r in enumerate(state.rows) if r > 0]
#         possible_rows.sort(reverse=True)
#         _ , row = possible_rows.pop(0)
#         #print("Pazzo prende max")
#     else:
#         row = random.choice([i for i, r in enumerate(state.rows) if r > 0])
#         #print("Pazzo prende random")
#     num_objects = ceil(state.rows[row] * ind.n)
#     return Nimply(row,num_objects)

In [None]:
# Early tries
# initial_n = 0.5 # How many pieces to take (0 => None 1 => All)
# initial_p = 0.5 # Chance to take from row w most pieces, otherwise random

# best = Nimmer(initial_n,initial_p)
# for gen in range(NUM_GEN):
#     new_n = np.random.normal(loc=0,scale=0.1,size=NUM_IND)+best.n
#     new_p = np.random.normal(loc=0,scale=0.1,size=NUM_IND)+best.p
#     individuals = [Nimmer(n,p) for n,p in zip(new_n,new_p)]
#     for ind in individuals:
#         ind.valute(play_matches(ind))
#     individuals.append(best)
#     best = copy(max(individuals, key=lambda i: i.win_rate))
# print(f"Best @ gen. {gen}: {best}")

In [None]:
#ee = even number of piles, even number of pieces
#eo = even number of piles, odd number of pieces
#oe = odd number of piles, even number of pieces
#oo = odd number of piles, odd number of pieces

#mp = take from pile with most pieces
#lp = take from pile with least pieces

#all = take all pieces
#odd = take an odd number of pieces
#eve = take an even number of pieces

Parameters = namedtuple("Parameters", "ee_mp_all, ee_lp_all, ee_mp_odd, ee_mp_eve, ee_lp_odd, ee_lp_eve," +
                                    "eo_mp_all, eo_lp_all, eo_mp_odd, eo_mp_eve, eo_lp_odd, eo_lp_eve," +
                                    "oe_mp_all, oe_lp_all, oe_mp_odd, oe_mp_eve, oe_lp_odd, oe_lp_eve," +
                                    "oo_mp_all, oo_lp_all, oo_mp_odd, oo_mp_eve, oo_lp_odd, oo_lp_eve")
NUM_PAR = len(Parameters._fields)

In [None]:
class Nimmer:
    def __init__(self, *args) -> None:
        if len(args) == 2:  #if a Parameter is passed to constructor a new Nimmer is created from its parameters 
            new_par = list()
            for i in range(NUM_PAR):
                new_par.append(args[0][i] + np.random.normal(loc=0,scale=args[1]))  
                if new_par[i] >= 1:
                    new_par[i] = 1  #Saturate to 1
                elif new_par[i] <= 0:
                    new_par[i] = 0  #Saturate to 0
            self.par = Parameters(*new_par)
        else:
            self.par = Parameters(*np.random.rand(NUM_PAR)) #else a Nimmer is created from scratch
        self.win_rate = 0

    def __lt__(self, other):
        return self.win_rate < other.win_rate
    
    def __str__(self):
        str_attr = [str(attr) for attr in self.par]
        return "{" + ",".join(str_attr) + "}" + " " + "WIN:" + str(self.win_rate*100)

    def valute(self, wr):
        self.win_rate = wr

In [None]:
def take_all(row):
    return Nimply(*row)

def take_odd(row):
    index, n_pcs = row
    if n_pcs == 1:
        return Nimply(index,1)
    return Nimply(index,random.randrange(1,n_pcs,step=2))

def take_even(row):
    index, n_pcs = row
    if n_pcs == 1:
        return Nimply(index,1)
    else:
        return Nimply(index,random.randrange(0,n_pcs,step=2))

def leave_one(row):
    index, n_pcs = row
    return Nimply(index,n_pcs-1)

In [None]:
def es_strategy_V2(state: Nim, ind: Nimmer) -> Nimply:

    possible_rows = [(i,r) for i,r in enumerate(state.rows) if r > 0]
    
    even_number_rows = len(possible_rows) % 2 == 0
    even_number_pcs = sum([p for _, p in possible_rows]) % 2 == 0
    row_most_pcs = max(possible_rows, key=lambda t: t[1])
    row_least_pcs = min(possible_rows, key=lambda t: t[1])
    
    possible_moves = list()
    
    if even_number_rows and even_number_pcs:
        possible_moves.append((ind.par.ee_mp_all, take_all(row_most_pcs)))
        possible_moves.append((ind.par.ee_lp_all, take_all(row_least_pcs)))
        possible_moves.append((ind.par.ee_mp_odd, take_odd(row_most_pcs)))
        possible_moves.append((ind.par.ee_mp_eve, take_even(row_least_pcs)))
        possible_moves.append((ind.par.ee_lp_odd, take_odd(row_most_pcs)))
        possible_moves.append((ind.par.ee_lp_eve, take_even(row_least_pcs)))
    elif even_number_rows and not even_number_pcs:
        possible_moves.append((ind.par.eo_mp_all, take_all(row_most_pcs)))
        possible_moves.append((ind.par.eo_lp_all, take_all(row_least_pcs)))
        possible_moves.append((ind.par.eo_mp_odd, take_odd(row_most_pcs)))
        possible_moves.append((ind.par.eo_mp_eve, take_even(row_least_pcs)))
        possible_moves.append((ind.par.eo_lp_odd, take_odd(row_most_pcs)))
        possible_moves.append((ind.par.eo_lp_eve, take_even(row_least_pcs)))
    elif not even_number_rows and even_number_pcs:
        possible_moves.append((ind.par.oe_mp_all, take_all(row_most_pcs)))
        possible_moves.append((ind.par.oe_lp_all, take_all(row_least_pcs)))
        possible_moves.append((ind.par.oe_mp_odd, take_odd(row_most_pcs)))
        possible_moves.append((ind.par.oe_mp_eve, take_even(row_least_pcs)))
        possible_moves.append((ind.par.oe_lp_odd, take_odd(row_most_pcs)))
        possible_moves.append((ind.par.oe_lp_eve, take_even(row_least_pcs)))
    elif not even_number_rows and not even_number_pcs:
        possible_moves.append((ind.par.oo_mp_all, take_all(row_most_pcs)))
        possible_moves.append((ind.par.oo_lp_all, take_all(row_least_pcs)))
        possible_moves.append((ind.par.oo_mp_odd, take_odd(row_most_pcs)))
        possible_moves.append((ind.par.oo_mp_eve, take_even(row_least_pcs)))
        possible_moves.append((ind.par.oo_lp_odd, take_odd(row_most_pcs)))
        possible_moves.append((ind.par.oo_lp_eve, take_even(row_least_pcs)))
    # Gold rules
    # if len(possible_rows) == 1 and possible_rows[0][1] > 1:
    #     possible_moves.append((ind.par.lo, leave_one(row_most_pcs)))
    # if len(possible_rows) == 2 and len(list(filter(lambda t: t[1] == 1, possible_rows))) == 1:
    #     possible_moves.append((ind.par.ta, take_all(row_most_pcs)))
        
    if len(possible_moves) == 0:    #Go Random if no strategy can be applied
        possible_moves.append((0,Nimply(random.choice([i for i, _ in possible_rows]), 1)))

    #Aggregate using move as a key and summing associated param to find most ev move  
    return pd.DataFrame(data=possible_moves, columns=['par', 'move']).groupby('move').agg({'par':'sum'}).idxmax().values[0]
    


In [None]:
def play_matches(ind):
    # print(f"TESTING INDIVIDUAL {ind}")
    won = 0
    for match in range(NUM_MATCH):
        nim = Nim(5)
        hero = es_strategy_V2
        rival = random.choice([optimal, pure_random])   #Test strategy with different rivals
        #First to move
        player = random.choice([0,1])   #0 Hero, 1 Rival
        while nim:
            if player:
                # print(f"Rival si ritrova con: {nim}")
                ply = rival(nim)
                # print(f"Rival gioca {ply}")
            else:
                # print(f"Hero si ritrova con: {nim}")
                ply = hero(nim,ind)
                # print(f"Hero gioca {ply}")
            nim.nimming(ply)
            player = 1 - player
        if not player:
            won += 1
    print(f"Result: {won/NUM_MATCH*100}% {ind}")
    return won/NUM_MATCH

In [None]:
best = Nimmer()
best.valute(play_matches(best))
print(f"Original : {best.win_rate}")
sigma = 0.1
no_impr = 0
for gen in range(NUM_GEN):
    individuals = [Nimmer(best.par, sigma) for _ in range(NUM_IND)]
    for ind in individuals:
        ind.valute(play_matches(ind))
    rival = copy(max(individuals, key=lambda i: i.win_rate))
    if rival > best:
        no_impr = 0
        best = copy(rival)
        print(f"Best @ gen. {gen}: {best.win_rate}")
    else:
        no_impr += 1
    if no_impr  == 3:
        sigma-=sigma/5
        print(f"Nuovo sigma: {sigma}")

In [None]:
VALIDATION_MATCH = 1000

In [None]:
hero = copy(best)
print(f"HERO: {hero}")

strategy = (es_strategy_V2, optimal)
won = 0
for match in range(VALIDATION_MATCH):
    nim = Nim(5)
    #First to move
    player = random.choice([0,1])   #0 Hero, 1 Rival
    while nim:
        if player:
            #print(f"Rival si ritrova con: {nim}")
            ply = strategy[player](nim)
            #print(f"Random gioca {ply}")
        else:
            #print(f"Hero si ritrova con: {nim}")
            ply = strategy[player](nim,hero)
            #print(f"Hero gioca {ply}")
        nim.nimming(ply)
        player = 1 - player
    if not player:
        won += 1
print(f"TOTAL: {won/VALIDATION_MATCH}")