In [1]:
# DQN(Deep Q-Network)
import gym
import numpy as np
import torch
from torch import nn
from torch.autograd import Variable
from torch import optim
from torch.nn import functional as F

# 환경 생성
env = gym.make('FrozenLake-v1')

In [2]:
# 신경망 구성
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(16, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 16)
        self.fc4 = nn.Linear(16, 4)
    
    def forward(self, x):
        x = Variable(x)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        return x

# 인스턴스 생성
model  = Net().cuda()

In [3]:
model

Net(
  (fc1): Linear(in_features=16, out_features=64, bias=True)
  (fc2): Linear(in_features=64, out_features=32, bias=True)
  (fc3): Linear(in_features=32, out_features=16, bias=True)
  (fc4): Linear(in_features=16, out_features=4, bias=True)
)

In [4]:
# one-hot 백테를 텐서로 변환
def onehot2tensor(state):
    tmp = np.zeros(16)
    tmp[state] = 1
    vector = np.array(tmp, dtype='float32')
    tensor = torch.from_numpy(vector).float()
    return tensor

# 모형에 입력
def applymodel(tensor):
    output_tensor = model(tensor)
    output_tensor = output_tensor.cpu()
    output_array = output_tensor.data.numpy()
    return output_tensor, output_array

In [6]:
# 총 보상
total_reward = 0.0

# 손실함수 생성
criterion = nn.MSELoss()

# 최적화함수 생성
optimizer = optim.Adam(model.parameters(), lr=0.01)

# 게임 시작
for i_episode in range(100):
    # 초기화
    observation = env.reset()

    # 현재 게임의 보상
    episode_reward = 0.0

    # 오차 누적 계산
    total_loss = 0.0
    for t in range(100):
        # 1턴 실행 후의 위치를 현재 위치로 설정
        current_state = observation

        # 경사를 초기화
        optimizer.zero_grad()

        # onehot 벡터를 텐서로 변환
        current_tensor = onehot2tensor(current_state)
        current_tensor = current_tensor.cuda()

        # 모형에 입력
        current_output_tensor, current_output_array = applymodel(current_tensor)

        # 행동 선택하기
        if np.random.rand() < 0.1:
            # 무작위로 선택
            action = env.action_space.sample()
        else:
            # Q 값이 최대가 되도록 선택
            action = np.argmax(current_output_array)
        # 1턴 실행
        observation, reward, done, info = env.step(action)

        # onehot 벡터를 텐서로 변환
        observation_tensor = onehot2tensor(observation)
        observation_tensor = observation_tensor.cuda()

        # 모형에 입력
        observation_output_tensor, observation_output_array = applymodel(observation_tensor)

        # Q값 업데이트
        q = reward + 0.99 * np.max(observation_output_array)
        q_array = np.copy(current_output_array)
        q_array[action] = q
        q_variable = Variable(torch.Tensor(q_array))
        
        # 오차 계산
        loss = criterion(current_output_tensor, q_variable)
        
        # 역전파 계산
        loss.backward()
        
        # 가중치 업데이트
        optimizer.step()
        
        # 오차 누적 계산
        total_loss += loss.data
        
        # 종료
        if done:
            # 현재 게임 보상 누적 계산
            episode_reward += reward
    # 총 보상 누적 계산
    total_reward += episode_reward

    # 누적 오차 및 보상을 10게임마다 출력
    if (i_episode+1) % 10 == 0:
        print(i_episode+1, total_loss, total_reward)


10 tensor(0.0001) 1.0
20 tensor(0.0001) 2.0
30 tensor(4.4370e-06) 2.0
40 tensor(1.6745e-08) 2.0
50 tensor(1.3263e-12) 2.0
60 tensor(1.4956e-19) 2.0
70 tensor(5.1721e-31) 2.0
80 tensor(0.0004) 2.0
90 tensor(5.6631e-05) 2.0
100 tensor(2.0408e-05) 3.0


In [7]:
# 총 보상을 출력
print(total_reward)

# 게임당 평균 보상을 출력
print(total_reward/100)

3.0
0.03


In [11]:
# 총 보상
total_reward = 0.0

frames = []
# 게임 시작
for i_episode in range(1000):

    # 초기화
    observation = env.reset()

    # 현재 게임 보상
    episode_reward = 0.0

    for t in range(10):
        # 1턴 실행 후의 위치를 현재 위치로 삼음
        current_state = observation

        # one-hot 벡터를 텐서로 변환
        current_tensor = onehot2tensor(current_state)
        current_tensor = current_tensor.cuda()

        # 모형에 입력
        current_output_tensor, current_output_array = applymodel(current_tensor)

        # Q값이 최대가 되는 행동을 선택
        action = np.argmax(current_output_array)

        # 1턴 실행
        observation, reward, done, info = env.step(action)

        # 애니메이션을 위하여 정보 기록
        frames.append({
            'frame': env.render(mode='ansi'),
            'state': observation,
            'action': action,
            'reward': reward
            }
        )

        # 종료
        if done:

            # 현재 게임 보상을 누적 계산
            episode_reward += reward

    # 총 보상을 누적 계산
    total_reward += episode_reward

In [12]:
# 총 보상을 출력
print(total_reward)

# 게임당 평균 보상을 출력
print(total_reward/1000)

24.0
0.024


In [13]:
from IPython.display import clear_output
from time import sleep

def print_frames(frames):
    for i, frame in enumerate(frames):
        clear_output(wait=True)
        print(frame['frame'])
        print(f"Timestep: {i + 1}")
        print(f"State: {frame['state']}")
        print(f"Action: {frame['action']}")
        print(f"Reward: {frame['reward']}")
        sleep(.1)
        
print_frames(frames)

  (Down)
SFFF
F[41mH[0mFH
FFFH
HFFG

Timestep: 243
State: 5
Action: 1
Reward: 0


KeyboardInterrupt: 