In [39]:
# 강화 학습으로 올바른 위치에서 택시 승객을 태우고 내리는 문제

In [40]:
import gym
import numpy as np
env = gym.make("Taxi-v3")
env.render()

+---------+
|[35mR[0m: |[43m [0m: :[34;1mG[0m|
| : | : : |
| : : : : |
| | : | : |
|Y| : |B: |
+---------+



In [41]:
# 환경 초기화
env.reset()
env.render()
# : 6 status: south(0), north(1), ea 각 칸에서의 경우의 수 가지st(2), west(3), pickup(4), dropoff(5)
print("Action Space : {}".format(env.action_space))
# 전체 경우의 수
print("State Space : {}".format(env.observation_space))

+---------+
|[35mR[0m: | : :[34;1mG[0m|
| : | : : |
| : : : : |
| | : |[43m [0m: |
|Y| : |B: |
+---------+

Action Space : Discrete(6)
State Space : Discrete(500)


In [42]:
# (taxi row, taxi column, passenger index, destination index)
# (택시의 행, 택시의 열, 승객위치, 목적지)
# R(0), G(1), Y(2), B(3)
state = env.encode(3, 1, 2, 3)
print("State:", state)
env.s = state
env.render()
# 벽에 부딪히면 -1 패널티 부여
# 승객이 없을 때는 노란색, 승객이 있을 때는 녹색
# R,G,Y,B : 가능한 픽업 및 목적지 위치
# 파란색 : 승객 픽업 장소
# 보라색 : 현재 목적지


State: 331
+---------+
|[35mR[0m: | : :[34;1mG[0m|
| : | : : |
| : : : : |
| | : |[43m [0m: |
|Y| : |B: |
+---------+



In [43]:
# env.P 초기 보상 테이블
env.P[328]
# -1, -10 이동할 때마다 승객승

{0: [(1.0, 428, -1, False)],
 1: [(1.0, 228, -1, False)],
 2: [(1.0, 348, -1, False)],
 3: [(1.0, 328, -1, False)],
 4: [(1.0, 328, -10, False)],
 5: [(1.0, 328, -10, False)]}

In [44]:
#무작위 방식으로 처리
# 초기 상태 설정
env.s = 328
epochs = 0
penalties, reward = 0, 0
# 애니메이션 관련 정보를 저장하는 리스트
frames = []
done = False
while not done:
  action = env.action_space.sample()
  state, reward, done, info = env.step(action)
  # -10 1 보상이 이면 패널티 부여
  if reward == -10:
    penalties += 1
  # 애니메이션을 위하여 정보 기록
  frames.append({
  'frame': env.render(mode='ansi'),
  'state': state,
  'action': action,
  'reward': reward
  }
  )
  epochs += 1
  print("Timesteps taken: {}".format(epochs))
print("Penalties incurred: {}".format(penalties))

Timesteps taken: 1
Timesteps taken: 2
Timesteps taken: 3
Timesteps taken: 4
Timesteps taken: 5
Timesteps taken: 6
Timesteps taken: 7
Timesteps taken: 8
Timesteps taken: 9
Timesteps taken: 10
Timesteps taken: 11
Timesteps taken: 12
Timesteps taken: 13
Timesteps taken: 14
Timesteps taken: 15
Timesteps taken: 16
Timesteps taken: 17
Timesteps taken: 18
Timesteps taken: 19
Timesteps taken: 20
Timesteps taken: 21
Timesteps taken: 22
Timesteps taken: 23
Timesteps taken: 24
Timesteps taken: 25
Timesteps taken: 26
Timesteps taken: 27
Timesteps taken: 28
Timesteps taken: 29
Timesteps taken: 30
Timesteps taken: 31
Timesteps taken: 32
Timesteps taken: 33
Timesteps taken: 34
Timesteps taken: 35
Timesteps taken: 36
Timesteps taken: 37
Timesteps taken: 38
Timesteps taken: 39
Timesteps taken: 40
Timesteps taken: 41
Timesteps taken: 42
Timesteps taken: 43
Timesteps taken: 44
Timesteps taken: 45
Timesteps taken: 46
Timesteps taken: 47
Timesteps taken: 48
Timesteps taken: 49
Timesteps taken: 50
Timesteps

In [45]:
from IPython.display import clear_output
from time import sleep
def print_frames(frames):
  for i, frame in enumerate(frames):
    clear_output(wait=True)
    print(frame['frame'])
    print(f"Timestep: {i + 1}")
    print(f"State: {frame['state']}")
    print(f"Action: {frame['action']}")
    print(f"Reward: {frame['reward']}")
    sleep(.1)
    print_frames(frames)


In [46]:
#강화학습으로 구현

In [47]:

import numpy as np
#q 테이블 초기화
q_table = np.zeros([env.observation_space.n, env.action_space.n])

In [48]:
%%time
import random
from IPython.display import clear_output
# 하이퍼 파라미터
alpha = 0.1 # 학습률
gamma = 0.6 # 할인율
epsilon = 0.1
# 그래프 출력을 위한 리스트
all_epochs = []
all_penalties = []

CPU times: user 9 µs, sys: 1 µs, total: 10 µs
Wall time: 13.6 µs


In [49]:
for i in range(1, 10001):
  state = env.reset()
  epochs, penalties, reward = 0, 0, 0
  done = False
  while not done:
    if random.uniform(0, 1) < epsilon:
      # 0.1 랜덤값이 미만이면 새로운 경로 탐색
      action = env.action_space.sample()
    else: #그렇지 않으면 기존에 학습된 경로 선택
      # q 학습된 테이블의 값 중 최대값
      action = np.argmax(q_table[state])
    next_state, reward, done, info = env.step(action)
    old_value = q_table[state, action]
    next_max = np.max(q_table[next_state])
    new_value = (1 - alpha) * old_value + alpha * (reward + gamma * next_max)
    q_table[state, action] = new_value
    if reward == -10:
      penalties += 1
    state = next_state
    epochs += 1
  if i % 100 == 0:
    clear_output(wait=True)
    print(f"Episode: {i}")
print("Training finished.\n")

Episode: 10000
Training finished.



In [50]:
q_table[328]

array([-2.33175226, -2.27325804, -2.34177893, -2.32737901, -5.78199002,
       -6.74757654])

In [51]:
# Q-learning 종료 후의 성능 측정
total_epochs, total_penalties = 0, 0
episodes = 100
# 애니메이션을 위한 프레임 리스트
frames = []
for _ in range(episodes):
  state = env.reset()
  epochs, penalties, reward = 0, 0, 0
  done = False
  while not done:
    action = np.argmax(q_table[state])
    state, reward, done, info = env.step(action)
    if reward == -10:
      penalties += 1
    # 애니메이션을 위한 정보 기록
    frames.append({
      'frame': env.render(mode='ansi'),
      'state': state,
      'action': action,
      'reward': reward})
    epochs += 1
  total_penalties += penalties
  total_epochs += epochs
print(f"Results after {episodes} episodes:")
print(f" : {total_epochs / episodes} 에피소드당 평균 시간 간격")
print(f" : {total_penalties / episodes} 에피소드당 평균 패널티")


Results after 100 episodes:
 : 29.94 에피소드당 평균 시간 간격
 : 0.0 에피소드당 평균 패널티


In [52]:
print_frames(frames)

+---------+
|R: | : :[34;1mG[0m|
| : | : : |
| : : : : |
| | : | : |
|[43mY[0m| : |[35mB[0m: |
+---------+
  (South)

Timestep: 1
State: 407
Action: 0
Reward: -1


KeyboardInterrupt: ignored