We first initiate the required libraries for the project

In [25]:
import random
import gym
from gym import spaces
import numpy as np
from stable_baselines3 import PPO
import optuna
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3 import DQN
from gym.wrappers import monitoring
from stable_baselines3.common.callbacks import CheckpointCallback, EvalCallback
import matplotlib.pyplot as plt
from stable_baselines3.common.monitor import Monitor

Now we create the enviroment in which the agent will be trained. Note that the agent will receive a positive reward only in case of win (+1), in case of draw the reward will be 0 and in case of lose the reward will be negative.

In [None]:

def cmp(a, b):
    if a > b:
        return 1
    elif a < b:
        return -1
    else:
        return 0

class SimpleBlackjackEnv(gym.Env):
    metadata = {'render.modes': ['human']}
    
    def __init__(self):
        super(SimpleBlackjackEnv, self).__init__()
        self.deck = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 10, 10] * 4  # a full deck
        random.shuffle(self.deck)  # shuffle the deck
        self.action_space = spaces.Discrete(2)
        self.observation_space = spaces.Box(low=0, high=11, shape=(23,), dtype=int)
        
    def draw_card(self):
        return self.deck.pop()
        
    def draw_hand(self):
        return [self.draw_card(), self.draw_card()]

    def usable_ace(self, hand):
        return 1 in hand and sum(hand) + 10 <= 21

    def sum_hand(self, hand):
        if self.usable_ace(hand):
            return sum(hand) + 10
        return sum(hand)

    def is_bust(self, hand):
        return self.sum_hand(hand) > 21

    def score(self, hand):
        return 0 if self.is_bust(hand) else self.sum_hand(hand)

    def reset(self):
        if len(self.deck) < 15:
            self.deck = [1, 2, 3, 4, 5, 6, 7, 8, 9,
                         10, 10, 10, 10] * 4
            random.shuffle(self.deck)
        self.dealer = self.draw_hand()
        self.player = self.draw_hand()
        return self._get_observation()

    def step(self, action):
        assert self.action_space.contains(action)
        if action == 1:  # hit
            self.player.append(self.draw_card())
            if self.is_bust(self.player):
                done = True
                reward = -1.0
            else:
                done = False
                reward = 0.0
        else:  # stick
            done = True
            while self.sum_hand(self.dealer) < 17:
                self.dealer.append(self.draw_card())
            reward = cmp(self.score(self.player), self.score(self.dealer))
        return self._get_observation(), reward, done, {}

    def _get_observation(self):
        player_obs = self.player + [0] * (11 - len(self.player))
        dealer_obs = self.dealer + [0] * (11 - len(self.dealer))
        usable_ace_obs = [1] if self.usable_ace(self.player) else [0]
        return np.array(player_obs + dealer_obs + usable_ace_obs)

    def render(self, mode='human'):
        if mode != 'human':
            raise NotImplementedError()
        return f"Player hand: {self.player}, Dealer hand: {self.dealer}"

    def close(self):
        pass

# Testing the environment to ensure it initializes and steps correctly
env = SimpleBlackjackEnv()
obs = env.reset()
print(env.render())
obs, reward, done, _ = env.step(1)
print(env.render())
obs, reward, done, _ = env.step(0)
print(env.render())

For the first model we will be using a stable_baselines3 algorithm, which will only need to be called and trained. Since the performance with the standard hyperparameters are poor we also will use optuna to find the best hyperparameters in order to increase the win rate of the agent. Note that the trials have been set to 5 because of compoutational and running time purposes, with an higher number of trials optuna will surely find more accurate hyperparameters.

In [26]:
def evaluate_agent(model, env, num_games=1000): #
    wins = 0                                    
    for _ in range(num_games):
        obs = env.reset()
        done = False
        while not done:
            action, _ = model.predict(obs)
            obs, reward, done, _ = env.step(action)
        if reward == 1.0:
            wins += 1
    win_rate = wins / num_games
    return win_rate

def objective(trial):
    learning_rate = trial.suggest_float("learning_rate", 1e-5, 1e-2, log=True)
    gamma = trial.suggest_float("gamma", 0.9, 0.9999)
    n_steps = trial.suggest_int("n_steps", 16, 2048, log=True)
    ent_coef = trial.suggest_float("ent_coef", 1e-8, 1e-1, log=True)
    
    env = DummyVecEnv([lambda: SimpleBlackjackEnv()])
    model = PPO("MlpPolicy", env, learning_rate=learning_rate, gamma=gamma, n_steps=n_steps, ent_coef=ent_coef, verbose=0)
    
    model.learn(total_timesteps=100000)
    
    win_rate = evaluate_agent(model, env)
    
    return -win_rate  

study = optuna.create_study()
study.optimize(objective, n_trials=1)

print(study.best_params) # <-- best hyperparameters found (not necessarily the same as best model)


#training the agent with the best hyperparameters found by optuna
best_params = study.best_params
env = DummyVecEnv([lambda: SimpleBlackjackEnv()])
model = PPO("MlpPolicy", env, **best_params, verbose=1, tensorboard_log='./PPO_logs')
model.learn(total_timesteps=50000)






[I 2023-10-29 19:56:57,977] A new study created in memory with name: no-name-1316754c-3b42-4677-b81e-71dd79f6bfa2
We recommend using a `batch_size` that is a factor of `n_steps * n_envs`.
Info: (n_steps=1222 and n_envs=1)
[I 2023-10-29 19:57:23,588] Trial 0 finished with value: -0.436 and parameters: {'learning_rate': 0.0015176242402698446, 'gamma': 0.9442402649180488, 'n_steps': 1222, 'ent_coef': 0.0982024564895372}. Best is trial 0 with value: -0.436.


{'learning_rate': 0.0015176242402698446, 'gamma': 0.9442402649180488, 'n_steps': 1222, 'ent_coef': 0.0982024564895372}
Using cpu device


AssertionError: The algorithm only supports (<class 'gymnasium.spaces.box.Box'>, <class 'gymnasium.spaces.discrete.Discrete'>, <class 'gymnasium.spaces.multi_discrete.MultiDiscrete'>, <class 'gymnasium.spaces.multi_binary.MultiBinary'>) as action spaces but Discrete(2) was provided

In [None]:
def test_agent_verbose(model, num_games=10000):
    action_mapping = {0: "Stick", 1: "Hit"}
    won_games = 0
    for _ in range(num_games):
        obs = env.reset()
        done = False
        print("\nStarting a new game...")
        
        while not done:
            player_hand_length = np.sum(obs[0][:11].astype(int) != 0)
            dealer_hand_length = np.sum(obs[0][11:22].astype(int) != 0)
            
            player_hand = obs[0][:11][:player_hand_length]
            dealer_hand = obs[0][11:22][:dealer_hand_length]
            
            print(f"Player's hand: {player_hand}")
            print(f"Dealer's visible card: {dealer_hand[0]}")
            
            action, _ = model.predict(obs)
            print(f"Agent's action: {action_mapping[int(action)]}")  
            
            obs, reward, done, _ = env.step(action)
        
        print(f"Player's final hand: {player_hand}")
        print(f"Dealer's final hand: {dealer_hand}")
        
        if reward > 0:
            print("Result: Won!")
            won_games += 1
        elif reward < 0:
            print("Result: Lost!")
        else:
            print("Result: Draw!")
        print('-'*40)
    print(f"Agent won {won_games} out of {num_games} games.")
    return won_games  # Return the number of won games

def simulate_games(model, num_simulations=10, games_per_simulation=[100, 500, 1000, 5000, 10000]):
    win_rates = []
    for num_games in games_per_simulation:
        total_wins = 0
        for _ in range(num_simulations):
            won_games = test_agent_verbose(model, num_games)  # Assumes test_agent returns the number of won games
            if won_games is not None:  # Check if won_games is not None before adding to total_wins
                total_wins += won_games
        average_win_rate = total_wins / (num_games * num_simulations)
        win_rates.append(average_win_rate)
        print(f"Average winning rate for {num_games} games: {average_win_rate * 100}%")
    return win_rates
simulate_games(model)

we can see that the agen won at most 45% of the games, which is way less then the expected 49% which is the world's average winning rate. But for this enviroment we must keep in mind that we do not have the possibility to split, even if it would not change things too much, because if we split we will be simply playing 2 hands at the same time, meaning that it will not change the winning rate in most cases.  

CAN'T RUN THE CODE BELOW DUE TO HIGH COMPUTATIONAL POWER REQUIRED


In [None]:
from stable_baselines3 import DQN
from gym.wrappers import monitoring
from stable_baselines3.common.callbacks import CheckpointCallback, EvalCallback
from stable_baselines3.common.vec_env import DummyVecEnv
import matplotlib.pyplot as plt

# Create environment
env = DummyVecEnv([lambda: SimpleBlackjackEnv()])

# Initialize agent
model = DQN("MlpPolicy", env, verbose=1, buffer_size= 50000)

# Train agent
model.learn(total_timesteps=100000)

# Save the model
model.save("dqn_blackjack")

In [None]:
def test_agent_dqn(model, num_games=10000):
    action_mapping = {0: "Stick", 1: "Hit"}
    won_games = 0
    for _ in range(num_games):
        obs = env.reset()
        done = False
        print("\nStarting a new game...")
        
        while not done:
            player_hand_length = np.sum(obs[0][:11].astype(int) != 0)
            dealer_hand_length = np.sum(obs[0][11:22].astype(int) != 0)
            
            player_hand = obs[0][:11][:player_hand_length]
            dealer_hand = obs[0][11:22][:dealer_hand_length]
            
            print(f"Player's hand: {player_hand}")
            print(f"Dealer's visible card: {dealer_hand[0]}")
            
            action, _ = model.predict(obs, deterministic=True)
            print(f"Agent's action: {action_mapping[int(action)]}")  
            
            obs, reward, done, _ = env.step(action)
        
        print(f"Player's final hand: {player_hand}")
        print(f"Dealer's final hand: {dealer_hand}")
        
        if reward > 0:
            print("Result: Won!")
            won_games += 1
        elif reward < 0:
            print("Result: Lost!")
        else:
            print("Result: Draw!")
        print('-'*40)
    print(f"Agent won {won_games} out of {num_games} games.")
    return won_games  # Return the number of won games

# Test the DQN model
test_agent_dqn(model)

In [None]:
from stable_baselines3 import DQN
from gym.wrappers import monitoring
from stable_baselines3.common.callbacks import CheckpointCallback, EvalCallback
from stable_baselines3.common.vec_env import DummyVecEnv
import matplotlib.pyplot as plt
env = DummyVecEnv([lambda: SimpleBlackjackEnv()])

model = DQN.load("./RisultatiAlessio/dqn_blackjack")
def test_agent_dqn(model, num_games=10000):
    action_mapping = {0: "Stick", 1: "Hit"}
    won_games = 0
    for _ in range(num_games):
        obs = env.reset()
        done = False
        print("\nStarting a new game...")
        
        while not done:
            player_hand_length = np.sum(obs[0][:11].astype(int) != 0)
            dealer_hand_length = np.sum(obs[0][11:22].astype(int) != 0)
            
            player_hand = obs[0][:11][:player_hand_length]
            dealer_hand = obs[0][11:22][:dealer_hand_length]
            
            print(f"Player's hand: {player_hand}")
            print(f"Dealer's visible card: {dealer_hand[0]}")
            
            action, _ = model.predict(obs, deterministic=True)
            print(f"Agent's action: {action_mapping[int(action)]}")  
            
            obs, reward, done, _ = env.step(action)
        
        print(f"Player's final hand: {player_hand}")
        print(f"Dealer's final hand: {dealer_hand}")
        
        if reward > 0:
            print("Result: Won!")
            won_games += 1
        elif reward < 0:
            print("Result: Lost!")
        else:
            print("Result: Draw!")
        print('-'*40)
    print(f"Agent won {won_games} out of {num_games} games.")
    return won_games  # Return the number of won games

# Test the DQN model
test_agent_dqn(model)

In [None]:
model = PPO.load("./RisultatiAlessio/ppo_blackjack")
def test_agent_verbose(model, num_games=10000):
    action_mapping = {0: "Stick", 1: "Hit"}
    won_games = 0
    for _ in range(num_games):
        obs = env.reset()
        done = False
        print("\nStarting a new game...")
        
        while not done:
            player_hand_length = np.sum(obs[0][:11].astype(int) != 0)
            dealer_hand_length = np.sum(obs[0][11:22].astype(int) != 0)
            
            player_hand = obs[0][:11][:player_hand_length]
            dealer_hand = obs[0][11:22][:dealer_hand_length]
            
            print(f"Player's hand: {player_hand}")
            print(f"Dealer's visible card: {dealer_hand[0]}")
            
            action, _ = model.predict(obs)
            print(f"Agent's action: {action_mapping[int(action)]}")  
            
            obs, reward, done, _ = env.step(action)
        
        print(f"Player's final hand: {player_hand}")
        print(f"Dealer's final hand: {dealer_hand}")
        
        if reward > 0:
            print("Result: Won!")
            won_games += 1
        elif reward < 0:
            print("Result: Lost!")
        else:
            print("Result: Draw!")
        print('-'*40)
    print(f"Agent won {won_games} out of {num_games} games.")
    return won_games  # Return the number of won games

def simulate_games(model, num_simulations=10, games_per_simulation=[100, 500, 1000, 5000, 10000]):
    win_rates = []
    for num_games in games_per_simulation:
        total_wins = 0
        for _ in range(num_simulations):
            won_games = test_agent_verbose(model, num_games)  # Assumes test_agent returns the number of won games
            if won_games is not None:  # Check if won_games is not None before adding to total_wins
                total_wins += won_games
        average_win_rate = total_wins / (num_games * num_simulations)
        win_rates.append(average_win_rate)
        print(f"Average winning rate for {num_games} games: {average_win_rate * 100}%")
    return win_rates
simulate_games(model)