# CPSC 422 - Assignment 1

## Question 3

The following program computes an agent's **belief state** in a **Partially Observable Markov Decision Process** (POMDP). A POMDP differs from a normal MDP in that an agent cannot directly observe its state, and must infer which state it is in given its actions and observations from its environment. A belief state is therefore the probability that an agent is in a given state in the state space.

Below is the world our agent operates in. It can make one move at each timestep, and will stop moving once it reaches an 'end' state.

![Grid Word](img/A1Q3.png)

The **transition probabilities** determine which direction the agent will actually move in. For example, if the agent tries to move 'up', it could end up moving 'left' or 'right' with probabilities 0.1 each. If the agent runs into a wall, it will remain in its current state.

![Transition Probabilities](img/Transition-Probabilities.png)

The **observation probabilities** determine the probability of being in cell given an observation about the environment. Here, the three possible observations are observing 1-wall, 2-walls, or 'end' for a terminal state.

![Observation Probabilities](img/Observation-Probabilities.png)

Therefore, this program takes a sequence of actions and respective observations after each action, and computes the agent's belief in each state. If a starting state is provided, it starts with complete belief (1.0) in its starting state. Else, it assumes a uniform distribution across all possible starting states. During each action/observation, it computes the next belief state using the following formula:

$$b'(s') = \alpha P(e|s')\sum_sP(s'|s,a)b(s)$$
* $b'(s')$ is the next belief state
* $P(e|s')$ is the probability of observing evidence $e$ in state $s'$
* $P(s'|a,s)$ is the probability of arriving in state $s'$ after taking action $a$ from state $s$
* $b(s)$ is the current belief state


### Imports

In [1]:
import numpy as np
import copy
from sympy import Matrix, init_printing
init_printing()

### StateSpace Class

The following class represents a belief state space, and computes updates to that belief state space.

In [2]:
class StateSpace:
    
    numRows = 3
    numCols = 4
    invalidSpaces = {(2,2)}
    uniformProbability = 1/9

    def __init__(self, actions, observations, startingState=None):
        self.states = [[0, 0, 0, 0],
                       [0, None, 0, 0],
                       [0, 0, 0, 0]]
        self.setInitialBeliefState(startingState)
        self.actions = actions
        self.observations = observations
        assert len(self.actions) == len(self.observations), "Actions and observations must be of same length"
    
    def computeBeliefStates(self, verbose=False):
        if self.startingState:
             print(f"\nStarting state is {str(self.startingState)}")
        else:
            print("\nNo starting state specified - will use a uniform distribution")
        if verbose:
            self.printStateSpace()
        for i in range(len(self.actions)):
            self.updateBeliefStates(self.actions[i], self.observations[i], verbose)
        print(f"\nFinal belief state after actions {str(self.actions)} and observations {str(self.observations)}:")
        self.printStateSpace()
    
    def updateBeliefStates(self, action, observation, verbose=False):
        priorStates = copy.deepcopy(self.states)
        for c in range(1,self.numCols+1):
            for r in range(1,self.numRows+1):
                state = (c,r)
                if not self.stateIsValid(state):
                    continue
                neighbors = [(c,r+1), (c-1,r), (c,r-1), (c+1,r), (c,r)]
                sumOfPriorStates = 0
                for neighbor in neighbors:
                    if self.stateIsValid(neighbor):
                        sumOfPriorStates += (self.getTransitionProbability(state, neighbor, action)*self.getState(neighbor, priorStates))
                self.setState(state, self.getObservationProbability(observation,state)*sumOfPriorStates)
        self.normalizeStates()
        if verbose:
            print(f"\nAgent takes action {action} and observes {str(observation)}:")
            self.printStateSpace()

    def normalizeStates(self):
        total = 0
        for c in range(1,self.numCols+1):
            for r in range(1,self.numRows+1):
                state = (c,r)
                if self.stateIsValid(state):
                    total += self.getState(state, self.states)
        for c in range(1,self.numCols+1):
            for r in range(1,self.numRows+1):
                state = (c,r)
                if self.stateIsValid(state):
                    currentValue = self.getState(state, self.states)
                    if currentValue != 0:
                        self.setState(state, currentValue/total)
                    
    def setInitialBeliefState(self, startingState):
        if startingState == None:
            self.startingState = None
            for c in range(1,self.numCols+1):
                for r in range(1, self.numRows+1):
                    state = (c,r)
                    if self.stateIsValid(state) and not self.stateIsTerminal(state):
                        self.setState(state, self.uniformProbability)
        elif not self.stateIsValid(startingState):
            print("Could not set starting state:")
            return self.invalidStateError(startingState)
        else:
            self.startingState = startingState
            for c in range(1,self.numCols+1):
                for r in range(1,self.numRows+1):
                    state = (c,r)
                    if self.stateIsValid(state):
                        self.setState(state, 0)
            self.setState(startingState, 1)

    def stateIsValid(self, state):
        col = state[0]
        row = state[1]
        return col >= 1 and col <= self.numCols and row >= 1 and row <= self.numRows and state not in self.invalidSpaces

    def invalidStateError(self, state):
        print(f"State ({state[0]}, {state[1]}) is invalid!")
        return None
    
    def stateIsTerminal(self, state):
        if not self.stateIsValid(state):
            print("Could not determine if state is terminal:")
            return self.invalidStateError(state)
        return (state[0] == 4 and (state[1] == 2 or state[1] == 3))

    def getState(self, state, states):
        if not self.stateIsValid(state):
            print("Could not get state:")
            return self.invalidStateError(state)
        col = state[0]
        row = state[1]
        return (states[abs(row-self.numRows)][col-1])
    
    def getObservationProbability(self, observation, state):
        if not self.stateIsValid(state):
            print("Could not get observation probability:")
            return self.invalidStateError(state)
        if observation == "end" and self.stateIsTerminal(state): return 1
        elif observation == "end" or self.stateIsTerminal(state): return 0
        elif observation == 1:
            if state[0] == 3: return 0.9
            else: return 0.1
        elif observation == 2:
            if state[0] == 3: return 0.1
            else: return 0.9
    
    def getTransitionProbability(self, state, priorState, action):
        if not self.stateIsValid(state):
            print("Could not get transition probability - state error:")
            return self.invalidStateError(state)
        elif not self.stateIsValid(priorState):
            print("Could not get transition probability - priorState error:")
            return self.invalidStateError(priorState)
        elif self.stateIsTerminal(priorState): return 0
        if state == priorState:
            sameStateTransitionTable = {
                ((1,1),"up"): 0.1, ((1,1),"left"): 0.9, ((1,1),"down"): 0.9, ((1,1),"right"): 0.1,  # State (1,1)
                ((2,1),"up"): 0.8, ((2,1),"left"): 0.2, ((2,1),"down"): 0.8, ((2,1),"right"): 0.2,  # State (2,1)
                ((3,1),"up"): 0, ((3,1),"left"): 0.1, ((3,1),"down"): 0.8, ((3,1),"right"): 0.1,    # State (3,1)
                ((4,1),"up"): 0.1, ((4,1),"left"): 0.1, ((4,1),"down"): 0.9, ((4,1),"right"): 0.9,  # State (4,1)
                ((1,2),"up"): 0.2, ((1,2),"left"): 0.8, ((1,2),"down"): 0.2, ((1,2),"right"): 0.8,  # State (1,2)
                ((3,2),"up"): 0.1, ((3,2),"left"): 0.8, ((3,2),"down"): 0.1, ((3,2),"right"): 0,    # State (3,2)
                ((1,3),"up"): 0.9, ((1,3),"left"): 0.9, ((1,3),"down"): 0.1, ((1,3),"right"): 0.1,  # State (1,3)
                ((2,3),"up"): 0.8, ((2,3),"left"): 0.2, ((2,3),"down"): 0.8, ((2,3),"right"): 0.2,  # State (2,3)
                ((3,3),"up"): 0.8, ((3,3),"left"): 0.1, ((3,3),"down"): 0, ((3,3),"right"): 0.1,    # State (3,3)
            }
            return sameStateTransitionTable[state,action]
        elif state[0] == priorState[0] and state[1] == priorState[1]+1:
            # state is above priorState
            if action == "up": return 0.8
            elif action == "left": return 0.1
            elif action == "down": return 0
            elif action == "right": return 0.1
        elif state[0] == priorState[0] and state[1] == priorState[1]-1:
            # state is below priorState
            if action == "up": return 0
            elif action == "left": return 0.1
            elif action == "down": return 0.8
            elif action == "right": return 0.1
        elif state[1] == priorState[1] and state[0] == priorState[0]+1:
            # state is to the right of priorState
            if action == "up": return 0.1
            elif action == "left": return 0
            elif action == "down": return 0.1
            elif action == "right": return 0.8
        elif state[1] == priorState[1] and state[0] == priorState[0]-1:
            # state is to the left of priorState
            if action == "up": return 0.1
            elif action == "left": return 0.8
            elif action == "down": return 0.1
            elif action == "right": return 0
        else:
            return 0    
    
    def setState(self, state, val):
        if not self.stateIsValid(state):
            print("Could not set state:")
            return self.invalidStateError(state)
        col = state[0]
        row = state[1]
        self.states[abs(row-self.numRows)][col-1] = val
    
    def printStateSpace(self):
        display(Matrix(self.states))

### Inputs

Create a `StateSpace` object for each set of actions, observations, and optional start state. Then run `computeBeliefStates()` on each `StateSpace` object. Note, `computeBeliefStates()` has an optional `verbose` argument, which will print out each individual action/observation, in addition to the final belief state.

A few examples are below.

#### Piazza Test Case

In [3]:
ss = StateSpace(["up", "right", "up"], [1,2,1])
ss.computeBeliefStates(verbose=True)


No starting state specified - will use a uniform distribution


⎡0.111111111111111  0.111111111111111  0.111111111111111          0        ⎤
⎢                                                                          ⎥
⎢0.111111111111111        None         0.111111111111111          0        ⎥
⎢                                                                          ⎥
⎣0.111111111111111  0.111111111111111  0.111111111111111  0.111111111111111⎦


Agent takes action up and observes 1:


⎡0.0592105263157895   0.0328947368421053  0.503289473684211           0.0     
⎢                                                                             
⎢0.0328947368421053          None         0.266447368421053           0.0     
⎢                                                                             
⎣0.00657894736842105  0.0328947368421053  0.0592105263157895  0.00657894736842

   ⎤
   ⎥
   ⎥
   ⎥
105⎦


Agent takes action right and observes 2:


⎡0.0486298726360479  0.284832111153995  0.0605943651099962         0.0       ⎤
⎢                                                                            ⎥
⎢0.173678116557314         None         0.0329988421458896         0.0       ⎥
⎢                                                                            ⎥
⎣0.0208413739868777  0.062524121960633  0.0345426476263991  0.281358548822848⎦


Agent takes action up and observes 1:


⎡0.0998549256836285   0.112902489986223   0.439821531218351         0.0       
⎢                                                                             
⎢0.0243067911203569          None         0.131634412722744         0.0       
⎢                                                                             
⎣0.00394164180330112  0.0262684878511665  0.146333451947554  0.014936267666675

 ⎤
 ⎥
 ⎥
 ⎥
8⎦


Final belief state after actions ['up', 'right', 'up'] and observations [1, 2, 1]:


⎡0.0998549256836285   0.112902489986223   0.439821531218351         0.0       
⎢                                                                             
⎢0.0243067911203569          None         0.131634412722744         0.0       
⎢                                                                             
⎣0.00394164180330112  0.0262684878511665  0.146333451947554  0.014936267666675

 ⎤
 ⎥
 ⎥
 ⎥
8⎦

#### Other Test Cases

In [4]:
ss1 = StateSpace(["up","up","up"], [2,2,2])
ss2 = StateSpace(["up","up","up"], [1,1,1])
ss3 = StateSpace(["right","right","up"], [1,1,"end"], (2,3))
ss4 = StateSpace(["up","right","right","right"], [2,2,1,1], (1,1))
stateSpaces = [ss1, ss2, ss3, ss4]

In [5]:
def runTests(stateSpaces, verbose=False):
    for ss in stateSpaces:
        ss.computeBeliefStates(verbose)

In [6]:
runTests(stateSpaces)


No starting state specified - will use a uniform distribution

Final belief state after actions ['up', 'up', 'up'] and observations [2, 2, 2]:


⎡0.568370409228761   0.225729132204172  0.00313373243531962           0.0     
⎢                                                                             
⎢0.0359526443858046        None         0.000260973127400388          0.0     
⎢                                                                             
⎣0.0201639169571179  0.143620351911547  0.00200793722319308   0.00076090252668

    ⎤
    ⎥
    ⎥
    ⎥
3695⎦


No starting state specified - will use a uniform distribution

Final belief state after actions ['up', 'up', 'up'] and observations [1, 1, 1]:


⎡0.00197953609603324   0.0135387010942808    0.964447334255283            0.0 
⎢                                                                             
⎢0.000118475975822438         None           0.0183362729009477           0.0 
⎢                                                                             
⎣7.89839838816256e-5   0.00064879701045621  0.000761631273144247  9.0267410150

       ⎤
       ⎥
       ⎥
       ⎥
4292e-5⎦


Starting state is (2, 3)

Final belief state after actions ['right', 'right', 'up'] and observations [1, 1, 'end']:


⎡0.0  0.0   0.0  0.55⎤
⎢                    ⎥
⎢0.0  None  0.0  0.45⎥
⎢                    ⎥
⎣0.0  0.0   0.0  0.0 ⎦


Starting state is (1, 1)

Final belief state after actions ['up', 'right', 'right', 'right'] and observations [2, 2, 1, 1]:


⎡0.0162579957356077  0.0189765458422175  0.124733475479744         0.0       ⎤
⎢                                                                            ⎥
⎢ 0.11841684434968          None         0.174626865671642         0.0       ⎥
⎢                                                                            ⎥
⎣0.0162846481876333  0.020682302771855   0.353091684434968  0.156929637526652⎦