In [1]:
import gym
import numpy as np
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Flatten, Dense, InputLayer, Conv2D, MaxPool2D
from PIL import Image
import matplotlib.pyplot as plt
from collections import deque
import random
from IPython.display import clear_output

In [2]:
class DQN_Agent:
    def __init__(self, n_of_actions):
        self.rng = 1
        self.rng_min = 0.1
        self.rng_decay = 0.99
        self.discount = 0.95
        
        self.n_of_actions = n_of_actions
        
        self.transfer_ctr = 0
        
        self.memory = deque(maxlen=20_000)
        
        self.q_eval = self.create_model()
        self.q_target = self.create_model()
        self.transfer_weight()
    
    def create_model(self):
        model = Sequential()
        model.add(Conv2D(16, 8, 4, activation='relu', padding='same', input_shape=(125,80,1)))
        model.add(MaxPool2D(pool_size=4, strides=2, padding='same'))
        model.add(Conv2D(32, 4, 2, activation='relu', padding='same'))
        model.add(MaxPool2D(pool_size=2, strides=1, padding='same'))
        model.add(Flatten())
        model.add(Dense(128, activation='relu'))
        model.add(Dense(128, activation='relu'))
        model.add(Dense(self.n_of_actions, activation='linear'))
        model.compile(
            optimizer='adam',
            loss='mean_squared_error',
            metrics=['accuracy']
        )

        return model
    
    def remember(self, observation, action, reward, done, new_observation):
        observation = self.reshape_image(observation)
        new_observation = self.reshape_image(new_observation)
        self.memory.append([observation, action, reward, done, new_observation])

    def get_action(self, observation):
        if random.random() < self.rng:
            return random.randint(0, self.n_of_actions - 1)
        else:
            return np.argmax(self.get_qs(observation))
        
    def get_qs(self, observation):
        observation = np.reshape(self.reshape_image(observation), (1, 125, 80, 1))
        return self.q_eval.predict(observation)
    
    def reshape_image(self, observation):
        img = Image.fromarray(observation)
        img = img.resize((80, 125))
        img = img.convert("L")
        img = np.array(img)
        img = np.reshape(img, (125, 80, 1))
        return img
    
    def save(self):
        self.model.save('ski')
        
    def load(self):
        self.model = tf.keras.models.load_model('ski')
        self.random = 1

    def train(self):
        if len(self.memory) < 5_000:
            return

        mini_batch = random.sample(self.memory, 128)

        current_states = np.array([transition[0] for transition in mini_batch])
        current_qs = self.q_eval.predict(current_states)
        next_states = np.array([transition[4] for transition in mini_batch])
        next_qs = self.q_target.predict(next_states)

        X = current_states
        y =[]

        for i, observation in enumerate(mini_batch):
            _ = observation[0]
            action = observation[1]
            reward = observation[2]
            done = observation[3]
            new_state = observation[4]

            if done:
                new_q = reward
            else:
                new_q = reward + self.discount * np.max(next_qs[i])

            qs = current_qs[i]
            qs[action] = new_q
            y.append(qs)

        self.q_eval.fit(X, np.array(y), verbose=0, shuffle=False)

        self.transfer_ctr += 1
        if self.transfer_ctr > 10:
            self.transfer_weight()
            self.transfer_ctr = 0
            
            self.rng = self.rng * self.rng_decay
            if self.rng < self.rng_min:
                self.rng = self.rng_min
                
    def transfer_weight(self):
        self.q_target.set_weights(self.q_eval.get_weights())

In [3]:
n_of_episodes = 5_000

In [4]:
env = gym.make('Skiing-v0')
agent = DQN_Agent(3)

In [None]:
agent.load()

In [None]:
observation = env.reset()
train_ctr = 0

for i in range(n_of_episodes):
    game_over = False
    total_reward = 0
    while not game_over:
        #env.render()
        #observation, reward, done, info = env.step(env.action_space.sample())
        
        action = agent.get_action(observation)
        new_observation, reward, done, info = env.step(action)
        agent.remember(observation, action, reward, done, new_observation)
        observation = new_observation
        
        total_reward += reward
        
        train_ctr += 1
        if train_ctr > 500:
            agent.train()
            train_ctr = 0
        
        if done:
            agent.train()
            game_over = True
            clear_output(wait=True)
            print(f'Episode: {i}')
            print(f'Reward: {total_reward}')
            print(f'Randomness: {agent.rng}')
            observation = env.reset()
    
env.close()

Episode: 359
Reward: -18406.0
Randomness: 0.9


In [None]:
agent.rng

In [None]:
agent.save()