Copyright **`(c)`** 2023 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

# LAB10

Use reinforcement learning to devise a tic-tac-toe player.

### Deadlines:

* Submission: [Dies Natalis Solis Invicti](https://en.wikipedia.org/wiki/Sol_Invictus)
* Reviews: [Befana](https://en.wikipedia.org/wiki/Befana)

Notes:

* Reviews will be assigned  on Monday, December 4
* You need to commit in order to be selected as a reviewer (ie. better to commit an empty work than not to commit)

In [1633]:
from itertools import combinations
from random import choice, choices
import numpy as np
import pickle
import time

## Tic-Tac-Toe game
The game is implemented as a sum of 15 game, so the goal is pick three number from 1 to 9 that sum to 15. If the numbers are displayed in a 3x3 grid as below the goal is to pick three numbers that are in a straight line (horizontal, vertical or diagonal), as tic tac toe game.

| **2** | **7** | **6** |
|-------|-------|-------|
| **9** | **5** | **1** |
| **4** | **3** | **8** |




In [1634]:
SEQUENCE = [2, 7, 6, 9, 5, 1, 4, 3, 8]

class TicTacToe():
    def  __init__(self, board=None, x=None, o=None):
        self.board = frozenset(board) if type(board) == set else frozenset(SEQUENCE) 
        self.x = frozenset(x) if x else frozenset()
        self.o = frozenset(o) if o else frozenset()
    
    def __str__(self):
        return str(self.board) + " " + str(self.x) + " " + str(self.o)
    
    def __key(self):
        return (self.board, self.x, self.o)

    def __hash__(self):
        return hash(self.__key())

    def __eq__(self, other):
        if isinstance(other, TicTacToe):
            return self.board == other.board and self.x == other.x and self.o == other.o
        return NotImplemented
    
    """
    Show the board in a human readable format
    """
    def show (self):
        for i, move in enumerate(SEQUENCE):
            print(" ", end="")
            if move in self.x:
                print("X", end="")
            elif move in self.o:
                print("O", end="")
            else:
                print(".", end="")
            if i % 3 == 2:
                print()
        print()

    """
    Change the board using a possible moves for a player
    """
    def move(self, player, pos):
        if pos in self.board:  
            if player == 0:
                self.x = self.x.union({pos})
            else:
                self.o = self.o.union({pos})
            self.board = self.board.difference({pos})
            return True
        else:
            return False
    
    """
    Check if a player has won or if the game is a draw
    """
    def check_win(self):
        if TicTacToe.check(self.x):
            if TicTacToe.check(self.o):
                raise ValueError("Both players have won")
            return 0
        elif TicTacToe.check(self.o):
            return 1
        elif len(self.board) == 0:
            return -1
        else:
            return None
        
    def copy(self):
        return TicTacToe(set(self.board), set(self.x), set(self.o))

    """
    Check all the possible triplets of moves to see if a player has won
    """
    @staticmethod
    def check(moves):
        if any([sum(triple) == 15 for triple in combinations(moves, 3)]):
            return True 
        else:
            return False
    
    """
    Flip the board along the diagonal
    """
    def flip_function (x):
        if x > 6:
            return x - 6
        elif x > 3:
            return x
        else:
            return x + 6
        
    """
    Rotate the board 90 degrees clockwise
    """
    def rotate_function (x):
        return (10 - 2*x) % 10 if x % 2 == 0 else (5 - 2*x) % 10
    
    """
    Transform the board using a function above
    """
    def transform(self, function):
        return TicTacToe(set([function(i) for i in self.board]), set([function(i) for i in self.x]), set([function(i) for i in self.o]))

    """
    Get the inverse transformations of a list of transformations
    """
    def get_inverse_transformations(transformations):
        inverse_transformations = []
        for transformation in transformations:
            if transformation == TicTacToe.flip_function:
                inverse_transformations.append(TicTacToe.flip_function)
            else:
                for _ in range(3):
                    inverse_transformations.append(TicTacToe.rotate_function)
        inverse_transformations.reverse()
        return inverse_transformations

    """
    Get the transformation of the board to another board
    """
    def get_transformation(self, other):
        equilvalent_game = other
        transformation = []
        for _ in range(2):
            for _ in range(4):
                if self == equilvalent_game:
                    return transformation
                equilvalent_game = equilvalent_game.transform(TicTacToe.rotate_function)
                transformation.append(TicTacToe.rotate_function)
            equilvalent_game = other.transform(TicTacToe.flip_function)
            transformation = [TicTacToe.flip_function]
            
        return []
    
    """
    Check if two boards are equivalent by checking if one is a transformation of the other
    """
    def equivalent(self, other):
        return self.get_transformation(other) != []   
    
    """
    Get an equivalent board and the transformations to get to it from a list of possible equivalent boards
    """
    def get_equivalent(self, possible_equivalents):
        for equivalent in possible_equivalents:
            transformations = equivalent.get_transformation(self) 
            if transformations != []:
                return equivalent, transformations
        return self , []
        

## Environment

The environment gives reward, next state and if the game is finished given the current state and the action. The state is represented as a TicTacToe object, the action is represented as a number from 1 to 9, the reward is a number different for each situation (win, lose, draw, invalid move). The next state is a TicTacToe object, the game is finished if the game is won, lost, draw or if the action is invalid.

The environment implements dome strategies to play the game as the oppoent of the agent. The strategies are:
* random: pick a random action
* win move: if there is a move that wins the game pick it
* win loss move: if there is a move that wins the game pick it, otherwise if there is a move that does not let win the agent pick it
* me: let the human play

The states are saved in a file, so they can be reused in the next run of the program. All the states are not equivalent to each other, so they are minimized using the symmetries of the game.

In [1635]:
class Environment():
    INVALID_MOVE_REWARD = -1
    MOVE_REWARD = 0.02
    WIN_REWARD = 1
    LOSE_REWARD = -1
    DRAW_REWARD = 0

    def __init__(self, player = 0, strategy=None) -> None:
        self.states = Environment.get_states()
        self.player = player
        self.strategy = strategy if strategy else Environment.random_strategy
    
    """
    Get all the possible states of the game
    """
    def get_states():
        try:
            with open('states.npy', 'rb') as f:
                states = [np.load(f, allow_pickle=True) for _ in range(len(SEQUENCE) + 1)]
        except FileNotFoundError:
            states = Environment.generate_states()
            with open('states.npy', 'wb') as f:
                for state in states:
                    np.save(f, state)

        return states
    
    """
    Generate all the possible states of the game
    """
    def generate_states():
        states = []

        for depth in range(len(SEQUENCE) + 1):
            boards = [set(e) for e in combinations(SEQUENCE, depth)]
            good_games = []

            for board in boards:
                x_and_o = set(SEQUENCE) - board
                o = [set(e) for e in combinations(x_and_o, len(x_and_o) // 2)]
                x = [x_and_o - x_moves for x_moves in o]
                
                for x_moves, o_moves in zip(x, o):
                    game = TicTacToe(board, x_moves, o_moves)
                    equivalent = [game.equivalent(good) for good in good_games]

                    if not any(equivalent):
                        try:
                            game.check_win()
                            good_games.append(game)
                        except ValueError:
                            pass
                        
            states.append(good_games)

        states.reverse()

        return states
    
    def random_strategy(self, actions):
        return choice(list(actions))
    
    def win_move_strategy(self, actions):
        for action in actions:
            game = self.current_state.copy()
            game.move(1 - self.player, action)
            if game.check_win() == 1 - self.player:
                return action
        return Environment.random_strategy(self, actions)
    
    def win_loss_move_strategy(self, actions):
        for action in actions:
            game = self.current_state.copy()
            game.move(1 - self.player, action)
            if game.check_win() == 1 - self.player:
                return action
        for agent_action in actions:
            agent_game = game.copy()
            agent_game.move(self.player, agent_action)
            if agent_game.check_win() == self.player:
                return agent_action
        return Environment.random_strategy(self, actions)
    
    def me_strategy(self, actions):
        print("Current state:")
        show_state = self.transform_inv_state(self.current_state)
        show_state.show()
        time.sleep(0.2)
        index = int(input("Enter move: "))
        action = SEQUENCE[index - 1]
        for transformation in self.transformations:
            action = transformation(action)
        return action
    
    """
    Reset the environment to initial state
    """
    def reset(self):
        self.transformations = []
        self.current_state = choice(self.states[0])
        if self.player == 1:
            action = self.strategy(self, self.current_state.board)
            self.current_state.move(1 - self.player, action)

            self.update_transformations()

        return self.current_state, False
    
    """
    Transform the equivalent state back to the original state
    """
    def transform_inv_state(self, state):
        for transformation in TicTacToe.get_inverse_transformations(self.transformations):
            state = state.transform(transformation)
        return state
    
    """
    Transform the equivalent action back to the original action
    """
    def transform_inv_action(self, action):
        for transformation in TicTacToe.get_inverse_transformations(self.transformations):
            action = transformation(action)
        return action
    
    """
    Update the current state to an equivalent state that is know from the environment
    """
    def update_transformations(self):
        possible_equivalents = self.states[len(SEQUENCE) - len(self.current_state.board)]
        self.current_state, transformation = self.current_state.get_equivalent(possible_equivalents)
        self.transformations += transformation 
    
    """
    Make a move in the environment
    """
    def step(self, action):
        
        if not self.current_state.move(self.player, action):
            return self.current_state, Environment.INVALID_MOVE_REWARD, True

        self.update_transformations()

        win = self.current_state.check_win()

        if win == self.player:
            return self.current_state, Environment.MOVE_REWARD + Environment.WIN_REWARD, True
        elif win == -1:
            return self.current_state, Environment.MOVE_REWARD + Environment.DRAW_REWARD, True
        
        env_action = self.strategy(self, self.current_state.board)
        self.current_state.move(1 - self.player, env_action)

        self.update_transformations()

        win = self.current_state.check_win()

        if win == 1 - self.player:
            return self.current_state, Environment.MOVE_REWARD + Environment.LOSE_REWARD, True
        elif win == -1:
            return self.current_state, Environment.MOVE_REWARD + Environment.DRAW_REWARD, True
        
        return self.current_state, Environment.MOVE_REWARD, False

## Agent

The agent is a Q-learning agent that use a Monte Carlo aproach, so from each game it updates the Q value function with the rewards of the environment. The Q values are an estimation of thw expected reward of each action in each state. The Q values are updated using the formula:

`Q(s, a) = Q(s, a) + (reward - Q(s, a)) / N(s, a)`

where `N(s, a)` is the number of times the agent has visited the state `s` and has taken the action `a`. The reward is the reward of the environment.

The agent has a policy that is epsilon greedy, so it picks a random action with probability `epsilon/number of moves` and the best action with probability `epsilon/number of moves + (1 - epsilon)`, where the best action is the action with the highest Q value. The epsilon is decreased at each game, so the agent starts with a random policy and then it starts to exploit the Q values.

The Q values is saved in a file, so it can be reused in the next run of the program. ALso the number of games played is saved in a file.

In [1636]:
class Agent():
    """
    Good value for greedy_exp:
    - 0.1 or 0.2 for promote exploration
    - 0.5 for balance exploration and exploitation
    - 1 for promote exploitation
    - 10 for optimal policy
    """
    def __init__(self, greedy_exp=0.5) -> None:
        self.q_values = Agent.get_q_values()
        self.episodes = Agent.get_episodes()
        self.greedy_exp = greedy_exp

    """
    Define the policy of the agent
    """
    def e_greedy_policy(self, state):
        probability = [1/(len(SEQUENCE)*self.episodes**self.greedy_exp) + (1 - 1/(self.episodes**self.greedy_exp)) if action == np.argmax(self.q_values[state][0]) else 1/(len(SEQUENCE)*self.episodes**self.greedy_exp) for action in range(len(SEQUENCE))]
        action = choices(SEQUENCE, probability, k=1).pop()

        return action

    def optimal_policy(self, state):
        return SEQUENCE[np.argmax(self.q_values[state][0])]

    """
    Get the q values from a file or generate them
    """
    def get_q_values():
        try:
            with open('q_values.pkl', 'rb') as fp:
                q_values = pickle.load(fp)
        except FileNotFoundError:
            q_values = Agent.generate_q_values()

        return q_values

    """
    Generate the q values
    """
    def generate_q_values():
        q_values = {state: np.zeros((2, len(SEQUENCE))) for state in np.concatenate(Environment.get_states())}

        return q_values
    """
    Get the number of episodes from a file or generate them
    """
    def get_episodes():
        try:
            with open('episodes.pkl', 'rb') as fp:
                episodes = pickle.load(fp)
        except FileNotFoundError:
            episodes = 1

        return episodes
    """
    Update the q values using the policy improvment algorithm
    """
    def policy_improvment(self, episode_states, episode_actions, episode_rewards):
        for index, (state, action) in enumerate(zip(episode_states, episode_actions)):
            index_action = SEQUENCE.index(action)
            # print("State: ", state)
            # state.show()
            # print("Action: ", action)

            cumulative_reward = sum(episode_rewards[index:])

            # print("Cumulative reward: ", cumulative_reward)
            # print("Q value: ", self.q_values[state][0])
            # print("Q value count: ", self.q_values[state][1])

            self.q_values[state][1][index_action] += 1
            self.q_values[state][0][index_action] += (cumulative_reward - self.q_values[state][0][index_action]) / self.q_values[state][1][index_action]
            
            # print("Q value update: ", self.q_values[state][0])
            # print("Q value count update: ", self.q_values[state][1])
        self.episodes += 1
        
    """
    Save the q values and the number of episodes to files
    """
    def save(self):
        with open('q_values.pkl', 'wb') as fp:
            pickle.dump(self.q_values, fp)
        with open('episodes.pkl', 'wb') as fp:
            pickle.dump(self.episodes, fp)
        

        

## RL algorithm

The reinforcement learning algorithm is implemented as a function that run an episode of the game. The function takes as input the agent and the strategy of the opponent. The function returns three list of rewards, states and actions. 
Each episode is runned until the game is finished. At each step the agent picks an action using the policy, then the environment gives the reward, the next state and if the game is finished. At the end of the episode the Q values are updated using the rewards, states and actions.

If you want to train from zero delete the files `q_values.pkl` and `episodes.pkl` and run the code

In [1637]:
def episode(agent, player, env_strategy=Environment.random_strategy, verbose=True): 
    env = Environment(player, env_strategy)
    state, end = env.reset()

    if verbose:
        print("Agent play as player ", player)

    states = [state.copy()]
    actions = []
    rewards = []

    while not end:
        if verbose:
            value = agent.q_values[state]
            show_state = env.transform_inv_state(state)
            show_state.show()
            print("Moves:\t\t\t", end="")
            for i in SEQUENCE:
                print("{}   \t".format(env.transform_inv_action(i)), end="")
            print("\nExpected reward:\t", end="")
            for j in value[0]:
                print("{:.2f}\t".format(j), end="")
            print()
                
        action = agent.optimal_policy(state)
        if verbose:
            print("Agent action: ", env.transform_inv_action(action))
            print()
        state, reward, end = env.step(action)


        states.append(state.copy())
        actions.append(action)
        rewards.append(reward)

    if verbose:
        print("Final state: ")
        show_state = env.transform_inv_state(state)
        show_state.show()

    return states, actions, rewards


In [1641]:

iterations = 1000
agent = Agent(0.2)
win = 0
loss = 0

print(f"Exploaration rate: {1/(agent.episodes**agent.greedy_exp):.4f}") 

for i in range(iterations):
    
    states, actions, rewards = episode(agent, i%2, env_strategy=Environment.win_loss_move_strategy, verbose=False)
    print(f"Game: {i} Reward: {sum(rewards):.2f}", end="\r") 
    if sum(rewards) > 0.5:
        win += 1
    elif sum(rewards) < -0.5:
        loss += 1

    agent.policy_improvment(states, actions, rewards)

agent.save()

print("Game won: ", win, " "*50)  
print("Game lost: ", loss)
             



Exploaration rate: 0.0000
Game won:  709                                                   
Game lost:  34


## Play against the agent
Set the variable `play_against_agent` to `True` to play against the agent and set the variable `agent_start` to `True` if the agent starts the game.
Your possible action are number from 1 to 9 where each number correspond to a cell in the grid as below:

| **1** | **2** | **3** |
|-------|-------|-------|
| **4** | **5** | **6** |
| **7** | **8** | **9** |

In [1639]:
play_against_agent = False
agent_start = False

if play_against_agent:
    agent = Agent(100)
    states, actions, rewards = episode(agent, 0 if agent_start else 1, verbose=True, env_strategy=Environment.me_strategy)
    if sum(rewards) < -0.5:
        print("You win!!!")
    elif sum(rewards) > 0.5:
        print("You lose :(")
    else:
        print("Draw") 
