Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 2: ES

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The goal of the game is to **avoid** taking the last object.

* Task2.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task2.2: An agent using evolved rules using ES

## Instructions

* Create the directory `lab2` inside the course repo 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.



In [74]:
import logging
from pprint import pprint, pformat
from collections import namedtuple
import random
from copy import deepcopy
import numpy as np
from tqdm.notebook import tqdm


## Constant

In [75]:
N_ROWS = 5
N_AGENTS_IN_ENVIRONMENT = 100
N_REPETITION = 5

## The *Nim* and *Nimply* classes

In [76]:
Nimply = namedtuple("Nimply", "row, num_objects")

class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects


## Sample (and silly) startegies 

In [77]:
def pure_random(state: Nim) -> Nimply:
    """A completely random move"""
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)

def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))


## Optimal strategy

In [78]:

def nim_sum(state: Nim) -> int:
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)


def analize(raw: Nim) -> dict:
    cooked = dict()
    cooked["possible_moves"] = dict()
    for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c + 1)):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = nim_sum(tmp)
    return cooked


def optimal(state: Nim) -> Nimply:
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")

    number_of_heaps_that_has_two_or_more_objects = sum(np.array(state.rows) >= 2)
    number_of_heaps_that_has_objects = sum(np.array(state.rows) > 0)

    if number_of_heaps_that_has_two_or_more_objects > 1:
        spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns == 0]
        if not spicy_moves:
            spicy_moves = list(analysis["possible_moves"].keys())
        ply = random.choice(spicy_moves)
    elif number_of_heaps_that_has_two_or_more_objects == 1:
        if number_of_heaps_that_has_objects % 2 == 0:
            heaps = np.array(state.rows)
            index = np.argwhere(heaps >= 2).flatten()[0]
            number_of_objects_to_remove = heaps[index]

            ply = Nimply(int(index), int(number_of_objects_to_remove))
        else:
            heaps = np.array(state.rows)
            index = np.argwhere(heaps >= 2).flatten()[0]
            number_of_objects_to_remove = heaps[index]

            ply = Nimply(int(index), int(number_of_objects_to_remove - 1))
    else:
        spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns == 0]
        if not spicy_moves:
            spicy_moves = list(analysis["possible_moves"].keys())
        ply = random.choice(spicy_moves)
        
    return ply


## Adaptive Agent and Customized Agent based on predefined strategy


In [79]:
class AdaptiveAgent():
    def __init__(self, num_rows, W = None):
        self.num_rows = num_rows
        if W is None:
            self.W = np.random.random(size = (4 * num_rows,num_rows))
        else:
            self.W = W

    def get_action(self, state:Nim) -> Nimply:
        input = np.array(state.rows)
        result = np.dot(self.W, input)

        return self.interpret(result, state)

    def interpret(self, output, state:Nim):
        rows = state.rows
        actions = np.flip(np.argsort(output))

        for chosen_action in actions:
            index = int(chosen_action // 4)
            action = chosen_action % 4

            # 0 is take 1
            # 1 is take 2
            # 2 is take all but one
            # 3 is take all
            if action == 0:
                if rows[index] >= 1:
                    return Nimply(index, 1)
                else:
                    continue
            elif action == 1:
                if rows[index] >= 2:
                    return Nimply(index, 2)
                else:
                    continue
            elif action == 2:
                if rows[index] > 1:
                    return Nimply(index, int(rows[index]) - 1)
                else:
                    continue
            else:
                if rows[index] > 0:
                    return Nimply(index, int(rows[index]))
                else: 
                    continue
        
class CustomAgent():

    def __init__(self, intelligence):
        self.intelligence = intelligence
    
    def get_action(self, state: Nim):
        if random.random() < self.intelligence:
            return optimal(state)
        else:
            return pure_random(state)
    

## Oversimplified match

In [80]:
logging.getLogger().setLevel(logging.INFO)

strategy = (optimal, pure_random)

nim = Nim(5)
logging.info(f"init : {nim}")
player = 0
while nim:
    ply = strategy[player](nim)
    logging.info(f"ply: player {player} plays {ply}")
    nim.nimming(ply)
    logging.info(f"status: {nim}")
    player = 1 - player
logging.info(f"status: Player {player} won!")


INFO:root:init : <1 3 5 7 9>
INFO:root:ply: player 0 plays Nimply(row=4, num_objects=9)
INFO:root:status: <1 3 5 7 0>
INFO:root:ply: player 1 plays Nimply(row=0, num_objects=1)
INFO:root:status: <0 3 5 7 0>
INFO:root:ply: player 0 plays Nimply(row=1, num_objects=1)
INFO:root:status: <0 2 5 7 0>
INFO:root:ply: player 1 plays Nimply(row=1, num_objects=2)
INFO:root:status: <0 0 5 7 0>
INFO:root:ply: player 0 plays Nimply(row=3, num_objects=2)
INFO:root:status: <0 0 5 5 0>
INFO:root:ply: player 1 plays Nimply(row=3, num_objects=3)
INFO:root:status: <0 0 5 2 0>
INFO:root:ply: player 0 plays Nimply(row=2, num_objects=3)
INFO:root:status: <0 0 2 2 0>
INFO:root:ply: player 1 plays Nimply(row=3, num_objects=2)
INFO:root:status: <0 0 2 0 0>
INFO:root:ply: player 0 plays Nimply(row=2, num_objects=1)
INFO:root:status: <0 0 1 0 0>
INFO:root:ply: player 1 plays Nimply(row=2, num_objects=1)
INFO:root:status: <0 0 0 0 0>
INFO:root:status: Player 0 won!


## Fitness function & Environment


In [81]:
pool = []

for n in range(N_AGENTS_IN_ENVIRONMENT):
    pool.append(CustomAgent(n / N_AGENTS_IN_ENVIRONMENT))

def fitness(adaptive_agent):
    count = 0 

    for epoch in range(N_REPETITION):
        for competing_agent in pool:
            adaptive = lambda x : adaptive_agent.get_action(x)
            compete = lambda x :  competing_agent.get_action(x)

            strategy = (adaptive, compete)

            nim = Nim(N_ROWS)
            #logging.info(f"init : {nim}")
            player = 0
            while nim:
                ply = strategy[player](nim)
                #logging.info(f"ply: player {player} plays {ply}")
                nim.nimming(ply)
                #logging.info(f"status: {nim}")
                player = 1 - player
            #logging.info(f"status: Player {player} won!")

            if player == 0:
                count += 1
    
    return count / N_REPETITION 

def eval(offsprings):
    result = []

    for W in offsprings:
        adaptive_agent = AdaptiveAgent(num_rows = N_ROWS, W = W)
        result.append(fitness(adaptive_agent))
    
    if len(result) == 1:
        return result[0]
    else:
        return result


## 1 + Lambda Strategy 

In [86]:
ld = 20 
sigma = 0.1

solution = np.random.random(size = (4*5, 5))
history = []
best = np.copy(solution)
best_value = eval([best])

for n in tqdm(range(1000 // ld)):
    offsprings = (
        np.random.normal(loc = 0, scale = sigma, size = (ld, 4*N_ROWS, N_ROWS)) + solution
    )

    evals = eval(offsprings)
    solution = offsprings[np.argmax(evals)]
    solution_value = evals[np.argmax(evals)]
    
    if best_value < solution_value:
        best_value = solution_value
        best = np.copy(solution)
        history.append((n, solution_value))

#logging.info(f"Best solution: {rastrigin(best_so_far)}")

#history = np.array(history)
#plt.figure(figsize=(14, 4))
#plt.plot(history[:, 0], history[:, 1], marker=".")


  0%|          | 0/50 [00:00<?, ?it/s]

In [85]:
history

[(0, 36.2), (1, 42.4)]