# RVI Implementation for Infinite-Horizon, Undiscounted, Average-Reward MDP

This implementation is for finding an optimal policy for an ambulance dispatching problem on a grid-world. An infinite-horizon, undiscounted, average reward MDP model is defined. An optimal policy and relative state values have been achieved at the end of the implementation.

## Setup

### Libraries & Helper Class

In [None]:
from collections import defaultdict # this is a useful data structure for storing the transition probabilities and rewards for each state

import numpy as np
np.random.seed(30)

import math
import random

import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')
# show gridworld_visual picture which in the path /content/drive/MyDrive/Average Reward MDP Policy Optimization/gridworld_visual.png
# from IPython.display import Image
# Image(filename='/content/drive/MyDrive/Average Reward MDP Policy Optimization/gridworld_visual.png', width=300)

In [None]:
from typing import Tuple

class GridHelper:
    def __init__(self, n: int):
        self.n: int = n

    def coord_to_index(self, x: int, y: int) -> int:
        return x * self.n + y

    def index_to_coord(self, index: int) -> Tuple[int, int]:
        return divmod(index, self.n)

    def manhattan_distance(self, a: int, b: int) -> int:
        ax, ay = self.index_to_coord(a)
        bx, by = self.index_to_coord(b)
        return abs(ax - bx) + abs(ay - by)

### States and Helpers

In [None]:
n = 5 # Size of the grid

first_ambulance_index = 0
last_ambulance_index = n**2 - 1

states = [(a1, a2) for a1 in range(0, n**2 - 1) for a2 in range(0, n**2 - 1) if a1 != a2]
states.append((0, 0))

In [None]:
print("Total number of states:", len(states))

In [None]:
# we need to map states to indices for easier access
state2idx = {s: k for k, s in enumerate(states)}

In [None]:
state2idx

In [None]:
grid_helper = GridHelper(n)

### Parameters

#### Service Time and Arrival Rate

In [None]:
# define an np array to hold the expected service time for each patient location and ambulance spot
expected_service_time = np.zeros((n**2, 2))  # in hours
for j in range(0, 2):
    for i in range(n**2):
        if j == 0:
            man_distance = grid_helper.manhattan_distance(i, 0)
        else:
            man_distance = grid_helper.manhattan_distance(i, n**2 - 1)
        expected_service_time[i, j] = np.random.normal(man_distance*4,1,1)/60 #here we are just generating numbers by using normal dist.

# edge cases. We won't use these pairs in the implementation
expected_service_time[0,0] = 0
expected_service_time[0,1] = 0
expected_service_time[n**2-1,0] = 0
expected_service_time[n**2-1,1] = 0

In [None]:
expected_service_time*60 # in minutes

In [None]:
# probability of the service less than 10 minutes
coverage_reward_prob_high = np.empty_like(expected_service_time)
for i in range(expected_service_time.shape[0]):
    for j in range(expected_service_time.shape[1]):
        if expected_service_time[i, j] > 0:
            coverage_reward_prob_high[i, j] = 1 - math.exp(-(1/expected_service_time[i, j]) * 1/6)
        else:
            coverage_reward_prob_high[i, j] = 0  # or np.nan if you prefer
coverage_reward_prob_high

In [None]:
coverage_reward_prob_low = coverage_reward_prob_high/100 #highly low reward for low priority patients

In [None]:
# define an array with size 3 to keep the rewards for each ambulance, cell, and priority level
# coverage_reward[patient_index, ambulance_index, priority_level]
coverage_reward = np.zeros((n**2, 2, 2)) 

coverage_reward[:, 0, 0] = coverage_reward_prob_low[:, 0]  # all patients, first ambulance, low priority
coverage_reward[:, 0, 1] = coverage_reward_prob_high[:, 0] # all patients, first ambulance, high priority

coverage_reward[:, 1, 0] = coverage_reward_prob_low[:, 1] # all patients, second ambulance, low priority
coverage_reward[:, 1, 1] = coverage_reward_prob_high[:, 1] # all patients, second ambulance, high priority

In [None]:
arr_rate = 5         # arrivals per hour

#### Probability of a Request from the Cells

In [None]:
P = np.zeros(n**2) # arrival-location pmf
for i in range(n**2):
    # The first and last indices are the hospitals, so we set their probabilities to 0.
    # This means that the ambulance will never be dispatched to these locations.
    if i == 0 or i == n**2 - 1:
        P[i] = 0.0
    else:
        P[i] = np.random.uniform(0, 1)
P /= np.sum(P)
# round to 2 decimal points
P = np.round(P, 4)

In [None]:
P

In [None]:
print(np.sum(P))  # should be 1.0

In [None]:
# heatmap representation of the arrival probabilities
plt.figure(figsize=(6, 4))
sns.heatmap(P.reshape(n, n), annot=True, cmap='Blues', square=True, linewidths=0.5)  
plt.title('Arrival Probabilities')
plt.xticks(ticks=np.arange(n) + 0.5, labels=np.arange(n))
plt.yticks(ticks=np.arange(n) + 0.5, labels=np.arange(n))
plt.grid(False)
plt.show()

#### Conditional Probability Values for High and Low Priority Patients

In [None]:
def assign_probabilities(size: int, low_range=(0.35, 0.60),  high_range=(0.50, 0.75)):
    
    probs = np.zeros((size, size), dtype=float)
    max_idx = size - 1

    for r in range(size):
        for c in range(size):
            # Skip the two corner cells – they stay at 0.
            if (r == 0 and c == 0) or (r == max_idx and c == max_idx):
                continue
            
            d_ul = r + c                          # distance to (0, 0)
            d_lr = (max_idx - r) + (max_idx - c)  # distance to (max_idx, max_idx)

            if d_ul < d_lr:                       # closer to upper-left
                probs[r, c] = random.uniform(*high_range)
            else:                                 # closer to lower-right
                probs[r, c] = random.uniform(*low_range)
    return probs


In [None]:
P_low = assign_probabilities(n)  # deterministic run
P_high = np.zeros(n**2)
P_high = 1 - P_low
P_high[0, 0] = 0.0  # hospital at (0, 0) has no probability
P_high[-1, -1] = 0.0  # hospital at (n

In [None]:
# show P_low and P_high heatmaps side by side
fig, axes = plt.subplots(1, 2, figsize=(9, 4))
sns.heatmap(P_low.reshape(n, n),  annot=True, fmt=".2f", ax=axes[0], cmap="YlGnBu")
axes[0].set_title("P_low")  
sns.heatmap(P_high.reshape(n, n), annot=True, fmt=".2f", ax=axes[1], cmap="YlGnBu")
axes[1].set_title("P_high")
plt.tight_layout()
plt.show()

In [None]:
# combine P_low and P_high into two-dimensional array
P_pri_combined = np.zeros((2, n**2))
# assign P_low to first row and P_high to second row
P_pri_combined[0, :] = P_low.flatten()
P_pri_combined[1, :] = P_high.flatten()

### Actions

In [None]:
def actions(s):
    idle_providers = []
    if s[0] == 0:  idle_providers.append(0)   # provider at cell 0
    if s[1] == 0:  idle_providers.append(1)   # here 1 indicated the second ambulance which is at cell n**2 - 1
    return idle_providers if idle_providers else [None]   # None = null dispatch

In [None]:
def actions_alt(s):
    idle_providers = []
    #if s[0] == 0:  idle_providers.append(0)   # provider at cell 0
    if s[1] == 0:  idle_providers.append(1)   # here 1 indicated the second ambulance which is at cell n**2 - 1
    return idle_providers if idle_providers else [None]   # None = null dispatch

### Event Rates

In [None]:
# probability of the service less than 10 minutes
inversed_expected_service_time = np.empty_like(expected_service_time)
for i in range(expected_service_time.shape[0]):
    for j in range(expected_service_time.shape[1]):
        if expected_service_time[i, j] > 0:
            inversed_expected_service_time[i, j] = 1/expected_service_time[i, j]
        else:
            inversed_expected_service_time[i, j] = 0  # or np.nan 
inversed_expected_service_time

In [None]:
mu_max_0 = np.max(inversed_expected_service_time[:,0])
mu_max_1 = np.max(inversed_expected_service_time[:,1])

print("Maximum service time for ambulance 0:", mu_max_0)
print("Maximum service time for ambulance 1:", mu_max_1)

In [None]:
# this is needed for the normalization procedure. Here normalization is a process of transforming a continuous 
# Markov Decision Process (MDP) into a discrete one by scaling transition probabilities.
gamma = arr_rate + (mu_max_0 + mu_max_1)
gamma

## Transitions

In [None]:
def transitions(s, a):
    
    # input:  (state s, action a)
    # output: (next_state, prob, reward) !not one, all possible transitions starting from state s with an action a
    
    grid_size = n
    state_info = defaultdict(lambda: {"reward": 0.0, "prob": 0.0})

    # --- 1. Service completions ------------------------------------------
    for p_idx, provider_cell in enumerate((0, 1)): # to understand these, you need to remember the state representation which is a
        # tuple of two indices representing the ambulances' positions or 0 if the ambulance is idle. Also we have two ambulances which are
        # represented by indices 0 and 1.
        job = s[p_idx]
        if job != 0:
            rate   = inversed_expected_service_time[job, provider_cell]
            prob   = rate / gamma
            s_next = list(s)                # copy current state
            s_next[p_idx] = 0               # provider becomes idle
            state_info[tuple(s_next)]["reward"] += 0.0
            state_info[tuple(s_next)]["prob"]   += prob

    # --- 2. Customer arrival ---------------------------------------------
    prob_arr = arr_rate / gamma
    for i in range(grid_size**2):
        for j in range(2): # this is for two priorities (low(0) and high(1))
            if P[i] == 0:               # skip provider cells
                continue
            if a is None:               # both providers busy ⇒ null dispatch
                # here, you may want to count total number of request that are not served
                s_next = s             # state unchanged
                state_info[s_next]["reward"] += 0.0
                state_info[s_next]["prob"]   += prob_arr * P[i] * P_pri_combined[j, i]
            else:         # dispatch chosen idle provider
                p_idx = 0 if a == 0 else 1 # you can also use 'a' directly as an index since it is either 0 or 1
                s_next = list(s)
                s_next[p_idx] = i
                if s_next[0] == s_next[1]:  # the states where both ambulances are at the same location are not allowed such as (1,1) or (5,5).(0,0) is allowed tho.
                    s_next_no = s
                    state_info[s_next_no]["reward"] += 0.0
                    state_info[s_next_no]["prob"]   += prob_arr * P[i] * P_pri_combined[j, i]
                else:
                    state_info[tuple(s_next)]["reward"] += coverage_reward[i, a, j]
                    state_info[tuple(s_next)]["prob"]   += prob_arr * P[i] * P_pri_combined[j, i]
                             
    # --- 3. Dummy self-loop ----------------------------------------------
    total_prob = sum(s["prob"] for s in state_info.values())
 
    residual_prob = 1.0 - total_prob
    
    state_info[s]["reward"] += 0.0
    state_info[s]["prob"] += residual_prob

    # convert to list of tuples
    return [(ns, info["prob"], info["reward"]) for ns, info in state_info.items()]

## Relative Value Iteration

In [None]:
def rvi(eps=1e-12, max_iter=10000):
    V = np.zeros(len(states))
    # best action value for each state
    
    g = 0.0                             # bias estimate
    #ref = 0                            # choose arbitrary reference state. ! this was changed
 
    for it in range(max_iter):
        V_new = np.empty_like(V) # this will create a new array with the same shape and dtype as V but its contents are uninitialized
        g_num = 0.0

        for k, s in enumerate(states):
            best = -np.inf
            for a in actions(s):
                q = 0.0
                for ns, p, r in transitions(s, a): # this was already defined above. It's quite critical to understand how the transitions work
                    q += p * (r + V[state2idx[ns]])
                best = max(best, q)
            V_new[k] = best
            g_num  += best

        g_new = g_num / len(states)      # average over states
        #V_new -= V_new[ref]             # anchor
        V_new -= min(V_new)              # here we are choosing the state with minimum value as the reference one.
        
        if it % 100 == 0:
            print(f"Iteration {it}, g = {g:.4f}, max |V_new - V| = {np.max(np.abs(V_new - V)):.10f}")
            
        if np.max(np.abs(V_new - V)) < eps:
            return g_new, V_new

        V, g = V_new, g_new

    raise RuntimeError("RVI failed to converge")

In [None]:
final_average_reward, V = rvi()

In [None]:
#V

In [None]:
print(final_average_reward)

## Policy Generation

In a scenario with only two ambulances, the state (0,0) is of particular interest, as it represents the case where both ambulances are idle. If only one ambulance is available when a patient request arrives, that ambulance must be dispatched immediately—regardless of the request's priority level—since no other resources are free to respond.

In [None]:
# I need to find the best actions for the state (0, 0) which is the initial state where both ambulances are idle
# this is needed for each patient location and priority level

def best_actions():
    s = (0,0)
    best_actions = {}
    for i in range(n**2):
        if P[i] == 0:
            continue
        else:
            for pri in range(2):
                best_action = None
                best_value = -np.inf
                for a in actions(s):
                    s_next = s
                    s_next = list(s_next)
                    s_next[a] = i 
                    state_value = V[state2idx[tuple(s_next)]] # state value of the next state
                    immediate_reward = coverage_reward[i, a, pri] # immediate reward
                    value = state_value + immediate_reward
                    #print(f"State: {s}, Action: {a}, Next State: {s_next}, Priority: {pri}, Immediate: {immediate_reward}, StateValue: {state_value}, Total Value: {value}")
                    if value > best_value:
                        best_value = value
                        best_action = a
                best_actions[(i, pri)] = best_action
    return best_actions

In [None]:
all_best_actions = best_actions()

In [None]:
# for priority level 0 patients, show the best actions for each patient location on the grid 
best_action_grid_pri_0 = np.full((n, n), -1)
for i in range(n**2):
    if P[i] == 0:
        continue
    else:
        action = all_best_actions[(i, 0)]
        best_action_grid_pri_0[grid_helper.index_to_coord(i)] = action

# for priority level 1 patients, show the best actions for each patient location on the grid 
best_action_grid_pri_1 = np.full((n, n), -1)
for i in range(n**2):
    if P[i] == 0:
        continue
    else:
        action = all_best_actions[(i, 1)]
        best_action_grid_pri_1[grid_helper.index_to_coord(i)] = action

In [None]:
# visualize best_action_grid_pri_0 and best_action_grid_pri_1 on a heatmap and plot them side by side
fig, axes = plt.subplots(1, 2, figsize=(6, 3))
sns.heatmap(best_action_grid_pri_0, annot=True, fmt=".0f", ax=axes[0], cmap="YlGnBu", cbar=False, square=True)
axes[0].set_title("Best Actions for Priority 0 Patients")  
sns.heatmap(best_action_grid_pri_1, annot=True, fmt=".0f", ax=axes[1], cmap="YlGnBu", cbar=False, square=True)
axes[1].set_title("Best Actions for Priority 1 Patients")
plt.tight_layout()
plt.show()  