## Exercise 4.7 - Jack's Car Rental Problem


In [None]:
import numpy as np
from scipy.stats import poisson


def calculate_request_reward(cars_available, cars_requested):
    if cars_requested > cars_available:
        return -1
    return cars_requested * 10  # $10 per requested car


def compute_movement(n_cars_1, n_cars_2, n_moved, n_max=20):
    # assumes that moved cars are effectively available in source location
    return min(n_cars_1 - n_moved, n_max), min(n_cars_2 - n_moved, n_max)


# computes triples
def compute_possible_next_cars(
    current_cars,
    max_requested,
    max_returned,
    lambda_requested,
    lambda_returned,
    n_max=20,
):
    dist_req = poisson(mu=lambda_requested)
    dist_ret = poisson(mu=lambda_returned)

    # array that stores different outcomes, depending on requested and returned ammounts
    next_cars = np.ones((max_requested, max_returned)) * current_cars
    probs = np.zeros((max_requested, max_returned))
    rewards = np.zeros((max_requested, max_returned))

    for requested in range(max_requested):
        for returned in range(max_returned):
            next_cars[requested, returned] = np.clip(
                next_cars[requested, returned] - requested + returned, 0, n_max
            )
            probs[requested, returned] = dist_req.pmf(requested) * dist_ret.pmf(
                returned
            )
            rewards[requested, returned] = calculate_request_reward(
                current_cars, requested
            )
    return np.stack([probs.flatten(), next_cars.flatten(), rewards.flatten()], axis=-1)

Lets compute a single step in the environment dynamics. Let's say we can have a maximum of 10 cars in each location


In [None]:
# suppose we have 5 cars at location 1, 3 cars at location 2
n_cars_1 = 5
n_cars_2 = 3
# policy tells us to move 2 cars from location_1 to location_2
n_moved = 2
n_cars_1, n_cars_2 = compute_movement(n_cars_1, n_cars_2, n_moved, n_max=10)

# possible variations in reward and states
next_1_values = compute_possible_next_cars(
    n_cars_1, 10, 10, lambda_requested=3, lambda_returned=3
)
next_2_values = compute_possible_next_cars(
    n_cars_2, 10, 10, lambda_requested=4, lambda_returned=2
)

sample = next_1_values[23, :]
print(
    f"Number of cars in location 1 for next day (sample)\n{'-'*20}\nProbability: {sample[0]} | N_cars: {sample[1]} | Reward: {sample[2]}"
)

The next state, along with the probability and reward, is determined by combining next values in location 1 and location 2


In [None]:
import itertools


class TabularValueFunction:
    def __init__(self, nrows: int, ncols: int):
        self.v = np.zeros((nrows + 1, ncols + 1))
        self.states = list(
            itertools.product(np.arange(nrows + 1), np.arange(ncols + 1))
        )
        self.state_space = (nrows + 1, ncols + 1)

    def update_value(self, row, col, new_value):
        self.v[row, col] = new_value

    def get_value(self, row, col):
        return self.v[row, col]

    def get_states(self):
        return self.states
    
    def get_value_table(self):
        return self.v


class RowPolicy:
    def __init__(self, state_space: tuple[int, int]):
        self.state_space = state_space
        self.policy_values = np.zeros(state_space, dtype=int)

    def sample_action(self, row, col):
        return self.policy_values[row, col]
    
    def update_action(self, row, col, new_action):
        self.policy_values[row,col] = new_action

    def get_policy_table(self):
        return self.policy_values

In [None]:
from scipy.stats import poisson


class JacksCarProblem:
    # poisson distribution for request and return values
    lambda_request = (4, 3)
    lambda_return = (3, 2)
    # limit of cars in any location
    max_cars = 20
    # max car movements
    max_mvnt = 5

    def __init__(self):
        # request probability by location for upto 10 cars [CARS_REQUESTED, LOCATION]
        self.request_probs = np.stack(
            [
                poisson(mu=self.lambda_request[0]).pmf(np.arange(10)),
                poisson(mu=self.lambda_request[1]).pmf(np.arange(10)),
            ],
            axis=-1
        )
        # return probability by location for upto 10 cars [CARS_RETURNED, LOCATION]
        self.return_probs = np.stack(
            [
                poisson(mu=self.lambda_return[0]).pmf(np.arange(10)),
                poisson(mu=self.lambda_return[1]).pmf(np.arange(10)),
            ],
            axis=-1
        )

    def calculate_location_reward(self, cars_count:int, cars_requested:int):
        # in case the requested carse in any location exceed the available ones
        if cars_requested > cars_count:
            return -10
        # +$10 every requested car and -$3 every car moved 
        reward = cars_requested*10
        return reward


    def __compute_location_step(self,cars:int, cars_displaced:int, location_id: int):
        next_cars = np.zeros((10,10))
        probs = np.zeros((10,10))
        rewards = np.zeros((10,10))
        
        for cars_requested in range(10):
            for cars_returned in range(10):
                # compute probability of this number of cars requested and returned
                prob_requested = self.request_probs[cars_requested,location_id]
                prob_returned = self.return_probs[cars_returned, location_id]
                probs[cars_requested, cars_returned] = prob_requested*prob_returned

                # compute location reward based on available cars after displacement
                reward = self.calculate_location_reward(cars-cars_displaced, cars_requested)
                rewards[cars_requested, cars_returned] = reward

                # update car number. Clip to range [0,20]
                new_cars = min(max(cars - cars_displaced - cars_requested + cars_returned,0),20)
                next_cars[cars_requested, cars_returned] = new_cars
        return np.stack(
            [
                probs.flatten(), next_cars.flatten(), rewards.flatten()
            ],
            axis=-1
        )
    
    def __combine_locations(self, location_1_triple: np.ndarray, location_2_triple: np.ndarray):
        # combine values for all probable combinations of requested and returned cars in each location
        probs = []
        states = []
        rewards = []
        for location_1_result in location_1_triple:
            for location_2_result in location_2_triple:
                # compute this alternative probability
                result_prob = location_1_result[0] * location_2_result[0]
                new_state = (int(location_1_result[1]),int(location_2_result[1]))
                # punish when any location has more requests than cars
                if location_1_result[2] < 0  or location_2_result[2] < 0:
                    result_reward = -10
                else:
                    result_reward = location_1_result[2] + location_2_result[2]
                probs.append(result_prob)
                states.append(new_state)
                rewards.append(result_reward)
        return {
            "probs":np.array(probs),
            "states":np.array(states),
            "rewards":np.array(rewards)
        }

    
    def compute_step(self,cars_count:tuple[int], cars_moved: int):
        assert (cars_count[0]-cars_moved) >= 0 and (cars_count[1] + cars_moved) >= 0
        
        # compute step for each location
        step_location_1 = self.__compute_location_step(cars_count[0],cars_moved,0)
        step_location_2 = self.__compute_location_step(cars_count[1],-cars_moved,1)

        # combine results for each location
        combinations = self.__combine_locations(step_location_1, step_location_2)

        # this should be included in rewards ($2 per car moved)
        movement_cost = abs(cars_moved)*2
        combinations["rewards"] = combinations["rewards"] - movement_cost

        return combinations
        
        


In [None]:
import numpy as np
from tqdm.notebook import tqdm

def policy_evaluation(
    value_function: TabularValueFunction, policy: RowPolicy, env:JacksCarProblem, discount_rate: float = 0.9, theta: float = 0.001
):
    assert value_function.state_space == policy.state_space
    print("Running policy evaluation ...")
    while True:
        delta = 0
        for state1, state2 in value_function.get_states():
            old_value = value_function.get_value(state1, state2)
            action = policy.sample_action(state1, state2)
            combinations = env.compute_step((state1, state2),action)
            v_sum = 0
            for i in range(len(combinations["states"])):
                new_state = combinations["states"][i]
                prob = combinations["probs"][i]
                reward = combinations["rewards"][i]
                v_sum += prob*(reward + discount_rate*value_function.get_value(new_state[0],new_state[1]))
            delta = max(delta, abs(old_value-v_sum))
            value_function.update_value(state1, state2, v_sum)
        print(f'Delta: {delta:.4f}' , end='\r')
        if delta < theta:
            break
    print("-> Finished policy evaluation! <-")
    return value_function

def policy_improvement(policy: RowPolicy, value_function: TabularValueFunction, env: JacksCarProblem, discount_rate: float = 0.9):
    policy_stable = True
    print("Running policy improvement ...")
    for state1, state2 in tqdm(value_function.get_states()):
        old_action = policy.sample_action(state1, state2)
        # the number of cars moved from one location to another is restricted to available cars in each location
        action_range = [max(-state2, -5), min(state1,5)]
        best_action = old_action
        best_value = 0
        for action in range(action_range[0], action_range[1]+1):
            combinations = env.compute_step((state1, state2),action)
            action_value = 0
            for i in range(len(combinations["states"])):
                new_state = combinations["states"][i]
                prob = combinations["probs"][i]
                reward = combinations["rewards"][i]
                action_value += prob*(reward + discount_rate*value_function.get_value(new_state[0],new_state[1]))
            if action_value > best_value:
                best_action = action
        policy.update_action(state1, state2, best_action)
        if old_action != best_action and value_function.get_value(state1, state2) != best_value:
            policy_stable = False
    return policy_stable, policy

def policy_iteration(
        policy: RowPolicy, 
        value_function: TabularValueFunction, 
        env: JacksCarProblem, 
        discount_rate: float = 0.9,
        max_iterations: int = 1):
    optimal_policy = False
    step = 1
    policies = []
    value_functions = []
    while not optimal_policy:
        print(f"Step {step}")
        # evaluate policy
        value_function = policy_evaluation(value_function, policy, env, discount_rate=discount_rate)

        # improve policy
        optimal_policy, policy = policy_improvement(policy, value_function, env, discount_rate=discount_rate)

        # store new policy and value function
        policies.append(policy)
        value_functions.append(value_function)

        # max iterations
        if step == max_iterations:
            print("Reached max iterations!")
            break
        step += 1
        print("\n")
    return policies, value_functions

            

In [None]:
env = JacksCarProblem()
initial_values = TabularValueFunction(20, 20)
initial_policy = RowPolicy((21, 21))

optimal_policy, optimal_values = policy_iteration(initial_policy, initial_values, env, discount_rate = 0.9, max_iterations=2)


In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

policy_id = 1


fig, axs = plt.subplots(1,2,figsize=(16,6))
policy_1 = optimal_policy[policy_id].get_policy_table()
sns.heatmap(policy_1,linewidths=0.5, linecolor='black',ax=axs[0])
axs[0].set_title(f"$\pi_{policy_id+1}$")

policy_1 = optimal_values[policy_id].get_value_table()
sns.heatmap(policy_1,linewidths=0.5, linecolor='black', cmap='viridis', ax=axs[1])
axs[1].set_title("$V_{\pi}$")

plt.show()

## Exercise 4.9 - Value Iteration Gambler's Problem

In [None]:
import numpy as np


class RowValueFunction():
    def __init__(self, n_states: int):
        self.v = np.random.random(n_states+1)
        self.states = np.arange(n_states+1)
        self.v[0] = 0
        self.v[-1] = 0

    def update_value(self, state, new_value):
        self.v[state] = new_value

    def get_value(self, state):
        return self.v[state]

    def get_states(self):
        return self.states[1:-1]
    
    def get_value_table(self):
        return self.v


class RowPolicy:
    def __init__(self, n_states: int):
        self.policy_values = np.zeros(n_states+1, dtype=int)

    def get_available_actions(self, state):
        return np.arange(1,min(state,100-state)+1)
    
    def update_action(self, state, new_action):
        self.policy_values[state] = new_action

    def get_policy_values(self):
        return self.policy_values

In [None]:
class GamblersProblem:
    def __init__(self, p_head:float=0.4):
        self.p_head= p_head
    
    def compute_step(self, state: int, action:int):
        # if state is 0, then we are in a terminal state
        if state == 0:
            return {
                "probs":[1],
                "states":[state],
                "rewards":[0]
            }
        # if state is 100, then we are in a terminal state
        elif state == 100:
            return {
                "probs":[1],
                "states":[state],
                "rewards":[0]
            }
        # otherwise, we are in a non-terminal state
        else:
            # compute probability of heads and tails
            probs = [self.p_head, 1-self.p_head]
            # compute possible states and rewards
            states = [min(state+action,100), max(0,state-action)]
            rewards = [1 if states[0] == 100 else 0,0]
            return {
                "probs":probs,
                "states":states,
                "rewards":rewards
            }
        


In [None]:
def value_iteration(env: GamblersProblem,value_function: RowValueFunction, policy: RowPolicy, theta: float = 0.001):
    # initialize delta
    values = []
    while True:
        delta = 0
        # Update value function
        for state in value_function.get_states():
            old_value = value_function.get_value(state)
            possible_actions = policy.get_available_actions(state)
            max_val = 0
            max_action = None
            for action in possible_actions:
                step = env.compute_step(state, action)
                new_val = sum([prob*(step["rewards"][i]+value_function.get_value(step["states"][i])) for i, prob in enumerate(step["probs"])])
                if new_val > max_val:
                    max_action = action
                    max_val = new_val
            value_function.update_value(state, max_val)
            delta = max(delta, abs(old_value-max_val))
        for state in value_function.get_states():
            possible_actions = policy.get_available_actions(state)
            max_val = 0
            max_action = None
            for action in possible_actions:
                step = env.compute_step(state, action)
                new_val = sum([prob*(step["rewards"][i]+value_function.get_value(step["states"][i])) for i, prob in enumerate(step["probs"])])
                if new_val > max_val:
                    max_action = action
                    max_val = new_val
            policy.update_action(state, max_action)
        print(f'Delta: {delta:.4f}',end='\r')
        values.append(value_function.get_value_table().copy())
        if delta < theta:
            break
    return values, policy
            

In [None]:
env = GamblersProblem(0.4)
init_value_function = RowValueFunction(100)
init_policy = RowPolicy(100)

state_value_sweeps, new_policy = value_iteration(env, init_value_function, init_policy,1e-10)

In [None]:
import matplotlib.pyplot as plt

plottable_sweeps = [0,1,2,9,35-1]

fig, axs = plt.subplots(1,2,figsize=(12,4))

states = init_value_function.get_states()
policy_values = new_policy.get_policy_values()[1:-1]

# plot optimal policy
axs[0].bar(states, policy_values)
axs[0].set_xlabel("Capital (state)")
axs[0].set_ylabel("Stake (action)")
# plot optimal value function

for sweep in plottable_sweeps:
    values = state_value_sweeps[sweep][1:-1]
    axs[1].plot(states, values,label=sweep)
axs[1].legend()
axs[1].set_xlabel("Capital (state)")
axs[1].set_ylabel("Value")
plt.show()