<a href="https://colab.research.google.com/github/nkmin0/2024_RL/blob/main/RL_240309_MountainCar/reinforcement_learning_tutorial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 강화 학습 맛보기

강화 학습 수업에 오신 여러분을 진심으로 환영합니다!

여러분이 앞으로 1년 동안 배우게 될 강화 학습은 인공지능 분야 중에서도 가장 독특하고 유용한 기술입니다.
아마 다른 어떤 수업에서도 경험하지 못할 재미있고 유익한 경험이 될 거에요.

## 강화 학습 내용 훑어보기

강화 학습은 시행착오를 거치며 경험을 통해 적절한 행동 방식을 배워나가는 과정입니다. "당근과 채찍" 전략을 생각하면 쉽습니다. 여러분이 좋은 행동을 하면 당근을, 나쁜 행동을 하면 채찍질을 주는 것이 바로 강화 학습입니다. 이 당근과 채찍을 통틀어 강화 학습에서는 "보상"이라고 부릅니다. 강화 학습의 가장 중요한 요소 중 하나가 바로 이 보상입니다.

![](https://s3-ap-northeast-2.amazonaws.com/opentutorials-user-file/module/4916/12365.jpeg)

강화 학습의 또 다른 중요한 요소는 바로 끊임없이 변화하는 환경의 상태입니다. 어러분이 취하는 행동은 여러분 주변의 환경을 변화시킵니다. 여러분은 이 끊임없이 변화하는 환경 속에서 계속 보상을 최대한 취할 수 있도록 노력해야합니다. 이 과정이 바로 강화 학습의 과정과 많이 닮아있습니다.



## 강화 학습 예제 경험하기

강화 학습이 어떤 문제를 풀 수 있는지 한번 살펴볼까요?

사실 강화 학습은 우리 주변에서 흔히 접할 수 있는 대부분의 문제를 다 풀 수 있는 아주 유용한 기술입니다. 로봇을 조종하는 것부터 시작해서, 자율 주행, 주식 투자, 심지어는 최근에 각광받는 ChatGPT 역시 강화 학습 기술이 사용되었습니다. 우리가 일상 생활에서 상벌의 개념을 생각할 수 있는 거의 모든 문제를 강화 학습으로 풀 수 있습니다.

In [3]:
%%capture
!pip install -U ray[rllib]
!pip install gputil
!pip install ipython==7.10.0
!pip install moviepy
!pip install gymnasium[classic-control]

In [4]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import ray
import numpy as np
import gymnasium as gym

import warnings
warnings.filterwarnings('ignore')

from IPython.display import HTML
from base64 import b64encode
from gymnasium.wrappers import RecordVideo


def view_video(filename):
    video = f'video/{filename}-episode-0.mp4'
    mp4 = open(video,'rb').read()
    data_url = "data:video/mp4;base64," + b64encode(mp4).decode()

    return HTML("""
    <video width=400 controls>
          <source src="%s" type="video/mp4">
    </video>
    """ % data_url)

### 환경 정의하기

오늘 수업 시간에는 강화 학습에서 가장 단순하고 유명한 예제 중 하나인 "Mountain Car" 문제를 직접 풀어볼 것입니다. 이 문제에서 여러분의 목표는 산 정상의 깃발에 도달하는 것입니다.
여러분은 오른쪽 또는 왼쪽으로 가속하는 페달을 밟거나, 페달을 밟지 않는 선택을 할 수 있습니다.
산 정상까지 올라가려면 페달을 적절히 밟아야 합니다. 하지만 산의 경사가 너무 가파르기 때문에, 아무리 밟아도 중력 때문에 올라가지 못할 수도 있습니다.

![](https://www.gymlibrary.dev/_images/mountain_car.gif)

먼저, 이 환경에서 여러분이 취할 수 있는 행동은 다음과 같이 표현할 수 있습니다:

0: 왼쪽으로 가속하는 페달 밟기

1: 페달 밟지 않기

2: 오른쪽으로 가속하는 페달 밟기

In [3]:
env = gym.make('MountainCar-v0')
print('Created env:', env)
 # 가능한 행동의 개수 [0, 1, 2]

Created env: <TimeLimit<OrderEnforcing<PassiveEnvChecker<MountainCarEnv<MountainCar-v0>>>>>
Available actions: Discrete(3)


다음으로, 이 환경에서 받을 수 있는 상태에 대한 정보는 $s_{t} = (x_{t}, v_{t})$입니다.
즉, 여러분이 받는 상태 정보 $s_{t}$는 $x$축 방향에서의 현재 위치 $x_{t}$와 속도 $v_{t}$입니다.

$x$축 방향의 좌표는 $(-1.2, 0.6)$ 사이이고, 속도 $v$는 (-0.07, 0.07) 사이입니다.

In [51]:
state, _ = env.reset()
print('Observation space:', env.observation_space) # 환경의 상태 / x축 빙향의 위치, 속도
print('The starting state is:', state) # [시작좌표, 속도]

Observation space: Box([-1.2  -0.07], [0.6  0.07], (2,), float32)
The starting state is: [-0.5427396  0.       ]


  and should_run_async(code)


여러분이 어떤 행동을 선택하면, 다음과 같이 환경의 상태가 변화합니다.

$v_{t+1} = v_{t} + (\mathrm{action} - 1) * F - \cos(3 * x_{t}) * mg$

$x_{t+1} = x_{t} + v_{t+1}$

여기서 $F$는 페달을 밟을 때 가속하는 힘 $F = 0.001$이고, 중력 $mg = 0.0025$입니다.

In [48]:
action = 2 # 0, 1, 2
# 행동을 선택하고 난 후, 다음 상태 / 보상 / 환경 끝남 여부
state, reward, done, _, _ = env.step(action)
print(f"State: {state}") # action에 대한 state의 변화 출력 [다음 x축 좌표, 다음 속도]
print(f"Reward: {reward}") # 다음 보상
print(f"Done: {done}") # 환경이 끝났는지 (True / False)

State: [-0.4295984   0.00747945]
Reward: -1.0
Done: False


  and should_run_async(code)


### 인공지능 정책 정의하기

강화 학습에서 정책은 인공지능의 두뇌입니다. 인공지능이 변화하는 환경 속에서 어떻게 행동할지를 결정하는 지침으로 생각할 수 있습니다. 가장 단순한 형태의 정책으로는 여러분이 취할 수 있는 행동 중에서 아무거나 (샘플링하여) 고르는 것이 있겠습니다.

In [6]:
# 이 코드를 수정해서 강화학습을 어떻게 할지를 정함
class RandomPolicy():
  # 클래스 선언
    def __init__(self, action_space):
      # 행동공간에 대한 정보 저장
        self.action_space = action_space

    # 클래스를 호출할 수 있도록 해주는 함수
    def __call__(self, state):
      # 상태를 받았을 때
      # 랜덤하게 action을 하도록
        return self.action_space.sample()

In [64]:
class TestPolicy():
  def __init__(self, action_space):
    self.action_space = action_space

    # 내가 직접 정의
    self.start_position = -0.54


  # 페달을 밟고 있는데 움직이지 않으면 방향을 전환하는 방법 (선생님 예시)
  def __call__(self, state):
    # state = [x축 방향 위치, x축 방향 속도]
    x, v = state
    next_action = 2

    if -0.002< v and v < 0.002: # 속도가 없다면 (페달을 안밟았다 / 오른쪽 혹은 왼쪽으로 가다가 중력에 의해 올라가지 못한다)
      if x<self.start_position:
        my_action = 0
        next_action = 2
      elif x>self.start_position:
        my_action = 2
        next_action = 0
      else:
        my_action = 1
        next_action = 2



    return next_action

  and should_run_async(code)


In [66]:
class MyPolicy():
  def __init__(self, action_space):
    self.action_space = action_space

    self.start_position = -0.54


  def __call__(self, state):
    x, v = state
    next_action = 2

    if v>0:
      next_action = 2

    elif v<0:
      next_action = 0

    else:
      next_action = 1


    return next_action

  and should_run_async(code)


### 환경과 상호작용하기

정책이 있다면, 여러분은 주어진 상황에서 어떤 행동을 취해야 할지 결정할 수 있습니다. 그럼 이제 본격적으로 환경 속에 들어가서 직접 부딪혀볼까요?

여러분이 어떤 행동을 취하면, 환경은 여러분에게 보상을 줍니다. 이 보상은 여러분이 좋은 행동을 했는지, 나쁜 행동을 했는지 알려주는 값입니다. 그리고 여러분이 취한 행동에 따라 환경은 변화하고, 이 변화한 환경을 여러분은 다시 관찰할 수 있습니다. 그리고 여러분은 또다시 (여러분이 관찰한 결과를 토대로) 어떤 행동을 취할지 결정합니다. 강화 학습은 이 과정을 반복하며 진행됩니다.

In [93]:
# 환경을 생성
env = gym.make("MountainCar-v0", render_mode="rgb_array")
# 비디오 저장을 위해 필요한 코드
env = RecordVideo(env, 'video', name_prefix='random-agent') # 단순히 비디오를 위한 코드 / 강화학습만을 할때에는 굳이?

#정책 불러오기
agent = MyPolicy(env.action_space)
state, _ = env.reset() # 환경 초기화
env.start_video_recorder() # 비디오 촬영 시작

#for t in range(10000):
#    action = agent(state) # 상태 정보를 받아서 활동 결정
#    state, reward, done, _, _ = env.step(action) # 행동에 의해 상태 변화
#    env.render() # 비디오 촬영

i=0
while 1:
  i+=1
  action = agent(state)
  state, reward, done, _, _ = env.step(action)
  env.render()
  if done:
    break


print(i)

env.close()
view_video('random-agent')

  and should_run_async(code)


Moviepy - Building video /content/video/random-agent-episode-0.mp4.
Moviepy - Writing video /content/video/random-agent-episode-0.mp4





Moviepy - Done !
Moviepy - video ready /content/video/random-agent-episode-0.mp4
Moviepy - Building video /content/video/random-agent-episode-0.mp4.
Moviepy - Writing video /content/video/random-agent-episode-0.mp4





Moviepy - Done !
Moviepy - video ready /content/video/random-agent-episode-0.mp4
114


### BONUS: 강화 학습 에이전트 학습하기

In [17]:
%%capture
from ray.rllib.algorithms.ppo import PPOConfig
from ray.tune.logger import pretty_print

algo = (
    PPOConfig()
    .rollouts(num_rollout_workers=1)
    .resources(num_gpus=1)
    .environment(env="MountainCar-v0")
    .build()
)

for i in range(2):
    algo.train()

2024-03-09 05:09:40,984	INFO worker.py:1724 -- Started a local Ray instance.
2024-03-09 05:09:54,768	INFO trainable.py:164 -- Trainable.setup took 17.241 seconds. If your trainable is slow to initialize, consider setting reuse_actors=True to reduce actor creation overheads.


In [None]:
env = gym.make("MountainCar-v0", render_mode="rgb_array")
env = RecordVideo(env, 'video', name_prefix='ppo-agent')

state, _ = env.reset()
env.start_video_recorder()

for t in range(100):
    action = algo.compute_single_action(state)
    state, reward, done, _, _ = env.step(action)
    env.render()

env.close()
view_video('ppo-agent')

# 강화 학습 환경 직접 만져보기

### 블랙잭

In [21]:
import random


In [208]:
p=[[400, 1000] for _ in range(23)]
P=[[1, 2] for _ in range(23)]

In [204]:
for i in range(len(p)):
  print(i," : ", round(p[i][0],2),"/",round(p[i][1],2))

0  :  1 / 1
1  :  1 / 1
2  :  1 / 1
3  :  1 / 1
4  :  660.38 / 660.38
5  :  1280.04 / 1280.04
6  :  1889.81 / 1889.81
7  :  2460.49 / 2460.49
8  :  3175.92 / 3175.92
9  :  3759.28 / 3759.28
10  :  4518.72 / 4518.72
11  :  4967.15 / 4967.15
12  :  5335.75 / 8181.9
13  :  4862.56 / 7982.26
14  :  4014.27 / 7269.76
15  :  3417.87 / 6739.91
16  :  2857.95 / 6117.02
17  :  1948.15 / 5005.61
18  :  161.56 / 3366.4
19  :  1.22 / 2826.19
20  :  0.84 / 4784.3
21  :  1 / 2483.68
22  :  1 / 1


In [None]:
for i in range(len(P)):
  print(i," : ", p[i][0],"/",p[i][1])

In [209]:
class MyPolicy():
  def __init__(self, action_space):
    self.action_space = action_space
    #self.p = [[1, 1] for _ in range(23)]
  def __call__(self, state):
    cur, D, a = state
    q=random.random()
    if cur>=21:
      return 0
    if q < p[cur][0] / p[cur][1]:
      return 1
    else:
      return 0


In [210]:
# Create the Blackjack environment
env = gym.make('Blackjack-v1')
env = RecordVideo(env, 'video', name_prefix='blackjack')

state, _ = env.reset()
env.start_video_recorder()

#print('Available actions:', env.action_space)
#print('Observation space:', env.observation_space)
#print('The starting state is:', state)

win=0
draw=0
lose=2

bust=1
low=1

In [213]:


for i in range(100000):

  done = False
  reward = 0
  while not done:
      state, _ = env.reset()

      agent = MyPolicy(env.action_space)
      action = agent(state)

      pre_state = state
      pre_my, _, __ = pre_state
      state, reward, done, _, info = env.step(action)

      if action==1:
        p[pre_my][1]+=hit
        if reward!=-1:
          p[pre_my][0]+=hit

      elif action==0 and pre_my<=21:
        if reward!=0:
          p[pre_my][1]+=stop
          if reward==-1:
            p[pre_my][0]+=stop
          elif reward==1 and p[pre_my][0]>1:
            p[pre_my][0]-=stop



  if reward == 1.0:
    win+=1
  elif reward == -1.0:
    lose+=1
    bust+=action
    low=lose-bust
  else:
    draw+=1

  hit = low/(bust+low)
  stop = bust/(bust+low)

print("win : ", round(win/(win+draw+lose) *100,2), "%",sep='')
print("draw : ", round(draw/(win+draw+lose) *100,2), "%",sep='')
print("lose : ", round(lose/(win+draw+lose) *100,2), "%",sep='')
print()
print("bust : ", bust-1)
print("low : ", low-1)




      #env.render()
      #print(info)
      # Print the current state, reward, and whether the episode is done
      #print('State:', state)
      #print('Reward:', reward)
      #print('Done:', done)



# Close the environment
env.close()


win : 37.77%
draw : 6.13%
lose : 56.09%

bust :  52787
low :  115487


### Tic-Tac-Toe

In [None]:
import numpy as np
import gymnasium as gym
from gymnasium import spaces

class TicTacToeEnv(gym.Env):
    def __init__(self):
        self.board = np.zeros((3, 3), dtype=int)
        self.current_player = 1
        self.action_space = spaces.Discrete(9)
        self.observation_space = spaces.Box(low=0, high=1, shape=(3, 3), dtype=int)

    def reset(self):
        self.board = np.zeros((3, 3), dtype=int)
        self.current_player = 1
        return self.board

    def step(self, action):
        row = action // 3
        col = action % 3

        if self.board[row, col] != 0:
            return self.board, -1, False, False, {}

        self.board[row, col] = self.current_player

        if self._check_winner(self.current_player):
            return self.board, 1, True, False, {}

        if np.count_nonzero(self.board) == 9:
            return self.board, 0, True, False, {}

        self.current_player = -self.current_player
        return self.board, 0, False, False, {}

    def render(self):
        print(self.board)

    def _check_winner(self, player):
        for i in range(3):
            if np.all(self.board[i, :] == player) or np.all(self.board[:, i] == player):
                return True
        if np.all(np.diag(self.board) == player) or np.all(np.diag(np.fliplr(self.board)) == player):
            return True
        return False

In [None]:
env = TicTacToeEnv()
state = env.reset()
env.render()

done = False
while not done:
    action = ?????
    state, reward, done, _ = env.step(action)
    env.render()