### Simple agent POMDP 
#### Author: Andrei Gabriel Popescu

In [1]:
import numpy as np
from typing import Dict, Tuple, List
from enum import IntEnum
from pprint import pprint
from itertools import count
import matplotlib.pyplot as plt
import seaborn as sns; sns.set()

In [33]:
class Actions(IntEnum):
    LEFT = 0
    RIGHT = 1


class States(IntEnum):
    A0 = 0
    A1 = 1
    A2 = 2
    A3 = 3
    A4 = 4


class Obs(IntEnum):
    O_2 = 0 # two walls
    O_3 = 1 # three walls

class HomeworkEnv(object):
    def __init__(self, max_num_steps: int = 1, noise: float = 0.15):
        """
        Constructor

        Parameters
        ----------
        max_num_steps
            maximum number of steps allowed in the env
        noise
            observation noise. This means that with 1 - noise
            you hear the tiger behind the correct door.
        """
        self.max_num_steps = max_num_steps
        self.noise = noise
        self.num_steps = None
        self.__state = None
        self.done = True

        # define state mapping
        self.__num_states = 5
        self.__state_mapping = {
            States.A0: "Agent in pos 0",
            States.A1: "Agent in pos 1",
            States.A2: "Agent in pos 2", # Goal state
            States.A3: "Agent in pos 3",
            States.A4: "Agent in pos 4",
        }

        # define action mapping
        self.__num_actions = 2
        self.__action_mapping = {
            Actions.LEFT: "Left",
            Actions.RIGHT: "Right",
        }

        # define observation mapping
        self.__num_obs = 2
        self.__obs_mapping = {
            Obs.O_2: "Two walls",
            Obs.O_3: "Three walls, end!",
        }

        # init transitions & observations probabilities
        # and rewards
        self.__init_transitions()
        self.__init_observations()
        self.__init_rewards()

    def __init_transitions(self):
        # define transition probability for listening action
        #               Tiger: left      Tiger: right
        # Tiger: left    1.0              0.0
        # Tiger: right   0.0              1.0
        # This means that the tiger doesn't change location
        left_action = np.array([[
            [1.0, 0.0, 0.0, 0.0, 0.0],
            [1.0, 0.0, 0.0, 0.0, 0.0],
            [0.0, 1.0, 0.0, 0.0, 0.0],
            [0.0, 0.0, 1.0, 0.0, 0.0],
            [0.0, 0.0, 0.0, 1.0, 0.0]
        ]])

        # define transition probability for left action
        #               Tiger: left      Tiger: right
        # Tiger: left    0.5              0.5
        # Tiger: right   0.5              0.5
        # Resets the tiger location
        right_action =  np.array([[
            [0.0, 1.0, 0.0, 0.0, 0.0],
            [0.0, 0.0, 1.0, 0.0, 0.0],
            [0.0, 0.0, 0.0, 1.0, 0.0],
            [0.0, 0.0, 0.0, 0.0, 1.0],
            [0.0, 0.0, 0.0, 0.0, 1.0]
        ]])
        # define transition probability for the right action
        #               Tiger: left      Tiger: right
        # Tiger: left    0.5              0.5
        # Tiger: right   0.5              0.5
        # Resets the tiger location
        stay = np.array([[
            [1.0, 0.0, 0.0, 0.0, 0.0],
            [0.0, 1.0, 0.0, 0.0, 0.0],
            [0.0, 0.0, 1.0, 0.0, 0.0],
            [0.0, 0.0, 0.0, 1.0, 0.0],
            [0.0, 0.0, 0.0, 0.0, 1.0]
        ]])
        
        self.__T = np.concatenate([left_action, right_action], axis=0)

    def __init_observations(self):
        # define observation probability for the listen action
        #                Obs: Tiger left    Obs: Tiger right
        # Tiger: left    0.85               0.15
        # Tiger: right   0.15               0.85
        obs_left = np.array([[
            [0.0, 1.0],
            [1.0, 0.0],
            [1.0, 0.0],
            [1.0, 0.0],
            [0.0, 1.0]
        ]])

        # define observation probability for the left action
        #                Obs: Tiger left    Obs: Tiger right
        # Tiger: left    0.50               0.50
        # Tiger: right   0.50               0.50
        # Any observation without listening is informative
        obs_right = np.array([[
            [0.0, 1.0],
            [1.0, 0.0],
            [1.0, 0.0],
            [1.0, 0.0],
            [0.0, 1.0]
        ]])
        
        # define observation probability for the right action
        #                Obs: Tiger left    Obs: Tiger right
        # Tiger: left    0.50               0.50
        # Tiger: right   0.50               0.50
        # Any observation without listening is informative
        obs_stay = np.array([[
            [0.0, 1.0],
            [1.0, 0.0],
            [1.0, 0.0],
            [1.0, 0.0],
            [0.0, 1.0]
        ]])

        self.__O = np.concatenate([obs_left, obs_right], axis=0)

    def __init_rewards(self):
        # define rewards for listening
        # Tiger: left      -1
        # Tiger: right     -1
        # You get a -1 reward for listening
        R_left = np.array([[-1, -1, -1, 0, -1]])

        # define rewards for left action
        # Tiger: left      -100
        # Tiger: right      +10
        # If the tiger is left and you choose to go
        # left, then you get -100.
        # If the tiger is right and you choose to go
        # left, you get +10.
        R_right = np.array([[-1, 0, -1, -1, -1]])

        # define rewards for right action
        # Tiger: left       +10
        # Tiger: right      -100
        # If the tiger is left and you choose to go
        # right, then you get +10
        # If the tiger is right and you choose to go
        # right, then you get -100.
        R_stay = np.array([[-1, -1, 0, -1, -1]]) # just in goal state
        
        self.__R = np.concatenate([R_left, R_right], axis=0)

    def reset(self):
        self.done = False
        self.num_steps = 0

        # initialize the state random
        # this puts the tiger behind the left and right
        # door with equal probability
        self.__state = np.random.choice([States.A0, States.A4])

    def step(self, action: Actions) -> Tuple[int, float, bool, Dict[str, int]]:
        """
        Performs an environment step

        Parameters
        ----------
        action
            action to be applied

        Returns
        -------
        Tuple containing the next observation, the reward,
        ending episode flag, other information.
        """
        assert not self.done, "The episode finished. Call reset()!"
        self.num_steps += 1
        self.done = (self.num_steps == self.max_num_steps)

        # get the next observation. this is stochastic
        obs = np.random.choice(
            a=[Obs.O_2, Obs.O_3],
            p=self.O[action][self.__state]
        )

        # get the reward. this is deterministic
        reward = self.R[action][self.__state]

        # get the next transition
        self.__state = np.random.choice(
            a=[States.A0, States.A1, States.A2, States.A3, States.A4],
            p=self.T[action][self.__state]
        )

        # construct info
        info = {"num_steps": self.num_steps}
        return obs, reward, self.done, info

    @property
    def state_mapping(self) -> Dict[States, str]:
        """
        Returns
        -------
        State mapping (for display purpose)
        """
        return self.__state_mapping

    @property
    def action_mapping(self) -> Dict[Actions, str]:
        """
        Returns
        -------
        Action mapping (for display purposes)
        """
        return self.__action_mapping

    @property
    def obs_mapping(self) -> Dict[Obs, str]:
        """
        Returns
        -------
        Observation mapping (for display purposes)
        """
        return self.__obs_mapping

    @property
    def T(self) -> np.ndarray:
        """
        Returns
        -------
        Transition probability matrix.
        Axes: (a, s, s')
        """
        return self.__T

    @property
    def O(self) -> np.ndarray:
        """
        Returns
        -------
        Observation probability matrix.
        Axes: (a, s, o)
        """
        return self.__O

    @property
    def R(self) -> np.ndarray:
        """
        Returns
        -------
        Reward matrix:
        Axes: (a, s)
        """
        return self.__R

    @property
    def states(self) -> List[int]:
        """
        Returns
        -------
        List containing the states
        """
        return list(self.__state_mapping.keys())

    @property
    def actions(self) -> List[int]:
        """
        Returns
        -------
        List containing the actions
        """
        return list(self.__action_mapping.keys())

    @property
    def obs(self) -> List[int]:
        """
        Returns
        -------
        List containing the observations
        """
        return list(self.__obs_mapping.keys())

### Utils

In [34]:
def display(policies: Dict, ncols: int = 5):
    """
    Parameters
    ----------
    policies
        Full dictionary of policies
    ncol
        Number of plots per row(columns)
    """

    num_steps = len(policies)
    nrows = num_steps // ncols + (num_steps % ncols != 0)

    fig, ax = plt.subplots(nrows, ncols, figsize=(20, 10))
    for i in policies:
        # print("Policy index: ", i)

        row, col = i // ncols, i % ncols
        # extract x1, y1 components
        x1 = [b[0] for b in policies[i]["policy"]]
        y1 = policies[i]["scores"]

        # print("Nr policies: ", len(policies))
        g = ax[row][col] if len(policies) > ncols else ax[col]
        # plot scores
        g.stem(x1, y1, 'o--', basefmt="b")
        g.set_title(f"Value Function Step {i}")

        for v in policies[i]["V"]:
            col1 = np.linspace(1, 0, 1000).reshape(-1, 1)
            col2 = 1 - col1
            matrix = np.concatenate([col1, col2], axis=1)

            # compute points then filter
            y2 = np.dot(matrix, v)
            idx = (np.min(y1) <= y2) & (y2 <= np.max(y1))
            g.plot(col2[idx], y2[idx], )


def get_closest_belief(policy: Dict[Tuple, Actions], b) -> Tuple:
    keys = list(np.array(x) for x in policy.keys())
    dist = [np.linalg.norm(b - x) for x in keys]
    return keys[np.argmin(dist)]

### The next part is taken from the POMDP Lab

In [35]:
def update_belief(b: np.ndarray, a: Actions, o: Obs, env: HomeworkEnv) -> np.ndarray:
    """
    Computes the next belief state from the current belief state, applied action
    and received observation
    
    Parameters
    ----------
    b
        Current belief state
    a
        Applied action
    o
        Observation received
    env
        Tiger Environment
    
    Returns
    -------
    Next belief state.
    """
    
    # extract transition probability matrix
    # adn observation probability matrix
    T, O = env.T, env.O
    
    # get states, actions & observations
    states, actions, obs = env.states, env.actions, env.obs
    
    # compute the next belief state
    b_prime = np.zeros_like(b)
    
    ###########################
    # TODO 1 - Your code here #
    ###########################
    
    for s_prime in states:
        sum_over_states = 0
        for s in states:
            sum_over_states += T[a][s][s_prime] * b[s]
        b_prime[s_prime] = O[a][s_prime][o] * sum_over_states


    # normalize
    if b_prime.sum() != 0:
        b_prime /= b_prime.sum()
        
    return b_prime

In [36]:
def check_duplicate(b: np.ndarray, buff: List[np.ndarray], eps: float = 1e-8) -> bool:
    """
    Checks whether the belief is already in the buffer
    
    Parameters
    ----------
    b
        belief to check if it already exists
    buff
        buffer of beliefs to check against
    eps
        distance threshold
    """
    dist = np.array([np.linalg.norm(b - x) for x in buff])
    return any(dist < eps)

In [37]:
def get_next_beliefs(b: np.ndarray, env:HomeworkEnv) -> List[np.ndarray]:
    """
    Generates a list of possible beliefs that can result
    form the current belief passed as argument
    
    Parameters
    ----------
    b
        current belief
    env
        environment
    """
    # get the list of possible actions
    acts, obs = env.actions, env.obs
    buff = []
    
    ######################
    # TODO 2: Your code here
    
    # step 1: go through all the actions
    for a in acts:
        # step 2: go through all the observations
        for o in obs:
            # update current belief according to the
            # current action and observation using 
            # the update_belief function previously implemented
            # b_prime = ...
            b_prime = update_belief(b, a, o, env)
            # add the new belief to the buffer only
            # if it is not a duplicate
            #   ...
            if not check_duplicate(b_prime, buff):
                buff.append(b_prime)
    
    #######################
    
    return buff

In [38]:
def generate_all_beliefs(b_init: np.ndarray, env: HomeworkEnv) -> List[List[np.ndarray]]:
    """
    Generate all the possible belief that can result
    in the maximum steps allowed
    
    Parameters
    ----------
    b_init
        initial belief (we're going to use the [0.5, 0.5] for this lab).
    env
        environment
    
    Returns
    -------
    List of lists of belief, meaning that for each step
    we will have a list of belief.
    E.g.
    [
        [b_init],            ---> initial belief (level 1)
        [b00, b01, b02, ...] ---> those result from the initial belief (level 2)
        [b10, b11, b12, ...] ---> those result form the beliefs from the second level (level 3)
        ....
    ]
    """
    
    # extract the maximum number of steps allowe
    # by the environment
    max_num_steps = env.max_num_steps
    
    # initialize storing buffer by adding the 
    # list containing the initial belief
    buff = [[b_init]]
    
    # for  the maximum steps allowed
    for step in range(1, max_num_steps):
        # buffer for the next level of beliefs
        next_buff = []
        
        # go through all beliefs from the previous level
        # and generate new ones
        for b in buff[step - 1]:
            # generate all the belief that can result for
            # belief b (apply get_next_belief previously implemented)
            tmp_buff = get_next_beliefs(b, env)
            
            # we have to check if the new beliefs
            # don't exist already in the next level buffer
            # so we don't add duplicates
            for b_prime in tmp_buff:
                if not check_duplicate(b_prime, next_buff):
                    next_buff.append(b_prime)
            
        # add the new level of beliefs
        buff.append(next_buff)
    
    return buff

In [39]:
def get_gamma_a_star(a: Actions, env: HomeworkEnv) -> np.ndarray:
    """
    Parameters
    ----------
    a
        current action
    env
        environment
    
    Returns
    -------
    alpha^(a, *) vector. This in an array
    of dimension: # of states and can be extracted
    directly from the rewards matrix
    """
    return env.R[a]

In [40]:
def get_gamma_a_o(a: Actions, o: Obs, V_prime: List[np.array], env: HomeworkEnv, gamma: float=0.9):
    """
    Parameters
    ----------
    a
        action
    o
        observation
    V_prime
        list of alpha vectors from the next step
    env
        environment
    gamma
        discounting factor
    """
    # get transition, observation and reward matrix
    T, O, R = env.T, env.O, env.R
    
    # get posible states, actions and observations
    states, actions, obs = env.states, env.actions, env.obs
    
    # buffer of next gamma_ao vectors
    gamma_a_o = []
    
    # go through all alpha_prime vectors from V_prime
    for alpha_prime in V_prime:
        # define the new alpha_a_o vector
        alpha_a_o = np.zeros((len(states)))
        
        ########################
        # TODO 3: Your code here
        # go throguh all states (s)
        for s in states:
        
            # go through all states (s_prime)
            for s_prime in states:
        
                # perform update alpha_a_o[s] += ...
                alpha_a_o[s] += T[a][s][s_prime] * O[a][s_prime][o] * alpha_prime[s_prime]
        ########################
        
        # append the new alpha_a_o vector to the buffer
        gamma_a_o.append(gamma * alpha_a_o)
    
    return gamma_a_o

In [41]:
def get_gamma_a_b(b: np.ndarray, 
                  V_prime: List[np.ndarray], 
                  env: HomeworkEnv, 
                  gamma: float=0.9) -> Dict[Actions, np.ndarray]:
    """
    Parameters
    ----------
    b
        belief state
    V_prime
        list of alpha vector from the next step
    env
        environment
    gamma
        discounting factor
    """
    # get all posible actions and observations
    A, O = env.actions, env.obs
    
    # define gamma_a_b buffer 
    gamma_a_b = {}
    
    # go through all actions
    for a in A:
        # get the gamma_a_star vectors form the previously implemented function
        gamma_a_star = get_gamma_a_star(a, env)
        
        # define accumulator accumulator
        sum_gamma_a_o = np.zeros_like(gamma_a_star, dtype=np.float)
        
        # go through all the observations
        for o in O:
            # get gamma_a_o from the previously implementd function
            gamma_a_o = get_gamma_a_o(a, o, V_prime, env, gamma)
            
            # need to do a maximization
            best_alpha = None
            best_score = -np.inf
            
            # go through all alphas from gamma_a_o
            for alpha in gamma_a_o:
                # compute the score by dot product between
                # alpha and current belief b
                score = np.dot(alpha, b)
                
                # update the best score and alpha
                if score > best_score:
                    best_score = score
                    best_alpha = alpha
            
            # add best alpha to the summation
            # notice that if V_prime is empty (for the last step)
            # we don't have any best_alpha
            if best_alpha is not None:
                sum_gamma_a_o += best_alpha
        
        # add the reward vector to the accumulator
        sum_gamma_a_o += gamma_a_star
        
        # store mapping between action and accumulator
        gamma_a_b[a] = sum_gamma_a_o

    return gamma_a_b

In [42]:
def get_V(B, V_prime, env: HomeworkEnv, gamma: float=0.9) -> Tuple[List[np.ndarray], Dict, List[float]]:
    """
    Parameters
    ----------
    B
        List of beliefs (per leve)
    env
        environment
    gamma
        discount factor
    
    Returns
    -------
    A tuple containing the  the new V list, best policy for the current level, 
    and the best scores
    """
    # define policy dictionary
    policy = {}
    
    # define V and score buffers
    V, scores = [], []
    
    # go through all beliefs
    ################################################
    # TODO 5: Your code here
    for b in B:
        # get gamma_a_b dictionary form the previous implemented function
        # gamma_a_b = ....
        gamma_a_b = get_gamma_a_b(b, V_prime, env, gamma)
        
        # variables for maximization
        best_a = None
        best_score = -np.inf
        best_gamma_a_b = None
        
        # go through all actions from gamma_a_b
        for a in gamma_a_b:
            # compute score by dot product between
            # the gamma_a_b corresponding to a and the current belief
            # score = ...
            score = np.dot(gamma_a_b[a], b)
            # update score if better and
            # remeber the action and the alpha
            #    ... update best score, best_a, best_gamma_a_b
            if score > best_score:
                best_score = score
                best_gamma_a_b = gamma_a_b[a]
                best_a = a
        
        # remeber the action to be applied
        # for the current belief state 
        policy[tuple(b)] = best_a
        ###############################################
        
        # add best gamma_a_b to the V
        V.append(best_gamma_a_b)
        
        # also remebere the best score
        scores.append(best_score)   
    
    return V, policy, scores 

In [43]:
def point_based_value_backup(env: HomeworkEnv, 
                             gamma: float=0.9):
    """
    Point-based value backup algorithm for POMDP
    Link: http://www.cs.cmu.edu/~ggordon/jpineau-ggordon-thrun.ijcai03.pdf
    
    Parameters
    ----------
    env
        environment
    gamma
        discount factor
    
    Returns
    -------
    Best policies per level. Each element
    corresponds to a level
    """
    
    # define initial belief states
    b_init = np.array([0.5, 0, 0, 0, 0.5])
    
    # generate the list of all possible beliefs per level
    B = generate_all_beliefs(b_init, env)
    
    # need to reverse the list cause we are starting
    # from the last possible acton
    B = reversed(B)
    
    # initail list of best gamma_a_b
    V = []
    
    # buffer of policies and V vectors
    policies = {}
    
    # for each level and each set of beliefs
    for i, bs in enumerate(B):
        # get the V's, policy and the best scores
        V, policy, scores = get_V(bs, V, env)
        
        # store results
        policies[env.max_num_steps - i - 1] = {
            "policy": policy,
            "V": V,
            "scores": scores
        }
    return policies

In [44]:
# define environment
env = HomeworkEnv(max_num_steps=3)

# solve environment
policies = point_based_value_backup(env=env, gamma=0.9)

for step in range(env.max_num_steps):
    print("========== Step %d ======== " % (step, ))
    pprint(policies[step]['policy'])
    print("\n")

{(0.5, 0.0, 0.0, 0.0, 0.5): <Actions.LEFT: 0>}


{(0.0, 0.0, 0.0, 0.0, 1.0): <Actions.LEFT: 0>,
 (0.0, 0.0, 0.0, 1.0, 0.0): <Actions.LEFT: 0>,
 (0.0, 1.0, 0.0, 0.0, 0.0): <Actions.RIGHT: 1>,
 (1.0, 0.0, 0.0, 0.0, 0.0): <Actions.RIGHT: 1>}


{(0.0, 0.0, 0.0, 0.0, 0.0): <Actions.LEFT: 0>,
 (0.0, 0.0, 0.0, 0.0, 1.0): <Actions.LEFT: 0>,
 (0.0, 0.0, 0.0, 1.0, 0.0): <Actions.LEFT: 0>,
 (0.0, 0.0, 1.0, 0.0, 0.0): <Actions.LEFT: 0>,
 (0.0, 1.0, 0.0, 0.0, 0.0): <Actions.RIGHT: 1>,
 (1.0, 0.0, 0.0, 0.0, 0.0): <Actions.LEFT: 0>}




Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  sum_gamma_a_o = np.zeros_like(gamma_a_star, dtype=np.float)


In [46]:
# define environment
# you can change the number of steps
env =HomeworkEnv(max_num_steps=4, noise=0.15)

# solve environment
policies = point_based_value_backup(env=env, gamma=0.9)
print(policies[0])

# do a few experiments
scores = []
num_experiments = 10

for i in range(num_experiments):
    # reset environment
    env.reset()
    
    # initial belief state
    b = np.array([0.5, 0, 0, 0, 0.5])
    
    # score variable & buffer actions
    score = 0
    acts, obs = [], []
    
    for step in count():
        # interact with the environment
        b = get_closest_belief(policies[step]["policy"], b)
        a = policies[step]["policy"][tuple(b)]
        o, r, done, _ = env.step(a)
        
        # update score, acts & obs
        score += r
        acts.append(a)
        obs.append(o)
        
        # break if environment completed
        if done:
            acts = [env.action_mapping[a] for a in acts]
            obs = [env.obs_mapping[o] for o in obs]
            
            print("Episode %d, Score: %.2f" % (i, score))
            print("\t* Actions:", acts)
            print("\t* Obs:", obs)
            print("\n")
            break
        
        # update belief
        b = update_belief(b=b, a=a, o=o, env=env)
    
    # save score
    scores.append(score)
    
# report mean score
print("=================")
print("Avg score: %.2f" % (np.mean(scores), ))

{'policy': {(0.5, 0.0, 0.0, 0.0, 0.5): <Actions.LEFT: 0>}, 'V': [array([-2.629, -2.629, -3.439, -2.439, -1.81 ])], 'scores': [-2.2195]}
Episode 0, Score: -4.00
	* Actions: ['Left', 'Right', 'Right', 'Left']
	* Obs: ['Three walls, end!', 'Two walls', 'Three walls, end!', 'Three walls, end!']


Episode 1, Score: -4.00
	* Actions: ['Left', 'Right', 'Left', 'Left']
	* Obs: ['Three walls, end!', 'Three walls, end!', 'Two walls', 'Three walls, end!']


Episode 2, Score: -4.00
	* Actions: ['Left', 'Right', 'Left', 'Left']
	* Obs: ['Three walls, end!', 'Three walls, end!', 'Two walls', 'Three walls, end!']


Episode 3, Score: -4.00
	* Actions: ['Left', 'Right', 'Left', 'Left']
	* Obs: ['Three walls, end!', 'Three walls, end!', 'Two walls', 'Three walls, end!']


Episode 4, Score: -4.00
	* Actions: ['Left', 'Right', 'Right', 'Left']
	* Obs: ['Three walls, end!', 'Two walls', 'Three walls, end!', 'Three walls, end!']


Episode 5, Score: -4.00
	* Actions: ['Left', 'Right', 'Right', 'Left']
	* Obs

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  sum_gamma_a_o = np.zeros_like(gamma_a_star, dtype=np.float)
