In [None]:
# Experiment configuration

MAX_CARS = 20
"""Maximum number of cars that may be present at any location per day."""

MAX_MOVES = 5

# first location
REQ_ALPHA = 3
"""Poisson parameter describing the rate of car requests at location 1."""


RET_ALPHA = 3

# second location
REQ_BETA = 4
RET_BETA = 2

# discount factor
GAMMA = 0.9

# rewards
RENTAL_CREDIT = 10
MOVE_COST = -2

In [2]:
import numpy as np
from scipy.stats import poisson

# outer product of four vectors
T = np.einsum(
    "i,j,k,l->ijkl",
    poisson.pmf(inclusive_range(MAX_CARS), REQ_ALPHA),
    poisson.pmf(inclusive_range(MAX_CARS), RET_ALPHA),
    poisson.pmf(inclusive_range(MAX_CARS), REQ_BETA),
    poisson.pmf(inclusive_range(MAX_CARS), RET_BETA),
)

T.shape

(21, 21, 21, 21)

#### Expected Return

The formula for expected return $G_t$ is given as follows (Bellman eq.):

$$Q(s, a) = \sum_{s', r} p(s', r | s, a)[r + \gamma V(s')]$$

In the context of Jack's Car Rental, we know that the state is the number of cars at each location at the end of the day, and the actions are the net numbers of cars moved between the two locations overnight.

Sequentially, it looks like this:

1. Initial state $s_{n-1}$ = $(i, j)$: count of cars at each location the night prior to day $n$.

2. Select action $a \in \mathcal{A}(s)$ denoting how many cars to move before day $n$.

   - Move $|a|$ cars at a cost of $2 per car. A negative value of $a$ means moving cars from the 1st location to the 2nd location. A positive value means moving cars from the 2nd location to the 1st location. Zero means moving no cars.

3. Move $a$ cars at a cost of $2 per car.
   - Negative means 1st location to 2nd location.
     - Clip $a$ such that $a=\max(a, -i)$.
   - Positive means 2nd location to 1st location.
     - Clip $a$ such that $a=\min(a, j)$.

4. Start of day $n$, state is $(i, j)$.
5. Sample $A_{\text{out}} \sim \text{Pois}(\alpha_{\text{request}})$ cars requested from location $\alpha$.

6. Sample $B_{\text{out}} \sim \text{Pois}(\beta_{\text{request}})$ cars requested from location $\beta$.

7. Sample $A_{\text{in}} \sim \text{Pois}(\alpha_{\text{return}})$ cars returned at location $\alpha$.

8. Sample $B_{\text{in}} \sim \text{Pois}(\beta_{\text{return}})$ cars returned at location $\beta$.

In [None]:
def actions(state: tuple[int, int]) -> list[int]:
    validate_state(state)
    n_cars_alpha, n_cars_beta = state
    lower_bound = max(-MAX_MOVES, -n_cars_alpha)
    upper_bound = min(+MAX_MOVES, +n_cars_beta)
    return np.arange(lower_bound, upper_bound + 1)


def validate_state(state: tuple[int, int]):
    n_cars_alpha, n_cars_beta = state
    if not (0 <= n_cars_alpha <= MAX_CARS):
        raise ValueError(f"invalid alpha state: {n_cars_alpha}")
    if not (0 <= n_cars_beta <= MAX_CARS):
        raise ValueError(f"invalid beta state: {n_cars_beta}")


def validate_state_action(state: tuple[int, int], action: int):
    # actions also validates state!
    if action not in actions(state):
        raise ValueError(f"invalid action: {action}")

In [None]:
def expected_return(state: tuple[int, int], action: int) -> float:
    validate_state_action(state, action)

    

#### Policy Iteration

In [None]:
# initialize value function and policy to arbitrary values
V = np.zeros((MAX_CARS + 1, MAX_CARS + 1), dtype=float)
pi = np.zeros((MAX_CARS + 1, MAX_CARS + 1), dtype=int)

In [None]:
import functools


def iter_states():
    """Return an iterator over all (i, j) state values.

    Each pair represents the number of cars at the 1st and 2nd locations,
    respectively. As such, i and j both fall in the range [0, MAX_CARS].

    Returns:
        _type_: _description_
    """
    return functools.product(range(MAX_CARS + 1), range(MAX_CARS + 1))


def policy_evaluation(theta: float = 1e-3, max_iter: int = 25):
    for _ in range(max_iter):
        delta = 0
        for state in iter_states():
            v = V[state]
            V[state] = expected_return(state, pi[state])
            delta = max(delta, abs(v - V[state]))
        if delta < theta:
            return


def policy_improvement():
    policy_stable = True
    for state in iter_states():
        old_action = pi[state]
        # pi[state] = argmax(expected_return(state, action) for action in actions)
        if old_action != pi[state]:
            policy_stable = False
    return policy_stable


def policy_iteration():
    while True:
        policy_evaluation()
        policy_stable = policy_improvement()
        if policy_stable:
            break