In [47]:
import sys
import numpy as np

UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3

In [48]:
'''
Enumerate states for a 3x3 grid ==> 81 states 
(9 choices for pacman location x 9 choices for ghost location)

y    _0_|_1_|_2_
|    _3_|_4_|_5_
v     6 | 7 | 8
    x -->

Each x,y pair represented as an integer number corresponding to the diagram above
'''

states_num = [];

for s in range(81):
    for p in range(9):
        for g in range(9):
            states_num.append( (p, g) )
                    
#for s in range(81):
#    print("state ", s, ": ", states_num[s])

In [49]:
def grid_to_xy(number):
    switch = {
        0: [0,0],
        1: [1,0],
        2: [2,0],
        3: [0,1],
        4: [1,1],
        5: [2,1],
        6: [0,2],
        7: [1,2],
        8: [2,2]
    }
    return switch.get(number, "invalid entry")

def xy_to_grid(x,y):
    switch = {
        0: {0:0, 1:3, 2:6},
        1: {0:1, 1:4, 2:7},
        2: {0:2, 1:5, 2:8}
    }
    x = switch.get(x,"invalid entry")

    if x == "invalid entry":
        return x
    else:
        return x.get(y,"invalid entry")

def return_state(states, p, g):
    return states.index( (p,g) )

def move(x, y, action):
    if action == UP:
        y = max(0, y-1)
    elif action == RIGHT:
        x = min(2, x+1)
    elif action == DOWN:
        y = min(2, y+1)
    elif action == LEFT:
        x = max(0, x-1)
    return xy_to_grid(x, y)

In [50]:
class PacmanEnv:
    '''
    Class to initialize and store information about the Pacman environment for program planning algorithms.

    Properties:
        P[s][a] is a list of is a list of transition tuples (prob, next_state, reward, done)
        num_states = number of states (set to default for 3x3 grid)
        num_actions = number of actions (set to 4)
        pellet_loc = location of pellet (set to 2, i.e. [2,0] by default)

    Methods:
        return_state: Returns state number given location of pacman and the ghost
        move: Moves pacman given current location and action input. Returns grid location number
        calculate_reward: Returns reward for current location of pacman. Used to evaluate R(s,a,s') by 
                        first determining s' through move(s,a), then calculating the reward at s'.
        grid_to_xy: Returns corresponding (x,y) coordinate pair for valid grid location integer input
                    If number out of range, returns 'invalid entry' error message
        xy_to_grid: Returns corresponding grid location # for given (x,y) coordinate pair input
                    If number out of range, returns 'invalid entry' error message
    '''

    def __init__(self, states=states_num, num_states=81, num_actions=4, pellet_loc=2):
        self.states = states
        self.num_states = num_states
        self.num_actions = num_actions
        self.pellet_loc = pellet_loc
        
        P = {s : {a : [] for a in range(num_actions)} for s in range(num_states)}
        
        # parameters must be of the same type, i.e. [x,y] or int value 0-8
        # need to adjust to include reward definition for bumping into walls
        def calculate_reward(pacman_new_loc, ghost_new_loc, ghost_current_loc, pellet_location):
            if pacman_new_loc == ghost_current_loc: # pacman moved to the ghost's location
                return -1000
            elif pacman_new_loc == pellet_location:
                return 1000
            elif pacman_new_loc == ghost_new_loc: # the ghost moved to pacman's new location
                return -1000
            else:
                return 0
        
        for s in range(num_states):
            for pacman_a in range(num_actions):
                done = False # flag to signal game has ended
                temp = P[s][pacman_a]
                pacman_grid_loc = states[s][0] # for the given state, where is pacman
                ghost_grid_loc = states[s][1] # in the given state, where is the ghost
                
                # if pacman performs action a: 0=UP, 1=RIGHT, 2=DOWN, 3=LEFT
                [x_p, y_p] = grid_to_xy(pacman_grid_loc)
                next_pacman_loc = move(x_p, y_p, pacman_a) # grid location he will move to
                
                for ghost_a in range(num_actions):
                    # if the ghost performs action a: 0=UP, 1=RIGHT, 2=DOWN, 3=LEFT
                    [x_g, y_g] = grid_to_xy(ghost_grid_loc)
                    next_ghost_loc = move(x_g, y_g, ghost_a) # grid location he will move to
                    
                    # resulting next state, simulates pacman and the ghost moving simultaneously
                    next_state = return_state(states, next_pacman_loc, next_ghost_loc) 
                    reward = calculate_reward(next_pacman_loc, next_ghost_loc, ghost_grid_loc, pellet_loc) # calculate the reward at this state

                    if (pacman_grid_loc == pellet_loc or pacman_grid_loc == ghost_grid_loc):
                        done = True

                    temp.append( (0.25, next_state, reward, done) )
        
        self.P = P
        

In [51]:
def value_iteration(env=PacmanEnv(), gamma=0.9, theta=1e-5):
    '''
    Value Iteration Algorithm

    Inputs:
        env: PacmanEnv as defined in class above.
        gamma: Discount rate for future rewards.
        theta: Stopping criterion value. When change in Value function is less than theta for every state, stop.

    Helper Methods:
        calculate_action_values: Calculates the values for all actions for a given state.
                                Returns a vector action_values of length num_actions, where 
                                action_values[a] = expected value of action a.
                                The expected value is calculated according to the Bellman equation:
                                V(s) = P(s'|s,a) * ( R(s,a) + (gamma * V(s')) )
        extract_policy: Returns the optimal policy for a given value function. It is run once at the end of the algorithm
                        after the optimal V (value function) has been calculated.

    Outputs:
        A tuple (policy, V, steps) of the optimal policy, the approximated optimal value function, and the number of steps
        the algorithm took to converge.
    '''
    
    def calculate_action_values(current_state, V):
        action_values = np.zeros(env.num_actions)
        for a in range(env.num_actions):
            for prob, next_state, reward, done in env.P[current_state][a]:
                action_values[a] += prob * (reward + (gamma * V[next_state]))
        return action_values
    
    def extract_policy(V):
        policy = np.zeros([env.num_states, env.num_actions])
        
        for s in range(env.num_states):
            action_values = calculate_action_values(s, V)
            best_action = np.argmax(action_values) # returns index of action that has maximum V
            policy[s, best_action] = 1 # deterministic optimal policy, i.e. always take best_action for given state
        
        return policy
    
    V = np.zeros(env.num_states) # arbitrarily initialize vector V to be all zeros
    converged = False
    steps = 0
    
    # iteratively calculate optimal V
    while not converged:
        print('Value iteration, step ', steps, '...')
        delta = 0
        for s in range(env.num_states):
            action_values = calculate_action_values(s, V)
            max_action_value = np.max(action_values)
            delta = max( delta, np.abs(max_action_value - V[s]) ) # the maximum difference between V'(s) and V(s) for all s
            V[s] = max_action_value        
        
        steps += 1
        
        #print('Delta: ', delta)
        converged = (delta < theta)
        #print(converged)
    
    # extract optimal policy after calculating optimal V
    policy = extract_policy(V)
    
    return policy, V, steps

In [43]:
import math
winReward = 1000
loseReward = -1000

def nextMove(env, currState, action):
    '''
    Returns all possible next states given action, and the reward given next states.
    ARGS:
        currState: Current game state, ranges from 0 to numStates-1.
        action: Action to perform.
    RETURN:
        nextStates: List of possible next states give current state and action.
                    Largly depends on potential ghost movements.
        rewards: List of rewards for each potential next state.
    '''
    nextStates = [];
    # Get currState coordinates
    
    pacman_grid_loc = env.states[currState][0] # for the given state, where is pacman
    ghost_grid_loc = env.states[currState][1] # in the given state, where is the ghost
                
    [x_p, y_p] = grid_to_xy(pacman_grid_loc)
    [x_g, y_g] = grid_to_xy(ghost_grid_loc)
    
    # Get pacman location after performing action
    next_pacman_loc = move(x_p, y_p, action)
    pacmanLocX_next, pacmanLocY_next = grid_to_xy(next_pacman_loc)
    
    # Initialize rewards
    if (next_pacman_loc == env.pellet_loc): 
        rewards = [winReward]*4;
    elif (next_pacman_loc == ghost_grid_loc):
        rewards = [loseReward]*4;
    else:
        rewards = [0]*4;

    # Iterate through possible ghost states
    for ghostAction in range(4):
        next_ghost_loc = move(x_g, y_g, ghostAction)
        ghostLocX_next, ghostLocY_next = grid_to_xy(next_ghost_loc)
        state = return_state(env.states, next_pacman_loc, next_ghost_loc)
        nextStates.append(state);     
        # Evaluate reward - pacman eaten by ghost
        if (next_pacman_loc == next_ghost_loc):
            rewards[ghostAction] = loseReward;

    return nextStates, rewards;

In [44]:
import math
winReward = 1000
loseReward = -1000

###############################################################################
############################  Policy Iteration ################################
###############################################################################
def policy_iteration(env=PacmanEnv(), deltaLim=1e-5, gamma=0.9):
    '''
    Policy iteration algorithm consists of two steps that are repeated in each iteration of the loop:
    policy evaluation (calculate V) and policy improvement (greedy approach to improve current policy).
    
    Helper Methods:
        policyEvaluation
        policyImprovement
    '''
    prob = 0.25
    
    ########################## Policy evaluation ##################################
    def policyEvaluation(policy, v, deltaLim=1e-5, delta=10000):
        cnt = 0;
        while (delta > deltaLim): 
            delta = 0;
            for state in range(env.num_states):
                if (env.P[state][0][0][3] == True): # game has ended
                    v[state] = 0;
                    continue;
                vStateOld = v[state]; 
                # Choose action from policy
                action = policy[state];
                # Potential next states given current state and action
                nextStates, rewards = nextMove(env, state, action); 
                # Compute V(s)
                vNextStates = [prob*(rewards[i]+gamma*v[nextStates[i]]) for i in range(len(nextStates))];
                v[state] = sum(vNextStates);

                delta = max(delta, abs(v[state]-vStateOld));
            print("Delta",delta);
            cnt += 1;
        print("Policy evalulation converged at", cnt);
        return v;


    ########################## Policy Improvement #################################
    def policyImprovement(policy, v):
        policyStable = True;
        for state in range(env.num_states):
            if (env.P[state][0][0][3] == True): # game ended
                continue;
            oldAction = policy[state];
            # Optimal action and V(s,a)
            optAction = 0; 
            optActionVal = None;
            for action in range(4):
                # Potential next states given current state and action
                nextStates, rewards = nextMove(env, state, action);
                # Compute V(s,a)
                vNextStates = [prob*(rewards[i]+gamma*v[nextStates[i]]) for i in range(len(nextStates))];
                actionVal = sum(vNextStates);
                if (optActionVal == None):
                    optActionVal = actionVal;
                if (actionVal > optActionVal): # Choose action with largest V(s,a)
                    optActionVal = actionVal;
                    optAction = action;
            # No convergence
            if (oldAction != optAction):
                policyStable = False;
                policy[state] = optAction;
        return policy, policyStable
    
    # Init
    v = [0]*env.num_states;
    policy = [0]*env.num_states;
    policyStable = False;

    # Run Policy Iteration
    policyIter = 1;
    while (not policyStable):
        v = policyEvaluation(policy, v);
        policy, policyStable = policyImprovement(policy, v);
        # print("Total iterations:", policyIter)
        policyIter += 1;

    return policy, v, policyIter

In [45]:
policy, V, steps = policy_iteration()

Delta 1345.9564041748047
Delta 138.45322265624998
Delta 80.36209533691408
Delta 50.327251831054696
Delta 33.73082389033124
Delta 27.188297693384726
Delta 21.966299296992872
Delta 17.57000472180266
Delta 13.958241635996899
Delta 11.036803885005725
Delta 8.69804945493371
Delta 6.838899352821443
Delta 5.368187924993606
Delta 4.20872259239178
Delta 3.296839179868016
Delta 2.5809058406553618
Delta 2.0195120096609003
Delta 1.5796944540835511
Delta 1.2353503818944773
Delta 0.9658848837300411
Delta 0.7550907005195313
Delta 0.5902370677873137
Delta 0.461337393511144
Delta 0.36056539611337257
Delta 0.28325655650687054
Delta 0.22723088283598258
Delta 0.18228629704020705
Delta 0.1462312642899235
Delta 0.11730760240138238
Delta 0.09410482672890907
Delta 0.0754914077958233
Delta 0.06055961086141792
Delta 0.04858123833491845
Delta 0.038972122246548224
Delta 0.0312636379597393
Delta 0.025079851325187974
Delta 0.020119185502721848
Delta 0.01613971373265599
Delta 0.012947360991546475
Delta 0.01038643923

In [60]:
### Value Iteration Result ###
def convert_to_action(num):
    switch = {
                0: 'UP',
                1: 'RIGHT',
                2: 'DOWN',
                3: 'LEFT'
            }
    return switch.get(num, "invalid entry")

policy, V, steps = value_iteration(gamma=0.5)
env = PacmanEnv()
action_list = []

print('-----------------------------------------')
print('Optimal extracted policy: ')
print(policy)

print('-----------------------------------------')
for s in range(env.num_states):
    print()
    print('Location of Pacman: ', grid_to_xy(env.states[s][0]))
    print('Location of Ghost: ', grid_to_xy(env.states[s][1]))
    print('Best action determined by optimal policy: ', convert_to_action( policy[s].tolist().index(1) ) )
    action_list.append(convert_to_action( policy[s].tolist().index(1) ))

print('-----------------------------------------')    
print('Value Function: ')
print(V)

print('Steps \'til convergence: ', steps)
print('-----------------------------------------')

Value iteration, step  0 ...
Value iteration, step  1 ...
Value iteration, step  2 ...
Value iteration, step  3 ...
Value iteration, step  4 ...
Value iteration, step  5 ...
Value iteration, step  6 ...
Value iteration, step  7 ...
Value iteration, step  8 ...
Value iteration, step  9 ...
Value iteration, step  10 ...
Value iteration, step  11 ...
Value iteration, step  12 ...
Value iteration, step  13 ...
Value iteration, step  14 ...
Value iteration, step  15 ...
Value iteration, step  16 ...
Value iteration, step  17 ...
Value iteration, step  18 ...
Value iteration, step  19 ...
Value iteration, step  20 ...
-----------------------------------------
Optimal extracted policy: 
[[0. 1. 0. 0.]
 [0. 0. 1. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 0. 1. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 

In [57]:
### Policy iteration result ###
policy2, V2, steps2 = policy_iteration(gamma=0.5)
env = PacmanEnv()
action_list2 = []

print('-----------------------------------------')
print('Optimal extracted policy: ')
print(policy2)

print('-----------------------------------------')
for s in range(env.num_states):
    print()
    print('Location of Pacman: ', grid_to_xy(env.states[s][0]))
    print('Location of Ghost: ', grid_to_xy(env.states[s][1]))
    print('Best action determined by optimal policy: ', convert_to_action( policy2[s] ) )
    action_list2.append(convert_to_action( policy2[s] ))

print('-----------------------------------------')    
print('Value Function: ')
print(V2)

print('Steps \'til convergence: ', steps2)
print('-----------------------------------------')

Delta 1158.9641571044922
Delta 70.37353515625
Delta 22.15576171875
Delta 7.0171356201171875
Delta 2.2110939025878906
Delta 0.7046712562441826
Delta 0.2284294314449653
Delta 0.0753918632199202
Delta 0.025302088840135184
Delta 0.008614999952101243
Delta 0.0029681235470206957
Delta 0.0010321105410753262
Delta 0.0003614107556018098
Delta 0.0001271984433515172
Delta 4.4927232920599636e-05
Delta 1.5906580870250764e-05
Delta 5.640310405397031e-06
Policy evalulation converged at 17
Delta 1434.5930225674501
Delta 593.0232546375245
Delta 83.82783805393524
Delta 9.83109652231062
Delta 2.2626809498884084
Delta 0.7660799726641727
Delta 0.25537548775460905
Delta 0.08474518926699659
Delta 0.02807864839445351
Delta 0.009298309990644071
Delta 0.003078785205561374
Delta 0.0010195006612541135
Delta 0.00033765196742407966
Delta 0.00011185193935148163
Delta 3.7061030845109144e-05
Delta 1.2282651917772114e-05
Delta 4.071596892529783e-06
Policy evalulation converged at 17
Delta 420.77652488751005
Delta 116.0

In [63]:
diff_opt_states = []

for s in range(env.num_states):
    if (action_list[s] != action_list2[s]):
        diff_opt_states.append(s)

for i in range(len(diff_opt_states)):
    if (action_list[i] != action_list2[i]):
        print('---------Different at state ', i, '---------')
        print('Value iteration optimal policy: ',action_list[i])
        print('Policy iteration optimal policy: ',action_list2[i])

---------Different at state  0 ---------
Value iteration optimal policy:  RIGHT
Policy iteration optimal policy:  UP
---------Different at state  2 ---------
Value iteration optimal policy:  RIGHT
Policy iteration optimal policy:  UP
---------Different at state  4 ---------
Value iteration optimal policy:  RIGHT
Policy iteration optimal policy:  UP
---------Different at state  10 ---------
Value iteration optimal policy:  RIGHT
Policy iteration optimal policy:  UP
