### Preliminaries

In [None]:
# Imports

#import gymnasium as gym
import importlib
import environments
import agents
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
import numpy as np
import sys
import time
import copy
import joblib

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# Re-import environment when modified
importlib.reload(environments)

<module 'environment' from '/Users/paulsalquebre/Documents/SkyjoRL/environment.py'>

In [2]:
# Useful functions

def inverse_permutation(perm):
    inv = [0] * len(perm)
    for i, p in enumerate(perm):
        inv[p] = i
    return inv

def get_all_q_values(deck_values, visible_card, q_values_dict):
    n_col = deck_values.shape[1]

    sorted_indices = np.argsort(deck_values, axis=0)
    sorted_deck = np.take_along_axis(deck_values, sorted_indices, axis=0)

    q_values_grid = np.zeros((n_col, 4))

    for j in range(n_col):
        inverse_indices = inverse_permutation(sorted_indices[:,j])
        column = sorted_deck[:,j]
        key = tuple(map(int, np.append(column, visible_card)))
        q_values = q_values_dict[key].copy()
        q_values[1:] = q_values[1:][inverse_indices]
        q_values_grid[:,j] = q_values
    
    return q_values_grid

In [3]:
# A function to print the current board state

def print_board(state):
    """Print the board as described by the state"""
    player_turn = state["player_turn"]
    players_deck_i_values = state["player_deck_i_values"]
    players_deck_i_mask = state["player_deck_i_mask"]
    visible_card_value = state["visible_card_value"]
    done = state["done"]
    n_players = len(players_deck_i_values)

    if done:
        print("The game has ended!")
        for p in range(n_players):
            print(f"Player {p} has a score of {np.sum(players_deck_i_values[p])}")
    else:
        # Print player turn
        if player_turn[1] == 0:
            #print(f"Player {player_turn[0]} needs to draw a card.")
            print(f"Visible card: {visible_card_value}\n")
        else:
            #print(f"Player {player_turn[0]} has drawn the card {visible_card_value} from the draw pile or the visible card.\n")
            print(f"Card drawn: {visible_card_value}\n")

    grids_lines = []

    for p in range(n_players):
        player_deck_values = players_deck_i_values[p]
        player_deck_mask = players_deck_i_mask[p]

        lines = [f"----- PLAYER {p} -----"]
        for i in range(player_deck_values.shape[0]):
            row = "|"
            for j in range(player_deck_values.shape[1]):
                if player_deck_mask[i, j] == 1:
                    row += f"  {player_deck_values[i, j]:02}"
                else:
                    row += "  **"
            row += "  |"
            lines.append(row)
        lines.append("--------------------")
        lines.append("")

        grids_lines.append(lines)

    max_lines = max(len(lines) for lines in grids_lines)
    for lines in grids_lines:
        while len(lines) < max_lines:
            lines.append(" " * len(lines[0]))

    for i in range(max_lines):
        print("    ".join(grids_lines[p][i] for p in range(n_players)))

In [4]:
# A baseline agent for evaluation

class BaselineAgent:
    def __init__(self, player_id, visible_card_threshold=3, discard_card_threshold=6, flip_card_proba=0.2):
        self.player_id = player_id
        self.visible_card_threshold = visible_card_threshold
        self.discard_card_threshold = discard_card_threshold
        self.flip_card_proba = flip_card_proba

        self.latest_action = None
    
    def get_action(self, observation: dict):
        """From the observation, returns the action chosen by the agent."""
        deck_values = observation["deck_values"]
        deck_mask = observation["deck_mask"]
        visible_card = observation["visible_card"]
        player_turn = observation["player_turn"]

        if player_turn[0] != self.player_id: # Raise an error if we ask our agent to player and its not its turn to player
            raise ValueError(f"It's not my turn to play! I am player {self.player_id}, whereas player {player_turn[0]} needs to play.")
        
        if player_turn[1] == 0: # Our agent needs to choose wether to draw the visible card, or from the draw pile
            if visible_card <= self.visible_card_threshold: # Draw the visible card if its value is lower than a threshold
                action = [1,-1,-1,-1]
            else:
                action = [0,-1,-1,-1]

        else: # Our agent needs to decide wether to discard the card or not, and then choose a position
            indices_facedown_cards = np.argwhere(deck_mask == False) # Indices of the face-down cards            
            i, j = indices_facedown_cards[np.random.randint(0, indices_facedown_cards.shape[0])] # Get some random face-down card
            if np.random.rand() < self.flip_card_proba: # We force our agent to choose to flip a random face-down card with some probability

                if visible_card >= self.discard_card_threshold: # Discard the drawn card if its value is higher or equal than a threshold
                    action = [-1,0,i,j]
                else:   # Keep the card
                    action = [-1,1,i,j]
            else:
                max_value_deck = np.max(deck_values[deck_mask])
                if max_value_deck > visible_card: # Keep the card, and replace it by the highest card in the deck
                    id_max = list(zip(*np.where(deck_values == max_value_deck)))[0]
                    action = [-1,1,id_max[0],id_max[1]]
                else:   # Discard the card, and flip a random face-down card
                    action = [-1,0,i,j]
        
        return action

    def get_latest_action(self):
        return self.latest_action

In [5]:
# Define our master agent

class MasterAgent:
    def __init__(self, column_q_values):
        self.column_q_values = column_q_values
    
    def select_action(self, obs):
        deck_values = obs["deck_values"].copy()
        deck_mask = obs["deck_mask"].copy().astype(bool)
        visible_card = obs["visible_card"]
        is_first_action = obs["player_turn"][1]

        _, n_col = deck_values.shape

        deck_values[~deck_mask] = -3

        q_values = get_all_q_values(deck_values, visible_card, self.column_q_values)

        flat_indices = np.argsort(q_values.ravel())[::-1]
        coords = [np.unravel_index(i, q_values.shape) for i in flat_indices]

        gains = np.zeros(4)
        for c in range(4):
            q_values_col = q_values[:,c].copy()
            id_q_max = np.argmax(q_values_col)
            q_max = q_values_col[id_q_max]
            if q_max == -float("inf"):
                gains[c] = -float("inf")
            else:
                q_values_col[id_q_max] = -float("inf")
                gains[c] = q_max - np.max(q_values_col)
        
        #print(f"Columns gains: {gains}")
        #print("\nQ-values:")
        #print(q_values)
        #print("")

        best_columns = np.argsort(gains)[::-1]
        coords = []
        for c in best_columns:
            a = np.argmax(q_values[:,c])
            coords.append((a,c))

        unauthorized_columns = []
        for i in range(len(coords)):
            a, c = coords[i]
            action_found = False
            if (c not in unauthorized_columns) and ((a != 0) or not all(deck_mask[:,c])): # We check that there is a hidden card in the column c or we can use the drawn card
                col_action, action_id = c, a
                action_found = True
                break
            else:
                unauthorized_columns.append(c)
        
        #print(f"Best valid action: column {c}, action {a} - q-value {q_values[a,c]:.02f}")

        if not action_found: # It seems that no actions are available -> raise an issue
            raise ValueError("No valid actions found.")

        if is_first_action == 0:
            if action_id == 0: # Best action is to discard the card, and draw from the pile instead
                action = [0,-1,-1,-1]
                return torch.tensor(action)
            else:   # The best action is to use this card, therefore draw the visible card
                action = [1,-1,-1,-1]
                return torch.tensor(action)
        else:
            # We need to first check that it is allowed to uncover a card in the column col_action,
            # otherwise we will take the next best action

            
            if action_id == 0: # Best action is to discard the card, and uncover a new card
                # We then want to discard the card and uncover a new card
                i, j = np.where(deck_mask[:,col_action] == False)[0][0], col_action
                action = [-1,0,i,j]
                return torch.tensor(action)
            
            else: # Best action is to use the card and replace it
                i, j = action_id-1, col_action
                action = [-1,1,i,j]
                return torch.tensor(action)

In [None]:
# Initialization

env = environments.SkyjoEnv(n_players=1)
q_values_dict = joblib.load("q_values_column_max5rounds.joblib")
#q_values_dict = joblib.load("q_values_column_max10rounds.joblib")
print(f"Q-values dict loaded: {len(q_values_dict)} entries")
agent = MasterAgent(q_values_dict)

Q-values dict loaded: 12240 entries


In [None]:
# A 2-player game, with the baseline agent

env = environments.SkyjoEnv(n_players=2)
observation = env.reset()
baseline_agent = BaselineAgent(player_id=1)

done = False
c = 0

while not done:
    c += 1
    print(f"Round {c}, player 0 needs to play")
    print_board(env.get_state())
    #valid_actions = env.get_valid_actions(observation["player_turn"], observation["deck_mask"], device=device)
    action = agent.select_action(observation)
    if action[0] == 0:
        print(f"The player draws from the pile")
    else:
        print(f"The player draws the visible card")
    observation, _, _ = env.step(action)

    print(f"The player has drawn the card: {observation["visible_card"]}")
    action = agent.select_action(observation)
    if action[1] == 1:
        print(f"The player use the drawn card and put it in the position {action[2]},{action[3]}")
    else:
        print(f"The player discards the drawn card and uncover the card in the position {action[2]},{action[3]}")
    observation, reward, done = env.step(action)
    print("\n##########################################\n")
    time.sleep(2)

    if done:
        break

    print(f"Round {c}, player 1 needs to play")
    print_board(env.get_state())
    #valid_actions = env.get_valid_actions(observation["player_turn"], observation["deck_mask"], device=device)
    action = baseline_agent.get_action(observation)
    action = torch.tensor(action)
    if action[0] == 0:
        print(f"The player draws from the pile")
    else:
        print(f"The player draws the visible card")
    observation, _, _ = env.step(action)

    action = baseline_agent.get_action(observation)
    action = torch.tensor(action)
    print(f"The player has drawn the card: {observation["visible_card"]}")
    if action[1] == 1:
        print(f"The player use the drawn card and put it in the position {action[2]},{action[3]}")
    else:
        print(f"The player discards the drawn card and uncover the card in the position {action[2]},{action[3]}")
    observation, reward, done = env.step(action)
    print("\n##########################################\n")
    time.sleep(2)

print_board(env.get_state())

Round 1, player 0 needs to play
Visible card: -1

----- PLAYER 0 -----    ----- PLAYER 1 -----
|  11  11  **  **  |    |  -1  12  **  **  |
|  **  **  **  **  |    |  **  **  **  **  |
|  **  **  **  **  |    |  **  **  **  **  |
--------------------    --------------------
    
The player draws the visible card
The player has drawn the card: -1
The player use the drawn card and put it in the position 0,1

##########################################

Round 1, player 1 needs to play
Visible card: 11

----- PLAYER 0 -----    ----- PLAYER 1 -----
|  11  -1  **  **  |    |  -1  12  **  **  |
|  **  **  **  **  |    |  **  **  **  **  |
|  **  **  **  **  |    |  **  **  **  **  |
--------------------    --------------------
    
The player draws from the pile
The player has drawn the card: 8
The player use the drawn card and put it in the position 0,1

##########################################

Round 2, player 0 needs to play
Visible card: 12

----- PLAYER 0 -----    ----- PLAYER 1 -----
|

In [None]:
# Evaluation functions

# Play a game with two agents
def play_two_players_game(agent1, agent2):
    env = environments.SkyjoEnv(n_players=2)
    observation = env.reset()
    

    done = False
    c = 0

    while not done:
        c += 1
        action = agent1.select_action(observation)
        observation, _, _ = env.step(action)
        action = agent1.select_action(observation)
        observation, reward, done = env.step(action)
        if done:
            break

        action = agent2.get_action(observation)
        action = torch.tensor(action)
        observation, _, _ = env.step(action)

        action = agent2.get_action(observation)
        action = torch.tensor(action)
        observation, reward, done = env.step(action)
    
    state = env.get_state()
    score1 = env.get_sum_cards(state['player_deck_i_values'][0], np.ones((3,4), dtype=bool))
    score2 = env.get_sum_cards(state['player_deck_i_values'][1], np.ones((3,4), dtype=bool))

    return score1, score2


# Play a huge number of games to determine the best agent
def evaluate_agents(agent1, agent2, num_games=10000):
    scores1 = np.zeros(num_games)
    scores2 = np.zeros(num_games)
    win_ratio = 0
    for i in range(num_games):
        score1, score2 = play_two_players_game(agent1, agent2)
        scores1[i] = score1
        scores2[i] = score2
        if score1 < score2:
            win_ratio += 1
    win_ratio /= num_games
    return scores1, scores2, win_ratio

In [9]:
# Evaluation of the Master Agent

master_agent = agent
baseline_agent = agents.BaselineAgent(player_id=1)

scores_master, scores_baseline, win_ratio = evaluate_agents(master_agent, baseline_agent, num_games=100000)

print(f"The master agent has won {100*win_ratio:.0f}% of the games.")
print(f"Master agent final score: {scores_master.mean():.01f} +/- {scores_master.std():.01f}")
print(f"Baseline agent final score: {scores_baseline.mean():.01f} +/- {scores_baseline.std():.01f}")

The master agent has won 54% of the games.
Master agent final score: 21.2 +/- 9.2
Baseline agent final score: 23.3 +/- 10.6
