In [1]:
import numpy as np
import sys

STAY = 0
UP = 1
RIGHT = 2
DOWN = 3
LEFT = 4

ACTIONS = [STAY, UP, RIGHT, DOWN, LEFT]
STR_ACTIONS = ['STAY', 'UP', 'RIGHT', 'DOWN', 'LEFT']

Edge_up = [x for x in range(0,6)]
Edge_right = [5,11,17]
Edge_down = [x for x in range(12,18)]
Edge_left = [0,6,12]

Banks = [0,5,12,17]
Police_station = [8]


In [2]:
class BankEnv():
    
    def __init__(self):
        
        self.shape = [3,6]
        self.nS = 18*18
        self.nA = 5
        self.P = {}
        
        self.init_agent_state = Banks[0]
        self.init_police_state = Police_station[0]
        
        self.state_index = np.arange(self.nS).reshape((18,18))
        
        self.init_state = self.state_index[self.init_agent_state][self.init_police_state]
        
        self.MAX_Y = self.shape[0]
        self.MAX_X = self.shape[1]
    
    def get_column_line(self,x):    
        
        index = np.arange(18).reshape(self.shape)
        coordinates = np.argwhere(index == x )
        line = coordinates[0][0]
        col = coordinates[0][1]
        
        return (line, col)
    
    def get_police_next_moves(self,agent_state, police_state):
        
        line_agent, col_agent = self.get_column_line(agent_state)
        line_police, col_police = self.get_column_line(police_state)
        
        if line_agent == line_police:
            
            if agent_state > police_state:
                # UP, DOWN, RIGHT
                Moves = [UP, DOWN, RIGHT]
            else:
                # UP, DOWN, LEFT
                Moves = [UP, DOWN, LEFT]
                
        
        elif col_agent == col_police:
            
            if agent_state < police_state:   
                Moves = [UP, RIGHT, LEFT]
            else:
                Moves = [DOWN, RIGHT, LEFT]
                
        else:
            # If agent above and right
            if agent_state < police_state and col_agent > col_police:
                Moves = [UP, RIGHT]
                
            # If agent above and left
            elif agent_state < police_state and col_agent < col_police:
                Moves = [UP, LEFT]
            
            # If agent down and right
            elif agent_state > police_state and col_agent > col_police:
                Moves = [DOWN, RIGHT]
            
            # If agent down and left
            elif agent_state > police_state and col_agent < col_police:
                Moves = [DOWN, LEFT]
                
        return Moves
    
    def get_next_states(self,state, actions):
        
        next_states = {}

        for action in actions:
        
            if action == UP and state not in Edge_up:
                next_states[UP] = state - self.MAX_X

            if action == RIGHT and state not in Edge_right:
                next_states[RIGHT] = state + 1

            if action == DOWN and state not in Edge_down:
                next_states[DOWN] = state + self.MAX_X

            if action == LEFT and state not in Edge_left:
                next_states[LEFT] = state - 1
                
            if action == STAY:
                next_states[STAY] = state
            
        
        return next_states
    
    def create_transition_matrix(self):
        
        on_bank = lambda x: x in Banks
        on_police = lambda x,y: x == y
        
        grid = np.arange(self.nS).reshape((18,18))
        
        it = np.nditer(grid, flags=['multi_index'])
        
        while not it.finished:
            
            s = it.iterindex
            
            x,y = it.multi_index
            
            self.P[s] = {a : [[]] for a in range(self.nA)}
            
            if on_police(x,y):
                reward = -50

                for action in range(4):
                    self.P[s][action] = [([1.0], [self.init_state], [reward])]
            
            else:
                police_moves = self.get_police_next_moves(x,y)
                
                agent_next_states = self.get_next_states(x, ACTIONS)
                
                police_next_states = self.get_next_states(y,police_moves)
                
                prob = 1/len(police_next_states)

                if on_bank(x):
                    reward = 10
                else:
                    reward = 0
            
            
                for action_a, ns in agent_next_states.items():
                    next_state_list = []
                    prob_list = []
                    reward_list = []

                    for action_p, ns_p in police_next_states.items():
                        
                        ns_index = self.state_index[ns][ns_p]

                        next_state_list.append(ns_index)
                        prob_list.append(prob)
                        reward_list.append(reward)

                    self.P[s][action_a] = [(prob_list, next_state_list, reward_list)]

            it.iternext()
            
    
    

In [3]:
env = BankEnv()

In [4]:
env.create_transition_matrix()

In [5]:
def value_iteration(env, theta=0.0001, discount_factor=0.6):
    """
    Value Iteration Algorithm.
    
    Args:
        env:env.P represents the transition probabilities of the environment.
            env.P[s][a] is a list of transition tuples (prob, next_state, reward, done).
            env.nS is a number of states in the environment. 
            env.nA is a number of actions in the environment.
        theta: We stop evaluation once our value function change is less than theta for all states.
        discount_factor: Gamma discount factor.
        
    Returns:
        A tuple (policy, V) of the optimal policy and the optimal value function.
    """
    
    def isListEmpty(inList):
        if isinstance(inList, list): # Is a list
            return all( map(isListEmpty, inList) )
        return False # Not a list
    
    
    def one_step_lookahead(state, V):
        """
        Helper function to calculate the value for all action in a given state.
        
        Args:
            state: The state to consider (int)
            V: The value to use as an estimator, Vector of length env.nS
        
        Returns:
            A vector of length env.nA containing the expected value of each action.
        """
        
        A = np.zeros(env.nA)
        for a in range(env.nA):
            
            if len(env.P[state][a][0]) != 0:

                for prob, next_state, reward in env.P[state][a]:

                    for i in range(len(prob)):
                        A[a] += prob[i] * (reward[i] + discount_factor * V[next_state[i]])

               
        return A
    
    V = np.zeros(env.nS)
    maxIter = 50
    i = 0
    while True:
        # Stopping condition
        delta = 0
        # Update each state...
        for s in range(env.nS):
            
            # Do a one-step lookahead to find the best action
            A = one_step_lookahead(s, V)
            best_action_value = np.max(A)
            # Calculate delta across all states seen so far
            delta = max(delta, np.abs(best_action_value - V[s]))
            # Update the value function. Ref: Sutton book eq. 4.10. 
            V[s] = best_action_value        
        # Check if we can stop 
        
        i +=1
        #print("Delta-Value ", delta)
        if delta < theta:
            break
        if i >=maxIter:
            break
        
    
    # Create a deterministic policy using the optimal value function
    policy = np.zeros([env.nS, env.nA])
    for s in range(env.nS):
        # One step lookahead to find the best action for this state
        A = one_step_lookahead(s, V)
        best_action = np.argmax(A)
        # Always take the best action
        policy[s, best_action] = 1.0
    
    return policy, V

In [6]:
policy, v = value_iteration(env)

In [7]:
pol = np.reshape(np.argmax(policy, axis=1), (18,18))

In [8]:
len(policy)

324

In [9]:
pol[:,13].reshape((3,6))

array([[0, 4, 4, 2, 2, 0],
       [1, 1, 1, 2, 2, 1],
       [1, 4, 4, 2, 2, 0]], dtype=int64)

In [10]:
# Solve the problem, and display the value function (evaluated at the initial state) as a function
# of lambda. Illustrate an optimal policy for different values of lambda - comment on the behaviour.

In [11]:
v[8]

22.61202818579331

In [12]:
values = []

for discount in np.arange(0,1,0.01):
    policy,v = value_iteration(env, theta=0.0001, discount_factor=discount)
    values.append(v[8])

In [13]:
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()

discounts = np.arange(0,1,0.01)

plt.plot(discounts, values)
#plt.show()


In [14]:
import random
class Simulation:
    
    def __init__(self, policy):
        self.grid = self.initialize_grid()
        self.policy = policy
        self.s = {'A':[0,0], 'P':[1,2]}
    
    
    def update_state(self, action):
                
        if action == UP:
            self.s['A'][0] -= 1
        elif action == DOWN:
            self.s['A'][0] += 1
        elif action == RIGHT:
            self.s['A'][1] += 1
        elif action == LEFT:
            self.s['A'][1] -= 1
        else:
            pass
        
        action = self.get_police_action()
        
        while not self.valid_move(self.s['P'], action):
            action = self.get_police_action()
        
        if action == UP:
            self.s['P'][0] -= 1
        elif action == DOWN:
            self.s['P'][0] += 1
        elif action == RIGHT:
            self.s['P'][1] += 1
        elif action == LEFT:
            self.s['P'][1] -= 1
        else:
            pass

    def initialize_grid(self):
        
        grid = np.zeros((3,6), dtype = object)
        grid[:] = '-'
        grid[0,0] = 'A'
        grid[1,2] = 'P'
        return grid
    def update_grid(self):
        
        ia, ja = self.s['A']
        ib, jb = self.s['P']
        self.grid = np.zeros((3, 6), dtype = object)
        self.grid[:] = '-'
        self.grid[ia, ja] = 'A'
        self.grid[ib, jb] = 'P'
        
    def get_police_action(self):
        
        state_index = np.arange(18).reshape((3,6))

        row_agent, col_agent = self.s['A']
        row_police, col_police = self.s['P']
        
        print("Agent state", self.s['A'])
        print("Police state", self.s['P'])
        
        police_state = state_index[row_police][col_police]
        agent_state = state_index[row_agent][col_agent]
        
        if row_agent == row_police:
            
            if agent_state > police_state:
                # UP, DOWN, RIGHT
                Moves = [UP, DOWN, RIGHT]
            else:
                # UP, DOWN, LEFT
                Moves = [UP, DOWN, LEFT]

        elif col_agent == col_police:
            
            if agent_state < police_state:   
                Moves = [UP, RIGHT, LEFT]
            else:
                Moves = [DOWN, RIGHT, LEFT]
                
        else:
            # If agent above and right
            if agent_state < police_state and col_agent > col_police:
                Moves = [UP, RIGHT]
                
            # If agent above and left
            elif agent_state < police_state and col_agent < col_police:
                Moves = [UP, LEFT]
            
            # If agent down and right
            elif agent_state > police_state and col_agent > col_police:
                Moves = [DOWN, RIGHT]
            
            # If agent down and left
            elif agent_state > police_state and col_agent < col_police:
                Moves = [DOWN, LEFT]
        str_moves = [STR_ACTIONS[move] for move in Moves]
        print(str_moves)
        random_action = random.choice(Moves)
        print(STR_ACTIONS[random_action])
        print('\n')
        return random_action
        
    def valid_move(self,state, action):
        
        if action == UP and state[0] == 0:
            return False
        elif action == DOWN and state[0]:
            return False
        elif a == LEFT and state[1] == 0: # Western Maze Wall
            return False
        elif a == RIGHT and state[1] == 5: # Eastern Maze Wall
            return False
        else:
            return True
    
    def get_action_from_policy(self, state):
        
        index = np.arange(18).reshape((3,6))
        state_index = np.arange(18*18).reshape((18,18))
        
        col_agent, line_agent = state['A']
        col_police, line_police = state['P']
        
        police_index = index[col_police][line_police]
        agent_index = index[col_agent][line_agent]
        
        state_i = state_index[agent_index][police_index]
        
        action = self.policy[state_i]
        
        return action
        
    def run_simulation(self, T):
        
        self.s = {'A': [0,0], 'P': [1,2]}
        print('Start grid:')
        print(self.grid)
        print('\n')
        for i in range(T):
            
            action = self.get_action_from_policy(self.s)
            self.update_state(action)
            self.update_grid()
            print(self.grid)
            print('\n')
    
    

In [15]:
try:
    policy = np.argmax(policy,axis=1)
except:
    pass

In [16]:
a = Simulation(policy)

In [17]:
a.run_simulation(10)

Start grid:
[['A' '-' '-' '-' '-' '-']
 ['-' '-' 'P' '-' '-' '-']
 ['-' '-' '-' '-' '-' '-']]


Agent state [0, 0]
Police state [1, 2]
['UP', 'LEFT']
UP


[['A' '-' 'P' '-' '-' '-']
 ['-' '-' '-' '-' '-' '-']
 ['-' '-' '-' '-' '-' '-']]


Agent state [0, 0]
Police state [0, 2]
['UP', 'DOWN', 'LEFT']
LEFT


[['A' 'P' '-' '-' '-' '-']
 ['-' '-' '-' '-' '-' '-']
 ['-' '-' '-' '-' '-' '-']]


Agent state [1, 0]
Police state [0, 1]
['DOWN', 'LEFT']
LEFT


[['P' '-' '-' '-' '-' '-']
 ['A' '-' '-' '-' '-' '-']
 ['-' '-' '-' '-' '-' '-']]


Agent state [2, 0]
Police state [0, 0]
['DOWN', 'RIGHT', 'LEFT']
LEFT


[['-' '-' '-' '-' '-' 'P']
 ['-' '-' '-' '-' '-' '-']
 ['A' '-' '-' '-' '-' '-']]


Agent state [2, 0]
Police state [0, -1]
['DOWN', 'RIGHT']
DOWN


[['-' '-' '-' '-' '-' '-']
 ['-' '-' '-' '-' '-' 'P']
 ['A' '-' '-' '-' '-' '-']]


Agent state [2, 0]
Police state [1, -1]
['DOWN', 'RIGHT']
DOWN


Agent state [2, 0]
Police state [1, -1]
['DOWN', 'RIGHT']
RIGHT


[['-' '-' '-' '-' '-' '-'