In [2]:
# Sutton's book Chapter 5: Blackjack with Monte Carlo Exploring Start
import random
import numpy as np
from scipy.stats import beta
import matplotlib.pyplot as plt

DECK_OF_CARDS = [1,2,3,4,5,6,7,8,9,10,10,10,10] * 4 #J, Q, K count as 10.
DEALDER_STICK_THRESHOLD = 17 # same as Sutton's and Vega's rule: https://www.youtube.com/watch?v=SWdPf21v5Ak

def sum_hand(hand):
    min_s = sum(hand)
    max_s = sum(hand)
    if 1 in hand:
        max_s = min_s  + 10
    if max_s <= 21 and max_s > min_s:
        ## Has a usable ACE.
        return (max_s, 1)
    else:
        return (min_s, 0)
    
def busted(hand):
    (s, _) = sum_hand(hand)
    return s > 21    
    
def convert_cards_to_state(my_cards, dealer_card):
    (s, usable_ace) = sum_hand(my_cards)
    # state is 0-based, for easy indexing with ndarray.
    return (s-1, dealer_card-1, usable_ace)

In [3]:
class BasePlayer(object):
    # Return 1 if hit, 0 if stick.
    def action(self, cards, rival_card):
        raise NotImplementedError("action() is not implemented.")

class Dealer(BasePlayer):
    def __init__(self):
        self.stick_threshold = DEALDER_STICK_THRESHOLD
    def action(self, cards, rival_card):
        (s, _) = sum_hand(cards) #s>=1 
        return int(self.stick_threshold > s) #stick for 17 or greater, same as Sutton's 

In [4]:
class Policy(object):
    def __init__(self):
        # stats is a 5D tensor. 
        # The first 3 dimensions describe state. The 4th dimension describes
        # action (0 for stick, 1 for hit). The 5th dimension describes the
        # reward: {-1, 0, 1}.
        # self.states[i][j][k][a][r] saves counts of how many times we have
        # reward `r` when we take action `a` for state s = (i, j, k).
        self.stats = np.zeros((21, 10, 2, 2, 3), dtype='int64') # Init to 1 instead of 0 to smooth out.
        # q is a 4D tensor that represents q(s, a).
        self.q = np.zeros((21, 10, 2, 2), dtype='float64')
        # pi is a 4D tensor that represents pi(a|s), last dimension is the action
        self.pi = np.zeros((21, 10, 2, 2), dtype='float64')
        #pi[:,:,:,1] is the probability of hit while pi[:,:,:,0] is the probability of stick
        self.pi[0:11,:,:,:] = [0., 1.] #Always hit if the cards sum to [1,11]; note that this part of pi should never get updated
        self.pi[11:19,:,:,:]=[0.,1.] #same as Sutton's initial policy, hit if [12,19]; stick if 20,21
        self.pi[19,:,:,:] = [1., 0.] 
        
    # state_action_seq is a list of tuple. 
    def update(self, state_action_seq, final_reward):
        #print 'updating with state_action_seq:',state_action_seq, ' with reward: ', final_reward
        #update q(s,a) using the episode(ie., state_action_seq). 
        for state_action in state_action_seq:  
            (i,j,k,a) = state_action #i=sum(play_cards), 0<=i<=20, 0<=j<=9, k=0 or 1,  
            if i <= 10:
                # No update: we always hit.
                continue
            if i == 20:
                # No update: we always stick.
                continue
            # reward is in {-1, 0, 1}    
            self.stats[i,j,k,a,final_reward + 1] += 1
            # q stores average reward so far.
            # Denominator is the # of samples; the numerator is the net reward accumulated from +1 and -1 cases.
            self.q[i,j,k,a] = (self.stats[i,j,k,a,2] - self.stats[i,j,k,a,0]) * 1.0 / np.sum(self.stats[i,j,k,a,:])
            # If q[i,i,k,0] > q[i,j,k,1], argsort returns [1,0].
            # This is exactly how greedy algorithm updates policy.
        #greedy update pi(a|s) based on estimated q(s,a)
        self.pi[i,j,k,:] = np.argsort(self.q[i,j,k,:]) 
  
    def action(self, state):#greedy 
        return np.argmax(self.pi[*state])
    
    # Visualize matrix as a grayscale image, assuming entries are in [0,1].
    def imshow(self, ax, matrix, title):
        ax.set_title(title)
        ax.imshow(matrix, cmap='gray',vmin=0., vmax=1.)        

        ax.set_xlabel('Dealer Card')
        ax.set_xticks(np.arange(10))
        ax.set_xticklabels(np.arange(1,11,1).astype('S2'))
        
        ax.set_ylabel('Player Card')
        ax.set_yticks(np.arange(9))
        ax.set_yticklabels(np.arange(20,11,-1).astype('S2'))
        
        ax.spines['right'].set_color('none')
        ax.spines['top'].set_color('none')      
    
    # Visualize the hard decision matrix and the soft (Bayesian) decision matrix.
    def visualize(self):
        fig = plt.figure(figsize=(12, 9), dpi=80)
        
        usable_ace = 0
        ax = plt.subplot(221)
        decision = (self.pi[11:20,:,usable_ace,1] > self.pi[11:20,:,usable_ace,0]).astype(int)
        self.imshow(ax, np.flipud(decision), 'Decision (No usable Ace)')
        
        ax = plt.subplot(222)
        bayesian_decision = self.bayesian_decision(0)
        self.imshow(ax, bayesian_decision, 'Bayesian Decision (No usable Ace)')
        
        usable_ace = 1
        ax = plt.subplot(223)
        decision = (self.pi[11:,:,usable_ace,1] > self.pi[11:,:,usable_ace,0]).astype(int)
        self.imshow(ax, np.flipud(decision), 'Decision (with usable Ace)')

        ax = plt.subplot(224)
        bayesian_decision = self.bayesian_decision(usable_ace)
        self.imshow(ax, bayesian_decision, 'Bayesian Decision (with usable Ace)')

        plt.show()


In [5]:
class PolicyPlayer(BasePlayer):
    def __init__(self, policy):
        self.policy = policy
        self.state_action_pairs = []
    def action(self, cards, rival_card):
        s = convert_cards_to_state(cards, rival_card) #s is 0-based 
        a = self.policy.action(s)
        (i,j,k) = s  # 
        self.state_action_pairs.append((i,j,k,a))
        return a
    def update_policy(self, reward): #this combines the policy evaulation and improvement step;  
        self.policy.update(self.state_action_pairs, reward)
    def reset_states(self):
        self.state_action_pairs = []

In [7]:
# A simple debugger (to avoid clutter in main code.)
class Dbg:
    # level can be 0 or 1. (0 means silent.)
    def __init__(self, level):
        self.level = level
    def print_hands(self, game):
        if self.level == 0:
            return
        print 'dealer cards: ', game.dealer_cards 
        print 'player cards: ', game.player_cards
    def on_dealer_action(self, action):
        if self.level == 0:
            return
        print 'dealer action: ', action
    def on_player_action(self, action):
        if self.level == 0:
            return
        print 'player action: ', action
    def print_bust_status(self, dealer, player):
        if self.level == 0:
            return
        if player and dealer:
            print 'both busted'
        elif player:
            print 'player busted'
        elif dealer:
            print 'dealer busted'
        else:
            pass
    def print_sum_of_hands(self, dealer_sum, player_sum):
        if self.level == 0:
            return
        print 'dealer sum: {}, player sum: {}'.format(dealer_sum, player_sum)
        

In [8]:
class Game(object):
    """
    Shoe game as in https://www.youtube.com/watch?v=SWdPf21v5Ak
    """
    def __init__(self, dealer, player, debug_level):
        self.dealer = dealer
        self.player = player
        self.dealer_cards = []
        self.player_cards = []
        self.cards = DECK_OF_CARDS[:]
        #random.shuffle(self.cards) #Sutton assumes infinite cards. Use sample with replacement  instead. 
        self.card_index = 0
        self.dbg = Dbg(debug_level)
    def reset(self):
        self.player.reset_states()
        self.dealer_cards = []
        self.player_cards = []

    def on_player_action(self, action):
        if action == 1:
           #Sutton assumes infinite cards.
            self.player_cards.extend(np.random.choice(self.cards,size=1))
        self.dbg.on_player_action(action)
    def on_dealer_action(self, action):
        if action == 1:
            self.dealer_cards.extend(np.random.choice(self.cards,size=1))
        self.dbg.on_dealer_action(action)
    
    def check_bust(self):
        dealer_busted = busted(self.dealer_cards)
        player_busted = busted(self.player_cards)
        self.dbg.print_bust_status(dealer_busted, player_busted)
        return (dealer_busted, player_busted)
    
    # Returns the reward for the player. Can be one of {-1, 0, 1}.
    def compare_hands(self):
        d_sum, _ = sum_hand(self.dealer_cards)
        p_sum, _ = sum_hand(self.player_cards)        
        return np.sign(p_sum - d_sum)
        
    # Returns 1 iff player wins.    
    def play(self,dealer_init_cards, player_init_cards, a):
        """
        each play generates an episode starting from state the initial states and action 
        dealer_init_cards, player_init_cards: List[Int] of size 2
        a: player first action,  1 for hit ; 0 for stick, 
        """
        self.reset()
        

In [None]:
# quick sanity check of the game driver.
dealer = Dealer()
policy = Policy()
player = PolicyPlayer(policy)
game = Game(dealer, player, 1)
r = game.play()
print 'result: ', r


In [None]:
from IPython import display

class Learner:
    def __init__(self):
        self.player = PolicyPlayer(Policy())
    def play(self, num_of_training, debug_level):
        dealer = Dealer()
        mini_batch = 5 # We shuffle cards after 5 rounds of play.
        progress_bar_iters = 100 # This is the number of times we "report progress", e.g. refreshing dynamic plots.
        progress_bar_step_size = (num_of_training / mini_batch) / progress_bar_iters
        old_decision = np.zeros((9,10))
        old_q = np.zeros((9,10))
        epoch = []
        delta_q = []
        fig = plt.figure(figsize=(12, 6))
        j = 0
        for i in xrange(num_of_training / mini_batch):
            if i > 0 and progress_bar_iters > 0 and i % progress_bar_step_size == 0:
                plt.clf()
                if i < num_of_training / mini_batch - 1:
                    display.clear_output(wait=True)                
                epoch.append(j)
                j += 1
                new_q = self.player.policy.q[11:20,:,0,0]
                delta_q.append(np.max(np.abs(new_q - old_q)))
                old_q = np.copy(new_q)
                
                plt.subplot(211)
                plt.plot(epoch, delta_q, 'b-')
                plt.xlim(0, progress_bar_iters)
                plt.ylim(0, 1.1)
                
                plt.subplot(223)
                decision = (self.player.policy.pi[11:20,:,0,1] > self.player.policy.pi[11:20,:,0,0]).astype(int)
                plt.imshow(np.flipud(decision), cmap='gray', vmin=0., vmax=1.)
                plt.xlabel('Dealer Card')
                plt.xticks(np.arange(10), np.arange(1,11,1).astype('S2'))
                plt.ylabel('My Card')
                plt.yticks(np.arange(9), np.arange(20,11,-1).astype('S2'))
                ax = plt.gca()
                ax.spines['right'].set_color('none')
                ax.spines['top'].set_color('none')        
                plt.colorbar()                
                 
                display.display(plt.gcf())
  
            if debug_level > 0:
                print '\n\n === Game {} === \n\n'.format(i+1)
            game = Game(dealer, self.player, debug_level)
            
            # Play `mini_batch` number of games without shuffling.
            for i in xrange(mini_batch): #sgu: no shuffle? also the Sutton's example assume infinit cards(sample w/ replacement)
                r = game.play() 
                self.player.update_policy(r)
                self.player.reset_states()
                

In [None]:
learner = Learner()
learner.play(5000000, 0)


In [None]:
learner.player.policy.visualize()


In [None]:
# Debug
i,j,k = 16,6,1
policy = learner.player.policy
print policy.stats[i,j,k,:,:]
print policy.q[i,j,k,:]
print policy.pi[i,j,k]
policy.plot_beta((i,j,k))
