# Import Python modules

In [1]:
from __future__ import division, print_function
from dy.DiscrFinState_DiscrInfinTime_DiscValue import Prob
from itertools import product
from numpy import allclose, amax, argmax, array, maximum, zeros
from pandas import DataFrame

In [2]:
from HelpyFuncs.Print import print_live_in_ipython
print_live_in_ipython()

# Set CONSTANTS

In [3]:
NB_STATES = 13  
NB_CONTROLS = 3

PURCHASE_PROBS = \
    array(
        [[ 50,  45,  40,  35,  30,  25, 20, 15, 10,  7,  4,  2,  0],
         [110,  92,  80,  69,  59,  50, 42, 35, 29, 24, 20, 17, 15],
         [220, 180, 160, 140, 120, 100, 80, 70, 60, 50, 40, 30, 20]]).T / 1e3

DISCOUNT_FACTOR = .99

ZERO_STATE_TRANSITION_MATRIX = zeros([NB_STATES, NB_STATES])

ZERO_VALUE_VECTOR = zeros([NB_STATES, 1])

ZERO_STATE_CONTROL_VALUE_MATRIX = zeros([NB_STATES, NB_CONTROLS])

# _E-Right-Way_ Set-Ups

In [4]:
def state_transition_prob_matrix(policy):
    p = ZERO_STATE_TRANSITION_MATRIX.copy()
    for state in range(NB_STATES):
        control = policy[state]
        purchase_prob = PURCHASE_PROBS[state, control]
        p[state, 0] = purchase_prob
        p[state, min(state + 1, 12)] = 1. - purchase_prob
    return p

def expected_value_per_stage(state, control):
    return PURCHASE_PROBS[state, control] * [8., 7., 5.][control] - (control > 0) * .5

def expected_values_per_stage(policy):
    return array([[expected_value_per_stage(state, policy[state])]
                  for state in range(NB_STATES)])

def bellman_op(terminal_expected_values=ZERO_VALUE_VECTOR, return_policy=False):
    expected_value_matrix = ZERO_STATE_CONTROL_VALUE_MATRIX.copy()
    for control in range(NB_CONTROLS):
        policy = NB_STATES * [control]
        expected_value_matrix[:, control] = \
            (expected_values_per_stage(policy) + \
                DISCOUNT_FACTOR * state_transition_prob_matrix(policy).dot(terminal_expected_values))[:, 0]
    if return_policy:
        return argmax(expected_value_matrix, axis=1)
    else:    
        return amax(expected_value_matrix, axis=1, keepdims=True)

ERightWay = \
    Prob(
        nb_states=NB_STATES,
        state_transition_prob_matrix_func=state_transition_prob_matrix,
        expected_values_per_stage_func=expected_values_per_stage,
        bellman_op=bellman_op)

# Value Iteration

In [5]:
value_iteration_results = ERightWay.value_iteration()

print('\nOptimal Solution Determined by Value Iteration:')
value_iteration_results

Running Value Iteration #1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, done!

Optimal Solution Determined by Value Iteration:


Unnamed: 0,control,expected_value
0,2,3.856485
1,2,3.129462
2,2,2.515718
3,2,1.929857
4,2,1.403996
5,2,0.970908
6,0,0.661192
7,0,0.437885
8,0,0.267259
9,0,0.152107


# Policy Iteration

In [6]:
policy_iteration_results = ERightWay.policy_iteration()

print('\nOptimal Solution Determined by Policy Iteration:')
policy_iteration_results

Running Policy Iteration #1, 2, 3, done!

Optimal Solution Determined by Policy Iteration:


Unnamed: 0,control,expected_value
0,2,4.09584
1,2,3.331096
2,2,2.678998
3,2,2.054816
4,2,1.492157
5,2,1.025056
6,0,0.684998
7,0,0.45266
8,0,0.275691
9,0,0.156493


# _APPENDIX: Note on Numerical Inaccuracies by Matrix Inversion_

_Note that the Expected Values produced by the Value Iteration procedure are accurate, while those produced by the Policy Iteration procedure, which involves inverting very sparse matrices, are not._

# _APPENDIX: Code for Discrete-Finite-State, Discrete-Infinite-Time, Discounted-Value Dynamic Programming_

In [7]:
from __future__ import print_function
from numpy import allclose, array, identity, zeros
from numpy.linalg import inv
from pandas import DataFrame


class Prob:
    def __init__(
            self,
            nb_states=1,
            state_transition_prob_matrix_func=lambda policy: array([[1.]]),
            expected_values_per_stage_func=lambda policy: array([[0.]]),
            discount_factor=.999,
            bellman_op=lambda terminal_expected_value: array([[0.]])):
        self.nb_states = nb_states
        self.state_transition_prob_matrix_func = state_transition_prob_matrix_func
        self.expected_values_per_stage_func = expected_values_per_stage_func
        self.discount_factor = discount_factor
        self.bellman_op = bellman_op

    def expected_values(self, policy):
        return inv(identity(self.nb_states) -
                   self.discount_factor * self.state_transition_prob_matrix_func(policy))\
            .dot(self.expected_values_per_stage_func(policy))

    def value_iteration(self, init_values=None, rtol=1e-5, atol=1e-8):
        if init_values:
            curr_values = init_values
        else:
            curr_values = zeros((self.nb_states, 1))
        prev_values = None
        i = 0
        print('Running Value Iteration #', end='')
        while (prev_values is None) or (not allclose(curr_values, prev_values, rtol=rtol, atol=atol)):
            i += 1
            print(i, end=', ')
            prev_values = curr_values.copy()
            curr_values = self.bellman_op(terminal_expected_values=prev_values, return_policy=False)
        print('done!')
        d = DataFrame(dict(control=tuple(self.bellman_op(terminal_expected_values=curr_values, return_policy=True))))
        d['expected_value'] = curr_values.flatten()
        return d

    def policy_iteration(self, init_policy=None):
        if init_policy:
            curr_policy = init_policy
        else:
            curr_policy = self.nb_states * (0,)
        prev_policy = None
        identity_matrix = identity(self.nb_states)
        cached_matrix_inverses = {}
        i = 0
        print('Running Policy Iteration #', end='')
        while (prev_policy is None) or (curr_policy != prev_policy):
            i += 1
            print(i, end=', ')
            prev_policy = curr_policy
            if prev_policy in cached_matrix_inverses:
                matrix_inverse = cached_matrix_inverses[prev_policy]
            else:
                matrix_inverse = \
                    inv(identity_matrix - self.discount_factor * self.state_transition_prob_matrix_func(prev_policy))
                cached_matrix_inverses[prev_policy] = matrix_inverse
            expected_values = matrix_inverse.dot(self.expected_values_per_stage_func(prev_policy))
            curr_policy = tuple(self.bellman_op(terminal_expected_values=expected_values, return_policy=True))
        print('done!')
        d = DataFrame(dict(control=curr_policy))
        d['expected_value'] = expected_values.flatten()
        return d

# _APPENDIX: Brute-Force Verification of Optimal Solution_

In [8]:
optimal_expected_values = zeros([NB_STATES, 1])

print('Brute-Force Search Progress: ', end='')
nb_policies = NB_CONTROLS ** NB_STATES
i = 0
for policy in product(*(NB_STATES * [range(NB_CONTROLS)])):
    i += 1
    if not(i % 30000):
        print('%.1f%%' % (100 * i / nb_policies), end=', ')
    expected_values = ERightWay.expected_values(policy)
    optimal_expected_values = maximum(optimal_expected_values, expected_values)
    if allclose(expected_values, optimal_expected_values):
        optimal_policy = policy
print('done!\n')

print('Optimal Solutions Determined by Brute-Force Search:')
d = DataFrame(dict(control=optimal_policy))
d['expected_value'] = optimal_expected_values.flatten()
d

Brute-Force Search Progress: 1.9%, 3.8%, 5.6%, 7.5%, 9.4%, 11.3%, 13.2%, 15.1%, 16.9%, 18.8%, 20.7%, 22.6%, 24.5%, 26.3%, 28.2%, 30.1%, 32.0%, 33.9%, 35.8%, 37.6%, 39.5%, 41.4%, 43.3%, 45.2%, 47.0%, 48.9%, 50.8%, 52.7%, 54.6%, 56.5%, 58.3%, 60.2%, 62.1%, 64.0%, 65.9%, 67.7%, 69.6%, 71.5%, 73.4%, 75.3%, 77.1%, 79.0%, 80.9%, 82.8%, 84.7%, 86.6%, 88.4%, 90.3%, 92.2%, 94.1%, 96.0%, 97.8%, 99.7%, done!

Optimal Solutions Determined by Brute-Force Search:


Unnamed: 0,control,expected_value
0,2,4.09584
1,2,3.331096
2,2,2.678998
3,2,2.054816
4,2,1.492157
5,2,1.025056
6,0,0.684998
7,0,0.45266
8,0,0.275691
9,0,0.156493
