In [None]:
import time
import random
import sys
import os

import gym
from gym.envs.toy_text import blackjack

from IPython.display import clear_output
import pdb

import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
GPU_indx = 0
device = torch.device(GPU_indx if torch.cuda.is_available() else 'cpu')

Set up

In [None]:
env = gym.make('Blackjack-v1',new_step_api=True)
env.reset()
print(f'Action space: {env.action_space}. \n\n=> (1 = Hit, 0 = Bust)\n')
print('-----------------------------------------------\n')
print(f'Observation space: {env.observation_space} \n\n=> (Player hand sum, Dealer card, Usable ace)')

Action space: Discrete(2). 

=> (1 = Hit, 0 = Bust)

-----------------------------------------------

Observation space: Tuple(Discrete(32), Discrete(11), Discrete(2)) 

=> (Player hand sum, Dealer card, Usable ace)


Random test

In [None]:
def test_agent_random(env, num_episodes=1000):
    total_rewards = 0

    for episode in range(num_episodes):
        state = env.reset()  # 初始化环境，获取初始状态
        state = (state[0], state[1], state[2])  # 只取出需要的部分：player_sum, dealer_show, usable_ace
        done = False
        episode_reward = 0

        while not done:
            # 随机选择动作
            action = random.choice([0, 1])  # 假设动作空间是 [0, 1]

            # 与环境互动，获得下一个状态和奖励
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            next_state = (next_state[0], next_state[1], next_state[2])  # 只取出需要的部分

            # 更新总奖励
            episode_reward += reward

            # 更新状态
            state = next_state

        total_rewards += episode_reward

    average_reward = total_rewards / num_episodes
    return average_reward

average_reward_random = test_agent_random(env, num_episodes=1000)
print(f"Average reward over 1000 episodes with random policy: {average_reward_random}")


Average reward over 1000 episodes with random policy: -0.374


  if not isinstance(terminated, (bool, np.bool8)):


Q-learning

In [None]:
env = gym.make('Blackjack-v1',new_step_api=True)
Q = {}

for player_sum in range(4, 33):  # 玩家点数从4到32
    for dealer_show in range(1, 11):  # 庄家明牌从1到10
        for usable_ace in [True, False]:  # 是否有软牌
            Q[(player_sum, dealer_show, usable_ace)] = [0, 0]  # 0: hit, 1: stick

# Q-learning 参数
alpha = 0.1  # 学习率
gamma = 0.99  # 折扣因子
epsilon = 0.1  # 探索率
episodes = 1000  # 训练的回合数

# 动作选择策略（epsilon-greedy）
def epsilon_greedy(state):
    if random.uniform(0, 1) < epsilon:
        # 随机选择动作（探索）
        return env.action_space.sample()
    else:
        # 选择Q值最大的动作（利用）
        return np.argmax(Q[state])

# Q-learning 训练过程
for episode in range(episodes):
    state = env.reset()  # 初始化环境，获取初始状态
    state = (state[0], state[1], state[2])  # 只取出需要的部分：player_sum, dealer_show, usable_ace
    done = False
    while not done:
        # 使用 epsilon-greedy 策略选择动作
        action = epsilon_greedy(state)

        # 与环境互动，获得下一个状态和奖励
        next_state, reward,terminated, truncated, _ = env.step(action)
        next_state = (next_state[0], next_state[1], next_state[2])  # 只取出需要的部分

        # 如果 next_state 超出 Q 表的初始化范围，则返回到初始状态，避免错误
        if next_state[0] > 32:
            next_state = (32, next_state[1], next_state[2])  # 处理 player_sum 大于 21 的情况

        # 更新 Q 值
        current_q = Q[state][action]
        max_next_q = max(Q[next_state])

        # Q-learning 更新公式
        Q[state][action] = current_q + alpha * (reward + gamma * max_next_q - current_q)

        # 更新状态
        state = next_state

    # 每1000个回合打印一次进度
    if episode % 1000 == 0:
        print(f"Episode {episode}/{episodes}")

print("训练完成！")


  if not isinstance(terminated, (bool, np.bool8)):


In [None]:
def test_agent(env, Q, num_episodes=1000):
    total_rewards = 0

    for episode in range(num_episodes):
        state = env.reset()  # 初始化环境，获取初始状态
        state = (state[0], state[1], state[2])  # 只取出需要的部分：player_sum, dealer_show, usable_ace
        done = False
        episode_reward = 0

        while not done:
            # 使用 Q 表选择动作（利用）
            action = np.argmax(Q[state])

            # 与环境互动，获得下一个状态和奖励
            next_state, reward, done, _= env.step(action)
            next_state = (next_state[0], next_state[1], next_state[2])  # 只取出需要的部分

            # 更新总奖励
            episode_reward += reward

            # 更新状态
            state = next_state

        total_rewards += episode_reward

    average_reward = total_rewards / num_episodes
    return average_reward

average_reward = test_agent(env, Q, num_episodes=1000)
print(f"Average reward over 1000 episodes: {average_reward}")

Deep Q-learning

In [None]:
class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x


Average reward over 1000 episodes: -0.15


In [None]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        state, action, reward, next_state, done = zip(*random.sample(self.buffer, batch_size))
        return np.array(state), np.array(action), np.array(reward), np.array(next_state), np.array(done)

    def __len__(self):
        return len(self.buffer)

In [None]:
class DQNAgent:
    def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay

        # 定义 Q 网络和目标网络
        self.policy_net = DQN(state_dim, action_dim)
        self.target_net = DQN(state_dim, action_dim)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()

        # 定义优化器和损失函数
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
        self.loss_fn = nn.MSELoss()

        # 经验回放缓冲区
        self.buffer = ReplayBuffer(10000)

    def select_action(self, state):
        if random.random() < self.epsilon:
            return random.randint(0, self.action_dim - 1)  # 随机动作
        else:
            with torch.no_grad():
                state = torch.FloatTensor(state)
                q_values = self.policy_net(state)
                return torch.argmax(q_values).item()  # 选择 Q 值最大的动作

    def update(self, batch_size):
        if len(self.buffer) < batch_size:
            return

        # 从缓冲区中采样
        state, action, reward, next_state, done = self.buffer.sample(batch_size)
        state = torch.FloatTensor(state)
        action = torch.LongTensor(action)
        reward = torch.FloatTensor(reward)
        next_state = torch.FloatTensor(next_state)
        done = torch.FloatTensor(done)

        # 计算当前 Q 值
        current_q = self.policy_net(state).gather(1, action.unsqueeze(1))

        # 计算目标 Q 值
        with torch.no_grad():
            next_q = self.target_net(next_state).max(1)[0]
            target_q = reward + (1 - done) * self.gamma * next_q

        # 计算损失并更新网络
        loss = self.loss_fn(current_q.squeeze(), target_q)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # 更新 epsilon
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

    def update_target_net(self):
        self.target_net.load_state_dict(self.policy_net.state_dict())

In [None]:
env = gym.make('Blackjack-v1')
state_dim = 3  # player_sum, dealer_show, usable_ace
action_dim = 2  # 0: stick, 1: hit

agent = DQNAgent(state_dim, action_dim)
batch_size = 64
num_episodes = 1000
update_target_every = 10

for episode in range(num_episodes):
    state = env.reset()
    total_reward = 0

    while True:
        # 选择动作
        action = agent.select_action(state)

        # 执行动作
        next_state, reward, done, _ = env.step(action)

        # 存储经验
        agent.buffer.push(state, action, reward, next_state, done)

        # 更新状态
        state = next_state
        total_reward += reward

        # 更新网络
        agent.update(batch_size)

        if done:
            break

    # 更新目标网络
    if episode % update_target_every == 0:
        agent.update_target_net()

    print(f"Episode: {episode + 1}, Total Reward: {total_reward}, Epsilon: {agent.epsilon}")

In [None]:
def test_agent(env, agent, num_episodes=100):
    total_rewards = 0

    for episode in range(num_episodes):
        state = env.reset()
        episode_reward = 0

        while True:
            action = agent.select_action(state)
            next_state, reward, done, _ = env.step(action)
            episode_reward += reward
            state = next_state

            if done:
                break

        total_rewards += episode_reward
        print(f"Test Episode: {episode + 1}, Reward: {episode_reward}")

    average_reward = total_rewards / num_episodes
    print(f"Average Reward over {num_episodes} episodes: {average_reward}")

test_agent(env, agent)