### MountainCar Q-learning

In [None]:
import gym

env = gym.make('MountainCarContinuous-v0') #행동공간연속

In [None]:
# 랜덤 에이전트

env.reset()

score = 0
step = 0

for i in range(200):
  action = env.action_space.sample()
  obs, reward, done, info = env.step(action)[:4]
  previous_obs = obs
  score += reward
  step += 1

  if done:
    break

print(score, step)

In [None]:
env.reset()

score = 0
step = 0

while True:
  action = env.action_space.sample()
  obs, reward, done, info = env.step(action)[:4]

  previous_obs = obs
  score += reward
  step += 1

  if done:
    break

print(score, step)

In [None]:
# 성공적인 에피소드 저장

# 스텝당 -1, X좌표로 -0.2이상일 때 +1

import numpy as np

scores = []
training_data = []
accepted_scores = []

required_score = -198

for i in range(5000):
  if i % 100 == 0:
    print(i,end=' ')

  env.reset()

  score = 0

  game_memory = []
  previous_obs = []
 
  for i in range(200):
    action = env.action_space.sample()
    obs, reward, done, info = env.step(action)[:4]

    if len(previous_obs) > 0:
      game_memory.append([previous_obs, action])

    previous_obs = obs

    if obs[0] > -0.2:
      reward = 1

    else:
      reward = -1

    score += reward

    if done:
      break

  scores.append(score)

  if score > required_score:
    accepted_scores.append(score)

    for data in game_memory:
      training_data.append(data)

scores = np.array(scores)

print(scores.mean())
print(scores)
print(accepted_scores)

In [None]:
train_X = np.array([i[0] for i in training_data]).reshape(-1, 2)
train_Y = np.array([i[1] for i in training_data]).reshape(-1, 1)

print(train_X.shape)
print(train_Y.shape)

print(train_X)
print(train_Y)

In [None]:
# 초기 Q 테이블 설정

state_grid_count = 10
action_grid_count = 6

q_table = []

for i in range(state_grid_count):
  q_table.append([])

  for j in range(state_grid_count):
    q_table[i].append([])

    for k in range(action_grid_count):
      q_table[i][j].append(0.0001) #매우 작은 값으로 초기값 할당

actions = range(action_grid_count)
actions = np.array(actions).astype(float)

actions *= ((env.action_space.high - env.action_space.low) / (action_grid_count - 1))
actions += env.action_space.low

print(actions)

In [None]:
import random

def obs_to_state(env, obs):
  obs = obs.flatten()
  low = env.observation_space.low
  high = env.observation_space.high

  idx = (obs - low) / (high - low) * state_grid_count
  idx = [int(x) for x in idx]

  return idx

def softmax(logits):
  exp_logits = np.exp(logits - np.max(logits))
  sum_exp_logits = np.sum(exp_logits)

  return exp_logits / sum_exp_logits

sample = env.observation_space.sample()
grid = obs_to_state(env, sample)

print(sample)
print(grid)

In [None]:
# 난수가 작을 때는 랜덤으로 행동하고 아니면 최적의 행동 선택
# 처음에는 다양한 행동을 시도(탐험, exploration), 나중에는 최적의 행동(이용, exploitation)
# 입실론은 학습 진행에 따라 점점 작아지도록 설정
# 스텝마다 -0.05의 보상 증가, 에이전트가 움직이도록 자극하는 역할

In [None]:
from gymnasium.experimental.wrappers import RecordVideoV0
from IPython.display import Video

max_episodes = 5

scores = []
steps = []
select_actions = []

learning_rate = 0.05
gamma = 0.99
epsilon = 1.0
epsilon_min = 0.01

env = gym.make("MountainCarContinuous-v0", render_mode='rgb_array')
env = RecordVideoV0(env, "video", name_prefix="mount-q", disable_logger=True,episode_trigger=lambda x: x % 2 == 0 )

for i in range(max_episodes):
    epsilon *= 0.9
    epsilon = max(epsilon_min, epsilon)

    previous_obs = env.reset()[0]

    score = 0
    step = 0
    cnt=0

    while True:
        cnt+=1

        if cnt%1000==0:
            print(cnt,end=' ')

        state_idx = obs_to_state(env, previous_obs)

        if random.random() < epsilon:
            action_idx = random.randint(0, action_grid_count-1)
            action = actions[action_idx]
        else:
            logits = q_table[state_idx[0]][state_idx[1]]
            action_idx = np.argmax(softmax(logits))
            action = actions[action_idx]

        obs, reward, done, info = env.step([action])[:4]

        previous_obs = obs
        score += reward
        reward -= 0.05
        step += 1

        select_actions.append(action)
        new_state_idx = obs_to_state(env, obs)
        q_table[state_idx[0]][state_idx[1]][action_idx] = q_table[state_idx[0]][state_idx[1]][action_idx] + learning_rate * (reward + gamma * np.amax(q_table[new_state_idx[0]][new_state_idx[1]]) - q_table[state_idx[0]][state_idx[1]][action_idx])

        if done:
            break

    scores.append(score)  
    steps.append(step)

    print('\n', i, 'mean score: {}, mean step: {}, epsilon: {}'.format(np.mean(scores[-100:]), np.mean(steps[-100:]), epsilon))

    if np.mean(scores[-100:]) >= 90:
        print('Solved on episode {}!'.format(i))
        break

env.close()

In [None]:
Video("video/mount-q-episode-4.mp4", embed=True)

In [None]:
# 선택된 행동의 비율

import seaborn as sns

sns.displot(select_actions)

# 0 근처에 많이 발생 - 속도가 적을수록 적은 패널티의 

In [None]:
import matplotlib.pyplot as plt

plt.plot(scores)
plt.xlabel('episodes')
plt.ylabel('score')

plt.show()
# 점수가 증가하는 추세

In [None]:
q_table