# Monte Carlo Prediction - Blackjack

* **Description:** Perform first-visit and every-visit MC prediction for the Blackjack example
* **Reference:** Reinforcement Learning, An Introduction, Second Edition by Sutton, Barto
* **Section:** Section 5.1, Example 5.1, Pg. 93

# Import required libraries

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import math

from tqdm import tqdm
from collections import defaultdict

# Define classes and functions

## Class: Agent - Blackjack

In [None]:
class BJ_Agent(object):
    '''
    Defines the agent class for the Blackjack example
    Arguments:
        policy: Policy to use for the agent class
        actions: List of actions that the agent can take
    '''
    
    def __init__(self, policy, actions):
        self.policy = policy # Initial policy
        self.actions = actions # List of actions
        
    def step(self, state):
        '''
        Arguments:
            state: Dictionary containing current player sum, dealer shown card and ace type
        '''
        # Execute one step of agent based on current state
        if isinstance(self.policy, str):
            if(self.policy == 'stick_20_21_policy'): # stick when player sum = 20 or 21
                if ((state['player_sum'] == 20) or (state['player_sum'] == 21)):
                    action = self.actions[1] # stick
                else:
                    action = self.actions[0] # hit
        return action

## Class: Environment - Blackjack

In [None]:
class BJ_Environment(object):
    '''
    Defines the environment class for a n x n gridworld problem
    Arguments:
        n: Defines the size of the gridworld. n x n gridworld is generated
        reward: Reward value for each transition
    '''
    
    def __init__(self, dealer_hid_card, dealer_shn_card, player_init_cards, cards_list,\
                 card_value_dict, dealer_thresh):
        self.dealer_hid_card = dealer_hid_card
        self.dealer_shn_card = dealer_shn_card
        self.dealer_cards = np.append(self.dealer_hid_card, self.dealer_shn_card)
        self.dealer_thresh = dealer_thresh
        self.player_init_cards = player_init_cards
        self.cards_list = cards_list
        self.card_value_dict = card_value_dict
        self.player_cards = player_init_cards
        self.player_sum = sum([self.card_value_dict[card] for card in self.player_cards])
        self.dealer_sum = sum([self.card_value_dict[card] for card in self.dealer_cards])        
        self.game_over = 0
        self.update_states()
        
    def respond(self, action): # Respond to a particular action
        if (action == 'player_hit'): # Player Hit action
            player_next_card = np.random.choice(self.cards_list)
            self.player_cards = np.append(self.player_cards, player_next_card)
            self.player_sum = sum([self.card_value_dict[card] for card in self.player_cards])
        if (action == 'player_stick'): # Player stick action
            # Dealer hits until his sum becomes equal to or greater than self.dealer_thresh             
            while (self.dealer_sum <= self.dealer_thresh):
                dealer_next_card = np.random.choice(self.cards_list) # Next card for dealer
                self.dealer_cards = np.append(self.dealer_cards, dealer_next_card)
                self.dealer_sum = sum([self.card_value_dict[card] for card in self.dealer_cards])
        self.update_states()
        return self.state_int, self.state_vis
    
    def update_states(self):
        self.game_status_upd()
        self.state_int = {'dealer_hid_card': self.dealer_hid_card,
                          'dealer_shn_card': self.dealer_shn_card,
                          'dealer_cards': self.dealer_cards,
                          'player_init_cards': self.player_init_cards,
                          'player_cards': self.player_cards,
                          #'cards_list': self.cards_list,
                          #'card_value_dict': self.card_value_dict,
                          'game_over_flag': self.game_over,
                          'game_status': self.game_status,
                          'reward': self.reward
                         }
        self.state_vis = {'player_sum': self.player_sum,
                          'dealer_shn_card': self.dealer_shn_card,
                          'ace_type': 'usable',
                          'reward': self.reward
                         }
        
    def game_status_upd(self):
        if (self.player_sum > 21):
            self.game_over = 1
            self.game_status = 'Player_Bust_Player_Lose'
            self.reward = -1
        elif (self.dealer_sum > 21):
            self.game_over = 1
            self.game_status = 'Dealer_Bust_Player_Win'
            self.reward = 1
        elif (self.player_sum == 21):
            self.game_over = 1
            if (self.dealer_sum == 21):
                self.game_status = 'Draw'
                self.reward = 0
            else:
                self.game_status = 'Player_Win'
                self.reward = 1
        elif (self.dealer_sum == 21):
            self.game_over = 1
            self.game_status = 'Player_Lose'
            self.reward = -1
        else:
            self.game_over = 0
            self.game_status = 'Ongoing'
            self.reward = 0

## Function: iter_pol_eval

In [None]:
def iter_pol_eval(svf_init, actions_list, gw_envir, theta, plot_iter_ind, num_cols, plot = False):
    '''
    Run the iterative policy evaluation algorithm - Non in-place method
    Arguments:
        svf_init: Initial state-value function
        actions_list: List of actions that agent can take.
        gw_envir: Environment instance
        theta: Accuracy threshold at which to stop iteration (Pg. 97 of RL_Sutton) 
        plot_iter_ind: Iteration indices at which value functions will be plotted
        num_cols: Number of columns to use for plotting
        plot: boolean. If True, plot value functions
    '''

    num_plots = len(plot_iter_ind) + 2 # Plot initial and final value functions also
    num_rows = math.ceil(num_plots / num_cols) # Number of rows to use for plotting
    fig = plt.figure(figsize = ((num_cols * 3), (num_rows * 3.2)))
    fig.suptitle('State Value Functions at different iterations', fontsize = 30)
    sns.set(font_scale = 1.15)
    
    ind = 1 # Initialize iteration index 
    gw_size = int(np.sqrt(len(svf_init)))
    plot_ind = 1
    while(1): # Run iterative policy evaluation till convergence
        if (ind == 1):
            svf_curr = svf_init # Initialize current state value function in first iteration
        else:
            svf_curr = svf_next
        svf_next = np.zeros(len(svf_init)) # v_(k+1): Placeholder for next state value function.        
        # Loop through all states (leave out terminal states)
        for s in range(1, (len(svf_init) - 1)): 
            for act in actions_list: # Execute all actions for each state
                gw_envir.set_state(s)
                s_pr, r = gw_envir.respond(act) # Get next state and reward from environment
                # Note: Only one possible next state, reward for each s,a pair
                # So, p(s',r|s,a) = 1
                svf_next[s] += r + svf_curr[s_pr] # Update next state value function
            # For equiprobable random policy pi(a|s) = 1/(num_actions)
            svf_next[s] = svf_next[s] / len(actions_list)
        
        # Plot value function
        if (plot):
            if (ind == 1):
                ax = plt.subplot(num_rows, num_cols, (plot_ind))
                ax.set_title(f"Initial_Value_Function", fontsize = 15)
                svf_table_df = pd.DataFrame(svf_curr.reshape(gw_size, gw_size))
                sns.heatmap(svf_table_df, annot = True, cbar = False, square = True,\
                            cmap = 'Greys', vmin = 0, fmt = "0.1f", linewidths = 1,\
                            linecolor = 'black', xticklabels = False, yticklabels = False, ax = ax)
                plot_ind += 1
            if (ind in plot_iter_ind):
                ax = plt.subplot(num_rows, num_cols, (plot_ind))
                ax.set_title(f"Iteration: {ind}", fontsize = 15)
                svf_table_df = pd.DataFrame(svf_next.reshape(gw_size, gw_size))
                sns.heatmap(svf_table_df, annot = True, cbar = False, square = True,\
                            cmap = 'Greys', vmin = 0, fmt = "0.1f", linewidths = 1,\
                            linecolor = 'black', xticklabels = False, yticklabels = False, ax = ax) 
                plot_ind += 1
            
        # Compute delta
        svf_delta = svf_next - svf_curr
        delta = np.dot(svf_delta, svf_delta.T)
        if (delta < theta):
            break
        ind += 1
    
    if (plot):
        ax = plt.subplot(num_rows, num_cols, (plot_ind))
        ax.set_title(f"Final_Value_Function", fontsize = 15)
        svf_table_df = pd.DataFrame(svf_next.reshape(gw_size, gw_size))
        sns.heatmap(svf_table_df, annot = True, cbar = False, square = True,\
                    cmap = 'Greys', vmin = 0, fmt = "0.1f", linewidths = 1,\
                    linecolor = 'black', xticklabels = False, yticklabels = False, ax = ax) 
        
    return ind, svf_next

## Function: gen_episode

In [None]:
def gen_episode(envir, agent, init_state, term_states, verb = False):
    
    '''
    Generate one episode of agent-environment interaction
    Arguments:
        envir: Instance of environment to use for generating episode
        agent: Instance of agent to use for generating episode
        init_state: Initial state from which episode will start
        term_states: List of states that are considered terminal states
        verb: Boolean, controls verbosity
        
    Returns:
        states_list: List of states encountered in the episode (includes init_state and term_state)
        actions_list: List of actions take by agent at each time step
        rewards_list: List of rewards received by agent at each time step
    '''
    
    states_list = [] # Placeholder to store list of all states encountered in episode
    actions_list = [] # Placeholder to store list of all actions taken by agent
    rewards_list = [] # Placeholder to store list of rewards received at each step
        
    envir.set_state(init_state) # Initialize environment state
    states_list.append(envir.state) # Update states_list with initial state
    while(1): # Run agent till terminal states are reached
        s_t = envir.state # Retrieve current state of environment (State at time t)
        a_t = agent.step(s_t) # Action taken by agent at time t
        s_tplus1, rew_tplus1 = envir.respond(a_t) # Environment reponds to action a_t and moves
                                                  # to state s_tplus1 and
                                                  # returns a reward rew_tplus1
        states_list.append(s_tplus1) # Update states_list
        actions_list.append(a_t) # Update actions_list
        rewards_list.append(rew_tplus1) # Update rewards list
        
        # Stop episode if terminal state has been reached
        if (s_tplus1 in term_states): 
            if (verb):
                print("Terminal state reached.")
            break
            
    return states_list, actions_list, rewards_list        

# Initialize RL system

In [None]:
policy_init = 'stick_20_21_policy' # Set initial policy to stick_20_21
actions_list = ['player_hit', 'player_stick'] # List of actions of agent

# Define card list and card-value mapping
card_value_dict = {'A': 11, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9,
                   '10': 10, 'J': 10, 'Q': 10, 'K': 10
                  }
cards_list = list(card_value_dict.keys())
                   
# Get initial cards dealt to the dealer and player
dealer_hid_card = np.random.choice(cards_list) # Hidden card of dealer
dealer_shn_card = np.random.choice(cards_list) # Shown card of dealer
dealer_thresh = 17 # Threshold at which dealer sticks
player_init_cards = np.random.choice(cards_list, size = 2)

bj_agent = BJ_Agent(policy_init, actions_list) # Instantiate agent
bj_envir = BJ_Environment(dealer_hid_card, dealer_shn_card, player_init_cards, cards_list,\
                          card_value_dict, dealer_thresh) # Instantiate environment

game_over = bj_envir.state_int['game_over_flag']
while (game_over == 0):
    print(bj_envir.state_int)
    action = bj_agent.step(bj_envir.state_vis)
    print()
    print(action)
    bj_envir.respond(action)
    print()
    print(bj_envir.state_int)
    print()
    game_over = bj_envir.state_int['game_over_flag']

print()
print()
print(bj_envir.state_int)

# Run Iterative Policy Evaluation

In [None]:
svf_init = np.zeros(gw_size ** 2) # v_0: Initialize value function array to all zeros.
theta = 1e-5 # Accuracy threshold at which to stop iteration
num_cols = 4
plot_iter_ind = [1, 2, 3, 4, 50, 100]

# Run iterative policy evaluation
ind, svf_final = iter_pol_eval(svf_init, actions_list, gw_envir, theta, plot_iter_ind,\
                               num_cols, True)
print(f"Policy evaluation converged in {ind} steps")    

# Run Monte-Carlo Prediction

In [None]:
def mc_prediction(envir, agent, non_term_states, term_states, num_ep, visit_type):
    
    '''
    Function to run MC prediction and esti
    Arguments:
        envir: Instance of environment to use for generating episode
        agent: Instance of agent to use for generating episode
        init_state: Initial state from which episode will start
        term_states: List of states that are considered terminal states
        verb: Boolean, controls verbosity
        
    Returns:
        states_list: List of states encountered in the episode (includes init_state and term_state)
        actions_list: List of actions take by agent at each time step
        rewards_list: List of rewards received by agent at each time step
    '''
    
    # Initialize value function and count for all states to zero
    vpi = defaultdict(int) # State value function for give policy 'pi'
    count = defaultdict(int) # Number of times each state is encountered across episodes
    for state in non_term_states:
        vpi[state] = 0
        count[state] = 0
    
    # Loop through 'num_ep' episodes
    for ep in tqdm(range(num_ep)):
        init_state = np.random.choice(non_term_states)
        ep_states, ep_actions, ep_rewards = gen_episode(envir, agent, init_state, term_states)
        num_timesteps = len(ep_rewards) # Number of timesteps in current episode
        G = 0 # Initialize return to 0
        
        # Loop through each timestep of current episode
        for ind in range((num_timesteps - 1), -1, -1): 
            G = gamma * G + ep_rewards[ind] # Update return of current timestep
            curr_state = ep_states[ind] # Retrive state of current timestep
            if (visit_type == 'first'): # For first-visit MC prediction
                if (curr_state not in ep_states[0:ind]):
                    count[curr_state] += 1
                    vpi[curr_state] += (G - vpi[curr_state]) / count[curr_state]
        
    return vpi, count

In [None]:
# Initialize required variables
term_states = [0, 15] # List of terminal states
non_term_states = list(range(1, 15, 1)) # List of non-terminal states
gamma = 1 # Discount factor for return calculation
num_ep = 10000 # Number of episodes to run
visit_type = 'first'

# Run MC Prediction algorithm
vpi, count = mc_prediction(gw_envir, gw_agent, non_term_states, term_states, num_ep, visit_type)

# Print final value function
for key in vpi.keys():
    print("%0.1f" %vpi[key], end = ' ')

# Import required libraries

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from collections import defaultdict
from functools import partial
%matplotlib inline

import gym
#plt.style.use('ggplot')

# Define environment

In [2]:
env = gym.make('Blackjack-v0')

## Print Environment variables

In [3]:
print("Env.action_space:", env.action_space)
print("Env.observation_space:", env.observation_space)
print("Seed:", env.seed())
print("np_random:", env.np_random)
print("Natural:", env.natural)

Env.action_space: Discrete(2)
Env.observation_space: Tuple(Discrete(32), Discrete(11), Discrete(2))
Seed: [9256296792507305051]
np_random: RandomState(MT19937)
Natural: False


## Reset Environment

In [4]:
observation = env.reset()
print("Initial_Cards_With_Dealer:", env.dealer)
print("Initial_Cards_With_Player:", env.player)
print("Initial_Player_Score:", observation[0])
print("Dealer_Visible_Card:", observation[1])
print("Usable_Ace:", observation[2])

Initial_Cards_With_Dealer: [4, 10]
Initial_Cards_With_Player: [1, 4]
Initial_Player_Score: 15
Dealer_Visible_Card: 4
Usable_Ace: True


# Define policy

In [5]:
def sample_policy(observation):
    player_score, dealer_score, usable_ace = observation
    if player_score >= 20: # stick
        return 0
    else: # hit
        return 1

# Function: gen_episode

In [20]:
def gen_episode(policy, env):
    '''
    Function to generate one episode give an environment and a policy
    
    Arguments:
        policy: Policy to use for generating episode
        env: Environment to use for generating episode
    '''
    
    # Initialize lists for storing states, actions, and rewards
    states, actions, rewards = [], [], []
    # Initialize the gym environment
    observation = env.reset()
    
    while True: # Loop indefinitely
        
        states.append(observation) # Append current observation to states list
        action = policy(observation) # Take action based on policy
        actions.append(action) # Append current action to actions list
        
        # Take next step based on action and log reward obtained
        observation, reward, done, _ = env.step(action)
        rewards.append(reward)
        
        if done: # If terminal state, stop the episode
             break
                
    return states, actions, rewards

In [9]:
states, actions, rewards = gen_episode(sample_policy, env)

(23, 8, False) -1.0 True


In [11]:
print(env.player)
print(env.dealer)
print(done)

[6, 10, 7]
[8, 3]
True
