In [1]:
from PIL import Image

def display_frames_as_gif(frames, filename='Pong_DDQN.gif'):
    frs = [Image.fromarray(f, mode='RGB') for f in frames]
    frs[0].save('./result/'+filename, save_all=True, append_images=frs[1:], optimize=False, duration=40, loop=0)

In [2]:
import gym
from gym import spaces
import numpy as np
import torch
import torch.nn as nn
import pfrl
import cv2
import matplotlib.pyplot as plt

In [3]:
class WrapFrame(gym.ObservationWrapper):
    def __init__(self, env):
        gym.ObservationWrapper.__init__(self, env)
        self.width = 84
        self.height = 84
        self.observation_space = spaces.Box(low=0, high=255, shape=(self.height, self.width, 1), dtype=np.uint8)

    def observation(self, frame):
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
        frame = cv2.resize(frame, (self.width, self.height), interpolation=cv2.INTER_AREA)
        return frame[:,:,None]

class WrapPyTorch(gym.ObservationWrapper):
    def __init__(self, env=None):
        super(WrapPyTorch, self).__init__(env)
        obs_shape = self.observation_space.shape
        self.observation_space = spaces.Box(
            self.observation_space.low[0,0,0],
            self.observation_space.high[0,0,0],
            [obs_shape[2], obs_shape[0], obs_shape[1]],
            dtype=self.observation_space.dtype
        )
    
    def observation(self, observation):
        return observation.transpose(2, 0, 1)

In [4]:
env = gym.make('Pong-v0')
print('observation space:', env.observation_space)
print('action space:', env.action_space)
print(env.action_space.sample())

env = WrapFrame(env)
env = WrapPyTorch(env)
print('observation space:', env.observation_space)
# plt.imshow(env.observation_space.sample())
env.reset().shape

observation space: Box(0, 255, (210, 160, 3), uint8)
action space: Discrete(6)
0
observation space: Box(0, 255, (1, 84, 84), uint8)


(1, 84, 84)

## Random

In [35]:
done = False
frames = []
obs = env.reset()

while not done:
    action = int(np.random.rand()*6)
    obs, r, done, info = env.step(action)
    frames.append(env.render(mode='rgb_array'))
    print(f'action: {action}')
    print(f'reward: {r}')

display_frames_as_gif(frames, 'Pong_Random.gif')

: 0.0
action: 5
reward: 0.0
action: 0
reward: 0.0
action: 1
reward: 0.0
action: 3
reward: 0.0
action: 3
reward: 0.0
action: 0
reward: 0.0
action: 3
reward: 0.0
action: 4
reward: 0.0
action: 5
reward: 0.0
action: 3
reward: 0.0
action: 3
reward: 0.0
action: 1
reward: 0.0
action: 2
reward: 0.0
action: 5
reward: 0.0
action: 4
reward: 0.0
action: 4
reward: 0.0
action: 5
reward: 0.0
action: 1
reward: 0.0
action: 5
reward: 0.0
action: 5
reward: 0.0
action: 4
reward: 0.0
action: 5
reward: 0.0
action: 0
reward: 0.0
action: 5
reward: 0.0
action: 5
reward: 0.0
action: 0
reward: 0.0
action: 3
reward: 0.0
action: 0
reward: 0.0
action: 1
reward: 0.0
action: 2
reward: 0.0
action: 2
reward: 0.0
action: 5
reward: 0.0
action: 5
reward: 0.0
action: 5
reward: 0.0
action: 3
reward: 0.0
action: 0
reward: 0.0
action: 2
reward: 0.0
action: 3
reward: 0.0
action: 2
reward: 0.0
action: 1
reward: 0.0
action: 5
reward: 0.0
action: 5
reward: 0.0
action: 3
reward: 0.0
action: 5
reward: 0.0
action: 1
reward: 0.0
acti

In [20]:
env.close()

## DDQN with PFRL

In [5]:
class QFunction(nn.Module):

    def __init__(self, width, height, n_actions):
        super().__init__()
        
        def conv2d_size_out(size, kernel_size=5, stride=2):
            return (size-(kernel_size-1)-1) // stride + 1

        convW = conv2d_size_out(width, 8, 4)
        convW = conv2d_size_out(convW, 4, 2)
        convW = conv2d_size_out(convW, 3, 1)

        linear_input_size = (convW**2) * 64

        # RGB Image tensor as input
        self.selectTrackFeatures = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=8,stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3,stride=1),
            nn.ReLU(),
            nn.Flatten()
        )

        self.fc1 = nn.Sequential(
            nn.Linear(linear_input_size, 256),
            nn.ReLU(),
            nn.Linear(256, n_actions)
        )

        # self.l1 = nn.Linear(obs_size, 50)
        # self.l2 = nn.Linear(50, 50)
        # self.l3 = nn.Linear(50, n_actions)

    def forward(self, state):
        x = self.selectTrackFeatures(state)
        x = self.fc1(x)
        return pfrl.action_value.DiscreteActionValue(x)

obs_size = env.observation_space.low.shape
print(obs_size)
n_actions = env.action_space.n
q_func = QFunction(obs_size[1], obs_size[2], n_actions)

(1, 84, 84)


In [6]:
img = np.random.randint(256, size=(4,84,84,1))
print(img.shape)
tn = torch.from_numpy(img.astype(np.float32))
tn = tn.permute(0,3,1,2)
print(tn.shape)
q = QFunction(84, 84, 1)
q(tn)


(4, 84, 84, 1)
torch.Size([4, 1, 84, 84])


DiscreteActionValue greedy_actions:[0 0 0 0] q_values:[[2.4438639]
 [2.1189778]
 [2.1173034]
 [3.1938765]]

In [7]:
optimizer = torch.optim.Adam(q_func.parameters(), eps=1e-2)

In [None]:
# Set the discount factor that discounts future rewards.
gamma = 0.9

# Use epsilon-greedy for exploration
explorer = pfrl.explorers.ConstantEpsilonGreedy(
    epsilon=0.3, random_action_func=env.action_space.sample)

# DQN uses Experience Replay.
# Specify a replay buffer and its capacity.
replay_buffer = pfrl.replay_buffers.ReplayBuffer(capacity=10 ** 6)

# Since observations from CartPole-v0 is numpy.float64 while
# As PyTorch only accepts numpy.float32 by default, specify
# a converter as a feature extractor function phi.
phi = lambda x: x.astype(np.float32, copy=False)

# Set the device id to use GPU. To use CPU only, set it to -1.
gpu = 0

# Now create an agent that will interact with the environment.
agent = pfrl.agents.DoubleDQN(
    q_func,
    optimizer,
    replay_buffer,
    gamma,
    explorer,
    replay_start_size=500,
    update_interval=1,
    target_update_interval=100,
    phi=phi,
    gpu=gpu,
)

In [None]:
n_episodes = 200
max_episode_len = 300
for i in range(1, n_episodes + 1):
    obs = env.reset()
    R = 0  # return (sum of rewards)
    t = 0  # time step
    while True:
        # Uncomment to watch the behavior in a GUI window
        # env.render()
        action = agent.act(obs)
        obs, reward, done, _ = env.step(action)
        R += reward
        t += 1
        reset = t == max_episode_len
        agent.observe(obs, reward, done, reset)
        # print(f"action: {action}, reward: {reward}")
        if done or reset:
            break
    if i % 10 == 0:
        print('episode:', i, 'R:', R)
    if i % 50 == 0:
        print('statistics:', agent.get_statistics())
print('Finished.')

In [42]:
done = False
frames = []
obs = env.reset()
total_r = 0
t = 0

with agent.eval_mode():
    while t <= 300:
        action = agent.act(obs)
        obs, r, done, info = env.step(action)
        total_r += r
        t += 1
        agent.observe(obs, r, done, reset)
        frames.append(env.render(mode='rgb_array'))
print(total_r)
display_frames_as_gif(frames)

-3.0


In [12]:
env.close()