In [1]:
# !conda env list

In [2]:
# !pip install 'gym[all]'
# !pip show gym 

In [3]:
# !pip install torch torchvision torchaudio

In [4]:
# matplotlib backend 모드를 ipython 에서 display 하기 위해 설치
#!pip install ipympl

In [5]:
import sys
import os
import gym
import torch
from torch import nn

import warnings
warnings.filterwarnings('ignore')

print(f"Apple M1 백앤드를 사용하려면 PyTorch '1.12.1' 이상이 필요.")
print(f"torch.__version__: {torch.__version__}")
print(f"torch.backends.mps.is_built(): {torch.backends.mps.is_built()}")
print(f"torch.backends.mps.is_available(): {torch.backends.mps.is_available()}")

# gym 버전 확인
print(f'gym ver: {gym.version.VERSION}')
if gym.version.VERSION < '0.26.0':
    sys.exit('gym 버전이 0.26 이상 필요.')

trained_path = './trained-model'

if not os.path.isdir(trained_path):
    os.mkdir(trained_path)
    print(f'디렉토리 생성됨. {trained_path}')

Apple M1 백앤드를 사용하려면 PyTorch '1.12.1' 이상이 필요.
torch.__version__: 1.13.0
torch.backends.mps.is_built(): True
torch.backends.mps.is_available(): True
gym ver: 0.26.2


In [6]:
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from IPython import display

# matplotlib backend 를 jupyter notebook ipython 모드로 지정. 안하면 display plot 시 깜빡임
# %matplotlib ipympl

# matplotlib ipython plot 느린 속도로 별도 창에서 plot
%matplotlib tk

# from collections import namedtuple, deque
from collections import namedtuple, deque
from itertools import count
import torch.optim as optim
import torch.nn.functional as F

env = gym.make('CartPole-v1')

# 1.13.0 에서 Apple M1 GPU 가속 버그로 CPU 사용..
device = torch.device("cpu")
# device = torch.device("mps:0" if torch.backends.mps.is_available() else "cpu")

In [7]:
Transition = namedtuple(
    'Transition', ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [8]:
class DQN(nn.Module):
    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, n_actions)

    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        x = self.layer3(x)
        return x

In [9]:
BATCH_SIZE = 64    # transition 샘플 갯수
GAMMA      = 0.99  # discount factor
EPS_START  = 0.9   # 시작시 epsilon 값
EPS_END    = 0.01  # 종료시 epsilon 값
EPS_DECAY  = 1000  # epsilon 감쇠값
TAU        = 0.005 # soft target update rate
LR         = 0.0004# learning rate
MEM_SIZE   = 10000 # replayMemory maxlen
EPISODES   = 400   # 에피소드 반복 횟수

n_actions = env.action_space.n
state, _ = env.reset()
n_observations = len(state)
print(f'n_observations: {n_observations}')

policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(MEM_SIZE)

steps_done = 0

def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            return policy_net(state).max(1)[1].view(1, 1)
    else:
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)

episode_durations = []

def plot_durations():
    hyperparam_text  = f'batch_size: {BATCH_SIZE:,}\n'
    hyperparam_text += f'gamma: {GAMMA}\n'
    hyperparam_text += f'eps_start: {EPS_START}\n'
    hyperparam_text += f'eps_end: {EPS_END}\n'
    hyperparam_text += f'eps_decay: {EPS_DECAY:,}\n'
    hyperparam_text += f'tau: {TAU}\n'
    hyperparam_text += f'lr: {LR}\n'
    hyperparam_text += f'replay_size: {MEM_SIZE:,}\n'
    hyperparam_text += f'episodes: {EPISODES:,}'
    
    plt.figure(1)
    plt.clf()
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    plt.title(f'Training...\n(pytorch backend:{device})')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy(), label=hyperparam_text)
    plt.grid(linestyle=':', color='0.7', linewidth=1)
    # print hyperparameter
    plt.legend(handlelength=0, handletextpad=0, loc='upper left')
    # 화면 업데이트를 위해
    plt.pause(0.0001)

n_observations: 4


In [10]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))

    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a)
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1})
    next_state_values = torch.zeros(BATCH_SIZE, device=device)

    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0]
    
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()

    # In-place gradient clipping
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()

In [11]:
for i_episode in range(EPISODES):

    # Initialize the environment and get it's state
    state, _ = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    for t in count():
        action = select_action(state)
        observation, reward, terminated, truncated, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        done = terminated or truncated

        if terminated:
            next_state = None
        else:
            next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        memory.push(state, action, next_state, reward)
        state = next_state

        optimize_model()

        target_net_state_dict = target_net.state_dict()
        policy_net_state_dict = policy_net.state_dict()

        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key] * TAU + target_net_state_dict[key]*(1-TAU)
        
        if i_episode % 3 == 0:
            target_net.load_state_dict(target_net_state_dict)

        if done:
            episode_durations.append(t + 1)
            plot_durations()
            break

plt.show()

In [14]:
# PyTorch model save
torch_script = torch.jit.script(target_net)
torch_script.save('./trained-model/target_net_final_3.pt')

In [13]:
## saved_model = torch.jit.load('./trained-model/target_net.pt')
# saved_model.eval()
# print(saved_model)

2022-12-12 20:15:40.613 python[45947:7497586] +[CATransaction synchronize] called within transaction
