In [None]:
import numpy as np
import math
from random import randrange

def random_argmax(vector):
    """ Argmax that chooses randomly among eligible maximum indices. """
    m = np.amax(vector)
    indices = np.nonzero(vector == m)[0]
    if len(indices) == 0:
        raise IndexError(vector, m, indices)
    return np.random.choice(indices)

class CarRental():    
    def reset(self):
        self.nb_location = 2
        self.car_request = (3, 4) 
        self.car_return = (3, 2)
        self.state = [0, 0]
        self.rent_reward = 10
        self.transfer_cost = 2
        self.reward = 0
        self.location_capacity = 20
        return self.state, self.reward, False, {}
    
    def __init__(self):
        self.reset()
        class ActionSpace:
            def sample(self):
                return randrange(self.n+1)-self.location_capacity
        self.action_space = ActionSpace()
        self.action_space.n = self.location_capacity*2+1
        
    # For with statement
    def __enter__(self):
        return self

    def __exit__(self, exception_type, exception_value, traceback):
        self.close()
    
    def close(self):
        pass

    def rent_and_return(self, log = False):
        for i in range(self.nb_location):
            nb_rent_request = np.random.poisson(self.car_request[i], 1)[0]
            nb_return = np.random.poisson(self.car_return[i], 1)[0]
            if log: print("Rent, Return:", nb_rent_request, nb_return)
            # rent out cars
            nb_rent = nb_rent_request
            if (nb_rent_request > self.state[i]):
                if log: print("!!! Refusing", nb_rent_request - self.state[i], "rentals on location", i)
                nb_rent = self.state[i]
            if log: print("Renting", nb_rent, "from location ", i, "nb cars:", self.state[i])
            self.state[i] -= nb_rent
            self.reward += self.rent_reward * nb_rent
            if log: print("State:", self.state)
            if log: print("Reward:", self.reward)
            # return cars
            self.state[i] = min(self.location_capacity, self.state[i] + nb_return)
            if log: print("Return", nb_return, "on location", i, "nb cars:", self.state[i])

    def step(self, action):
        self.perform_action(action)
        return tuple(self.state), self.reward, False, {}
    
    def perform_action(self, action):
        state, cost = self.get_action_output(self.state, action)
        self.state = list(state)
        self.reward -= cost
        
    def get_action_output(self, state, action):
        out_state = [0, 0]
        if (action > 0):
            effective_action = min(action, state[0])
        elif (action < 0):
            effective_action = -min(-action, state[1])
        else:
            return tuple(state), 0
        out_state[0] = min(state[0] - effective_action, self.location_capacity)
        out_state[1] = min(state[1] + effective_action, self.location_capacity)
        cost = self.transfer_cost * abs(action)
        if (action >= 1):
            cost -= 2
        if abs(effective_action) != abs(action):
            cost += 10*abs(abs(action)-abs(effective_action))
        return tuple(out_state), cost
    
    def expected_reward(self, state):
        shop1ProbRewardTuples = []
        p_r_1 = 0.0
        for i in range(state[0]+1):
            r = self.rent_reward * i
            l_ambda = self.car_request[0]
            p_r = math.pow(l_ambda, i) * math.exp(-l_ambda) / math.factorial(i)
            p_r_1 += p_r
            shop1ProbRewardTuples.append((r, p_r))
        shop1ProbRewardTuples.append((self.rent_reward*state[0], 1-p_r_1))
        
        shop2ProbRewardTuples = []
        p_r_2 = 0.0
        for i in range(state[1]+1):
            r = self.rent_reward * i
            l_ambda = self.car_request[1]
            p_r = math.pow(l_ambda, i) * math.exp(-l_ambda) / math.factorial(i)
            p_r_2 += p_r
            shop2ProbRewardTuples.append((r, p_r))
        shop2ProbRewardTuples.append((self.rent_reward*state[1], 1-p_r_2))
        
        probs = []
        probRewardTuples = []
        for j, k in np.ndindex((len(shop1ProbRewardTuples), len(shop2ProbRewardTuples))):
            p1, p2 = shop1ProbRewardTuples[j][1], shop2ProbRewardTuples[k][1]
            probs.append(p1*p2)
            r1, r2 = shop1ProbRewardTuples[j][0], shop2ProbRewardTuples[k][0]
            probRewardTuples.append((r1+r2, p1*p2))
        
        sum_probs = sum(probs)
        if abs(sum_probs-1) > 0.0001:
            raise ValueError("Probs should sum up to 1", sum_probs, probs)
        return probRewardTuples

In [None]:
#environment specific functions

def states():
    for s1, s2 in np.ndindex((21, 21)):
        yield (s1, s2)
        
def init_Vs(V_s):
    for state in states():
        if state not in V_s:
            V_s[state] = 0.0
    
def p_next(env, s, a):
    """
    Returns a list of tuples (s', r, P(s', r|s, a), a)
    Containing the probability P(s', r) of reaching state s' with reward r
    when taking action a from state s
    """
    next_state, cost = env.get_action_output(s, a)
    probRewardList = env.expected_reward(next_state)
    return [ (next_state,)+(pr[0]-cost, pr[1], a) for pr in probRewardList ]

In [None]:
env = CarRental()
V_s = {}
gamma = 0.9
state, reward, done, info = env.reset()

def value_of_state(env, V_s, state):
    actions = exhaustive_greedy_policy(env, V_s, state)
    v = 0.0
    for action in actions:
        next_state, _ = env.get_action_output(state, action)
        rewardsDistribution = p_next(env, next_state, action)
        for (next_state, reward, prob, a) in rewardsDistribution:
            if a == action:
                v += prob*(reward+gamma*V_s[next_state])/len(actions)
    return v

def epsilon_greedy_policy(env, V_s, state, epsilon):
    if np.random.rand(1) < epsilon:
        action = self.env.action_space.sample()
        return action
    return greedy_policy(env, V_s, state)

def Q(env, V_s, state):
    values = []
    for action in range(-20, 21):
        next_state, _ = env.get_action_output(state, action)
        rewardsDistribution = p_next(env, next_state, action)
        v = 0.0
        for (next_state, reward, prob, a) in rewardsDistribution:
            if a == action:
                v += prob*(reward+gamma*V_s[next_state])
        values.append(v)
        #values.append(V_s[next_state])
    if len(values) == 0:
        raise RangeError(values)
    return values

def exhaustive_greedy_policy(env, V_s, state):
    values = Q(env, V_s, state)
    return [i-20 for i, v in enumerate(values) if v==max(values)]
    
def greedy_policy(env, V_s, state):
    values = Q(env, V_s, state)
    return random_argmax(values)-20

def fixed_greedy_policy(env, V_s, state):
    values = Q(env, V_s, state)
    return np.argmax(values)-20

def update_V_s(env, V_s):
    fixedV_s = V_s.copy()
    for state in states():
        v = value_of_state(env, fixedV_s, state)
        if math.isnan(v) == False:
            V_s[state] = v

def policy_udate(env, V_s):
    pi = {}
    pi_previous = {}
    hasChange = True
    while hasChange:
        for state in states():
            pi[state] = greedy_policy(env, V_s, state)
        update_V_s(env, V_s)
        hasChange = False
        for v in pi:
            if (v not in pi_previous) or (pi[v] != pi_previous[v]):
                hasChange = True
                if v in pi_previous:
                    print("In state {} changing action {} for {}".format(v, pi_previous[v], pi[v]))
                break
        pi_previous = pi.copy()

init_Vs(V_s)

In [None]:
policy_udate(env, V_s)

In [None]:
policy = {}
for state in states():
    choice = exhaustive_greedy_policy(env, V_s, state)
    print(state, value_of_state(env, V_s, state), choice)
    if len(choice) != 1:
        print("Many choices for state", state)
    policy[state] = choice[0]
    
def memorized_greedy(env, V_s, state):
    return policy[state]

In [None]:
def do_nothing_policy(*_):
    return 0

def random_policy(_, V_s, state):
    if state[0] == state[1]:
        return 0
    return randrange(-state[0], state[1])

def test_policy(env, nb_episodes, nb_steps, policy):
    rewards = []
    for i in range(nb_episodes):
        env.reset()
        for j in range(nb_steps):
            env.rent_and_return()
            action = policy(env, V_s, state)
            next_state, reward, done, info = env.step(action)
        rewards.append(reward)
    return sum(rewards)/len(rewards)

In [None]:
print("Average reward do nothing policy:", test_policy(env, 200, 100, do_nothing_policy))
print("Average reward random policy:    ", test_policy(env, 200, 100, random_policy))
print("Average reward optimal policy:   ", test_policy(env, 200, 100, memorized_greedy))