Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 3: ES

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The goal of the game is to **avoid** taking the last object.

* Task2.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task2.2: An agent using evolved rules using ES

## Instructions

* Create the directory `lab2` inside your personal course repository for the course 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.



In [1]:
from tqdm import tqdm
import logging
from pprint import pprint, pformat
from collections import namedtuple
import random
from copy import deepcopy
import numpy as np

N_ROWS = 5
N_GENERATIONS = 100
POPULATION_SIZE = 20
OFFSPRING_SIZE = 40
TRAIN_MATCHES = 100
TEST_MATCHES = 200
K = 4

## The *Nim* and *Nimply* classes

In [2]:
Nimply = namedtuple("Nimply", "row, num_objects")


In [3]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects # , f"{self._rows[row]} < {num_objects}"
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects


## Sample (and silly) startegies 

In [4]:
def pure_random(state: Nim) -> Nimply:
    """A completely random move"""
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)


In [5]:
def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))


In [6]:
def nim_sum(state: Nim) -> int:
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)

def analize(raw: Nim) -> dict:
    cooked = dict()
    cooked["possible_moves"] = dict()
    for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, (c + 1) if raw._k == None else min(c + 1, raw._k))):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = nim_sum(tmp)
    return cooked

def find_all_moves (state: Nim):
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    moves = [ply for ply, ns in analysis["possible_moves"].items()]
    return moves


def optimal(state: Nim) -> Nimply:
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = random.choice(spicy_moves)
    return ply


In [7]:
def find_spicy_moves(state: Nim) -> list:  #list(Nimply)
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    return spicy_moves

def choose_play(gen: dict):
    return np.random.choice(list(gen["rules"].keys()), p=list(gen["rules"].values()))
    
def make_move(mov, nim: Nim, spicy_moves: list):
    best_sol = None
    rows = nim.rows
    if mov == "emptiest_row":   #find the move that leaves its own row as empty as possible
        for sp in spicy_moves:
            rem_elem = rows[sp[0]] - sp[1]
            if best_sol == None or rem_elem < best_sol[1]:
                best_sol = (sp, rem_elem)
    elif mov == "fullest_row":   #find the move that leaves its own row as full as possible
        for sp in spicy_moves:
            rem_elem = rows[sp[0]] - sp[1]
            if best_sol == None or rem_elem > best_sol[1]:
                best_sol = (sp, rem_elem)
    elif mov == "largest_take":   #find the move that allows you to take the greatest possible number of matches
        for sp in spicy_moves:
            if best_sol == None or sp[1] > best_sol[1]:
                best_sol = (sp, sp[1])
    return best_sol[0]

def play_single_match(genome: dict, opponent_strategy):
    strategy = (None, opponent_strategy)
    
    nim = Nim(N_ROWS, K)
    # logging.info(f"init : {nim}")
    player = 0
    while nim:
        if strategy[player] is not None:
            ply = strategy[player](nim)
        else:
            moves = find_all_moves(nim)
            move = choose_play(genome)
            ply = make_move(move, nim, moves)
        # logging.info(f"ply: player {player} plays {ply}")
        nim.nimming(ply)
        # logging.info(f"status: {nim}")
        player = 1 - player
    # logging.info(f"status: Player {player} won!") 
    return player

def play_many_matches(genome: dict, opponent_strategy, n_matches):
    n_wins = 0
    for _ in range(n_matches):
        if play_single_match(genome, opponent_strategy) == 0:
            n_wins += 1
    return n_wins

def normalize_values(values):
    som = sum(values)

    for i in range(len(values)):
        values[i] = values[i] / som
    return values

def adaptive(opponent_strategy):
    """A strategy that can adapt its parameters"""
    population = []

    for _ in range(POPULATION_SIZE):   #initialize population
        init_values = list(np.random.dirichlet(np.ones(3)))
        new_ind = {
            "rules": {
                "emptiest_row": init_values[0],
                "fullest_row": init_values[1],
                "largest_take": init_values[2],
            },
            "variance": 1
        }
        n_wins = play_many_matches(new_ind, opponent_strategy, TRAIN_MATCHES)
        population.append((new_ind, n_wins))
    
    for gen in tqdm(range(N_GENERATIONS)):
        offspring = []
        for ind in population:
            ind[0]["variance"] = np.abs(np.random.normal(loc=ind[0]["variance"], scale=0.2))   #modifying the variance of the individual
            for _ in range(OFFSPRING_SIZE // POPULATION_SIZE):
                new_values = normalize_values(np.abs(np.random.normal(loc = list(ind[0]["rules"].values()), scale = ind[0]["variance"])))
                k = list(ind[0]["rules"].keys())
                new_ind = dict()
                new_ind["rules"] = dict()
                new_ind["variance"] = ind[0]["variance"]
                for i in range(len(k)):
                    new_ind["rules"][k[i]] = new_values[i]
                count_wins = play_many_matches(new_ind, opponent_strategy, TRAIN_MATCHES)
                offspring.append((new_ind, count_wins))

        population += offspring

        population = sorted(population, reverse=True, key=lambda e: e[1])
        population = population[0:POPULATION_SIZE]

        if population[0][1] > TRAIN_MATCHES / 2:
            print(f"Stopped at the {gen + 1}-th generation")
            break
        
    return max(population, key=lambda e : e[1])

## Oversimplified match

In [8]:
# logging.getLogger().setLevel(logging.INFO)

# strategy = (optimal, pure_random)

# nim = Nim(N_ROWS)
# logging.info(f"init : {nim}")
# player = 0
# while nim:
#     ply = strategy[player](nim)
#     logging.info(f"ply: player {player} plays {ply}")
#     nim.nimming(ply)
#     logging.info(f"status: {nim}")
#     player = 1 - player
# logging.info(f"status: Player {player} won!")


In [9]:
winner = adaptive(optimal)
print(f"The best individual is {winner[0]} with a win rate of {winner[1]}/{TRAIN_MATCHES}")

100%|██████████| 100/100 [37:58<00:00, 22.79s/it]

The best individual is {'rules': {'emptiest_row': 0.01852002574156935, 'fullest_row': 0.24169438283232988, 'largest_take': 0.7397855914261008}, 'variance': 1.7964259939771516} with a win rate of 50/100





In [10]:
count_wins = play_many_matches(winner[0], optimal, TEST_MATCHES)
print(f"Final win rate: {count_wins}/{TEST_MATCHES}")

Final win rate: 71/200
