# Lab 3: Partially Observable Markov Decision Process (POMDP)

In the previous lab, we studied the MDP model:

- $\mathcal S$ - state space
- $\mathcal A$ - possible actions
- $T$ - transition function
- $R$ - reward function

POMDP extends MDP with:

- set of possible observations,
- observation function or distribution O(o|s',a),
- and initial belief distribution over the states.

The goal is to maximize reward in a partially observable environment.

- Agent cannot fully observe current state but instead has belief in form of probability distribution over the entire state space.
- For each action and new state agent receives an observation.
- Agent updates its belief based on the previous belief, action and observation of the new state.


Example:

- Maze: Agent only see few tiles around him. (Maze is foggy)
- Can you think of other examples?



## Task 1

Let's use the maze again but differently:

- Direction of agent is now part of the state. Agent in the maze is represented by characters `<` `>` `^` `v`  based on its direction, instead of `@`
- Agent actions (turn left `l`, turn right `r`) can change its direction.
- Only movement forward `m` is allowed.

Implement function `apply` in this new setup.

In [22]:
from copy import deepcopy
from typing import List, Tuple, Dict, Set


Action = str  # Type alias

DIRECTIONS = ["^", ">", "v", "<"]
DIRECTIONS_DELTAS = {"^": (-1, 0), ">": (0, 1), "v": (1, 0), "<": (0, -1)}


class State:

    def __init__(self, maze_rows: List[str]):
        self._maze_rows = [list(row) for row in maze_rows]

        # Make sure the maze has a rectangular shape, and has a proper boundary.
        assert all(len(row) == self.num_cols() for row in self._maze_rows)
        assert set(self._maze_rows[0]) == {"#"} and set(self._maze_rows[-1]) == {"#"}
        assert all(row[0] == "#" and row[-1] == "#" for row in self._maze_rows)

    def actions(self) -> List[Action]:
        """
        :return: list of actions available at the current state.
        """
        return [
            "m",  # Move forward.
            "l",  # Turn left.
            "r",  # Turn right.
        ]

    def _are_valid_move_coordinates(self, i, j):
        return i >= 0 and i < self.num_rows() and j >= 0 and j < self.num_cols() and self._maze_rows[i][j] in [" ", "G"]

    def apply(self, action: Action) -> None:
        """
        Apply action in the current state.
        """
        i, j, direction = self.current_position()

        if action == "m":
            dd = DIRECTIONS_DELTAS[direction]
            new_row = i + dd[0]
            new_col = j + dd[1]

            if self._are_valid_move_coordinates(new_row, new_col):
                self._maze_rows[new_row][new_col] = direction
                self._maze_rows[i][j] = " "

        if action in ["l", "r"]:
            shift = 1 if action == "r" else -1
            dir_indx = DIRECTIONS.index(direction)

            self._maze_rows[i][j] = DIRECTIONS[(dir_indx + shift) % 4]

    def copy(self) -> "State":
        return State(deepcopy(self._maze_rows))

    # -- Maze specific methods ---------------------------------------------------

    def num_rows(self) -> int:
        return len(self._maze_rows)

    def num_cols(self) -> int:
        return len(self._maze_rows[0])

    def current_position(self) -> Tuple[int, int, str]:
        # Return agent coordinates and orientation.
        for i in range(self.num_rows()):
            for j in range(self.num_cols()):
                if self._maze_rows[i][j] in ["^", "<", "v", ">"]:
                    return i, j, self._maze_rows[i][j]
        raise RuntimeError(
            "Invalid maze: current position not found in: " + "\n".join(self._maze_rows)
        )
    
    def num_golds(self) -> int:
        golds = 0
        for i in range(self.num_rows()):
            for j in range(self.num_cols()):
                if self._maze_rows[i][j] == "G":
                    golds += 1
        return golds
    
    def has_any_gold(self) -> bool:
        return self.num_golds() > 0

    # -- Helper methods ----------------------------------------------------------

    def __str__(self) -> str:
        return "\n".join(["".join(row) for row in self._maze_rows])

    def __eq__(self, other) -> bool:
        return self._maze_rows == other._maze_rows

    def __hash__(self) -> int:
        return hash(str(self))

    def __repr__(self) -> str:
        return str(self)


We provide a number of test cases your implementation must pass:

In [23]:
def test_apply_turn() -> None:
    test_maze = [
        "#####",
        "# ^ #",
        "#####"]

    maze = State(test_maze)
    maze.apply("l")
    result_maze = [
        "#####",
        "# < #",
        "#####" ]
    assert maze == State(result_maze)

    maze = State(test_maze)
    maze.apply("r")
    result_maze = [
        "#####",
        "# > #",
        "#####" ]
    assert maze == State(result_maze)

    maze = State(test_maze)
    maze.apply("l")
    maze.apply("l")
    result_maze = [
        "#####",
        "# v #",
        "#####" ]
    assert maze == State(result_maze)

    maze = State(test_maze)
    maze.apply("l")
    maze.apply("r")
    result_maze = [
        "#####",
        "# ^ #",
        "#####" ]
    assert maze == State(result_maze)
    
def test_apply_move() -> None:
    test_maze = [
        "#####",
        "# ^ #",
        "#####"]

    maze = State(test_maze)
    maze.apply("m")
    result_maze = [
        "#####",
        "# ^ #",
        "#####" ]
    assert maze == State(result_maze)

    test_maze = [
        "#####",
        "# > #",
        "#####"]
    maze = State(test_maze)
    maze.apply("m")
    result_maze = [
        "#####",
        "#  >#",
        "#####" ]
    assert maze == State(result_maze)
    
    
def test_apply_move_gold() -> None:
    test_maze = [
        "#####",
        "# >G#",
        "#####"]
    maze = State(test_maze)
    assert maze.num_golds() == 1
    maze.apply("m")
    result_maze = [
        "#####",
        "#  >#",
        "#####" ]
    assert maze == State(result_maze)
    assert maze.num_golds() == 0

# Pass tests
test_apply_turn()
test_apply_move()
test_apply_move_gold()

# Models


In Lab2, we had a transition model in the form of a `SxSxA` matrix and a reward model in the form of a `SxA` matrix. Keeping all transitions and rewards in the memory can cause memory issues for large problems. In the case of POMDPs, we need to have yet another matrix for all the observations (`SxA` matrix in general case).

A different approach we will now use is to procedurally generate transitions, rewards and observation only when needed, based on known rules. In this way, we don't need to keep the whole matrix in memory all the time.

### Solver library
- To solve instances of POMDP, we will use `pomdp_py` library.

- The library uses `pomdp_py.State`, `pomdp_py.Observation`, `pomdp_py.Action` classes to represent the corresponding concepts. These create an interface we should work with in order to use the library -- therefore we will wrap our implementation inside of these classes.

- Additionally, we need to define `TransitionModel`, `RewardModel` and `ObservationModel` classes to model corresponding distributions. 

- We need function `get_all_states` to create uniform prior belief over all states.

In [24]:
# Install library with pomdp solver (takes a while) ...
!pip install pomdp-py



We need to implement interface for the pomdp library:

In [25]:
import pomdp_py

In [26]:
# This is boilerplate code that wraps our State and Action
# for the use with the external library.

class PomdpAction(pomdp_py.Action):
    def __init__(self, value: Action) -> None: 
      self.value = value
    
    def __hash__(self) -> int:         
      return hash(self.value)
    
    def __eq__(self, other) -> bool:    
      return self.value == other.value
    
    def __str__(self) -> str:          
      return self.value
    
    def __repr__(self) -> str:         
      return str(self)
      

class PomdpState(pomdp_py.State):
    def __init__(self, maze_rows: List[str]) -> None:
        super().__init__()
        self.state = State(maze_rows)
        self._maze_rows = self.state._maze_rows
    
    def apply(self, action: PomdpAction) -> None: 
      self.state.apply(action.value)
    
    def actions(self) -> List[PomdpAction]:              
      return [PomdpAction("m"), PomdpAction("l"), PomdpAction("r")]
    
    def current_position(self) -> Tuple[int, int, str]: 
      return self.state.current_position()
    
    def num_golds(self) -> int:     
      return self.state.num_golds()
    
    def has_any_gold(self) -> bool: 
      return self.state.has_any_gold()
    
    def copy(self) -> "PomdpState": 
      return PomdpState(deepcopy(self.state._maze_rows))
    
    def __hash__(self) -> int:             
      return hash(self.state)
    
    def __eq__(self, other: "PomdpState") -> bool:
      return self.state == other.state
    
    def __str__(self) -> str:
      return str(self.state)
    
    def __repr__(self) -> str:
      return repr(self.state)


class PomdpObservation(pomdp_py.Observation):
    def __init__(self, value) -> None:  
      self.value = value

    def __hash__(self) -> int:         
      return hash(self.value)
    
    def __eq__(self, other: "PomdpState") :    
      return self.value == other.value
      
    def __str__(self) -> str:          
      return self.value

    def __repr__(self) -> str:         
      return str(self)

    
class PolicyModel(pomdp_py.RandomRollout):
    def sample(self, state, **kwargs) -> Action:
        return self.get_all_actions().random()

    def get_all_actions(self, **kwargs) -> List[Action]:
        return [PomdpAction("m"), PomdpAction("l"), PomdpAction("r")]

**From now on, we will use these wrapped classes**: `PomdpAction`, `PomdpState`, `PomdpObservation`, `PolicyModel`

In [27]:
# Slightly modified function get_all_transitions() from Lab 2.

def get_all_states(init_state: PomdpState) -> List[PomdpState]:
    states = [init_state]
    opened = [0]

    while len(opened) > 0:
        from_idx = opened.pop()
        state = states[from_idx]

        for action in state.actions():
            next_state = state.copy()
            next_state.apply(action)
            if next_state in states:
                # State was visited.
                to_idx = states.index(next_state)
            else:
                # Not visited yet, add to open.
                states.append(next_state)
                to_idx = len(states) - 1
                opened.append(to_idx)
    return states


# Task 2

Implement probability(**next_state**, **state**, **action**) method of transition model.

In our deterministic case probability is always 1 (if **action** from **state** leads to **next_state**) or 0 (if not).

In [28]:
class TransitionModel(pomdp_py.TransitionModel):
    """
    Models the distribution T(s, a, s') = Pr(s'|s,a).
    In our deterministic case Pr(s'|s,a) is always 1 
    (if s,a goes to s') or 0 (if not).
    """

    def sample(
          self, 
          state: PomdpState, 
          action: PomdpAction
    ) -> PomdpState:
        """
        Given state and action, return next state.
        """
        next_state = state.copy()
        next_state.apply(action)
        return next_state

    def probability(
        self,
        next_state: PomdpState,
        state: PomdpState,
        action: PomdpAction
    ) -> float:
        """
        Returns the probability of Pr(s'|s,a)
        """

        if self.sample(state, action) == next_state:
            return 1
        else:
            return 0

        #raise NotImplemented()
    
    def get_all_states(self) -> List[PomdpState]:
        return self.states

In [29]:
# Example. Try to predict the output values before running!

tm = TransitionModel()

state1 = PomdpState([
    "######",
    "# ^ G#",
    "######"])

state2 = PomdpState([
    "######",
    "# > G#",
    "######"])

print(tm.probability(state2, state1, PomdpAction("l")))
print(tm.probability(state2, state1, PomdpAction("r")))
print(tm.sample(state1, PomdpAction("l")))


0
1
######
# < G#
######


# Task 3

Implement sample(***state***, ***action***) method of reward model.

If ***action*** leads to ***state*** with gold, reward is 1. In all other cases, reward is -0.1
Hint: use gold attribute from Maze.

In [30]:
class RewardModel(pomdp_py.RewardModel):
    """
    Models rewards.
    In our case a reaward is 1 (if a leads to gold state) 
    or -0.1 (if not).
    """

    def sample(
        self, 
        state: PomdpState, 
        action: PomdpAction, 
        next_state=None
    ) -> float:
        """
        Given state and action, return reward. 
        Next state is ignored in our Maze env.
        """
        next_state = state.copy()
        next_state.apply(action)

        return -0.1 if next_state.num_golds() == state.num_golds() else 1
        #raise NotImplemented()


In [31]:
# Example:

rm = RewardModel()
maze = PomdpState([
    "#####",
    "# ^G#",
    "#####"])

action = PomdpAction("r")
print(maze)
print(f'Action {action} Reward {rm.sample(maze, action, None)}')
maze.apply(action)
print(maze)
print('\n')

action = PomdpAction("m")
print(maze)
print(f'Action {action} Reward {rm.sample(maze, action, None)}')
maze.apply(action)
print(maze)


#####
# ^G#
#####
Action r Reward -0.1
#####
# >G#
#####


#####
# >G#
#####
Action m Reward 1
#####
#  >#
#####


# Task 4


Implement function observe(**state**).

- Function should output string of 4 character based on building blocks of maze, i.e.  one of `'#', ' ', 'G'`
- Example output: `" # #"`
- Function output depends on the state and uses the direction of the agent. (Select characters clockwise, starting with the character in front of the agent.)


Examples. What are the observations of the agent?

```
##1##
#4^2#
##3##
```



```
#####
# ^G#
#####
```


```
#####
# >G#
#####
```



In [32]:
class ObservationModel(pomdp_py.ObservationModel):
    '''
    Models distribution O(s',a,o) = Pr(o|s',a)
    In our case, observation does not depend on action -  Pr(o|s') 
    (All actions leads to some observation).
    
    Our Observation model is deterministic: Pr(o|s') is always 1 
    (if observation of s' matches o) or 0 (if not).
    '''

    def probability(
        self, 
        observation: PomdpObservation, 
        next_state: PomdpState, 
        action: PomdpAction=None
    ) -> float:
        '''
        Returns the probability Pr(o|s',a).
        '''
        next_state_observation = self.sample(next_state)
        if next_state_observation == observation:
            return 1.0
        else:
            return 0.0

    def observe(self, state: PomdpState) -> str:
        '''
        Agent looks around clockwise and returns string of 4 characters.
        '''
        # TODO: implement.
        i, j, direction = state.current_position()
        init_dir_indx = DIRECTIONS.index(direction)

        o = ""
        for k in range(4):
            dir_indx = (init_dir_indx + k) % 4
            delta = DIRECTIONS_DELTAS[DIRECTIONS[dir_indx]]
            o += str(state._maze_rows[i + delta[0]][j + delta[1]])

        return o
        #raise NotImplementedError()
        
        
    def sample(
        self, 
        next_state: PomdpState, 
        action: PomdpAction=None
        ) -> PomdpObservation:
        return PomdpObservation(self.observe(next_state))


In [33]:
def test_observation():
    om = ObservationModel()
    maze = PomdpState([
        "#####",
        "# ^G#",
        "#####"])
    assert str(om.sample(maze)) == '#G# '

    maze = PomdpState([
        "#####",
        "# >G#",
        "#####"])
    assert str(om.sample(maze)) == 'G# #'

    maze = PomdpState([
        "#####",
        "# vG#",
        "#####"])
    assert str(om.sample(maze)) == '# #G'

    maze = PomdpState([
        "#####",
        "# <G#",
        "#####"])
    assert str(om.sample(maze)) == ' #G#'

    assert om.probability(om.sample(maze), maze) == 1.

test_observation()

In [34]:
# Example
om = ObservationModel()
maze = State([
    "####",
    "#^ #",
    "####"])
print(om.sample(maze))

# ##


## Example - Solve POMDPs using a solver

- We will reuse models we defined before.
- Agent is initialized with a belief distributed uniformly over all the states.

In [35]:
def solve(
    init_true_state: PomdpState, 
    steps: int = 15, 
    planning_time: float = 0.5, 
    max_depth: int = 10
) -> None:
    states = get_all_states(init_true_state)
    uniform_belief = {s: 1.0 / len(states) for s in states}
    init_belief = pomdp_py.Histogram(uniform_belief)

    # 1. Select Agent, Environment

    # Initialize agent, environment
    agent = pomdp_py.Agent(
        init_belief, 
        PolicyModel(), 
        TransitionModel(), 
        ObservationModel(), 
        RewardModel()
    )
    env = pomdp_py.Environment(
        init_true_state, TransitionModel(), RewardModel()
    )

    # Initialize planners from library
    planner = pomdp_py.POUCT(
        max_depth=max_depth,
        discount_factor=0.95,
        planning_time=planning_time,
        exploration_const=110,
        rollout_policy=agent.policy_model,
    )

    for i in range(steps):
        # 2. Agent plans an action at.
        print("\n####################################")
        print(f"STEP {i}")
        print("####################################\n")
        action = planner.plan(agent)
        print(f"True state: \n{env.state}\n")
        max_belief = agent.cur_belief.argmax()
        max_belief_prob = agent.cur_belief[max_belief]
        print(f"Top Belief (Pr.= {round(max_belief_prob, 4)}) \n{max_belief}")
        print(f"Action: {str(action)}")

        # 3. Environment state transitions 
        # s_t -> s_t+1 according to its transition model.
        reward = env.state_transition(action, execute=True)

        # 4. Agent receives an observation ot and reward 
        # rt from the environment.
        observation = agent.observation_model.sample(env.state, action)
        print(f'Observation: "{observation}"')
        print(f"Reward: {round(reward, 4)}")

        # 5.Agent updates history and belief
        agent.update_history(action, observation)
        planner.update(agent, action, observation)
        new_belief = pomdp_py.update_histogram_belief(
            agent.cur_belief,
            action,
            observation,
            agent.observation_model,
            agent.transition_model,
        )
        agent.set_belief(new_belief)

In [36]:
maze = PomdpState([
    "######",
    "# #  #",
    "# # G#",
    "#^#  #",
    "#   G#",
    "######"])

solve(maze, steps=20, planning_time=1.0, max_depth=20)


####################################
STEP 0
####################################

True state: 
######
# #  #
# # G#
#^#  #
#   G#
######

Top Belief (Pr.= 0.0052) 
######
# #  #
# # G#
#^#  #
#   G#
######
Action: m
Observation: " # #"
Reward: -0.1

####################################
STEP 1
####################################

True state: 
######
# #  #
#^# G#
# #  #
#   G#
######

Top Belief (Pr.= 0.0417) 
######
# #  #
# # G#
#^#  #
#   G#
######
Action: m
Observation: "## #"
Reward: -0.1

####################################
STEP 2
####################################

True state: 
######
#^#  #
# # G#
# #  #
#   G#
######

Top Belief (Pr.= 0.25) 
######
#^#  #
# #  #
# #  #
#    #
######
Action: m
Observation: "## #"
Reward: -0.1

####################################
STEP 3
####################################

True state: 
######
#^#  #
# # G#
# #  #
#   G#
######

Top Belief (Pr.= 0.25) 
######
#^#  #
# #  #
# #  #
#    #
######
Action: r
Observation: "# ##"
Reward: -0.1

###

In [37]:
# Start in the middle.
# It takes some time for agent get reliable belief.

maze = PomdpState([
    "######",
    "#    #",
    "# ^  #",
    "#   G#",
    "######"])
solve(maze, steps=15, planning_time=0.5, max_depth=20)


####################################
STEP 0
####################################

True state: 
######
#    #
# ^  #
#   G#
######

Top Belief (Pr.= 0.0109) 
######
#    #
# ^  #
#   G#
######
Action: m
Observation: "#   "
Reward: -0.1

####################################
STEP 1
####################################

True state: 
######
# ^  #
#    #
#   G#
######

Top Belief (Pr.= 0.1) 
######
# ^  #
#    #
#   G#
######
Action: r
Observation: "   #"
Reward: -0.1

####################################
STEP 2
####################################

True state: 
######
# >  #
#    #
#   G#
######

Top Belief (Pr.= 0.1) 
######
#    #
#    #
# < G#
######
Action: m
Observation: "   #"
Reward: -0.1

####################################
STEP 3
####################################

True state: 
######
#  > #
#    #
#   G#
######

Top Belief (Pr.= 0.3333) 
######
#  > #
#    #
#   G#
######
Action: r
Observation: "  # "
Reward: -0.1

####################################
STEP 4
#################

In [38]:
# Start near wall.
# Agent quickly gets reliable belief.

maze = PomdpState([
    "######",
    "#^   #",
    "#    #",
    "#   G#",
    "######"])
solve(maze, steps=15, planning_time=0.5, max_depth=20)


####################################
STEP 0
####################################

True state: 
######
#^   #
#    #
#   G#
######

Top Belief (Pr.= 0.0109) 
######
#^   #
#    #
#   G#
######
Action: l
Observation: "##  "
Reward: -0.1

####################################
STEP 1
####################################

True state: 
######
#<   #
#    #
#   G#
######

Top Belief (Pr.= 0.1429) 
######
#<   #
#    #
#   G#
######
Action: l
Observation: " ## "
Reward: -0.1

####################################
STEP 2
####################################

True state: 
######
#v   #
#    #
#   G#
######

Top Belief (Pr.= 0.1429) 
######
#v   #
#    #
#   G#
######
Action: l
Observation: "  ##"
Reward: -0.1

####################################
STEP 3
####################################

True state: 
######
#>   #
#    #
#   G#
######

Top Belief (Pr.= 0.1429) 
######
#>   #
#    #
#   G#
######
Action: m
Observation: "   #"
Reward: -0.1

####################################
STEP 4
###########