In [5]:
import gym
# import matplotlib.pyplot as plt
import numpy as np
from tqdm.notebook import tqdm, trange
# import seaborn as sns
from collections import Counter
import pandas as pd

In [6]:
env = gym.make('Blackjack-v1', natural=True)

## Часть первая, с блекджеком и стратегиями

**Задание 1**. Рассмотрим очень простую стратегию: говорить stand, если у нас на руках комбинация в 19, 20 или 21 очко, во всех остальных случаях говорить hit. 
Используйте методы Монте-Карло, чтобы оценить выигрыш от этой стратегии.

In [7]:
TASK_NUM = 500_000
STAND = 0
HIT = 1
ACTIONS = {0: 'STAND', 1: 'HIT'}

In [8]:
rewards = np.zeros(TASK_NUM)

for task_id in trange(TASK_NUM):
    state = env.reset()
    is_done = False
    card_sum = state[0]
    while not is_done:
        action = HIT if card_sum < 19 else STAND
        state, reward, is_done, _ = env.step(action)
        card_sum = state[0]
    rewards[task_id] = reward

print(f'Средний выигрыш от наивной стратегии: {rewards.mean()}')

  0%|          | 0/500000 [00:00<?, ?it/s]

Средний выигрыш от наивной стратегии: -0.19815


In [None]:
data = pd.DataFrame(sorted(Counter(rewards).items()), columns=['reward', 'count'])
sns.barplot(x='reward', y='count', data=data)

**Задание 2**. Реализуйте метод обучения с подкреплением без модели (можно Q-обучение, но рекомендую попробовать и другие, например Monte Carlo control) для обучения стратегии в блекджеке, используя окружение Blackjack-v0 из OpenAI Gym.

In [4]:
from collections import defaultdict
import itertools

In [5]:
ACTIONS = {0: 'STAND', 1: 'HIT'}

In [6]:
def epsilon_soft_policy(eps, Q, state):
    coin = np.random.choice([0, 1], p=[eps, 1 - eps])
    if coin == 0:
        return np.random.randint(len(Q[state]['actions']))
    else:
        return np.argmax(Q[state]['actions'])

In [7]:
def update_q(episode, gamma, Q, actions):
    G = 0
    for t in range(len(episode))[::-1]:
        state, reward = episode[t]
        G = gamma * G + reward
        Q[state]['actions'][actions[t]] = (reward - Q[state]['actions'][actions[t]]) / (Q[state]['count'][actions[t]] + 1)
        Q[state]['count'][actions[t]] += 1

In [8]:
def run(eps, gamma, action_dim, environment, task_num):
    Q = defaultdict(lambda: {'actions': np.zeros(action_dim), 'count': np.zeros(action_dim)})
    rewards = np.zeros(task_num)
    for task_id in trange(task_num):
        state_prev = environment.reset()
        episode = []
        actions = []
        is_done = False
        while not is_done:
            action = epsilon_soft_policy(eps, Q, state_prev)
            actions.append(action)
            state, reward, is_done, _ = environment.step(action)
            episode.append((state_prev, reward))
            state_prev = state
        rewards[task_id] = reward
        update_q(episode, gamma, Q, actions)
    return Q, rewards

In [9]:
def choose_best_params(action_dim, environment):
    gammas = np.linspace(0.5, 1, 10)
    epsilons = np.linspace(0.01, 0.5, 25)
    best_reward = -np.inf

    for gamma, eps in itertools.product(gammas, epsilons):
        Q, rewards = run(eps, gamma, action_dim=action_dim, environment=environment, task_num=TASK_NUM)
        if rewards.mean() > best_reward:
            best_gamma = gamma
            best_eps = eps
            best_reward = rewards.mean()
        print(f'Средний выигрыш при gamma={gamma}, eps={eps}: {rewards.mean()}')
    return best_eps, best_gamma, best_reward

In [21]:
def inference_q(best_eps, best_gamma, action_dim, environment):
    Q, rewards = run(eps=best_eps, gamma=best_gamma, action_dim=action_dim, environment=environment, task_num=500_000)
    print(f'Средняя награда во время обучения: {rewards.mean()}')
    rewards = np.zeros(TASK_NUM)
    for task_id in trange(TASK_NUM):
        state_prev = environment.reset()
        episode = []
        actions = []
        is_done = False
        while not is_done:
            action = epsilon_soft_policy(best_eps, Q, state_prev)
            actions.append(action)
            state, reward, is_done, _ = environment.step(action)
            state_prev = state
        rewards[task_id] = reward
    print(f'Средняя награда во время инференса: {rewards.mean()}')
    return Q

In [None]:
best_eps, best_gamma, best_result = choose_best_params(action_dim=2, environment=env)
print(f'Лучшие параметры: gamma={best_gamma}, eps={best_eps}, mean_reward={best_reward}')

In [11]:
best_gamma, best_eps = 1, 0.05

In [22]:
# eps = 0.05, gamma = 1
Q = inference_q(best_eps, best_gamma, action_dim=2, environment=env)

  0%|          | 0/500000 [00:00<?, ?it/s]

Средняя награда во время обучения: -0.18185


  0%|          | 0/100000 [00:00<?, ?it/s]

Средняя награда во время инференса: -0.16678


In [None]:
def draw_table(Q, usable_ace=True):
    index = np.unique([current_sum for current_sum, dealer_first_card, ace in Q.keys() if ace == usable_ace])
    index.sort()
    columns = np.unique([dealer_first_card for current_sum, dealer_first_card, ace in Q.keys() if ace == usable_ace])
    columns.sort()
    describe_table = pd.DataFrame(columns=columns, index=index).fillna('nan')
    count_table = pd.DataFrame(columns=columns, index=index).fillna(0)
    for state, value in Q.items():
        current_sum, dealer_first_card, ace = state
        if not ace == usable_ace:
            continue
        action = np.argmax(value['actions'])
        state_count = np.sum(value['count'])
        describe_table.loc[current_sum, dealer_first_card] = ACTIONS[action]
        count_table.loc[current_sum, dealer_first_card] = state_count
    plt.figure(figsize=(10, 7))
    sns.heatmap(data=count_table.astype(int), annot=describe_table, fmt='s')
    plt.title(f'Table for usable_ace={usable_ace}')
    plt.xlabel('Dealers first card')
    plt.ylabel('Current hand sum')

In [None]:
draw_table(Q)

In [None]:
draw_table(Q, False)

# TODO: add gridsearch

In [23]:
from gym.envs.toy_text.blackjack import BlackjackEnv, sum_hand, cmp, usable_ace, is_bust, is_natural, score
from gym import spaces

In [24]:
TASK_NUM = 500_000
ACTIONS = {0: 'STAND', 1: 'HIT', 2: 'DOUBLE'}

In [25]:
class DoubleBlackjackEnv(BlackjackEnv):
    def __init__(self, natural=False, sab=False):
        super().__init__(natural=natural, sab=sab)
        self.base_env = gym.make('Blackjack-v1', natural=True)
        self.action_space = spaces.Discrete(3)
    
    def reset(self):
        return self.base_env.reset()

    def step(self, action):
        assert self.action_space.contains(action)
        if action == 2:
            # double — удвоить ставку
            # при этом больше действий делать нельзя, игроку выдаётся ровно одна дополнительная карта, а выигрыш или проигрыш удваивается
            state, reward, is_done, _ = self.base_env.step(1)
            if is_done:
                return state, reward * 2, is_done, _
            else:
                state, reward, is_done, _ = self.base_env.step(0)
                return state, reward * 2, is_done, _
        else:
            return self.base_env.step(action)

In [26]:
env_double = DoubleBlackjackEnv(natural=True)

In [None]:
# eps = 0.05, gamma = 1
Q = inference_q(best_eps, best_gamma, action_dim=3, environment=env_double)

  0%|          | 0/500000 [00:00<?, ?it/s]

Средняя награда во время обучения: -0.197854


  0%|          | 0/500000 [00:00<?, ?it/s]

Средняя награда во время инференса: -0.18814


In [None]:
draw_table(Q)

In [None]:
draw_table(Q, False)

In [29]:
class HoffmanBlackjackEnv(BlackjackEnv):
    # 1 -> Ace, 10 -> Jack, Queen, King
    DECK = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 10, 10] * 4
    def __init__(self, natural=False, sab=False):
        super().__init__(natural=natural, sab=sab)
        self.action_space = spaces.Discrete(3)
        self.deck = self.DECK[:]
        # Stanford Wong's counting scheme
        self.counting_rule = {
            1: -1, 
            2: 0.5, 
            3: 1, 
            4: 1, 
            5: 1.5, 
            6: 1, 
            7: 0.5,
            8: 0,
            9: -0.5,
            10: -1,
        }
        self.deck_score = 0
    
    def reset(self):
        if len(self.deck) <= 15:
            self.deck_score = 0
            self.deck = self.DECK[:]
        self.dealer = self.draw_hand()
        self.player = self.draw_hand()
        return self._get_obs()
    
    def draw_hand(self):
        return [self.draw_card(), self.draw_card()]
    
    def draw_card(self):
        card = np.random.choice(self.deck)
        self.deck.remove(card)
        self.deck_score += self.counting_rule[card]
        return card
    
    def _get_obs(self):
        return (sum_hand(self.player), self.dealer[0], usable_ace(self.player), self.deck_score)
        
    def step(self, action):
        assert self.action_space.contains(action)
        if action == 2:
            # double — удвоить ставку
            # при этом больше действий делать нельзя, игроку выдаётся ровно одна дополнительная карта, а выигрыш или проигрыш удваивается
            done = True
            while sum_hand(self.dealer) < 17:
                self.dealer.append(self.draw_card())
            reward = cmp(score(self.player), score(self.dealer))
            reward *= 2
        elif action == 1:  # hit: add a card to players hand and return
            self.player.append(self.draw_card())
            if is_bust(self.player):
                done = True
                reward = -1.
            else:
                done = False
                reward = 0.
        else:  # stick: play out the dealers hand, and score
            done = True
            while sum_hand(self.dealer) < 17:
                self.dealer.append(self.draw_card())
            reward = cmp(score(self.player), score(self.dealer))
            if self.sab and is_natural(self.player) and not is_natural(self.dealer):
                # Player automatically wins. Rules consistent with S&B
                reward = 1.0
            elif (
                not self.sab
                and self.natural
                and is_natural(self.player)
                and reward == 1.0
            ):
                # Natural gives extra points, but doesn't autowin. Legacy implementation
                reward = 1.5
        return self._get_obs(), reward, done, {}

In [30]:
env_hoffman = HoffmanBlackjackEnv(natural=True)

In [31]:
# eps = 0.05, gamma = 1
Q = inference_q(best_eps, best_gamma, action_dim=3, environment=env_hoffman)

  0%|          | 0/500000 [00:00<?, ?it/s]

Средняя награда во время обучения: -0.110766


  0%|          | 0/100000 [00:00<?, ?it/s]

Средняя награда во время инференса: -0.10225


In [None]:
# eps = 0.05, gamma = 1
Q = run(EPSILON, GAMMA, action_dim=3, environment=env_hoffman)

In [None]:
draw_table(Q)