In [1]:
"""
21点环境
=======
state: [玩家手牌列表], 庄家明牌
action: {叫牌(0)，停牌(1)}
reward: 胜利 1，失败 -1，平局 0
"""
import numpy as np


class BasePolicy:
    """策略基类
    """

    def act(self, obs):
        raise NotImplementedError('Policy.act Not Implemented')


class DealerPolicy(BasePolicy):
    """庄家策略

    手牌小于17要牌，否则停止
    """

    def act(self, obs):
        if obs < 17:
            return 0
        else:
            return 1

In [2]:
def max_point(traj):
    """工具函数，计算一个牌列表的最大点数

    Parameters
    ----------
    traj : list of int
        牌列表
    
    Returns
    -------
    point : int
        最大点数
    """
    s = 0
    num_ace = 0
    for card in traj:
        if card == 1:
            num_ace += 1
            s += 11
        else:
            s += card

    while s > 21 and num_ace > 0:
        s -= 10
        num_ace -= 1
        
    return s

In [7]:
class BlackJack:
    def __init__(self):
        super().__init__()

        # 动作空间
        # 0: 要牌
        # 1: 停止
        self.action_space = (0, 1)

        # 游戏状态:
        # 0 玩家抽卡阶段，玩家停止抽卡时进入下一阶段
        # 1 庄家抽卡阶段
        # 2 结算阶段
        self.state = 0

        # 玩家与庄家的卡
        self.player_trajectory = []
        self.dealer_trajectory = []

        # 庄家策略
        self.dealer_policy = DealerPolicy()

    def reset(self):
        # 给每人发两张卡
        self.player_trajectory = []
        self.player_trajectory.append(self._get_card())
        self.player_trajectory.append(self._get_card())
        self.dealer_trajectory = []
        self.dealer_trajectory.append(self._get_card())
        self.dealer_trajectory.append(self._get_card())

        self.state = 0

        return self._get_obs()

    def step(self, action):
        if action == 0:  # 玩家抽卡
            assert self.state == 0, '只能在 0 状态抽卡'

            # 抽卡
            self.player_trajectory.append(self._get_card())

            # 检测是否爆牌
            if self._is_blast(self.player_trajectory):
                return self._get_obs(), -1, True, {}

            return self._get_obs(), 0, False, {}

        elif action == 1:  # 玩家停止要牌
            # 进入庄家决策阶段
            self.state += 1

            # 获取庄家观测
            dealer_obs = max_point(self.dealer_trajectory)
            # 庄家决策
            action = self.dealer_policy.act(dealer_obs)
            while action == 0:
                # 庄家抽牌
                self.dealer_trajectory.append(self._get_card())
                # 计算当前点数之和
                dealer_obs = max_point(self.dealer_trajectory)
                # 爆牌检测，如果庄家爆牌，玩家得到1的回报
                if self._is_blast(self.dealer_trajectory):
                    return self._get_obs(), 1, True, {}
                # 如果没有爆牌，根据当前点数计算新的动作
                action = self.dealer_policy.act(dealer_obs)

            # 庄家停止要牌，开始结算
            self.state += 1
            player_point = max_point(self.player_trajectory)
            dealer_point = max_point(self.dealer_trajectory)
            # 比较点数，计算回报
            if player_point > dealer_point:
                reward = 1
            elif player_point == dealer_point:
                reward = 0
            else:
                reward = -1
            return self._get_obs(), reward, True, {}

        else:
            raise ValueError('非法动作')

    def _get_card(self):
        """抽卡

        Returns
        -------
        int
            抽到的卡(1-10)
        """
        card = np.random.randint(1, 14)  # 原来有14张牌,但10,J,Q,K都表示为10
        card = min(card, 10)
        return card

    def _get_obs(self):
        """获取观测

        Returns
        -------
        Cards : list of int
            手牌列表
        Dealer's card 1 : int
            庄家的第一张牌
        """
        return (self.player_trajectory.copy(), self.dealer_trajectory[0])

    def _is_blast(self, traj):
        """检测是否爆牌

        Parameters
        ----------
        traj : list of int
            牌列表

        Returns
        -------
        blast : bool
            如果爆牌返回 True，否则 False
        """
        return max_point(traj) > 21

In [21]:
#进行一轮测试(随机策略)
env = BlackJack()  # 定义环境
observation = env.reset()  # 获得初始观测
print("起始观测 = {}".format(observation))
while True:
    print("玩家 = {}, 庄家 = {}".format(env.player_trajectory, env.dealer_trajectory))
    action = np.random.choice(len(env.action_space))
    print('动作 = {}'.format(action))
    observation, reward, done, _ = env.step(action)
    print("观测 = {}, 奖励 = {}, 是否结束 = {}".format(observation, reward, done))
    if done == True:
        break

起始观测 = ([3, 10], 3)
玩家 = [3, 10], 庄家 = [3, 9]
动作 = 0
观测 = ([3, 10, 3], 3), 奖励 = 0, 是否结束 = False
玩家 = [3, 10, 3], 庄家 = [3, 9]
动作 = 1
观测 = ([3, 10, 3], 3), 奖励 = 1, 是否结束 = True


In [35]:
def obtostate(observation):
    return (sum(observation[0]), observation[1])

In [36]:
def evaluate_monte_carlo(policy, env, episode_num = 500000):
    q = np.zeros_like(policy)  # 动作价值函数
    c = np.zeros_like(policy)  # 动作数目统计
    for _ in range(episode_num):
        observation = env.reset()
        state_actions = []
        while True:
            state = obtostate(observation)
            action = np.random.choice(len(env.action_space), p = policy[state])
            state_actions.append((state, action))
            observation, reward, done, _ = env.step(action)
            if done == True:
                break
        g = reward  # 因回报只和最终结果有关，故不需要统计过程中reward的值
        for state, action in state_actions:  # 蒙特卡罗法估计
            c[state][action] += 1
            q[state][action] += (g - q[state][action]) / c[state][action]
    return q

In [87]:
#训练
policy = np.zeros([22, 11, 2]) # 22:玩家可能的总点数和，11:庄家可能的总点数和(前面的22，11为状态数)，2:动作数
policy[20:, :, 1] = 1 # 当玩家点数大于20时，不可能再拿牌
policy[:20, :, 0] = 1 # 当玩家点数小于20时，必定再要牌
q = evaluate_monte_carlo(policy, env)

In [84]:
q = np.array(q)
v = (q * policy).sum(axis = -1)

In [85]:
#用该原始策略所获得的价值函数进行测试
observation = env.reset()  # 获得初始观测
print("起始观测 = {}".format(observation))
while True:
    state = obtostate(observation)
    action = np.argmax(q[state])
    observation, reward, done, _ = env.step(action)
    print("玩家 = {}, 庄家 = {}".format(env.player_trajectory, env.dealer_trajectory))
    print('动作 = {}'.format(action))
    print("观测 = {}, 奖励 = {}, 是否结束 = {}".format(observation, reward, done))
    if done == True:
        break

起始观测 = ([9, 7], 9)
玩家 = [9, 7], 庄家 = [9, 5, 10]
动作 = 1
观测 = ([9, 7], 9), 奖励 = 1, 是否结束 = True


In [None]:
def monte_carlo_with_exploring_start(env, episode_num=500000):
    policy = np.zeros([22, 11, 2])
    policy[:, :, 1] = 1
    q = np.zeros_like(policy)
    c = np.zeros_like(policy)
    for _ in range(episode_num):
        state

In [136]:
def monte_carlo_soft(env, episode_num = 500000, epsilon = 0.05):
    policy = np.ones([22, 11, 2]) * 0.5
    q = np.zeros_like(policy)
    c = np.zeros_like(policy)
    for _ in range(episode_num):
        observation = env.reset()
        state_actions = []
        while True:
            state = obtostate(observation)
            action = np.random.choice(len(env.action_space), p = policy[state])
            state_actions.append((state, action))
            observation, reward, done, _ = env.step(action)
            if done == True:
                break
        g = reward
        for state, action in state_actions:
            c[state][action] += 1
            q[state][action] += (g - q[state][action]) / c[state][action]
            a = np.argmax(q[state])  # 柔性策略
            policy[state] = epsilon / len(env.action_space)
            policy[state][a] += 1. - epsilon
    return q, policy

In [137]:
q, policy = monte_carlo_soft(env)

In [138]:
#测试
episode = 50000
success_time = 0
for _ in range(episode):
    observation = env.reset()  # 获得初始观测
#     print("起始观测 = {}".format(observation))
    while True:
        state = obtostate(observation)
        action = np.argmax(policy[state])
        observation, reward, done, _ = env.step(action)
#         print("玩家 = {}, 庄家 = {}".format(env.player_trajectory, env.dealer_trajectory))
#         print('动作 = {}'.format(action))
#         print("观测 = {}, 奖励 = {}, 是否结束 = {}".format(observation, reward, done))
        if done == True:
            break
    if reward == 1:
        success_time += 1
print("使用该策略的获胜率 = {}".format(success_time / episode))

使用该策略的获胜率 = 0.4063


In [None]:
class playerAgent:
    def __init__(self, env, polic):
        env = BlackJack()
        policy = np.ones([22, 11, 2]) * 0.5
        