In [None]:
import os
import random
from typing import *
from collections import deque

import numpy as np
from tqdm.auto import tqdm, trange

import torch
import torch.nn as nn
import torch.optim as optim

from game.api import BlackjackWrapper
from game.models.model import GameState

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

In [None]:
class BlackjackPolicyModel(nn.Module):
    """
    Model that accepts a flattened state and outputs 2 values:
    1. Bet percentage from 0 to 1
    2. Probability of taking a card (hit) from 0 to 1
    """
    def __init__(self, in_features: int):
        super().__init__()
        # common layers shared by both outputs
        self.init_layers = nn.Sequential(
            nn.Linear(in_features, 64),
            nn.LeakyReLU(),
            nn.Linear(64, 128),
            nn.LeakyReLU(),
            nn.Linear(128, 256),
            nn.LeakyReLU(),
            nn.Linear(256, 128),
            nn.LeakyReLU(),
            nn.Linear(128, 64),
            nn.LeakyReLU(),
            nn.Linear(64, 32),
            nn.LeakyReLU(),
        )
        # layers for bet percentage output
        self.bet_layers = nn.Sequential(
            nn.Linear(32, 8),
            nn.LeakyReLU(),
            nn.Linear(8, 1),
            nn.Sigmoid(),
        )
        # layers for card action output
        self.card_layers = nn.Sequential(
            nn.Linear(32, 16),
            nn.LeakyReLU(),
            nn.Linear(16, 8),
            nn.LeakyReLU(),
            nn.Linear(8, 4),
            nn.LeakyReLU(),
            nn.Linear(4, 1),
            nn.Sigmoid(),
        )

    def forward(self, x) -> Tuple[torch.Tensor, torch.Tensor]:
        x = self.init_layers(x)
        x1 = self.bet_layers(x)
        x2 = self.card_layers(x)
        return x1, x2

    def get_bet_percent(self, normalized_state) -> torch.Tensor:
        state = torch.from_numpy(normalized_state).float().unsqueeze(0).to(device)
        bet_percent, _ = self.forward(state)
        return bet_percent.cpu()

    def get_card_action(self, normalized_state) -> torch.Tensor:
        state = torch.from_numpy(normalized_state).float().unsqueeze(0).to(device)
        _, card_prob = self.forward(state)
        return card_prob.cpu()

In [None]:
def reinforce(
    game_wrapper: BlackjackWrapper,
    policy_model: BlackjackPolicyModel,
    optimizer: optim.Optimizer,
    num_eps: int,
    gamma: float,
    max_steps: int = 1000,
    log_every: int = 10,
):
    print("Starting RL training process...")
    eps_scores: List[float] = []

    for i_eps in trange(num_eps):
        saved_outputs = []
        rewards: List[float] = []
        game_wrapper = game_wrapper.reset()
        state = game_wrapper.get_state()

        for i_step in range(max_steps):
            if i_step == 0:
                bet_percent = policy_model.get_bet_percent(state.flatten())
                saved_outputs.append(bet_percent)
                outcome = game_wrapper.bet_step(bet_percent)
            else:
                card_action = policy_model.get_card_action(state.flatten())
                saved_outputs.append(card_action)
                outcome = game_wrapper.card_step(take_card=card_action.item() > random.random())
            state = outcome.new_state
            terminated = outcome.terminated
            rewards.append(outcome.reward)
            if terminated:
                break

        n_steps = len(rewards)
        eps_scores.append(sum(rewards))
        returns = deque(maxlen=n_steps)

        for t in range(n_steps)[::-1]:
            disc_return_t = returns[0] if len(returns) > 0 else 0
            returns.appendleft(gamma * disc_return_t + rewards[t])

        returns = torch.tensor(returns)
        # normalize returns
        returns = (returns - returns.mean()) / (returns.std() + 1e-8)

        model_loss_arr = []
        for output, pred_return in zip(saved_outputs, returns):
            model_loss_arr.append(-output * pred_return)
        model_loss = torch.cat(model_loss_arr).sum()

        optimizer.zero_grad()
        model_loss.backward()
        optimizer.step()

        if i_eps % log_every == 0:
            tqdm.write(f"Episode {i_eps}\t\tRunning Average Score: {round(np.mean(eps_scores).item(), 3)}")

    return eps_scores

In [None]:
gamma = 0.9
learning_rate = 1e-3
num_eps = 10000
log_eps = int(num_eps / 100)

In [None]:
game_wrapper = BlackjackWrapper()
model = BlackjackPolicyModel(in_features=GameState.get_state_size())
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
model_name = "blackjack_policy_model"
proj_path = os.path.join(os.getcwd(), "..")
model_path = os.path.join(proj_path, "models", model_name)

In [None]:
scores = reinforce(
    game_wrapper=game_wrapper,
    policy_model=model,
    optimizer=optimizer,
    num_eps=num_eps,
    gamma=gamma,
    log_every=log_eps,
)
torch.save(model, model_path)

In [None]:
def evaluate_agent(
    game_wrapper: BlackjackWrapper,
    policy_model: BlackjackPolicyModel,
    num_eps: int,
    max_steps: int = 1000,
) -> Tuple[float, float]:
    rewards: List[float] = []

    for i_eps in range(num_eps):
        game_wrapper = game_wrapper.reset()
        state = game_wrapper.get_state()
        eps_reward = 0.0
        for i_step in range(max_steps):
            if i_step == 0:
                bet_percent = policy_model.get_bet_percent(state.flatten())
                outcome = game_wrapper.bet_step(bet_percent)
            else:
                card_action = policy_model.get_card_action(state.flatten())
                outcome = game_wrapper.card_step(take_card=card_action.item() > random.random())
            state = outcome.new_state
            terminated = outcome.terminated
            eps_reward += outcome.reward
            if terminated:
                break
        rewards.append(eps_reward)

    mean_reward = np.mean(rewards).item()
    std_reward = np.std(rewards).item()
    return mean_reward, std_reward

In [None]:
evaluate_agent(
    game_wrapper=game_wrapper,
    policy_model=model,
    num_eps=1000,
)