In [2]:
import numpy as np

# Q2

In [3]:
x=8
y=6
z=4
state_num_reward = ((z+1)%3)+1
state_num_penalty = 0
r_reward = 10
r_penalty = -100
r_transition = -1
p = 0.25+0.5*x/10.0
gamma = 0.2+0.5*y/10.0

print('state_num_reward =',state_num_reward)
print('state_num_penalty =',state_num_penalty)
print('p =',p)
print('gamma =',gamma)

state_num_reward = 3
state_num_penalty = 0
p = 0.65
gamma = 0.5


In [356]:
# for simplicity, s11 is renamed to be s0, all other states numbers are the same as the assignment

class Gridmap_mdp:
    def __init__(self,grid_map,state_num_reward,state_num_penalty,r_reward,r_penalty,r_transition,p,gamma):
        #initialize all of the attributes
        self.state_num_reward = state_num_reward
        self.state_num_penalty = state_num_penalty
        self.r_reward = r_reward
        self.r_penalty = r_penalty
        self.r_transition = r_transition
        self.p = p
        self.gamma = gamma
        self.grid_map = grid_map
        
        self.count = np.zeros([4,11])
        self.policy = np.ones([4,11])*0.25
        self.action_value = np.zeros([4,11])
        self.value = np.zeros([11])
        self.locations_dict = self.create_location_dict(self.grid_map)
        self.transition_mat = self.init_transition_mat()
        
    #action: 0=up 1=down 2=left 3=right
    def init_transition_mat(self):
        #create the transition matrix
        transition_mat = []
        for action in ['u','d','l','r']:
            transition_mat.append(self.init_transition_mat_direction(action))
        return transition_mat

    def init_transition_mat_direction(self,desired_action):
        #create a transition matrix for only for the desired_action
        action_list = ['u','d','l','r']
        transition_mat = np.zeros([11,11])
        for from_ in range(11):
            for action in action_list:
                update_from, update_to = self.increment_where(from_,action)
                if action == desired_action:
                    transition_mat[update_to,update_from] += self.p
                else:
                    transition_mat[update_to,update_from] += (1-self.p)/3.0
        return transition_mat
        
    def create_location_dict(self,grid_map):
        #create a dictionary to map state to coordinates
        locations_dict = {}
        for i in range(grid_map.shape[0]):
            for j in range(grid_map.shape[1]):
                # store as x and y
                locations_dict[grid_map[i][j]] = [j,i]
        return locations_dict
        
    def increment_where(self,from_,direction):
        #find where should the probability be increamented, used to help create transition matrix
        [x,y] = self.locations_dict[from_]
        if direction == 'r':
            #indended direction: right
            if x+1 >= 4 or self.grid_map[y,x+1] == -1:
                return [from_,from_]
            else:
                return [from_,self.grid_map[y,x+1]]
        if direction == 'l':
            #indended direction: left
            if x-1 < 0 or self.grid_map[y,x-1] == -1:
                return [from_,from_]
            else:
                return [from_,self.grid_map[y,x-1]]
        if direction == 'u':
            #indended direction: up
            if y-1 < 0 or self.grid_map[y-1,x] == -1:
                return [from_,from_]
            else:
                return [from_,self.grid_map[y-1,x]]
        if direction == 'd':
            #indended direction: down
            if y+1 >= 4 or self.grid_map[y+1,x] == -1:
                return [from_,from_]
            else:
                return [from_,self.grid_map[y+1,x]]
    
    def get_reward(self,from_,to_):
        #calculate the reward gained from moving from a state to another
        if from_ == state_num_reward:
            return self.r_reward
        if from_ == state_num_penalty:
            return self.r_penalty
        if to_ == state_num_penalty:
            return self.r_penalty
        elif to_ == state_num_reward:
            return self.r_reward
        else:
            return self.r_transition
        
    def get_possible_state_from(self,state_from,action):
        #find what states are possible from a state and action
        possible_states = []
        #0=up 1=down 2=left 3=right
        transition_mat = self.transition_mat[action]
        for i in range(transition_mat.shape[1]):
            if transition_mat[i,state_from] > 0.0:
                state_to = i
                p_to_that_state = transition_mat[i,state_from]
                possible_states.append(np.array([state_to,p_to_that_state]))
        return possible_states

    def sample_mc_episode(self,epsilon=0.0):
        #draw a sample episode from the environment
        self.alpha = 0.5
        initial_state = np.random.choice(9, 1)[0]+2
        state = initial_state
        
        states = []
        actions = []
        rewards = []
        
        return_ = 0
        
        while(state != state_num_penalty and state != self.state_num_reward):
            explore = np.random.choice(2,1,p=[1-epsilon,epsilon])[0]
            action = np.random.choice(4,1,p=self.policy[:,state])[0]
            
            if explore == 1:
                action = np.random.choice(4,1)[0]
            
            next_state = np.random.choice(11,1,p=self.transition_mat[action][:,state])[0]
            reward = self.get_reward(state,next_state)
                            
            states.append(state)
            actions.append(action)
            rewards.append(reward)
            
            return_ += reward
            
            state = next_state
            
        return states,actions,rewards
        
    def calculate_return_from_episode(self,rewards):
        #calculate the return from the generated episode
        return_ = 0
        for i,reward in enumerate(rewards):
            return_ += self.gamma**i * reward
        return return_
            
    def update_mc_episode(self,states,actions,rewards):
        #update the action value of the agent wrt to an episode
        for state,action in zip(states,actions):
            self.count[action,state] += 1
            return_ = self.calculate_return_from_episode(rewards)
            self.action_value[action,state] = ((self.count[action,state]-1)*self.action_value[action,state] + return_)/(self.count[action,state]*1.0)
        
    def mc_policy_eval(self,n,epsilon=0.0):
        #evaluate the action value function of a policy
        for i in range(n):
            if i % 1000 == 0:
                print('performing %sth episode'%(i))
            states,actions,rewards = self.sample_mc_episode(epsilon)
            self.update_mc_episode(states,actions,rewards)
        
    def policy_improve(self):
        #improve the policy from the action value function
        self.policy = np.zeros_like(self.policy)
        for state in range(11):
            v_up = Gridmap.action_value[0,state]
            v_down = Gridmap.action_value[1,state]
            v_left = Gridmap.action_value[2,state]
            v_right = Gridmap.action_value[3,state]
            v_max = np.argmax(np.array([v_up,v_down,v_left,v_right]),0)
            self.policy[v_max,state] = 1
            
    def tell_policy(self):
        #print the policy
        action_word = {0:'up',1:'down',2:'left',3:'right'}
        for state in range(11):
            if state == self.state_num_reward or state == self.state_num_penalty:
                continue
            v_up = Gridmap.policy[0,state]
            v_down = Gridmap.policy[1,state]
            v_left = Gridmap.policy[2,state]
            v_right = Gridmap.policy[3,state]
            v_max = np.argmax(np.array([v_up,v_down,v_left,v_right]),0)
            print('on state =',state,', do action =',action_word[v_max])
            
    def mc_learn(self,n,episode_per_step,epsilon=0.0):
        #learn the gridmap with epsilon greedy montecarlo
        for i in range(n):
            self.action_value = np.zeros([4,11])
            print('===============================')
            print('>',i,'th iter')
            print('===============================')
            self.mc_policy_eval(episode_per_step,epsilon)
            self.policy_improve()

In [357]:
grid_map = np.array([[1,2,3,4],
                    [5,6,-1,7],
                    [-1,8,9,10],
                    [-1,-1,0,-1]])

# Solving Bellman

In [358]:
Gridmap = Gridmap_mdp(grid_map,state_num_reward,state_num_penalty,r_reward,r_penalty,r_transition,p,gamma)

In [359]:
class Bellman_solver:
    def __init__(self,gridmap):
        self.gridmap = gridmap
        #probability transition matrix, depends on the policy
        self.P = np.zeros((11,11))
        self.r = np.zeros(11)
        self.how_to_go = {12:3,
                         15:1,
                         23:3,
                         21:2,
                         26:1,
                         43:2,
                         47:1,
                         51:0,
                         56:3,
                         62:0,
                         65:2,
                         68:1,
                         74:0,
                         710:1,
                         86:0,
                         89:3,
                         98:2,
                         910:3,
                         90:1,
                         109:2,
                         107:0}
        
    def update_P_from_policy(self):
        for from_state in range(0,11):
            if from_state == self.gridmap.state_num_reward or from_state == self.gridmap.state_num_penalty:
                self.P[from_state,from_state] = 1
                continue
            for to_state in range(0,11):
                p = 0
                for action in range(0,4):
                    p_action = self.gridmap.policy[action,from_state]
                    p_move_from_action = self.gridmap.transition_mat[action][to_state,from_state]
                    p += p_action*p_move_from_action
                    #print("from %s to %s doing action %s, the p_action = %s, and the p_move_from_action = %s"%(from_state,to_state,action,p_action,p_move_from_action))
                #print(">>> p = %s"%(p))
                self.P[to_state,from_state] = p
                       
    def update_r_from_policy(self):
        for from_state in range(0,11):
            r = 0
            if from_state == self.gridmap.state_num_reward or from_state == self.gridmap.state_num_penalty:
                self.r[from_state] = 0
                continue
            for action in range(0,4):
                for to_state in range(0,11):
                    p_policy = self.gridmap.policy[action,from_state]
                    reward = self.gridmap.get_reward(from_state,to_state)
                    p = self.gridmap.transition_mat[action][to_state,from_state]
                    r += p_policy*p*reward
            self.r[from_state] = r
            
    def evaluate_value(self):
        self.gridmap.value = np.matmul(np.linalg.inv(np.eye(11)-self.gridmap.gamma*self.P),self.r)
    
    def greedy(self):
        self.gridmap.policy = np.zeros((4,11))
        for from_state in range(0,11):
            if from_state == self.gridmap.state_num_reward or from_state == self.gridmap.state_num_penalty:
                #print("state %s is not a normal state"%(from_state))
                self.gridmap.policy[:,from_state] = 0
                continue
            to_states = []
            max_value = -9999
            best_action = -1
            for action in range(0,4):
                possible_states = self.gridmap.get_possible_state_from(from_state,action)
                for possible_state in possible_states:
                    if (not (possible_state[0] in to_states)) and (possible_state[0] != from_state):
                        to_states.append(possible_state[0])
            for to_state in to_states:
                #print(to_state)
                to_state = int(to_state)
                if self.gridmap.value[int(to_state)] > max_value:
                    max_value = self.gridmap.value[to_state]
                    best_action = self.how_to_go_to(from_state,to_state)
            self.gridmap.policy[best_action,from_state] = 1
            #print("best action for state %s is %s, and is set to %s"%(from_state,best_action,self.gridmap.policy[best_action,from_state]))
            #print(self.gridmap.policy)
        
    def how_to_go_to(self,from_state,to_state):
        s = str(from_state)+str(to_state)
        s = int(s)
        if not s in self.how_to_go:
            print("moving from %s to %s is impossible"%(from_state,to_state))
            return -1
        else:
            return self.how_to_go[s]
    
    def step_one_iter(self):
        #policy eval
        self.update_P_from_policy()
        self.update_r_from_policy()
        self.evaluate_value()
        #policy improve
        self.greedy()

In [360]:
bellman = Bellman_solver(Gridmap)

In [361]:
#see initial policy
bellman.gridmap.policy

array([[0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25],
       [0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25],
       [0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25],
       [0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25]])

In [374]:
bellman.step_one_iter()
#upgrade the policy once
bellman.gridmap.policy

array([[0., 0., 0., 0., 0., 1., 1., 1., 1., 0., 1.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 1., 0., 0., 0., 0., 1., 0.],
       [0., 1., 1., 0., 0., 0., 0., 0., 0., 0., 0.]])

In [375]:
bellman.gridmap.value

array([-45.58849622,  -1.31531223,   5.07968155,   7.12522205,
         5.88219852,  -1.40976582,  -2.88971302,  -2.93556299,
        -5.95453799, -70.13614802,  -5.95756582])

In [376]:
bellman.gridmap.tell_policy()

on state = 1 , do action = right
on state = 2 , do action = right
on state = 4 , do action = left
on state = 5 , do action = up
on state = 6 , do action = up
on state = 7 , do action = up
on state = 8 , do action = up
on state = 9 , do action = left
on state = 10 , do action = up


# Dynamic Programming