# Five Crowns Deep-Q-Learning

### Import Libraries

In [1]:
#Reference: https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque, defaultdict

from copy import deepcopy
from five_crowns import Game
from greedy import GreedyPlayer
from scoring import score_hand
from state import State

### Define the Network

In [2]:
class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, action_dim)
        )

    def forward(self, x):
        return self.fc(x)

### Define the Memory Buffer

In [3]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def add(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)

    def size(self):
        return len(self.buffer)

In [4]:
def payoff(game, current_player):
    """
    Calculates the reward for the player at this state

    Returns:
        int: The reward for the current player in the state
    """
    if not game._go_out:
        return 0
    # print(self.game.get_player_hand(self.curr_player_id))
    hand_score = score_hand(
        game.get_player_hand(current_player), game
    )
    return 1 if hand_score == 0 else -1

def successor(self, game, action, num_players):
    """
    Returns the successor state given the action from the current state

    Args:
        - action: The action to take from the current state

    Returns:
        - State: The new state after taking the action
    """
    # Make a copy for the successor state
    new_state = deepcopy(game)

    # Get the next player
    next_player_id = (game.get_active_player() + 1) % num_players
    new_state._active_player = next_player_id

    # Get the actions
    first_action = action[0]
    second_action = action[1]

    # Execute the first action
    added_card = None
    if first_action == "deck":
        added_card = new_state.get_deck().draw()
        new_state.get_player_hand(game.get_active_player()).append(added_card)
    elif first_action == "discard":
        added_card = new_state.get_discard_pile().pop()
        new_state.get_player_hand(game.get_active_player()).append(added_card)

    # Execute the second action
    if second_action == None:
        new_state.get_player_hand(self.curr_player_id).remove(added_card)
        new_state.get_discard_pile().append(added_card)
    else:
        new_state.get_player_hand(
            self.curr_player_id).remove(second_action)
        new_state.get_discard_pile().append(second_action)

    # Check the hand score and update game ending conditions
    # print(new_state.get_player_hand(self.curr_player_id))
    hand_score = score_hand(
        new_state.get_player_hand(self.curr_player_id), new_state
    )
    if hand_score == 0:
        new_state._go_out = True
        if new_state._remaining_players == new_state.num_players():
            new_state._go_out_player = self.curr_player_id
        new_state._remaining_players -= 1
        if new_state._remaining_players == 0:
            new_state._game_over = True

    return State(new_state)

def get_actions(self):
      """
      Returns all possible actions from the current state
      """
      if self.is_root:
          actions = [("root", c) for c in self.curr_player_hand if c != self.root_card]

      else:
          # initialize actions list
          actions = []

          if self.discard_pile_card:
              #actions.append(("discard", self.discard_pile_card))
              for card in self.curr_player_hand:
                  actions.append(("discard", card))
          if game.get_deck():
              actions.append(("deck", None))
              for card in self.curr_player_hand:
                  actions.append(("deck", card))

      return actions

In [5]:
import torch
import numpy as np
from deck import Card

def card_to_idx(suit, rank):
  suit_dict = {
      'Clubs': 0,
      'Diamonds': 1,
      'Hearts': 2,
      'Spades': 3,
      'Stars': 4,
      'J': 5
  }

  return 11 * suit_dict[suit] + (rank if suit != "J" else 0) - 3

def idx_to_card(idx):
    suit_dict = {
        0: 'Clubs',
        1: 'Diamonds',
        2: 'Hearts',
        3: 'Spades',
        4: 'Stars',
        5: 'J'
    }

    suit = suit_dict[idx // 11]
    if suit == "J":
        rank = 50
    else:
        rank = (idx % 11) + 3

    return Card(rank, suit)

def encode_state(num_players, full_deck, player_deck, discard_card, gone_out_status):
    num_players = num_players

    deck = set(full_deck)

    encoded_deck = np.zeros(len(deck))
    for idx, card in enumerate(player_deck):
        card_idx = card_to_idx(card.suit(), card.rank())
        encoded_deck[card_idx] += 1

    # Encode discard card as (rank, suit)
    discard_card_encoded = np.zeros(len(deck))
    if discard_card is not None:
        discard_idx = card_to_idx(discard_card.suit(), discard_card.rank())
        discard_card_encoded[discard_idx] = 1

    # Gone out status
    gone_out_status_encoded = int(gone_out_status)

    return np.concatenate([
        encoded_deck,
        discard_card_encoded,
        [gone_out_status_encoded]
    ])

def inference(game, hand, discard_card, policy_net):
    encoded_state = encode_state(game.num_players(), game.get_full_deck()._cards, hand, discard_card, game._go_out)

    with torch.no_grad():
        output = policy_net(torch.Tensor(encoded_state).to(device)).to("cpu").numpy()

        sorted_list = [(output[i], idx_to_card(i)) for i in range(len(output))]
        sorted_list.sort(key=lambda x: x[0], reverse=True)

        for _,card in sorted_list:
            if card in hand and card != discard_card:
                return card

In [22]:
from player import Player
from scoring import get_best_discard
from constants import GET_DISCARD, DRAW_CARD
import copy

class DQNPlayer(Player):
    """
    DQN player always takes action that minimize score for turn
    """
    def __init__(self, player_id, policy_net):
        super().__init__(player_id)
        self.prev_discard = None
        self.epsilon = epsilon
        self.epsilon_decay = 0.998
        self.min_epsilon = 0.05
        self.prev_action = None
        self.policy_net = policy_net
        self.policy_net.eval()

    def draw_phase(self, game):
        # Get best score if we take discard
        new_card = game.get_discard_pile()[-1]
        temp_hand = self.hand + [new_card]
        _, discard_score = get_best_discard(temp_hand,game,excluded_discard=new_card)

        # Get best expected score if we draw random
        remaining_deck = copy.deepcopy(game.get_full_deck().get_cards())
        draw_scores = []
        for card in game.get_discard_pile() + self.hand:
            remaining_deck.remove(card)
        for card in remaining_deck:
            temp_hand = self.hand + [card]
            _, draw_score = get_best_discard(temp_hand, game)
            draw_scores.append(draw_score)
        expected_draw_score = sum(draw_scores)/len(draw_scores)

        # Take action with better expected score
        if discard_score < expected_draw_score:
            self.prev_discard = game.get_discard_pile()[-1]
            return GET_DISCARD
        self.prev_discard = None
        return DRAW_CARD

    def discard_phase(self, game):
        if random.random() < self.epsilon:
            action = random.choice([card for card in self.hand if card != self.prev_discard])
        else:
            with torch.no_grad():
                action = inference(game, self.hand, self.prev_discard, self.policy_net)

        self.epsilon = max(self.min_epsilon, self.epsilon * self.epsilon_decay)

        idx = card_to_idx(action.suit(), action.rank())
        self.prev_action = idx
        return action


In [23]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cuda" if torch.cuda.is_available() else "cpu")
agents = 4
state_dim = 113
action_dim = 56
epoch=3
lr = 1e-4
tau = 0.005
print('using device:', device)

using device: cuda


In [25]:
policy_net = DQN(state_dim, action_dim).to(device)
#policy_net.load_state_dict(torch.load(f'five_crowns_dqn_{epoch}.pth'))

In [27]:
target_net = DQN(state_dim, action_dim).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

dqn_player = DQNPlayer(0, policy_net)
players = [dqn_player] + [GreedyPlayer(i) for i in range(1, agents)]

env = Game(players,epoch=epoch)
env.initialize_game()

state_encoded = encode_state(agents, env.get_full_deck()._cards, env._players[0].hand, env._discard_pile[-1], 0)

optimizer = optim.Adam(policy_net.parameters(), lr=lr, amsgrad=True)
buffer = ReplayBuffer(10000)
batch_size = 128
gamma = 0.99
target_update_freq = 100
loss = None

### Training Loop

In [36]:
payout_array=[]
for episode in range(10000):
    players = [dqn_player] + [GreedyPlayer(i) for i in range(1, agents)]
    env = Game(players)
    env.initialize_game()
    state = encode_state(agents, env.get_full_deck()._cards, env._players[0].hand, env._discard_pile[-1], 0)
    done = False
    while not done:
        for _ in range(agents):
          env.play_round()
          if env.is_game_over():
            reward = payoff(env, 0)
            payout_array.append(reward)
            done = True
          else:
            reward = 0
            done = False

        next_state = encode_state(agents, env.get_full_deck()._cards, env._players[0].hand, env._discard_pile[-1], 0)

        buffer.add((state, env._players[0].prev_action, reward, next_state, done))
        state = next_state

        if buffer.size() >= batch_size:
            batch = buffer.sample(batch_size)
            states, actions, rewards, next_states, dones = zip(*batch)

            states = torch.tensor(np.array(states), dtype=torch.float32).to(device)
            actions = torch.tensor(actions, dtype=torch.long).to(device)
            rewards = torch.tensor(rewards, dtype=torch.float32).to(device)
            next_states = torch.tensor(np.array(next_states), dtype=torch.float32).to(device)
            dones = torch.tensor(dones, dtype=torch.float32).to(device)

            # print(states.shape, actions.shape)
            q_values = policy_net(states)
            # print(q_values.shape)
            # print(actions.shape)
            q_values = q_values.gather(1, actions.unsqueeze(1)).squeeze()
            next_q_values = target_net(next_states).max(1)[0]
            target = rewards + (gamma * next_q_values * (1 - dones))

            loss = nn.SmoothL1Loss()(q_values, target)
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
            optimizer.step()
    if episode % 1000 == 0:
        print(f"Episode {episode + 1}, Loss: {loss}, Win Rate: {(np.array(payout_array)==1).mean()}, Epsilon: {dqn_player.epsilon}")
        payout_array=[]
    if episode % target_update_freq == 0:
        '''
        target_net_state_dict = target_net.state_dict()
        policy_net_state_dict = policy_net.state_dict()
        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key]*tau+target_net_state_dict[key]*(1-tau)
        target_net.load_state_dict(target_net_state_dict)
        '''
        target_net.load_state_dict(target_net.state_dict())
print("Ending loss: ", loss.item())

Episode 1, Loss: 0.12566494941711426, Win Rate: 0.0, Epsilon: 0.05
Episode 1001, Loss: 0.14582619071006775, Win Rate: 0.24465648854961833, Epsilon: 0.05
Episode 2001, Loss: 0.15195342898368835, Win Rate: 0.26347760060744113, Epsilon: 0.05
Episode 3001, Loss: 0.13204559683799744, Win Rate: 0.2552142586272279, Epsilon: 0.05
Episode 4001, Loss: 0.12778624892234802, Win Rate: 0.2519201228878648, Epsilon: 0.05
Episode 5001, Loss: 0.1049223467707634, Win Rate: 0.2569496619083396, Epsilon: 0.05
Episode 6001, Loss: 0.13794571161270142, Win Rate: 0.25696830851470026, Epsilon: 0.05
Episode 7001, Loss: 0.1321375072002411, Win Rate: 0.2684899845916795, Epsilon: 0.05
Episode 8001, Loss: 0.13473954796791077, Win Rate: 0.27864484202512374, Epsilon: 0.05
Episode 9001, Loss: 0.14906473457813263, Win Rate: 0.2816358024691358, Epsilon: 0.05
Ending loss:  0.12944823503494263


In [38]:
torch.save(policy_net.state_dict(), f'five_crowns_dqn_{epoch}.pth')

In [50]:
def simulate_one_game():
    """
    Simulate one game with given parameters
    Return player 1 score
    """
    players = [dqn_player] + [GreedyPlayer(i) for i in range(1, agents)]
    dqn_player.hand=[]
    game = Game(players=players, epoch=epoch)
    game.initialize_game()

    while not game.is_game_over():
        game.play_round()

    score = score_hand(players[0].hand, game)

    return score

In [None]:
iters = 10000
scores = [simulate_one_game() for _ in range(iters)]
print(f"Win Rate: {sum([1 for i in scores if i == 0])/iters}")
print(f"Average Score: {sum(scores)/iters}")