# MDP

In [None]:
import random
from collections import defaultdict
def vector_add(a, b):
    """Component-wise addition of two vectors."""
    if not (a and b):
        return a or b
    if hasattr(a, '__iter__') and hasattr(b, '__iter__'):
        assert len(a) == len(b)
        return list(map(vector_add, a, b))
    else:
        return a + b

def isnumber(x):
    """Is x a number?"""
    return hasattr(x, '__int__')

def print_table(table, header=None, sep='', numfmt='{}'):
    """Print a list of lists as a table, so that columns line up nicely.
    header, if specified, will be printed as the first row.
    numfmt is the format for all numbers; you might want e.g. '{:.2f}'.
    (If you want different formats in different columns,
    don't use print_table.) sep is the separator between columns."""
    justs = ['rjust' if isnumber(x) else 'ljust' for x in table[0]]

    if header:
        table.insert(0, header)

    table = [[numfmt.format(x) if isnumber(x)
                else "###" if x==None
                else x for x in row]
             for row in table]
    sizes = list(
        map(lambda seq: max(map(len, seq)),
            list(zip(*[map(str, row) for row in table]))))

    for row in table:
        print(sep.join(getattr(
            str(x), j)(size) for (j, size, x) in zip(justs, sizes, row)))

In [None]:
class GridMDP:
    """
    A Markov Decision Process on a two-dimensional grid.
    Attributes:
        grid (list of lists): Reward grid, where None indicates obstacles.
        terminals (set): Terminal states.
        init (tuple): Initial state.
        gamma (float): Discount factor (0 < gamma <= 1).
        rows (int): Number of rows in the grid.
        cols (int): Number of columns in the grid.
        orientations (tuple): Valid directions as unit vectors: (east, north, west, south).
        turns (tuple): Turn directions: (left, right).
    """

    def __init__(self, grid, terminals, init=(2, 0), gamma=0.99):
        # Reverse grid for bottom-to-top indexing
        self.grid = grid[::-1]
        self.rows = len(grid)
        self.cols = len(grid[0])

        # Extract states, reward, and validate input
        self.states = set()
        self.reward = {}
        for y in range(self.rows):
            for x in range(self.cols):
                if self.grid[y][x] is not None:
                    self.states.add((x, y))
                    self.reward[(x, y)] = self.grid[y][x]

        if init not in self.states:
            raise ValueError("Invalid initial state:", init)
        if any(t not in self.states for t in terminals):
            raise ValueError("Invalid terminal states:", terminals)

        self.terminals = terminals
        self.init = init
        self.gamma = gamma
        self.orientations = EAST, NORTH, WEST, SOUTH = [(1, 0), (0, 1), (-1, 0), (0, -1)]
        self.turns = LEFT, RIGHT = (+1, -1)
        # Precompute transition probabilities for efficiency
        self.transitions = {s: self._calculate_T(s) for s in self.states}



    def _calculate_T(self, s):
        """
        Calculate transition probabilities for all actions from a state.

        Args:
        state (tuple): Current state.

         Returns:
            dict: Mapping from action to list of (probability, next_state) pairs.
        """
        transitions = {action: [(0.8, self._go(s, action))] for action in self.orientations}
        for action in transitions:
            transitions[action].append((0.1, self._go(s, self._turn_direction(action, -1))))
            transitions[action].append((0.1, self._go(s, self._turn_direction(action, +1))))
        return transitions

    def _turn_direction(self, direction, turn):
        """
        Turn the given direction by the specified amount.

        Args:
            direction (tuple): Current direction.
            turn (int): direction to turn (left: -1, right: 1).

        Returns:
            tuple: New direction.
        """
        index = self.orientations.index(direction)
        return self.orientations[(index + turn) % len(self.orientations)]

    def _go(self, state, direction):
        """
        Move one step in the given direction, handling boundaries.

        Args:
            state (tuple): Current state.
            direction (tuple): Direction to move.

        Returns:
            tuple: New state.
        """
        new_state = tuple(vector_add(state, direction))
        return new_state if new_state in self.states else state

    def R(self, state):
        """
        Get the reward for a state.

        Args:
            state (tuple): State.

        Returns:
            float: Reward.
        """
        return self.reward[state]

    def T(self, state, action):
        """
        Get the transition probabilities for a state and action.

        Args:
            state (tuple): State.
            action (tuple): Action.

        Returns:
            list: List of (probability, next_state) pairs.
        """
        return self.transitions[state][action] if action else [(0.0, state)]


    def actions(self, state):
        """
        Get the available actions in a state (always oriented actions).

        Args:
            state (tuple): State.

        Returns:
            list: List of actions (possible directions).
        """
        if state in self.terminals:
            return [None]
        else:
            return self.orientations

    def to_grid(self, mapping):
        """
        Convert a mapping from (x, y) to values into a grid representation.

        Args:
            mapping (dict): Mapping from (x, y) to values.

        Returns:
            list of lists: Grid representation.
        """
        return list(reversed([[mapping.get((x, y), None) for x in range(self.cols)]
                              for y in range(self.rows)]))

    def to_arrows(self, policy):
        """
        Convert a policy (mapping from state to action) into a grid showing corresponding arrow directions.

        Args:
            policy (dict): Mapping from state to action.

        Returns:
            list of lists: Grid representation with arrows.
        """
        chars = {(1, 0): " > ", (0, 1): ' ∧ ', (-1, 0): ' < ', (0, -1): ' ∨ ', None: ' O '}
        return self.to_grid({s: chars[a] for (s, a) in policy.items()})


# MDP Initialization

In [None]:
grid = [
    [None, +1.0, None, None, None],
    [None, -.01, -.01, -1.0, None],
    [None, -.01, -.01, None, None],
    [None, -.01, -.01, None, None],
    [None, None, -.01, None, None]
]
terminals = [(1, 4), (3, 3)]
maze = GridMDP(grid, terminals)

In [None]:
pi = {(1,2):(0,1), (2, 1): (0,1), (1, 1): (0,1), (2, 0): (0,1), (1, 4): None, (2, 3): (-1,0), (3, 3): None, (2, 2): (0,1), (1, 3): (0,1)}
print(maze.states)
print_table(maze.to_arrows(pi))

{(1, 2), (2, 1), (1, 1), (2, 0), (1, 4), (2, 3), (3, 3), (2, 2), (1, 3)}
### O #########
### ∧  <  O ###
### ∧  ∧ ######
### ∧  ∧ ######
###### ∧ ######


In [None]:
def run_single_trial(agent_program, mdp):
    import random

    def take_single_action(mdp,s , a):
        return random.choice([ps[1] for ps  in mdp.T(s, a)])

    current_state = mdp.init
    while True:
        current_reward = mdp.R(current_state)
        percept = (current_state, current_reward)
        next_action = agent_program(percept)
        if next_action is None:
            break
        current_state = take_single_action(mdp, current_state, next_action)
    return

# Direct Evaluation /Monte Carlo Estimation

In [None]:
class Passive_MC_Agent:
    """
    Passive (non-learning) agent that uses direct utility estimation
    on a given MDP and policy.

    Attributes:
        pi (dict): Mapping from states to actions (policy).
        mdp (sequential_decision_environment): The MDP instance.
        U (dict): Utility estimates for each state.
        s (tuple, optional): Current state.
        a (tuple, optional): Last action taken.
        s_history (list): History of visited states.
        r_history (list): History of received rewards.
    """

    def __init__(self, pi, mdp):
        self.pi = pi
        self.mdp = mdp
        self.V = {}
        self.reset_history()

    def __call__(self, percept):
        """
        Acts according to the policy and updates history.

        Args:
            percept (tuple): (state, reward) pair.

        Returns:
            tuple: The action chosen according to the policy.
        """
        s, r = percept
        self.s_history.append(s)
        self.r_history.append(r)

        if s in self.mdp.terminals:
            self.s = self.a = None
        else:
            self.s, self.a = s, self.pi[s]
        return self.a

    def reset_history(self):
        """
        Resets the agent's internal history.
        """
        self.s = None
        self.a = None
        self.s_history = []
        self.r_history = []

    def estimate_V(self):
        """
        Estimates utilities based on the current history.

        Raises:
            AssertionError: If the MDP is not in a terminal state.
        """
        assert self.a is None, "MDP is not in a terminal state"
        assert len(self.s_history) == len(self.r_history)

        # Calculate utilities based on historical rewards
        V_temp = {s: [] for s in self.s_history}
        for i, s in enumerate(self.s_history):
            V_temp[s] += [sum(self.r_history[i:])]
        V_temp = {k: sum(v) / max(len(v), 1) for k, v in V_temp.items()}

        # Update existing utilities or add new ones
        for state, value in V_temp.items():
            if state in self.V:
                self.V[state] = (self.V[state] + value) / 2
            else:
                self.V[state] = value

        self.reset_history()
        return self.V

In [None]:
agent = Passive_MC_Agent(pi, maze)
for i in range(200):
    run_single_trial(agent,maze)
    agent.estimate_V()

print('\n'.join([str(k)+':'+str(v) for k, v in agent.V.items()]))

(2, 0):0.8311924226758116
(2, 1):0.8541879940330394
(1, 1):0.7818058646983982
(1, 2):0.9157404999469299
(2, 2):0.9126969606255535
(2, 3):0.9122347235814126
(1, 3):0.9590284764993509
(1, 4):1.0


# Policy Evaluation Agent

In [None]:
def policy_evaluation(mdp, pi, V, k=20):
    """Return an updated utility mapping V from each state in the MDP to its
    utility, using an approximation (modified policy iteration)."""
    for i in range(k):
        for s in mdp.states:
            V[s] = mdp.R(s) + mdp.gamma*sum(p*V[si] for p, si in mdp.T(s, pi[s]))
    return V



class Passive_PE_Agent:
    """
    [Figure 21.2]
    Passive (non-learning) agent that uses adaptive dynamic programming
    on a given MDP and policy.

    """

    class ModelMDP(GridMDP):
        """Class for implementing modified Version of input MDP with
        an editable transition model P and a custom function T."""

        def __init__(self, grid, terminals, init_state, gamma):
            super().__init__(grid, terminals, init_state, gamma)
            nested_dict = lambda: defaultdict(nested_dict)
            self.P = nested_dict()

        def T(self, s, a):
            """Return a list of tuples with probabilities for states
            based on the learnt model P."""
            return [(prob, res) for (res, prob) in self.P[(s, a)].items()]

    def __init__(self, pi, grid, terminals, init, gamma):
        self.pi = pi
        self.mdp = Passive_PE_Agent.ModelMDP(grid,terminals,init, gamma)
        self.V = {}
        self.Nsa = defaultdict(int)
        self.Ns1_sa = defaultdict(int)
        self.s = None
        self.a = None
        self.visited = set()  # keeping track of visited states

    def __call__(self, percept):
        s1, r1 = percept
        mdp = self.mdp
        R, P, pi = mdp.reward, mdp.P, self.pi
        s, a, Nsa, Ns1_sa, V = self.s, self.a, self.Nsa, self.Ns1_sa, self.V

        if s1 not in self.visited:  # Reward is only known for visited state.
            V[s1] = R[s1] = r1
            self.visited.add(s1)
        if s is not None:
            Nsa[(s, a)] += 1
            Ns1_sa[(s1, s, a)] += 1
            # for each t such that Ns′|sa [t, s, a] is nonzero
            for t in [res for (res, state, act), freq in Ns1_sa.items()
                      if (state, act) == (s, a) and freq != 0]:
                P[(s, a)][t] = Ns1_sa[(t, s, a)] / Nsa[(s, a)]



        self.V = policy_evaluation(mdp, pi, V)

        self.Nsa, self.Ns1_sa = Nsa, Ns1_sa
        if s1 in mdp.terminals:
            self.s = self.a = None
        else:
            self.s, self.a = s1, pi[s1]
        return self.a


In [None]:
agent = Passive_PE_Agent(pi,grid, terminals, (2, 0), 0.99)
for i in range(300):
    run_single_trial(agent, maze)

print('\n'.join([str(k)+':'+str(v) for k, v in agent.V.items()]))

(2, 0):0.6616057847250056
(1, 2):0.7784911122836556
(2, 1):0.7103487481949818
(1, 1):0.7206762966373008
(1, 4):1.0
(2, 3):0.7761287028938784
(3, 3):-1.0
(2, 2):0.7515515169859974
(1, 3):0.8562314565764992


# Temporal Difference Agent

In [None]:
class Passive_TD_Agent:
    """
    Passive (non-learning) agent that uses temporal differences (TD) to learn
    utility estimates for a given policy.

    Attributes:
        pi (dict): Mapping from states to actions (policy).
        V (dict): Utility estimates for each state.
        Ns (dict): Counts of state visits.
        s (object, optional): Current state.
        a (object, optional): Last action taken.
        r (float, optional): Last reward received.
        gamma (float): Discount factor.
        terminals (set): Set of terminal states.
        alpha (function): Learning rate function (optional).
    """

    def __init__(self, pi, mdp, alpha=None):
        self.pi = pi
        self.V = {s: 0. for s in mdp.states}
        self.Ns = {s: 0 for s in mdp.states}
        self.s = None
        self.a = None
        self.r = None
        self.gamma = mdp.gamma
        self.terminals = mdp.terminals
        self.alpha = alpha or (lambda n: 1 / n)  # Default alpha if not provided

    def __call__(self, percept):
        """
        Acts according to the policy and updates utility estimates using TD.
        """
        s1, r1 = percept
        if not self.Ns[s1]:
            self.V[s1] = r1
        # Update utility for the previous state if applicable
        if self.s is not None:
            self.Ns[self.s] += 1
            alpha = self.alpha(self.Ns[self.s])  # Calculate learning rate
            self.V[self.s] += alpha * (self.r + self.gamma * self.V[s1] - self.V[self.s])

        # Update internal state
        if s1 in self.terminals:
            self.s = self.a = self.r = None
        else:
            self.s, self.a, self.r = s1, self.pi[s1], r1

        return self.a

In [None]:
agent = Passive_TD_Agent(pi, maze, alpha=lambda n: 60./(59+n))
for i in range(200):
    run_single_trial(agent,maze)
print('\n'.join([str(k)+':'+str(v) for k, v in agent.V.items()]))

(1, 2):0.7701631849562328
(2, 1):0.6942453221137471
(1, 1):0.7140717953025868
(2, 0):0.654312831576855
(1, 4):1.0
(2, 3):0.7429620808410596
(3, 3):0.0
(2, 2):0.718734749664611
(1, 3):0.8059206649199855


# Q-Learning Agent

In [None]:
class QLearningAgent:

    def __init__(self, mdp, alpha=None):

        self.gamma = mdp.gamma
        self.terminals = mdp.terminals
        self.all_act = mdp.orientations
        self.Q = defaultdict(float)
        self.Nsa = defaultdict(float)
        self.s = None
        self.a = None
        self.r = None

        self.alpha = alpha or (lambda n: 1 / n)  # Default alpha if not provided


    def f(self, u):
        """Exploration function"""
        return u

    def actions_in_state(self, state):
        """Return actions possible in given state.
        Useful for max and argmax."""
        if state in self.terminals:
            return [None]
        else:
            return self.all_act

    def __call__(self, percept):
        s1, r1 = percept
        Q, Nsa, s, a, r = self.Q, self.Nsa, self.s, self.a, self.r
        alpha, gamma, terminals = self.alpha, self.gamma, self.terminals,
        actions_in_state = self.actions_in_state

        if s in terminals:
            Q[s, None] = r1
        if s is not None:
            Nsa[s, a] += 1
            Q[s, a] += alpha(Nsa[s, a]) * (r + gamma * max(Q[s1, a1]
                                                           for a1 in actions_in_state(s1)) - Q[s, a])
        if s in terminals:
            self.s = self.a = self.r = None
        else:
            self.s, self.r = s1, r1
            self.a = max(actions_in_state(s1), key=lambda a1: self.f(Q[s1, a1]))
        return self.a

In [None]:
q_agent = QLearningAgent(maze, alpha=lambda n: 60./(59+n))
for i in range(200):
    run_single_trial(q_agent,maze)
print('\n'.join([str(k)+':'+str(v) for k, v in agent.V.items()]))

(1, 2):0.7701631849562328
(2, 1):0.6942453221137471
(1, 1):0.7140717953025868
(2, 0):0.654312831576855
(1, 4):1.0
(2, 3):0.7429620808410596
(3, 3):0.0
(2, 2):0.718734749664611
(1, 3):0.8059206649199855
