In [1]:
import gym
env = gym.make('MsPacman-v0')
state = env.reset()
env.render()



True

In [2]:
import numpy as np
print(state.shape)

(210, 160, 3)


In [3]:
state_1 = state[1:176:2, ::2]
print(state_1.shape)

(88, 80, 3)


In [4]:
state_1 = state_1.mean(axis=2)
print(state_1.shape)

(88, 80)


In [5]:
state_1 = state_1.reshape(88, 80, 1)
print(state_1.shape)

(88, 80, 1)


In [6]:
state_1 = state_1[np.newaxis, :]
print(state_1.shape)

(1, 88, 80, 1)


In [7]:
print(env.action_space)

Discrete(9)


In [8]:
# Q 함수를 딥러닝으로 추정하기 위해 필요한 tensorflow.keras 라이브러리와 replay buffer를 만들기 위한 deque를 호출

import random
from collections import deque
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Dense, Flatten
from tensorflow.keras.optimizers import Adam

In [9]:
state_shape = (88, 80, 1)
action_size = env.action_space.n
color = np.array([210, 164, 74]).mean()

In [10]:
def process_state(state):
    # Downsize
    img = state[1:176:2, ::2]
    
    # (88, 80, 3)을 (88, 80)으로
    img = img.mean(axis=2)
    
    img[img==color] = 0
    
    # -1 ~ 1
    img = (img - 128) / 128
    
    img = img.reshape(88, 80, 1)
    return img

In [11]:
class DQN:
    def __init__(self, state_shape, action_size):
        self.state_shape = state_shape
        self.action_size = action_size
        self.replay_buffer = deque(maxlen=5000)
        self.gamma = 0.9
        self.update_timesteps = 1000
        self.eps_min = 0.1
        self.eps_max = 0.8
        self.eps_steps = 2000000
        self.main_network = self.q_network()
        self.target_network = self.q_network()
        self.target_network.set_weights = (self.main_network.get_weights())
        
    def q_network(self):
        model = Sequential()
        model.add(Conv2D(32, 8, strides=4, padding='same', activation='relu', input_shape=self.state_shape))
        model.add(Conv2D(64, 4, strides=2, padding='same', activation='relu'))
        model.add(Conv2D(64, 3, padding='same', activation='relu'))
        model.add(Flatten())
        model.add(Dense(512, activation='relu'))
        model.add(Dense(128, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        
        model.compile(loss='mse', optimizer=Adam())
        return model
    
    def store_transitions(self, state, action, reward, next_state, done):
        self.replay_buffer.append((state, action, reward, next_state, done))
    
    def epsilon_greedy(self, state, step):
        epsilon = max(self.eps_min, self.eps_max - (self.eps_max - self.eps_min) * step / self.eps_steps)
        
        if random.uniform(0, 1) < epsilon:
            return np.random.randint(self.action_size)
        else:
            state = state[np.newaxis, :]
            Q_values = self.main_network.predict(state)
            return np.argmax(Q_values[0])
    
    def train(self, batch_size):
        indices = np.random.randint(len(self.replay_buffer), size=batch_size)
        batch = [self.replay_buffer[index] for index in indices]
        states, actions, rewards, next_states, dones = [
            np.array([experience[field_index] for experience in batch])
            for field_index in range(5)
        ]    
        next_Q_values = self.target_network.predict(next_states)
        max_next_Q_values = np.max(next_Q_values, axis=1)
        target_Q_values = (rewards + (1-dones)*self.gamma*max_next_Q_values)
        
        y_Q_values = self.main_network.predict(states)
        
        for k, action in enumerate(actions):
            y_Q_values[k][action] = target_Q_values[k]
        
        self.main_network.train_on_batch(states, y_Q_values)
    
    def update_target_network(self):
        self.target_network.set_weights = (self.main_network.get_weights())        

In [12]:
num_episodes = 500
num_timesteps = 20000
batch_size = 8
training_interval = 4
return_monitor = []
dqn = DQN(state_shape, action_size)
done = False
time_step = 0

In [13]:
for i in range(num_episodes):
    Return = 0
    state = process_state(env.reset())
    
    for t in range(num_timesteps):
        env.render()
        time_step += 1
        if time_step % dqn.update_timesteps == 0:
            dqn.update_target_network()
        
        action = dqn.epsilon_greedy(state, t*(i+1))
        next_state, reward, done, _ = env.step(action)
        next_state = process_state(next_state)
        
        dqn.store_transitions(state, action, reward, next_state, done)
        state = next_state
        
        Return += reward
        
        if time_step % training_interval != 0:
            continue
            
        if done:
            print('Episode :', i, '', 'Return :', Return)
            return_monitor.append([i, Return])
            break
        
        if len(dqn.replay_buffer) > batch_size:
            dqn.train(batch_size)

Episode : 0  Return : 290.0


KeyboardInterrupt: 

In [None]:
import pandas as pd
df = pd.DataFrame(return_monitor, columns=['episode', 'return'])
df.head()

In [None]:
print(df.shape)

In [None]:
import matplotlib.pyplot as plt
fig = plt.figure(figsize=(10, 6))
plt.plot(df['episode'], df['return'])
plt.xlabel('episode')
plt.ylabel('return')
plt.title('Return_Monitor')
plt.show()