In [18]:
import gym
import random
import numpy as np

env = gym.make('Blackjack-v1')

obs_space = env.observation_space
action_space = env.action_space
print("The observation space: {}".format(obs_space))
print("The action space: {}".format(action_space))

The observation space: Tuple(Discrete(32), Discrete(11), Discrete(2))
The action space: Discrete(2)


In [19]:
win = 0
loss = 0
draw = 0
total = 0

for i in range(100000):
    # reset the environment and see the initial state
    state = env.reset()[0]
    # print("The initial observation is {}".format(obs))

    while True:
        # Sample a random action from the entire action space
        action = env.action_space.sample()

        # Take the action and get the new observation space
        state, reward, terminated, _, _ = env.step(action)
        # print("Take action {} new observation is {} reward {} terminated {} truncated {} info {}".format(random_action, obs, reward, terminated, truncated, info))

        if terminated:
            if reward == 1:
                win += 1
            elif reward == 0:
                draw += 1
            else:
                loss += 1
            total += 1
            break

win_rate = win / total
draw_rate = draw / total
loss_rate = loss / total
print("random move")
print(f"{win_rate=},{draw_rate=},{loss_rate=}")

win_rate=0.28207,draw_rate=0.04042,loss_rate=0.67751


In [23]:
def test_policy(policy):
    win = 0
    loss = 0
    draw = 0
    total = 0

    for i in range(100000):
        # reset the environment and see the initial state
        state = env.reset()[0]
        # print("The initial observation is {}".format(obs))

        while True:
            # Sample a random action from the entire action space
            action = policy(state)

            # Take the action and get the new observation space
            state, reward, terminated, _, _ = env.step(action)
            # print("Take action {} new observation is {} reward {} terminated {} truncated {} info {}".format(random_action, obs, reward, terminated, truncated, info))

            if terminated:
                if reward == 1:
                    win += 1
                elif reward == 0:
                    draw += 1
                else:
                    loss += 1
                total += 1
                break

    win_rate = win / total
    draw_rate = draw / total
    loss_rate = loss / total
    print(f"{win_rate=},{draw_rate=},{loss_rate=}")

In [21]:
test_policy(lambda s: 0 if s[0] >= 17 else 1)

win_rate=0.41197,draw_rate=0.10077,loss_rate=0.48726


In [24]:
def print_blackjack_policy(policy, useable_ace):
    print("useable ace" if useable_ace else "no useable ace")
    for hand in range(21,10,-1):
        for dealer in range(1,11):
            print(policy.get((hand, dealer, useable_ace), ' '), end='')
        print()
    print()

In [25]:
# Monte Carlo ES

policy = {}
Q = {}
returns = {}

for i in range(500000):
    state = env.reset()[0]
    action = random.choice([0,1])
    
    episode = [state,action]
    ret = 0
    while True:
        # Take the action and get the new observation
        state, reward, terminated, truncated, info = env.step(action)
        episode.append(state)
        
        if terminated:
            ret = reward
            break
        
        action = policy.get(state, 0 if state[0] >= 17 else 1)
        episode.append(action)
    
    seen_pairs = set()
    for p in range(len(episode) // 2):
        s,a = episode[2*p], episode[2*p+1]
        if (s,a) in seen_pairs: # continue makes this code first-visit MCES
            continue
        seen_pairs.add((s,a))
        
        G = ret
        rets = returns.get((s,a),[])
        rets.append(G)
        returns[(s,a)] = rets
        
        Q[(s,a)] = sum(returns[(s,a)])/len(returns[(s,a)])
    
    for p in range(len(episode) // 2):
        s = episode[2*p]
        
        action_values = []
        for a in [0,1]:
            action_values.append(Q.get((s,a),0))
        
        policy[s] = [0,1].index(np.argmax(action_values))

MCES_policy = policy
MCES_Q = Q
print_blackjack_policy(policy,True)
print_blackjack_policy(policy,False)


useable ace
0000000000
0000000000
0000000000
0001100011
1111111111
1111111111
1111111111
1111111111
1111111111
1111111111
          

no useable ace
0000000000
0000000000
0000000000
0000000000
1000000000
1000001111
1000001111
1000001111
1000001111
1110101111
1111111111



In [26]:
def epsilon_greedy_policy(state,Q,epsilon,action_space):
    if random.uniform(0,1) < epsilon:
        return random.choice(action_space)
    else:
        q_values = []
        for action in action_space:
            q_values.append(Q[state,action])
        return action_space[np.argmax(q_values)]


In [27]:
def get_greedy_policy(Q,action_space):
    policy = {}
    for hand in range(32):
        for dealer in range(11):
            for useable_ace in range(2):
                state = (hand, dealer, useable_ace)
                q_values = []
                for action in action_space:
                    q_values.append(Q[state, action])
                policy[state] = np.argmax(q_values)

    return policy

In [28]:
# SARSA

action_space = [0,1]
alpha = 0.1
epsilon = 0.1
gamma = 0.1
Q = {((h,d,ua),a): 0 for h in range(32) for d in range(11) for ua in range(2) for a in range(2)}

for i in range(500_000):
    S = env.reset()[0]
    A = epsilon_greedy_policy(S,Q,epsilon,action_space)
    
    while True:
        # Take the action and get the new observation
        S2, R, terminated, truncated, info = env.step(A)
        A2 = epsilon_greedy_policy(S2,Q,epsilon,action_space)
        
        Q[S,A] = Q[S,A] + alpha * (R + gamma * Q[S2,A2] - Q[S,A])
        
        S = S2
        A = A2
        
        if terminated:
            break

SARSA_policy = get_greedy_policy(Q,action_space)
SARSA_Q = Q

print_blackjack_policy(SARSA_policy,True)
print_blackjack_policy(SARSA_policy,False)

    

useable ace
0000000000
1000000000
1110100011
1101111111
1111111111
1111111111
1111111111
1111111111
1111111111
1111111111
0000000000

no useable ace
0000000000
0000000000
0000000000
0000000000
1000000000
1100001111
1001001111
1101101111
1110101011
1111011111
1111111111



In [29]:
# Q Learning

action_space = [0,1]
alpha = 0.1
epsilon = 0.1
gamma = 0.1
# !!!!! GAMMA is small because episodes are short. Shouldn't value the likely bust that is coming in 2 moves.
Q = {((h,d,ua),a): 0 for h in range(32) for d in range(11) for ua in range(2) for a in range(2)}

for i in range(500_000):
    S = env.reset()[0]
    
    while True:
        # get action from epsilon greedy
        A = epsilon_greedy_policy(S,Q,epsilon,action_space)
        
        # Take the action and get the new observation
        S2, R, terminated, truncated, info = env.step(A)
        
        q_values = []
        for action in action_space:
            q_values.append(Q.get((S2,action),0))
        max_Q = np.max(q_values)

        current_Q = Q.get((S,A),0)
        Q[(S,A)] = (1 - alpha) * current_Q + alpha * (R + gamma * max_Q)
        
        S = S2
        
        if terminated:
            break

QLearning_policy = get_greedy_policy(Q,action_space)
QLearning_Q = Q
print_blackjack_policy(QLearning_policy,True)
print_blackjack_policy(QLearning_policy,False)

    

useable ace
0000000000
1000000000
1100110011
1111100111
1111110111
1111111111
1111111111
1101111111
1111111111
1111111111
0000000000

no useable ace
0000000000
0000000000
0000000000
0000000000
1000000010
1000000111
1000001111
1101111111
1111101111
1011111111
1111111111



In [30]:
test_policy(MCES_policy.get)
test_policy(SARSA_policy.get)
test_policy(QLearning_policy.get)


win_rate=0.43242,draw_rate=0.0897,loss_rate=0.47788
win_rate=0.42572,draw_rate=0.09437,loss_rate=0.47991
win_rate=0.42629,draw_rate=0.09154,loss_rate=0.48217
