## DQN Agent for Moutain Car Game Environment

In [1]:
import gym
import pygame
import time
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import os
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from tensorflow.keras.optimizers import Adam
import random
import warnings
import sys

pygame 2.5.2 (SDL 2.28.3, Python 3.10.6)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
# Ignore all warnings
warnings.filterwarnings('ignore')

## Hard-Coded Strategy(Random)

In [3]:
# set the desired frame rate
desired_fps = 250

# Create the Gym environment with 'human' rendering mode
env = gym.make('MountainCar-v0')
env = gym.wrappers.Monitor(env, "hcs_recording", force=True, video_callable = lambda episode_id: episode_id % 4 == 0)
env.metadata['render_fps'] = desired_fps

# 10 game episodes with 200 steps for each episode
for e in range(10):
    observation = env.reset()
    action=2
    max_vel=env.observation_space.high[1]
    min_vel=env.observation_space.low[1]
    initial_pos=observation[0]
        
    for t in range(200):
        env.render()
        
        observation,reward,done,other_info = env.step(action)
        if(action==2):
            if(observation[1]>0.0):
                # Continue Forward running
                action=2
            else:
                # Start Backward running
                action=0
        
        elif(action==0):
            if(observation[1]<0.0):
                # Continue Backward running
                action=0
            else:
                # Start Forward running
                action=2
        if done: # Game Over
            print("Game Episode :{}/{}, High Score:{},Exploration Rate:{:.2}".format(e,10,199-t,1.0))
            break
            
env.reset()
env.close()
print("Game Over!")

Game Episode :0/10, High Score:75,Exploration Rate:1.0
Game Episode :1/10, High Score:86,Exploration Rate:1.0
Game Episode :2/10, High Score:84,Exploration Rate:1.0
Game Episode :3/10, High Score:86,Exploration Rate:1.0
Game Episode :4/10, High Score:77,Exploration Rate:1.0
Game Episode :5/10, High Score:76,Exploration Rate:1.0
Game Episode :6/10, High Score:79,Exploration Rate:1.0
Game Episode :7/10, High Score:78,Exploration Rate:1.0
Game Episode :8/10, High Score:87,Exploration Rate:1.0
Game Episode :9/10, High Score:78,Exploration Rate:1.0
Game Over!


## RL Based Strategy

#### Designing AI agent

In [4]:
class Agent:
    def __init__(self,state_size,action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95 #Discount Factor
        self.epsilon = 1.0 # Exploration Rate: How much to act randomly, 
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.01
        self.learning_rate = 0.001 
        self.model = self._create_model()
        
    
    def _create_model(self):
        #Neural Network To Approximate Q-Value function
        model = Sequential()
        model.add(Dense(24,input_dim=self.state_size,activation='relu')) #1st Hidden Layer
        model.add(Dense(24,activation='relu')) #2nd Hidden Layer
        model.add(Dense(self.action_size,activation='linear'))
        model.compile(loss='mse',optimizer=Adam(learning_rate=self.learning_rate))
        return model
    
    def remember(self,state,action,reward,next_state,done):
        self.memory.append((state,action,reward,next_state,done)) #remembering previous experiences
        
    def act(self,state):
        # Exploration vs Exploitation
        if np.random.rand()<=self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state) # predict reward value based upon current state
        return np.argmax(act_values[0]) #Left or Right
    
    def train(self,batch_size=32): #method that trains NN with experiences sampled from memory
        minibatch = random.sample(self.memory,batch_size)
        for state,action,reward,next_state,done in minibatch:
            
            if not done: #boolean 
                target = reward + self.gamma*np.amax(self.model.predict(next_state)[0])
            else:
                target = reward
            target_f = self.model.predict(state)
            target_f[0][action] = target
            original_stdout = sys.stdout
            # Redirect stdout to a null device (silencing all output)
            # sys.stdout = open('/dev/null', 'w')  # On Unix-based systems (Linux, macOS)
            # or
            sys.stdout = open('nul', 'w')  # On Windows
            self.model.fit(state,target_f,epochs=1,verbose=0)#single epoch, x =state, y = target_f, loss--> target_f
            sys.stdout = original_stdout
            
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
    
    def load(self,name):
        self.model.load_weights(name)
    def save(self,name):
        self.model.save_weights(name)

#### Training the DQN Agent (Deep Q-Learner)

In [6]:
n_episodes = 500
output_dir = "mountain_car_weights/"

In [7]:
agent = Agent(state_size=2,action_size=3)
done = False
state_size = 2
action_size = 3
batch_size = 32

In [None]:
agent = Agent(state_size, action_size) # initialise agent
done = False
env = gym.wrappers.Monitor(env, "rls_recording", force=True, video_callable = lambda episode: episode%99 == 0)

for e in range(n_episodes):
    state = env.reset()
    init_pos=state[0]
    state = np.reshape(state,[1,state_size])
    for time in range(200):
        env.render()
        action = agent.act(state) #action is 0 or 1
        prev_vel=state[0][1]
        
        next_state,reward,done,other_info = env.step(action) 
        new_vel=next_state[1]
        
        if(np.abs(new_vel)>np.abs(prev_vel)):
            n=1
        else:
            n=-1
        if done:
            if(time<199):
                reward=40
            else:
                reward=-10
        else:
            reward = n
        
        next_state = np.reshape(next_state,[1,state_size])
        agent.remember(state,action,reward,next_state,done)
        state = next_state
        
        if done:
            print("Game Episode :{}/{}, High Score:{},Exploration Rate:{:.2}".format(e,n_episodes,199-time,agent.epsilon))
            break
            
    if len(agent.memory)>batch_size:
        agent.train(batch_size)
    
    if e%50==0:
        agent.save(output_dir+"weights_"+'{:04d}'.format(e)+".hdf5")
        
env.close()                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                

Game Episode :0/1000, High Score:0,Exploration Rate:1.0
Game Episode :1/1000, High Score:0,Exploration Rate:0.99
Game Episode :2/1000, High Score:0,Exploration Rate:0.99
Game Episode :3/1000, High Score:0,Exploration Rate:0.99
Game Episode :4/1000, High Score:0,Exploration Rate:0.98
Game Episode :5/1000, High Score:0,Exploration Rate:0.98
Game Episode :6/1000, High Score:0,Exploration Rate:0.97
Game Episode :7/1000, High Score:0,Exploration Rate:0.97
Game Episode :8/1000, High Score:0,Exploration Rate:0.96
Game Episode :9/1000, High Score:0,Exploration Rate:0.96
Game Episode :10/1000, High Score:0,Exploration Rate:0.95
Game Episode :11/1000, High Score:0,Exploration Rate:0.95
Game Episode :12/1000, High Score:0,Exploration Rate:0.94
Game Episode :13/1000, High Score:0,Exploration Rate:0.94
Game Episode :14/1000, High Score:0,Exploration Rate:0.93
Game Episode :15/1000, High Score:0,Exploration Rate:0.93
Game Episode :16/1000, High Score:0,Exploration Rate:0.92
Game Episode :17/1000, Hi