Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 3: ES

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The goal of the game is to **avoid** taking the last object.

* Task2.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task2.2: An agent using evolved rules using ES

## Instructions

* Create the directory `lab2` inside the course repo 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.



In [1]:
import logging
from pprint import pprint, pformat
from collections import namedtuple
import random
from dataclasses import dataclass 
from copy import deepcopy


## The *Nim* and *Nimply* classes

In [2]:
Nimply = namedtuple("Nimply", "row, num_objects")


In [3]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects


## Sample (and silly) startegies 

In [4]:
def pure_random(state: Nim) -> Nimply:
    """A completely random move"""
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)


In [5]:
def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))


In [6]:
def adaptive(state: Nim) -> Nimply:
    """A strategy that can adapt its parameters"""
    genome = {"love_small": 0.5}


In [7]:
import numpy as np

# Returns the nim sum of the rows (binary rappresentation XOR)
def nim_sum(state: Nim) -> int:
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)

# Create a dictionary that contains all possible (valid) moves
def analize(raw: Nim) -> dict:
    cooked = dict()
    cooked["possible_moves"] = dict()
    for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c + 1)):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = nim_sum(tmp)
    return cooked

# Returns the best moves that return a 
def optimal(state: Nim) -> Nimply:
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = random.choice(spicy_moves)
    return ply


Human -> allow a human to partecipate in the game \
Vinzgorithm -> an algorithm I tried to make knowing the rules of the game, performs quite well (modestly) \
Play -> replays the game a number of times specified in the function and returns the percentages of win of the 2 strategies

In [8]:
def get_number_of_rows(state:Nim):
    n=np.count_nonzero(state.rows)
    return n

def human(state:Nim) ->Nimply:
    while(True):
        print(f"{state} nim sum: {nim_sum(state)} and not zero columns: {get_number_of_rows(state)}")
        print("Write the row")
        r=int(input())
        print("Write the number of elements to take")
        o=int(input())

        if r<len(state.rows) and o<=state.rows[r]:
            return Nimply(r,o)



def vinzgorithm(state: Nim) -> Nimply:
    ns=nim_sum(state)
    
    if(get_number_of_rows(state)>=2):
        return Nimply(int(np.argmax(state.rows)), int(np.max(state.rows)))
    else:
        moves=[Nimply(r,c-1) for r,c in enumerate(state.rows) if(c>1)]
    
    logging.info(f"len: {len(moves)} nin_sum: {ns}")
    if(len(moves)==0):
        moves=[Nimply(n,o) for n,c in enumerate(state.rows) for o in range(1,c+1)]
    ply=random.choice(moves)
    return ply

    
def play(strategy1, strategy2, times, orig_nim=None):

    if(orig_nim is None):
        orig_nim=Nim(5)
    assert callable(strategy1) , "strategy1 is not a function"
    assert callable(strategy2) , "strategy2 is not a function"
    strategy=(strategy1,strategy2)
    win0=0
    for _ in range(times):
        nim=deepcopy(orig_nim)
        logging.info(f"init : {nim}")
        player = 0
        while nim:
            ply = strategy[player](nim)
            logging.info(f"ply: player {player} plays {ply}")
            nim.nimming(ply)
            logging.info(f"status: {nim}")
            player = 1 - player
        logging.info(f"status: Player {player} won!")
        if(player==0):
            win0+=1
    return win0/times, 1-win0/times
    

In [9]:
def enum_state(rows:tuple):
    possible_states=[rows]
    temp=rows
    for r in range(len(rows)):
        temp_list=[]
        for i in possible_states:
            temp=list(i)
            assert type(temp)!=int
            
            while (temp[r]>0):
                temp[r]-=1
                temp_list.append(tuple(temp))
        possible_states.extend(temp_list)
    
    print(len(possible_states))
    return possible_states



## Evolution Strategies
Here the individual has a fitness value used to compare it with others which is the percentage of matches won against a chosen strategy
The parameters are the probabilities of using different strategies, hopefully it converges towards the optimal or towards the vinzgorithm

The softmax normalizes the outputs


In [10]:
def softmax(array):
    exp=np.exp(array)
    return exp/np.sum(exp)


class es_individual:
    def __init__(self, *strategies, compare_strategy=optimal):
        self.strategies=strategies
        self.vec=np.array([random.random() for _ in range(len(strategies))])
        self.vec=softmax(self.vec)
        self.fitness_value=0
        self.fitness(compare_strategy)

    def strategy(self, state:Nim):
        s=np.random.choice(self.strategies, p=self.vec, replace=False)
        assert callable(s), f"strategy not callable, type: {type(s)}"
        return s(state)
    
    def fitness(self, strategy=optimal):
        _,win1=play(strategy, self.strategy, times=100)
        if(win1>self.fitness_value):
            self.fitness_value=win1
            return True
        return False
    
    def tweak(self, strategy=optimal, sigma=1):
        new_ind=deepcopy(self)
        for i in range(len(self.vec)):
            new_ind.vec[i]+=random.gauss(0,sigma)
        new_ind.vec=softmax(new_ind.vec)
        new_ind.fitness(strategy)
        return new_ind

def evolve_first_improv(epochs, nim_strategy=optimal) -> es_individual:
    individual=es_individual(pure_random, vinzgorithm, optimal, gabriele)
    sigma=1
    for i in range(epochs):
        sigma=(epochs-i)/epochs
        new_individual=individual.tweak(optimal,sigma)
        if new_individual.fitness_value>individual.fitness_value:
            individual=new_individual
    return individual

def evolve_steepest(epochs,samples, nim_strategy=optimal) -> es_individual:
    individual=es_individual(pure_random, vinzgorithm, optimal, gabriele)
    for i in range(epochs):
        sigma=(epochs-i)/epochs
        new_individual=individual.tweak(nim_strategy, sigma)

        for _ in range(samples-1):
            temp_individual=new_individual.tweak(nim_strategy,sigma)
            if temp_individual.fitness_value>new_individual.fitness_value:
                new_individual=temp_individual

        if new_individual.fitness_value>individual.fitness_value:
            individual=new_individual

    return individual

def evolve_comma_lambda(epochs, samples, nim_strategy=optimal) -> es_individual:
    individual=es_individual(pure_random, vinzgorithm, optimal, gabriele)
    for i in range(epochs):
        sigma=(epochs-i)/epochs
        new_individual=individual.tweak(nim_strategy,sigma)

        for _ in range(samples-1):
            temp_individual=new_individual.tweak(nim_strategy, sigma)
            if temp_individual.fitness_value>new_individual.fitness_value:
                new_individual=temp_individual


        individual=new_individual

    return individual

            

# This was an attempt in making a very Brute Force approach at Genetic Algorithm
(Before figuring out it was not the goal of the lab)

In [11]:
SIZE=5

default_state=Nim(SIZE)

possible_states=enum_state(default_state.rows)

class Individual:
    genome=dict()
    phenotype=dict()
    def __init__(self, strategy=optimal, calc_fitness=False):
        global possible_states, default_state
        
        for t in possible_states:
            moves=[Nimply(r,o) for r,c in enumerate(t) for o in range(1,c+1)]
            self.phenotype[t]=0
            if(len(moves)!=0):
                self.genome[t]=random.choice(moves)
            else:
                self.genome[t]=Nimply(0,0)
        
        

        if calc_fitness:
            self.recalc_fitness(strategy)

    def recalc_fitness(self, strategy=optimal):
        global default_state
        temp=deepcopy(self.phenotype)
        self.fitness=play(strategy, self.strategy,100, default_state)[1]
        if(self.fitness<0.5):
            self.phenotype=temp
        
    def strategy(self, state:Nim):
        self.phenotype[state.rows]+=1
        return self.genome[state.rows]
    
    def reset_phenotype(self):
        for i in self.phenotype.keys():
            self.phenotype[i]=0
        

def crossover(individual1:Individual, individual2:Individual):
    global possible_states
    individual3=Individual()
    individual4=Individual()
    for t in possible_states:
        if individual1.phenotype[t]==individual2.phenotype[t]:
            if random.choice([True,False]):
                individual3.genome[t]=individual1.genome[t]
                individual4.genome[t]=individual2.genome[t]
            else:
                individual4.genome[t]=individual1.genome[t]
                individual3.genome[t]=individual2.genome[t]
        elif individual1.phenotype[t]>individual2.phenotype[t]:
            individual3.genome[t]=individual1.genome[t]
            individual4.genome[t]=individual1.genome[t]
        else:
            individual3.genome[t]=individual2.genome[t]
            individual4.genome[t]=individual2.genome[t]

    if(random.random()<0.05):
        individual3=mutate(individual3)

    individual3.recalc_fitness()
    individual4.recalc_fitness()
    
    return individual3, individual4


def mutate(individual : Individual):
    global possible_states
    
    for t in possible_states:
        moves=[Nimply(r,o) for r,c in enumerate(t) for o in range(1,c+1)]
        if(len(moves)==0):
            continue
        if np.random.choice([True, False], p=[0.05,0.95]):
            individual.genome[t]=random.choice(moves)
    individual.recalc_fitness(optimal)
    return individual


def compare(individual1, individual2):
    nim=deepcopy(default_state)

    strategy=(individual1,individual2)
    assert type(individual1)==Individual
    assert type(individual2)==Individual
    assert type(strategy[0])==Individual
    player = 0
    while nim:
        ply = strategy[player].genome[nim.rows]
        nim.nimming(ply)
        player = 1 - player
    loser=mutate(strategy[1-player])
    loser.reset_phenotype()
    return strategy[player], loser


def eval(individual1, test):
    nim=deepcopy(default_state)

    strategy=(test, individual1.strategy)
    player = 0
    while nim:
        ply = strategy[player](nim)
        nim.nimming(ply)
        player = 1 - player
    return player==1

def evolve(power_of_two:int):
    n=2**power_of_two
    individuals=[Individual() for _ in range(n)]
    
    while(True):
        new_gen=[]
        for i in range(0,n,2):
            if i< n-1:
                new_gen.extend(compare(individuals[i], individuals[i+1]))
        
        if(len(new_gen)==1):
            return new_gen[0]
        
        individuals=[]
        for j in range(0,len(new_gen),2):
            individuals.extend(crossover(new_gen[j], new_gen[j+1]))
            
        n=len(individuals)

def evolve_with_mutation(power_of_two:int):
    n=2**power_of_two
    individuals=[Individual(calc_fitness=True) for _ in range(n)]
    
    while(True):
        individuals.sort(key=lambda ind: ind.fitness, reverse=True)
        print([i.fitness for i in individuals])
        winners=[]
        losers=[]
        for i in range(0,n,2):
            if i<n-1:
                win,lose=compare(individuals[i], individuals[i+1])
                winners.append(win)
                losers.append(lose)
        

        for i in range(10):    
            if len(losers)>=1:
                losers.remove(losers[random.randint(0,len(losers)-1)])
            else:
                break

        if(len(winners)==1):
            return winners[0]
        
        individuals=[]
        for j in range(0,len(winners),2):
            if j<len(winners)-1:
                individuals.extend(crossover(winners[j], winners[j+1]))
            else:
                individuals.append(winners[j])
        individuals.extend(losers)
        
        n=len(individuals)

def tournament(adv_strategy, n, max_epochs):
    individuals=[Individual(calc_fitness=True) for _ in range(n)]
    best=individuals[0]
    for e in range(max_epochs):
        individuals.sort(key=lambda ind: ind.fitness, reverse=True)
        if(individuals[0].fitness>best.fitness):
            best=deepcopy(individuals[0])
            print(play(optimal, best.strategy, 100))
        print(best.fitness," at the epoch: ",e )
        #return individuals[0]
        top=int(len(individuals)*0.2)
        bot=len(individuals)-top

        top_roulette=individuals[0:top]
        bot_roulette=individuals[top:]

        shuffle_top=random.sample(range(0,top), top)
        shuffle_bot=random.sample(range(0,bot), bot)


        new_gen=[]
        for i in range(0,len(shuffle_top), 2):
            if(i<len(shuffle_top)-1):
                new_gen.extend(crossover(top_roulette[shuffle_top[i]], top_roulette[shuffle_top[i+1]]))
            else:
                new_gen.append(mutate(top_roulette[i]))
        
        for i in range(0,len(shuffle_bot), 2):
            if(i<len(shuffle_bot)-1):
                new_gen.extend(crossover(bot_roulette[shuffle_bot[i]], bot_roulette[shuffle_bot[i+1]]))
            else:
                new_gen.append(mutate(bot_roulette[i]))
        
        individuals=new_gen


    individuals.sort(key=lambda ind: ind.fitness, reverse=True)
    
    if(individuals[0].fitness>best.fitness):
        best=deepcopy(individuals[0])


    worst=individuals[len(individuals)-1]
    print(best.fitness, " ", worst.fitness)
    return best, worst

        

def evolve2(power_of_two:int):
    n=2**power_of_two
    individuals=[Individual() for _ in range(n)]
    
    while(True):
        new_gen=[]
        for i in range(0,n,2):
            if(eval(individuals[i], optimal)):
                new_gen.append(individuals[i])
        
        if(len(new_gen)==1):
            return new_gen[0]
        if(len(new_gen)==0):
            return individuals[0]
        
        individuals=[]
        for j in range(0,len(new_gen),2):
            if j+1==len(new_gen):
                individuals.append(new_gen[j])
            else:
                individuals.extend(crossover(new_gen[j], new_gen[j+1]))
            
        n=len(individuals)


    

3840


# Core Application
Individuals are compared with the same strategy and with the same conditions

In [12]:
logging.getLogger().setLevel(logging.INFO)
logging.disable(logging.INFO)
strategy = (vinzgorithm, optimal)

nim = Nim(SIZE)

#i=Individual()
last_man_standing, worst=tournament(optimal, 100,50)


### GENETIC DOESN'T WORK WELL ###
# individual=ESIndividual()
# individual.evolve(1000, nim, 1, optimal)

### ES HOPE IT WORKS ###
#individual1=evolve_first_improv(150)
#assert type(individual1) is es_individual
w0, w1=play(optimal, last_man_standing.strategy,100, nim)
print(f"Player 0 won: {w0*100}% of times Individual1 won: {w1*100}% of times")

w0, w1=play(optimal, worst.strategy,100, nim)
print(f"Player 0 won: {w0*100}% of times Individual1 won: {w1*100}% of times")


individual2=evolve_steepest(150,6)
w0, w1=play(optimal, individual2.strategy, 100, nim)
print(f"Player 0 won: {w0*100}% of times Individual2 won: {w1*100}% of times")

individual3=evolve_comma_lambda(150,6)
w0, w1=play(optimal, individual3.strategy, 100, nim)
print(f"Player 0 won: {w0*100}% of times Individual3 won: {w1*100}% of times")


KeyboardInterrupt: 