In [1]:
import gym
import numpy as np
import matplotlib.pyplot as plt
import copy
#Jupyter notebook 에서 pop up window
%matplotlib tk 

In [12]:
# Open AI Gym을 이용한 Custom Environment 생성 예
class GridEnv(gym.Env):
    
    def __init__(self):
        # 초기 환경 구성 시 필요한 파라미터 설정
        self.map_size = (3,3)
        self.agent_pos = [0,0]
        self.obstacle = [[1,1],[1,2]]
        self.goal = [2,2]
        # Temporal Difference 
        self.V = np.zeros([3,3])
        self.Q = np.zeros([4,3,3])
        self.gamma = 0.9
        self.alpha = 0.1
        
       
        print(f"Value function \n {self.V}\n")
        print(f"Action Value function \n {self.Q}\n")
        print(f"Initial Policy : Random")
        # Open AI gym 환경 정보 설정
        self.action_space = gym.spaces.Discrete(4)
        self.obs_space = gym.spaces.Discrete(3)
        
        #plt figtext 위치
        self.text_pos_x = 0.8
        self.text_pos_y = 0.9

    def step(self, action):
        self.prev_state = copy.deepcopy(self.agent_pos)
        if action == 0: #Left
            self.agent_pos[0] += -1
        elif action == 1: #Right
            self.agent_pos[0] += +1
        elif action == 2: #Up
            self.agent_pos[1] += -1
        elif action == 3: #Down
            self.agent_pos[1] += +1
        else:
            raise Exception("Action is not defined")
        self.next_state = copy.deepcopy(self.agent_pos)
        
        self.render_text(self.obs(),self.get_reward())
            
        return self.obs(), self.get_reward(), self._is_done(), self.next_state, self.prev_state
    
    def obs(self):
        
        if self.agent_pos in self.obstacle:
            return 0
        
        elif self.agent_pos == self.goal:    
            return 1
        elif self.agent_pos[0] < 0 \
        or self.agent_pos[1] < 0 \
        or self.agent_pos[0] > self.map_size[0]-1 \
        or self.agent_pos[1] > self.map_size[1]-1:
            return 3
        else:    
            return 2
    
    def _is_done(self):
        # 맵 밖으로 나갔을 시 
        if self.agent_pos[0] < 0 \
        or self.agent_pos[1] < 0 \
        or self.agent_pos[0] > self.map_size[0]-1 \
        or self.agent_pos[1] > self.map_size[1]-1:
            return True
        # 도착 지점 도착시
        elif self.agent_pos == self.goal:
            return True
        
        else:
            return False
            
    def reset(self):
        '''환경 초기화'''
        
        #맵 사이즈 설정
        self.world = np.zeros(self.map_size)        
        
        #에이전트의 초기위치 및 장애물 위치 설정
        self.world[self.obstacle[0][0],self.obstacle[0][1]] = 2
        self.world[self.obstacle[1][0],self.obstacle[1][1]] = 2
        
        
        self.agent_pos = [0,0]

        return self.obs()
    
    def policy(self, state = None): 
        #e-greedy policy
        self.policy_table = np.argmax(self.Q, axis=0)
        return self.policy_table[state[0],state[1]]
        
    def render(self,episode,step):
        # 시각화
        plt.ion()
        plt.title("Grid World")
        plt.figtext(self.text_pos_x,self.text_pos_y, f"Episode = {episode}")
        plt.figtext(self.text_pos_x,self.text_pos_y-0.1, "Step : {}".format(step))
        
        self.world[self.agent_pos[0], self.agent_pos[1]] = -1
        self.world[self.goal[0],self.goal[1]] = 3
        plt.matshow(self.world,fignum=0)
        plt.draw()
        plt.pause(0.001) #
        plt.clf()
        self.world[self.agent_pos[0], self.agent_pos[1]] = 0
        
        self.world[self.obstacle[0][0],self.obstacle[0][1]] = 2
        self.world[self.obstacle[1][0],self.obstacle[1][1]] = 2
        
    def render_text(self,obs,reward):
        # 시뮬레이션 정보 출력
        plt.figtext(self.text_pos_x,self.text_pos_y-0.3, f"Reward : {reward}")
        plt.figtext(self.text_pos_x-0.05,self.text_pos_y-0.4, f"State Value Function : \n{np.around(self.V,4)}")
        plt.figtext(self.text_pos_x+0.1,self.text_pos_y-0.6, f"Action Value Function : \n{np.around(self.Q,4)}")
        if obs == 2:
            plt.figtext(self.text_pos_x,self.text_pos_y-0.2, "")
        elif obs == 1:
            plt.figtext(self.text_pos_x,self.text_pos_y-0.2, "GOAL IN")
        elif obs == 0:
            plt.figtext(self.text_pos_x,self.text_pos_y-0.2, "Obstacle")
        elif obs == 3:
            plt.figtext(self.text_pos_x,self.text_pos_y-0.2, "Out of map")
            
            
    def get_reward(self):
        # 리워드 설정
        if self.agent_pos in self.obstacle:
            self.reward = -1
            return self.reward
        
        elif self.agent_pos == self.goal:
            self.reward = 1
            return self.reward
        
        # 맵 밖으로 나갔을 시 
        elif self.agent_pos[0] < 0 \
        or self.agent_pos[1] < 0 \
        or self.agent_pos[0] > self.map_size[0]-1 \
        or self.agent_pos[1] > self.map_size[1]-1:
            self.reward = -1
            return self.reward
        
        else:
            self.reward = 0
            return self.reward
        
    def value_function_update(self,action,next_action,state,next_state):
        #State value function
        try:
            self.V[state[0]][state[1]] = \
            self.V[state[0]][state[1]] + self.alpha * (self.reward + self.gamma*self.V[next_state[0]][next_state[1]]- self.V[state[0]][state[1]])
        except IndexError: # Agent가 밖으로 나갔을 시 Value function update 예외 처리
            self.V[state[0]][state[1]] = \
        self.V[state[0]][state[1]] + self.alpha * (self.reward + self.gamma*self.V[state[0]][state[1]]- self.V[state[0]][state[1]])
        #Action value function
        try:
            self.Q[action,state[0],state[1]] = \
            self.Q[action,state[0],state[1]] + self.gamma*(self.reward + self.gamma * self.Q[next_action,next_state[0],next_state[1]]-self.Q[action,state[0],state[1]])
        except IndexError:
            self.Q[action,state[0],state[1]] = \
            self.Q[action,state[0],state[1]] + self.gamma*(self.reward + self.gamma * self.Q[action,state[0],state[1]]-self.Q[action,state[0],state[1]])
        
    def close(self):
        # Clear env
        pass

In [13]:
env = GridEnv()

Value function 
 [[0. 0. 0.]
 [0. 0. 0.]
 [0. 0. 0.]]

Action Value function 
 [[[0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]]

 [[0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]]

 [[0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]]

 [[0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]]]

Initial Policy : Random


In [None]:
for episode in range(100): # 에피소드 수 설정
    #환경 생성
    obs = env.reset()
    for step in range(100): # step 상한선 설정
        
        env.render(episode, step)
        #Sampling Action
        action = env.policy(env.agent_pos)
        
        #Take action -> Observe Reward, Observe Next State
        obs, reward, is_done , next_state, state = env.step(action)
     
        #Choose next action from next state
        next_action = env.policy(next_state)
    
        #Value function update
        env.value_function_update(action,next_action,state,next_state)
        
        
            
        if is_done:                
            break;
env.close()
#plt.close()

In [5]:
env.Q

array([[[-3.11035971, -3.48402186, -0.99      ],
        [-1.25334463, -2.8946638 , -1.39239   ],
        [-0.87329418, -1.65100348,  0.        ]],

       [[-0.99171261, -0.67816047, -0.9       ],
        [ 0.45823016, -0.70041256,  0.999     ],
        [-2.46429   , -3.1425039 ,  0.        ]],

       [[-1.84445188, -1.91301383, -1.30809973],
        [-2.07122366,  0.40176471, -2.84874244],
        [-0.99      , -0.8019    ,  0.        ]],

       [[-1.45672031, -0.88226715, -2.46429   ],
        [-1.6307603 , -1.97204374, -0.9       ],
        [-2.47916206,  0.999     ,  0.        ]]])

In [10]:
np.argmax(env.Q, axis=0)

array([[1, 1, 1],
       [1, 2, 1],
       [0, 3, 0]])