Goal: measure how correlated certain features (e.g. POWER, distance to loops, etc.) are to the value $P(\pi | O(R))$, where $O(R)$ indicates that the policy was optimized for a random reward function $R$. For simplicity's sake, we assume that it was either optimized for some $R$ or generated uniformly randomly from the set of all policies, with a 50% chance of each scenario. We also assume that the reward is generated i.i.d. via $R(s, a, s') \sim N(0, 1)$.

In [3]:
import mdptoolbox as mdpt, numpy as np
import mdptoolbox.example

In [2]:
### Generate a bunch of MDPs with different parameters, sparsity

NUM_MDPs = 100
NUM_STATES = 10
NUM_ACTIONS = 4

def get_transition_matrix(num_states, num_actions, generator = np.random.dirichlet):
    """
    Returns a transition matrix for a given number of states and actions
    
    Returns:
        P: (num_actions, num_states, num_states) array, where P[a, s, s'] is the probability of 
        transitioning from state s to state s' given action a
    """
    P = np.zeros((num_actions, num_states, num_states)) # (A, S, S) shape
    for a in range(num_actions):
        for s in range(num_states):
            P[a, s, :] = generator(np.ones(num_states))
    return P

def get_reward_matrix(num_states, num_actions, sparsity = 0.0, generator = np.random.normal):
    """
    Returns a reward matrix for a given number of states and actions
    [Fix 2/27/24: sparsity should be deterministic, while sparse rewards should be in random order]
    """
    num_sparse_rewards = int(sparsity * num_actions * num_states ** 2)
    rewards = np.array([(0 if i < num_sparse_rewards else generator()) for i in range(num_actions * num_states ** 2)])
    np.random.shuffle(rewards)
    return rewards.reshape((num_actions, num_states, num_states))

DISCOUNT = 0.9
EPSILON = 0.01 # roughly indicates the "skill level" of the agent
MAX_ITER = 1000

In [6]:
def generate_tests(num_mdps = NUM_MDPs, sparsity_levels = None, mdp_generator = mdpt.mdp.ValueIteration, P_generator = None, **kwargs):
    """
    Generate a bunch of MDPs with different sparsity levels, and return the sparsity levels and the MDPs

    Args:
        sparsity_levels: a list of sparsity levels to generate MDPs with
    Returns:
        sparsity_levels: the sparsity levels used to generate the MDPs, in the same order as the MDPs
        MDPS: an array of MDPs
    """
    (max_iter, epsilon) = (kwargs['max_iter'], kwargs['epsilon']) if 'max_iter' in kwargs and 'epsilon' in kwargs else (MAX_ITER, EPSILON)
    sparsity_levels = sparsity_levels if sparsity_levels is not None else np.arange(num_mdps) / num_mdps
    sparsity_copy = sparsity_levels.copy() # defensive copy
    np.random.shuffle(sparsity_copy)
    MDPS = np.array([mdp_generator(
        get_transition_matrix(NUM_STATES, NUM_ACTIONS) if P_generator is None else P_generator(NUM_STATES, NUM_ACTIONS), 
        get_reward_matrix(NUM_STATES, NUM_ACTIONS, sparsity_copy[i]), 
        DISCOUNT, max_iter = max_iter) 
        for i in range(num_mdps)
    ])
    for mdp in MDPS:
        if mdp_generator == mdpt.mdp.ValueIteration:
            mdp.epsilon = epsilon
    return sparsity_copy, MDPS

In [None]:
### Generate a bunch of MDPs, solve some of them, generate random policy for others
MDPS = generate_tests(sparsity_levels = np.zeros(NUM_MDPs))[1]
for i in range(NUM_MDPs / 2): # 50% RR, 50% random
    MDPS[i].run()
    MDPS[i + NUM_MDPs / 2].policy = np.random.randint(NUM_ACTIONS, size = NUM_STATES)
policies = np.array([mdp.policy for mdp in MDPS])

