In [2]:
import numpy as np
from collections import defaultdict
import gymnasium as gym

class BlackjackQLearningAgent:
    def __init__(self, learning_rate=0.1, discount_factor=0.95, epsilon=1.0, epsilon_decay=0.99999, min_epsilon=0.01):
        self.q_values = defaultdict(lambda: np.zeros(2))  # Initialize Q-values for the actions (hit, stick)
        self.lr = learning_rate  # Learning rate
        self.gamma = discount_factor  # Discount factor
        self.epsilon = epsilon  # Exploration probability
        self.epsilon_decay = epsilon_decay  # Decay factor for epsilon
        self.min_epsilon = min_epsilon  # Minimum epsilon value to avoid zero exploration

    def get_action(self, state):
        """Select action using epsilon-greedy policy."""
        if np.random.random() < self.epsilon:
            action = np.random.choice([0, 1])  # 0 = stick, 1 = hit (action space of Blackjack)
        else:
            action = np.argmax(self.q_values[state])  # Choose action with highest Q-value
        
        # Decay epsilon for exploration-exploitation trade-off
        self.epsilon = max(self.min_epsilon, self.epsilon * self.epsilon_decay)
        return action

    def update(self, state, action, reward, next_state, done):
        """Update the Q-table using the Q-learning update rule."""
        current_q = self.q_values[state][action]
        if done:
            target_q = reward
        else:
            target_q = reward + self.gamma * np.max(self.q_values[next_state])  # Bellman equation
        # Update the Q-value for the current state-action pair
        self.q_values[state][action] += self.lr * (target_q - current_q)


class BlackjackEnv(gym.Env):
    def __init__(self, render_mode=None, natural=False, sab=False, total_decks=5):
        # Action space (0 = stick, 1 = hit)
        self.action_space = gym.spaces.Discrete(2)
        # Observation space: (player sum, dealer showing card, usable ace, running count, remaining decks)
        self.observation_space = gym.spaces.Tuple(
            (gym.spaces.Discrete(32), gym.spaces.Discrete(11), gym.spaces.Discrete(2))
        )
        self.natural = natural
        self.sab = sab
        self.render_mode = render_mode
        self.running_count = 0
        self.betting_unit = 1
        self.money = 50
        self.current_bet = 1
        one_suite = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 10, 10]
        self.original_deck = one_suite * 4 * total_decks
        self.deck = self.original_deck.copy()

    def step(self, action):
        """Take an action and return the next state, reward, done, and additional info."""
        assert self.action_space.contains(action)
        if action:  # If action is hit (1)
            self.player.append(self.draw_card(self.np_random))
            if self.is_bust(self.player):
                terminated = True
                reward = -1.0
            else:
                terminated = False
                reward = 0.0
        else:  # If action is stick (0)
            terminated = True
            while self.sum_hand(self.dealer) < 17:  # Dealer must hit until their sum is at least 17
                self.dealer.append(self.draw_card(self.np_random))
            reward = self.cmp(self.score(self.player), self.score(self.dealer))
            if self.sab and self.is_natural(self.player) and not self.is_natural(self.dealer):
                reward = 1.0
            elif not self.sab and self.natural and self.is_natural(self.player) and reward == 1.0:
                reward = 1.5

        # Update money based on game outcome
        self.money += reward * self.current_bet
        return self._get_obs(), reward, terminated, self.money

    def _get_obs(self):
        """Return the current observation."""
        return (self.sum_hand(self.player), self.dealer[0], self.usable_ace(self.player), self.running_count, self.getRemainingDecks())

    def reset(self, seed=None):
        """Reset the environment to start a new round."""
        super().reset(seed=seed)
        self.money = 50
        self.deck = self.original_deck.copy()
        self.running_count = 0
        return self.new_round()

    def new_round(self):
        """Start a new round of blackjack."""
        self.dealer = self.draw_hand(self.np_random)
        self.player = self.draw_hand(self.np_random)
        return self._get_obs()

    def draw_card(self, np_random):
        """Draw a card from the deck."""
        card_index = np_random.choice(len(self.deck))
        card = self.deck[card_index]
        if card in [1, 10]:
            self.running_count -= 1
        elif 2 <= card <= 6:
            self.running_count += 1
        self.deck.pop(card_index)
        return card

    def draw_hand(self, np_random):
        """Draw two cards for a hand."""
        return [self.draw_card(np_random), self.draw_card(np_random)]

    def getRemainingDecks(self):
        """Return the number of remaining decks in the shoe."""
        return round(len(self.deck) / 52 * 2) / 2

    @staticmethod
    def cmp(a, b):
        """Compare player and dealer scores."""
        return float(a > b) - float(a < b)

    @staticmethod
    def usable_ace(hand):
        """Check if the hand contains a usable ace."""
        return 1 in hand and sum(hand) + 10 <= 21

    @staticmethod
    def sum_hand(hand):
        """Return the sum of the hand."""
        if BlackjackEnv.usable_ace(hand):
            return sum(hand) + 10
        return sum(hand)

    @staticmethod
    def is_bust(hand):
        """Check if the hand is a bust (greater than 21)."""
        return BlackjackEnv.sum_hand(hand) > 21

    @staticmethod
    def score(hand):
        """Return the score of the hand."""
        return 0 if BlackjackEnv.is_bust(hand) else BlackjackEnv.sum_hand(hand)

    @staticmethod
    def is_natural(hand):
        """Check if the hand is a natural blackjack (Ace + 10-value card)."""
        return sorted(hand) == [1, 10]


def train_agent(env, agent, num_episodes):
    """Train the Q-learning agent."""
    for episode in range(num_episodes):
        state = env.reset()
        done = False
        
        while not done:
            action = agent.get_action(state)
            next_state, reward, done, _ = env.step(action)
            agent.update(state, action, reward, next_state, done)
            state = next_state

            # Check if money is 0 or remaining decks are less than 1
            if env.money <= 0 or env.getRemainingDecks() < 1:
                env.reset()  # Reset environment if money is 0 or decks are less than 1
                break
        
        if episode % 10000 == 0:
            print(f"Episode {episode} completed")


def evaluate_agent(env, agent, num_episodes=10000):
    """Evaluate the trained agent."""
    total_reward = 0
    for _ in range(num_episodes):
        state = env.reset()
        done = False
        while not done:
            action = np.argmax(agent.q_values[state])
            state, reward, done, _ = env.step(action)
            total_reward += reward
    return total_reward / num_episodes


if __name__ == "__main__":
    env = BlackjackEnv()
    agent = BlackjackQLearningAgent()
    
    # Train the agent
    num_episodes = 100000
    print("Training agent...")
    train_agent(env, agent, num_episodes)
    
    # Test the agent after training
    print("Evaluating agent...")
    avg_reward = evaluate_agent(env, agent)
    print(f"Average reward over 10,000 episodes: {avg_reward:.4f}")


Training agent...
Episode 0 completed
Episode 10000 completed
Episode 20000 completed
Episode 30000 completed
Episode 40000 completed
Episode 50000 completed
Episode 60000 completed
Episode 70000 completed
Episode 80000 completed
Episode 90000 completed
Evaluating agent...
Average reward over 10,000 episodes: -0.0403
