## Actor Critic
---

Policy Gradient의 Actor Critic 실습자료 입니다.


import 및 환경 설정

In [33]:
# from google.colab import drive
# drive.mount('/content/drive')

Mounted at /content/drive


In [21]:
# %pip install swig
# %pip install gym[all]
# %pip install gymnasium
# %pip install gymnasium[box2d]

import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import gym
import numpy as np
import torch.distributions as distributions
from collections import deque
import time

# For visualization
from gym.wrappers.monitoring import video_recorder
from IPython.display import HTML
from IPython import display
import glob
import base64, io

import numpy as np
import gymnasium as gym
import cv2
import matplotlib.animation as animation
from IPython.display import clear_output
import random

In [22]:
env = gym.make('CarRacing-v2', continuous=False)
print("Observation space: ", env.observation_space)
print("Action space: ", env.action_space)

Observation space:  Box(0, 255, (96, 96, 3), uint8)
Action space:  Discrete(5)


In [23]:
# Every frme always contains a black area at the bottom of the frame, so we had better cut this black area.
# Also, Color imformation is not directly related to car racing. So we will use gray image for computation efficiency.
# 학습에 불필요한 부분은 CROP 후에 사용함 (Grayscale로 변환)
def preprocess(img):
    img = img[:84, 6:90] # CarRacing-v2-specific cropping
    img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) / 255.0
    return img

In [24]:
class ImageEnv(gym.Wrapper):
    def __init__(
        self,
        env,
        skip_frames=4,
        stack_frames=4,
        initial_no_op=50,
        **kwargs
    ):
        super(ImageEnv, self).__init__(env, **kwargs)
        self.initial_no_op = initial_no_op
        self.skip_frames = skip_frames
        self.stack_frames = stack_frames

    def reset(self):
        # Reset the original environment.
        s, info = self.env.reset()

        # Do nothing for the next `self.initial_no_op` steps
        for i in range(self.initial_no_op):
            s, r, terminated, truncated, info = self.env.step(0)

        # Convert a frame to 84 X 84 gray scale one
        s = preprocess(s)

        # The initial observation is simply a copy of the frame `s`
        self.stacked_state = np.tile(s, (self.stack_frames, 1, 1))  # [4, 84, 84]
        return self.stacked_state, info

    def step(self, action):
        # We take an action for self.skip_frames steps
        # terminated: 완료 / truncated: 실패
        reward = 0
        for _ in range(self.skip_frames):
            s, r, terminated, truncated, info = self.env.step(action)
            reward += r
            if terminated or truncated:
                break

        # Convert a frame to 84 X 84 gray scale one
        s = preprocess(s)

        # Push the current frame `s` at the end of self.stacked_state
        self.stacked_state = np.concatenate((self.stacked_state[1:], s[np.newaxis]), axis=0)

        return self.stacked_state, reward, terminated, truncated, info

In [25]:
env = gym.make('CarRacing-v2', continuous=False)
env = ImageEnv(env)

env.reset()

(array([[[0.62745098, 0.62745098, 0.62745098, ..., 0.62745098,
          0.62745098, 0.62745098],
         [0.62745098, 0.62745098, 0.62745098, ..., 0.62745098,
          0.62745098, 0.62745098],
         [0.62745098, 0.62745098, 0.62745098, ..., 0.62745098,
          0.62745098, 0.62745098],
         ...,
         [0.62745098, 0.62745098, 0.62745098, ..., 0.62745098,
          0.62745098, 0.62745098],
         [0.62745098, 0.62745098, 0.62745098, ..., 0.62745098,
          0.62745098, 0.62745098],
         [0.62745098, 0.62745098, 0.62745098, ..., 0.62745098,
          0.62745098, 0.62745098]],
 
        [[0.62745098, 0.62745098, 0.62745098, ..., 0.62745098,
          0.62745098, 0.62745098],
         [0.62745098, 0.62745098, 0.62745098, ..., 0.62745098,
          0.62745098, 0.62745098],
         [0.62745098, 0.62745098, 0.62745098, ..., 0.62745098,
          0.62745098, 0.62745098],
         ...,
         [0.62745098, 0.62745098, 0.62745098, ..., 0.62745098,
          0.62745098, 0.

In [26]:
class Actor(nn.Module):
  def __init__(self, state_dim, action_dim):
    super(Actor, self).__init__()
    self.conv1 = nn.Conv2d(state_dim[0], 16, kernel_size=8, stride=4)
    self.conv2 = nn.Conv2d(16, 32, kernel_size=4, stride=2)
    self.fc1 = nn.Linear(32 * 9 * 9, 128)
    self.fc2 = nn.Linear(128, action_dim)

  def forward(self, x):
    x = F.relu(self.conv1(x))
    x = F.relu(self.conv2(x))
    x = x.view(x.size(0), -1)
    x = F.relu(self.fc1(x))
    x = F.softmax(self.fc2(x), dim=-1)
    return x

class Critic(nn.Module):
  def __init__(self, state_dim):
    super(Critic, self).__init__()
    self.conv1 = nn.Conv2d(state_dim[0], 16, kernel_size=8, stride=4)
    self.conv2 = nn.Conv2d(16, 32, kernel_size=4, stride=2)
    self.fc1 = nn.Linear(32 * 9 * 9, 128)
    self.fc2 = nn.Linear(128, 1)

  def forward(self, x):
    x = F.relu(self.conv1(x))
    x = F.relu(self.conv2(x))
    x = x.view(x.size(0), -1)
    x = F.relu(self.fc1(x))
    x = self.fc2(x)
    return x

In [27]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        state, action, reward, next_state, done = zip(*random.sample(self.buffer, batch_size))
        return np.stack(state), action, reward, np.stack(next_state), done

    def size(self):
        return len(self.buffer)

In [28]:
epsilon_start = 1.0
epsilon_end = 0.1
epsilon_decay = 500

def get_epsilon(episode):
    return max(epsilon_end, epsilon_start - (epsilon_start - epsilon_end) * (episode / epsilon_decay))

In [29]:
state_dim = (4, 84, 84)
action_dim = env.action_space.n

actor = Actor(state_dim, action_dim)
critic= Critic(state_dim)
actor_optimizer = optim.Adam(actor.parameters(), lr = 0.001)
critic_optimizer=optim.Adam(critic.parameters(),lr=0.001)
replay_buffer = ReplayBuffer(capacity=10000)
batch_size = 64

In [30]:
def train(env, actor, critic, actor_optimizer, critic_optimizer, gamma, replay_buffer, batch_size, episode):
    actor.train()
    critic.train()
    state, info = env.reset()
    episode_reward = 0
    terminated = truncated = False

    while not terminated and not truncated:
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        action_probs = actor(state_tensor).detach().cpu().numpy().squeeze()

        epsilon = get_epsilon(episode)
        if np.random.rand() < epsilon:
            action = np.random.choice(action_dim)
        else:
            action = np.random.choice(np.arange(action_dim), p=action_probs)

        next_state, reward, terminated, truncated, info = env.step(action)
        replay_buffer.add(state, action, reward, next_state, terminated or truncated)

        episode_reward += reward

        if replay_buffer.size() >= batch_size:
            batch = replay_buffer.sample(batch_size)
            state_batch, action_batch, reward_batch, next_state_batch, done_batch = batch

            state_batch = torch.FloatTensor(state_batch)
            next_state_batch = torch.FloatTensor(next_state_batch)
            action_batch = torch.LongTensor(action_batch).view(-1, 1)
            reward_batch = torch.FloatTensor(reward_batch).view(-1, 1)
            done_batch = torch.FloatTensor(done_batch).view(-1, 1)

            action_probs = actor(state_batch).gather(1, action_batch)
            log_action_probs = torch.log(action_probs)

            critic_t = critic(state_batch).view(-1, 1)
            critic_td_t = reward_batch + (1 - done_batch) * gamma * critic(next_state_batch).view(-1, 1)
            advantage_t = critic_td_t - critic_t

            actor_loss = -torch.mean(advantage_t.detach() * log_action_probs)
            actor_optimizer.zero_grad()
            actor_loss.backward()
            actor_optimizer.step()

            critic_loss = F.smooth_l1_loss(critic_t, critic_td_t.detach())
            critic_optimizer.zero_grad()
            critic_loss.backward()
            critic_optimizer.step()

        state = next_state

    return episode_reward


In [32]:
from tqdm import tqdm

In [33]:
MAX_EPISODES = 5000
gamma = 0.99
interval = 10
scores = []
scores_window = deque(maxlen=interval)
maxscore = -10000

start = time.time()

for episode in tqdm(range(1, MAX_EPISODES + 1)):
    episode_reward = train(env, actor, critic, actor_optimizer, critic_optimizer, gamma, replay_buffer, batch_size, episode)
    scores.append(episode_reward)
    scores_window.append(episode_reward)

    avg_score = np.mean(scores_window)
    if avg_score > maxscore:
        maxscore = avg_score
        print("Max Score Ever: ", avg_score)
        torch.save(actor.state_dict(), 'actor.pt')
        torch.save(critic.state_dict(), 'critic.pt')

    if episode % interval == 0:
        print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, avg_score))

    if avg_score >= 400.0:
        print("END!!")
        print("It takes {} seconds".format(time.time() - start))
        torch.save(actor.state_dict(), 'checkpoint.pth')
        break

  0%|          | 1/5000 [00:10<14:55:56, 10.75s/it]

Max Score Ever:  -48.27102803738362


  0%|          | 2/5000 [00:22<15:41:57, 11.31s/it]

Max Score Ever:  -38.302180685358614


  0%|          | 3/5000 [00:34<15:51:01, 11.42s/it]

Max Score Ever:  -36.22243281121819


  0%|          | 3/5000 [00:35<16:31:10, 11.90s/it]


KeyboardInterrupt: 