<a href="https://colab.research.google.com/github/sugiyama404/ReinforcementLearningForGymOrAtari/blob/main/DQN/DQNForCarRacing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip uninstall gym -y  > /dev/null 2>&1 # gym 0.17.3 was broken 2021/11/08
!pip install gym gym[box2d] tensorflow_addons > /dev/null 2>&1

In [2]:
import gym
from gym import wrappers

import numpy as np
import pandas as pd
import time
from datetime import datetime
import random
import copy

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import Model, Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Dense, ReLU, Input, Lambda, Conv2D, Flatten, Activation
from tensorflow.keras.losses import Huber
from sklearn.preprocessing import StandardScaler
import tensorflow_addons as tfa
from tensorflow_addons.optimizers import RectifiedAdam

from dataclasses import dataclass

from tensorflow.keras.utils import Progbar

import math
from time import sleep



!apt update  > /dev/null 2>&1
!apt install xvfb  > /dev/null 2>&1
!pip install pyvirtualdisplay  > /dev/null 2>&1
from pyvirtualdisplay import Display
d = Display()
d.start()

<pyvirtualdisplay.display.Display at 0x7f75c7be4390>

In [3]:
class ReplayMemory:
    def __init__(self, max_size=500, batch_size=32):

        self.cntr = 0
        self.size = 0
        self.max_size = max_size
        self.batch_size = batch_size
        self.states_memory = np.zeros([self.max_size, 96, 96, 3], dtype=np.float32)
        self.next_states_memory = np.zeros([self.max_size, 96, 96, 3], dtype=np.float32)
        self.acts_memory = np.zeros(self.max_size, dtype=np.uint8)
        self.rewards_memory = np.zeros(self.max_size, dtype=np.float32)
        self.done_memory = np.zeros(self.max_size, dtype=np.uint8)

    def store_transition(self, state, act, reward, next_state, done):
        self.states_memory[self.cntr] = state
        self.next_states_memory[self.cntr] = next_state
        self.acts_memory[self.cntr] = act
        self.rewards_memory[self.cntr] = reward
        self.done_memory[self.cntr] = done
        self.cntr = (self.cntr+1) % self.max_size
        self.size = min(self.size+1, self.max_size)

    def random_sampling(self):
        mb_index = np.random.choice(self.size, self.batch_size, replace=False)
        key = ['state','next_state','act','reward','done']
        value = [self.states_memory[mb_index],self.next_states_memory[mb_index],
                 self.acts_memory[mb_index],self.rewards_memory[mb_index],
                 self.done_memory[mb_index]]
        dict1=dict(zip(key,value))

        return dict1

In [4]:
class Brain:
    def __init__(self):

        obs_shape = (96, 96, 3)
        nb_actions = 5
        opt = RectifiedAdam(learning_rate=0.001, epsilon=0.001)
        loss = Huber()
        model = Sequential()
        model.add(Conv2D(16, kernel_size=(16, 16), strides=(2, 2), activation='relu', input_shape=obs_shape))
        model.add(Conv2D(32, kernel_size=(8, 8), strides=(2, 2), activation='relu'))
        model.add(Flatten())
        model.add(Dense(256))
        model.add(Dense(nb_actions))
        model.compile(loss = loss, optimizer = opt)
        model.summary()
        self.model = model

In [5]:
class Agent(Brain, ReplayMemory):
    def __init__(self, max_size=500, batch_size=32):
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.r = 0.995
        self.batch_size = batch_size
        Brain.__init__(self)
        ReplayMemory.__init__(self, max_size, batch_size)

    def update_replay_memory(self, state, action, reward, next_state, done):
        self.store_transition(state, action, reward, next_state, done)

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return np.random.choice(5)
        state = np.array([state])
        act_values = self.model.predict(state) # state (96, 96, 3)
        return np.argmax(act_values[0])

    def replay(self):
        if self.size < self.batch_size:
            return

        m_batch = self.random_sampling()
        states, next_states, actions, rewards, done = m_batch['state'], m_batch['next_state'], m_batch['act'], m_batch['reward'], m_batch['done']
        target = rewards + (1 - done) * self.gamma * np.amax(self.model.predict(next_states), axis=1)

        target_full = self.model.predict(states)

        target_full[np.arange(self.batch_size), actions] = target
        self.model.train_on_batch(states, target_full)

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.r

    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)

In [6]:
class Main:
    def __init__(self, env, agent, episodes_times = 1000):
        self.env = env
        self.agent = agent
        self.episodes_times = episodes_times

    def play_game(self):
        for episode in range(self.episodes_times):

            if (episode % 10 == 0):
                metrics_names = ['score']
                if (int(str(self.episodes_times)[:-1])*10 == episode):
                    pb_i = Progbar(int(str(self.episodes_times)[-1]), stateful_metrics=metrics_names)
                else:
                    pb_i = Progbar(10, stateful_metrics=metrics_names)
                score_mean = np.array([])

            state = self.env.reset()
            done = False
            score = 0
    
            while not done:
                self.env.render()              
                action = self.agent.act(state)
                tmp_action = self._action_clipping(action)
                next_state, reward, done, info = self.env.step(tmp_action)
                score += reward

                self.agent.update_replay_memory(state, action, reward, next_state, done)
                self.agent.replay()

                state = next_state

            score_mean = np.append(score_mean, score)
            values = [('score',np.mean(score_mean))]
            pb_i.add(1, values=values)

        self.env.close()

    def _action_clipping(self, val):
        actions = np.array([[ 0, 0, 0],  # [0]: straight
                            [ 0, 1, 0],  # [1]: acceleration
                            [ 0, 0, 1],  # [2]: decelerate
                            [ 1, 0, 0],  # [3]: Turn right
                            [-1, 0, 0]]) # [4]: Turn left
        return actions[val]

In [7]:
episodes_times = 300
batch_size = 32
max_size = 500

agent = Agent(max_size, batch_size)
gym.logger.set_level(40)
env = gym.make('CarRacing-v0')
env.unwrapped.verbose = 0
env = wrappers.Monitor(env, './', force=True, video_callable=(lambda ep: ep % 25 == 0))
main = Main(env, agent, episodes_times)
main.play_game()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 41, 41, 16)        12304     
                                                                 
 conv2d_1 (Conv2D)           (None, 17, 17, 32)        32800     
                                                                 
 flatten (Flatten)           (None, 9248)              0         
                                                                 
 dense (Dense)               (None, 256)               2367744   
                                                                 
 dense_1 (Dense)             (None, 5)                 1285      
                                                                 
Total params: 2,414,133
Trainable params: 2,414,133
Non-trainable params: 0
_________________________________________________________________
