In [1]:
import numpy as np
from itertools import product

##############################################
# 1. GridWorld MDP and Some Utilities
##############################################

class GridWorld:
    """
    Simple GridWorld with:
      - 5x5 states
      - 4 possible actions (up, down, left, right)
      - slip probability
    """
    def __init__(self, size=5, p_slip=0.2):
        self.size = size
        self.n_states = size * size
        self.n_actions = 4  # up, right, down, left
        self.actions = [0, 1, 2, 3]
        self.p_slip = p_slip
        # build transitions:
        #   p_transition[s_from, s_to, a] = probability of going from s_from to s_to via action a
        self.p_transition = self._build_transition()
    
    def _build_transition(self):
        """
        Returns a 3D array [n_states, n_states, n_actions], with transition probabilities.
        """
        T = np.zeros((self.n_states, self.n_states, self.n_actions))
        for s in range(self.n_states):
            r, c = divmod(s, self.size)
            for a in range(self.n_actions):
                # nominal next-state (assuming no slip)
                nr, nc = r, c
                if a == 0 and r > 0:         # up
                    nr = r - 1
                elif a == 1 and c < self.size - 1:  # right
                    nc = c + 1
                elif a == 2 and r < self.size - 1:  # down
                    nr = r + 1
                elif a == 3 and c > 0:             # left
                    nc = c - 1
                s_next = nr * self.size + nc
                
                # if we do not slip, go to s_next
                # else we slip, choose random among all actions with prob p_slip
                for slip_a in range(self.n_actions):
                    # nominal next-state for slip_a:
                    slip_n_r, slip_n_c = r, c
                    if slip_a == 0 and r > 0: slip_n_r = r - 1
                    if slip_a == 1 and c < self.size - 1: slip_n_c = c + 1
                    if slip_a == 2 and r < self.size - 1: slip_n_r = r + 1
                    if slip_a == 3 and c > 0: slip_n_c = c - 1
                    slip_s_next = slip_n_r * self.size + slip_n_c
                    
                    if slip_a == a:
                        # correct action => no slip => prob = 1.0 - p_slip
                        T[s, slip_s_next, a] += (1.0 - self.p_slip)
                    else:
                        # slip => prob = p_slip / (n_actions - 1)
                        T[s, slip_s_next, a] += (self.p_slip / (self.n_actions - 1))
        
        return T
    
    def state_features(self):
        """
        One-hot features: each of the n_states has a d=n_states dimensional
        feature-vector that is all zeros except 1 at the index of the state.
        """
        eye = np.eye(self.n_states)
        return eye

##############################################
# 2. Expert Trajectories (via a known reward)
##############################################

def value_iteration(p_transition, reward, discount=0.9, eps=1e-5):
    """
    Standard value-iteration for a given MDP with transitions p_transition,
    per-state reward, and discount. Returns the value function.
    """
    n_states, _, n_actions = p_transition.shape
    v = np.zeros(n_states)
    while True:
        v_old = v.copy()
        for s in range(n_states):
            q_sa = []
            for a in range(n_actions):
                val = 0.0
                for s_next in range(n_states):
                    val += p_transition[s, s_next, a] * (reward[s] + discount * v_old[s_next])
                q_sa.append(val)
            v[s] = max(q_sa)
        delta = np.max(np.abs(v - v_old))
        if delta < eps:
            break
    return v

def stochastic_policy_from_value(world, value, weighting=lambda x: x**50):
    """
    Convert a value function into a *stochastic* policy (used for demonstration).
    weighting(...) emphasizes near-optimal actions (down-weighting suboptimal).
    """
    n_states, _, n_actions = world.p_transition.shape
    policy = np.zeros((n_states, n_actions))
    
    for s in range(n_states):
        # compute q-values
        q_sa = []
        for a in range(n_actions):
            val = 0.0
            for s_next in range(n_states):
                val += world.p_transition[s, s_next, a] * (reward[s] + 0.9 * value[s_next])
            q_sa.append(val)
        # weighting
        q_sa = np.array(q_sa)
        w = weighting(q_sa - np.max(q_sa))  # shift so max is 0 => stable
        w = np.clip(w, 1e-8, np.inf)        # avoid 0
        w /= w.sum()                        # normalize
        policy[s] = w
    return policy

def generate_trajectories(num_trajectories, world, policy_exec, start_states, terminal):
    """
    Generate a set of trajectories by following a (stochastic) policy
    from the given start states until hitting a terminal state.
    """
    rng = np.random.default_rng(123)
    for _ in range(num_trajectories):
        s = rng.choice(start_states)
        traj_states = [s]
        traj_actions = []
        
        while s not in terminal:
            # pick action from policy
            a = policy_exec(s, rng)
            traj_actions.append(a)
            
            # sample next state
            p = world.p_transition[s, :, a]
            s_next = rng.choice(world.n_states, p=p)
            
            s = s_next
            traj_states.append(s)
        
        yield Trajectory(traj_states, traj_actions)

class Trajectory:
    """
    Simple wrapper to store state and action sequences.
    """
    def __init__(self, states, actions):
        self._states = states
        self._actions = actions
    
    def states(self):
        return self._states
    
    def actions(self):
        return self._actions
    
    def transitions(self):
        """
        Return (s,a,s_next) for each step, ignoring final if any.
        """
        return list(zip(self._states[:-1], self._actions, self._states[1:]))

def stochastic_policy_adapter(policy):
    """
    Turns an NxA policy array into a function that picks an action
    from a distribution policy[s].
    """
    def fn(s, rng):
        return rng.choice(len(policy[s]), p=policy[s])
    return fn

##############################################
# 3. Some IRL Helper Functions
##############################################

def feature_expectation_from_trajectories(features, trajectories):
    """
    Compute the empirical average of features from the provided trajectories.
    """
    n_states, n_features = features.shape
    fe = np.zeros(n_features)
    for traj in trajectories:
        for s in traj.states():
            fe += features[s]
    return fe / len(trajectories)

def initial_probabilities_from_trajectories(n_states, trajectories):
    """
    Probability p(s0) that s0 is the start-state of a trajectory.
    """
    p = np.zeros(n_states)
    for traj in trajectories:
        first_state = traj.transitions()[0][0]
        p[first_state] += 1.0
    return p / len(trajectories)

def compute_expected_svf(p_transition, p_initial, terminal, reward, eps=1e-5):
    """
    Compute the (approximate) expected state visitation frequency:
      1) backward pass => local action probabilities
      2) forward pass => sum over time
    """
    n_states, _, n_actions = p_transition.shape
    nonterminal = set(range(n_states)) - set(terminal)
    
    # backward pass
    zs = np.zeros(n_states)
    zs[list(terminal)] = 1.0
    
    # for extra safety, iterate 2*N
    for _ in range(2 * n_states):
        za = np.zeros((n_states, n_actions))
        for s_from, a in product(range(n_states), range(n_actions)):
            for s_to in range(n_states):
                za[s_from, a] += (p_transition[s_from, s_to, a] *
                                  np.exp(reward[s_from]) *
                                  zs[s_to])
        zs = za.sum(axis=1)
    p_action = za / zs[:, None]
    
    # forward pass
    d = np.zeros((n_states, 2 * n_states))  # d[s,t]
    d[:, 0] = p_initial
    
    for t in range(1, 2 * n_states):
        for s_to in range(n_states):
            for s_from, a in product(nonterminal, range(n_actions)):
                d[s_to, t] += (d[s_from, t-1] *
                               p_action[s_from, a] *
                               p_transition[s_from, s_to, a])
    return d.sum(axis=1)

##############################################
# 4. The MaxEnt IRL Routine
##############################################

def maxent_irl(p_transition, features, terminal, trajectories, optimizer, init, eps=1e-4):
    """
    - p_transition: shape = [n_states, n_states, n_actions]
    - features:     shape = [n_states, n_features]
    - terminal:     list of terminal states
    - trajectories: expert demonstration
    - optimizer:    e.g. an ExpSga or simple gradient approach
    - init:         function returning initial w
    """
    n_states, _, n_actions = p_transition.shape
    _, n_features = features.shape
    
    # 1) empirical feature expectation
    e_features = feature_expectation_from_trajectories(features, trajectories)
    
    # 2) probability of initial states
    p_initial = initial_probabilities_from_trajectories(n_states, trajectories)
    
    # 3) optimize
    w = init(n_features)
    delta = np.inf
    optimizer.reset(w)
    
    while delta > eps:
        w_old = w.copy()
        # compute state reward
        r_s = features.dot(w)
        
        # expected svf
        e_svf = compute_expected_svf(p_transition, p_initial, terminal, r_s)
        
        grad = e_features - features.T.dot(e_svf)
        optimizer.step(grad)
        delta = np.max(np.abs(w_old - w))
    
    # final reward
    return features.dot(w)

##############################################
# 5. Simple Optimizers
##############################################

class Constant:
    """
    Initialization: all weights = constant c
    """
    def __init__(self, c):
        self.c = c
    def __call__(self, dim):
        return np.full(dim, self.c)

class ExpSga:
    """
    Exponentiated Stochastic Gradient Ascent (for IRL).
    Optionally combine with a decaying learning rate.
    """
    def __init__(self, lr):
        self.lr_func = lr
    def reset(self, w_init):
        self.t = 0
        self.w = w_init
    def step(self, grad):
        lr = self.lr_func(self.t)
        self.t += 1
        # w <- w * exp(lr * grad)
        update = lr * grad
        self.w *= np.exp(update)
    @property
    def params(self):
        return self.w

def linear_decay(lr0=0.1, decay=1e-3):
    """
    Example learning-rate schedule: lr = lr0 / (1 + decay*t)
    """
    def f(t):
        return lr0 / (1.0 + decay * t)
    return f

##############################################
# 6. Putting It All Together (main)
##############################################

if __name__ == "__main__":
    # -- create the GridWorld MDP
    world = GridWorld(size=5, p_slip=0.2)
    
    # set up some reward
    reward = np.zeros(world.n_states)
    reward[-1] = 1.0   # top-right corner
    reward[8]  = 0.65  # just for variety
    terminal   = [24]  # top-right corner is terminal
    
    # build an expert policy (for demonstration only)
    discount   = 0.9
    value = value_iteration(world.p_transition, reward, discount)
    expert_pol = stochastic_policy_from_value(world, value)
    policy_exec = stochastic_policy_adapter(expert_pol)
    
    # generate trajectories
    n_trajectories = 200
    start = [0]   # bottom-left
    tjs = list(generate_trajectories(n_trajectories, world, policy_exec, start, terminal))
    
    # gather features
    features = world.state_features()
    
    # pick init strategy
    init_strategy = Constant(1.0)
    
    # pick optimizer
    optim = ExpSga(lr=linear_decay(lr0=0.2))
    
    # run max-ent IRL
    recovered_reward = maxent_irl(world.p_transition,
                                  features,
                                  terminal,
                                  tjs,
                                  optim,
                                  init_strategy)
    
    print("Original reward (partial):", reward[:10])
    print("Recovered reward (partial):", recovered_reward[:10])
    print("Done.")


Original reward (partial): [0.   0.   0.   0.   0.   0.   0.   0.   0.65 0.  ]
Recovered reward (partial): [nan nan nan nan nan nan nan nan nan nan]
Done.


  self.w *= np.exp(update)
