Copyright **`(c)`** 2023 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

# LAB10

Use reinforcement learning to devise a tic-tac-toe player.

### Deadlines:

* Submission: [Dies Natalis Solis Invicti](https://en.wikipedia.org/wiki/Sol_Invictus)
* Reviews: [Befana](https://en.wikipedia.org/wiki/Befana)

Notes:

* Reviews will be assigned  on Monday, December 4
* You need to commit in order to be selected as a reviewer (ie. better to commit an empty work than not to commit)

In [404]:
from itertools import combinations
from collections import namedtuple, defaultdict
import random as random
from copy import deepcopy

from tqdm.auto import tqdm
import numpy as np

In [405]:
State = namedtuple('State', ['x', 'o'])

In [406]:
MAGIC = [2, 7, 6, 9, 5, 1, 4, 3, 8]

In [407]:
def print_board(pos):
    """Nicely prints the board"""
    for r in range(3):
        for c in range(3):
            i = r * 3 + c
            if MAGIC[i] in pos.x:
                print('X', end='')
            elif MAGIC[i] in pos.o:
                print('O', end='')
            else:
                print('.', end='')
        print()
    print()

In [408]:
def win(elements):
    """Checks is elements is winning"""
    return any(sum(c) == 15 for c in combinations(elements, 3))

def state_value(pos: State):
    """Evaluate state: +1 first player wins"""
    if win(pos.x):
        return 1
    elif win(pos.o):
        return -1
    else:
        return 0
    
    

In [409]:
def random_game():
    trajectory = list()
    state = State(set(), set())
    available = set(range(1, 9+1))
    while available:
        x = random.choice(list(available))
        state.x.add(x)
        trajectory.append(deepcopy(state))
        available.remove(x)
        if win(state.x) or not available:
            break

        o = random.choice(list(available))
        state.o.add(o)
        trajectory.append(deepcopy(state))
        available.remove(o)
        if win(state.o):
            break
    return trajectory

In [410]:
'''
value_dictionary = defaultdict(float)
hit_state = defaultdict(int)
epsilon = 0.001

for steps in tqdm(range(500_000)):
    trajectory = random_game()
    final_reward = state_value(trajectory[-1])
    for state in trajectory:
        hashable_state = (frozenset(state.x), frozenset(state.o))
        hit_state[hashable_state] += 1
        value_dictionary[hashable_state] = value_dictionary[
            hashable_state
        ] + epsilon * (final_reward - value_dictionary[hashable_state])
'''

'\nvalue_dictionary = defaultdict(float)\nhit_state = defaultdict(int)\nepsilon = 0.001\n\nfor steps in tqdm(range(500_000)):\n    trajectory = random_game()\n    final_reward = state_value(trajectory[-1])\n    for state in trajectory:\n        hashable_state = (frozenset(state.x), frozenset(state.o))\n        hit_state[hashable_state] += 1\n        value_dictionary[hashable_state] = value_dictionary[\n            hashable_state\n        ] + epsilon * (final_reward - value_dictionary[hashable_state])\n'

In [411]:
#sorted(value_dictionary.items(), key=lambda e: e[1], reverse=True)[:10]

In [412]:
#size = dim playing grid
#learning_rate = Check how much the agent should update its values 
#discount_factor = This value reflects how much the agent takes into account future rewards during learning
#exploration_prob = Represents the probability that the agent performs random action instead of choosing the action that maximizes the value Q.
class QAgent:
    def __init__(self, size=3, learning_rate=0.1, discount_factor=0.9, exploration_prob=0.1):
        self.size = size
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_prob = exploration_prob

    
        state = State(tuple(), tuple()) 
        action = 0
        self.Q = {(state, action): 0 }

    def find_state(self, my_dictionary, state_to_find):
        for this_tuple, valore in my_dictionary.items():
            if this_tuple[0] == state_to_find:
                return True
        return False
    
    def find_max(self, my_dictionary, state_to_find):
        tuple_filtrate = {k: v for k, v in my_dictionary.items() if k[0] == state_to_find}

        if not tuple_filtrate:
            # Handle the case when tuple_filtrate is empty
            return -1

        tupla_massima = max(tuple_filtrate, key=lambda k: my_dictionary[k])
        return tupla_massima[1]


    def choose_action(self, state, available):
        if random.uniform(0, 1) < self.exploration_prob:
            return random.choice(list(available))
        else:
            #best action to do in that state
            if ((state != State(tuple(), tuple()) ) and self.find_state(self.Q, state) ):
                return self.find_max(self.Q, state)
            else:
                return random.choice(list(available))

    def update_q_value(self, state, action, next_state, reward, available):
        #current_q = self.Q[state, action]
        current_q = self.Q.get((state, action), 0)  # Restituisce 0 se la chiave non esiste
        #best_next_q = max([self.Q[next_state, next_action] for next_action in available], default=0)
        best_next_q = max([self.Q.get((next_state, next_action), 0) for next_action in available], default=0)
        self.Q[state, action] = (1 - self.learning_rate) * current_q + self.learning_rate * (reward + self.discount_factor * best_next_q)

    def take_action(self, state: State, action, player, available):
        # Check if the cell is already taken
        if action in state.x or action in state.o:
            # if the cell is taken, retun current state, a negative reward and that the game is not over
            return state, -1, False
        
        #if i am here the action is valid so i can remove it from available 
        available.remove(action)

        # Copies the current state so you don't change it directly
        next_state = deepcopy(state)

        # play the action in x or o according to the player turn
        if(player[0] == "x"):
            new_x = tuple(sorted(next_state.x + (action,)))
            next_state = State(new_x, state.o)
            player[0] = "o"
        else:
            new_o = tuple(sorted(next_state.o + (action,)))
            next_state = State(state.x, new_o)
            player[0] = "x"

        # Check if there is a win or if the game ended in a draw
        if win(next_state.x) or win(next_state.o) :
            done = True
            reward = 1  # positive reward for win the game
        elif not available:
            done = True
            reward = 0  # Zero reward for draw
        else:
            done = False
            reward = 0  # No reward if the game is not over

        return next_state, reward, done

    def play(self, episodes=1000):
        for _ in tqdm(range(episodes)):
            #initial state
            state = State(tuple(), tuple())
            #avaiable moves for this episode
            available = set(range(1, 9+1))
            #random choose player order
            player = [random.choice(("x", "o"))]
            #check if the game is over
            done = False

            #Play until there are moves to make and neither player has won
            while not done:
                # control whether to take a move from those available or from Q table
                action = self.choose_action(state, available)
                # calculate next_state, raward and in the game is over
                next_state, reward, done = self.take_action(state, action, player, available)
                self.update_q_value(state, action, next_state, reward, available)
                state = next_state
            

In [413]:
q_agent = QAgent()
q_agent.play(episodes=100_000)

100%|██████████| 1000/1000 [00:00<00:00, 1865.67it/s]
