pip install gym==0.17.3

In [1]:
import random
import gym
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
from tensorflow import convert_to_tensor
from IPython.display import clear_output

EPOCHS = 1000  # try a maximum 1000 epochs
THRESHOLD = 50  # as long as the agent get 50 points, it wins

|State|Action|Reward|Done|
|---|---|---|---|
|$(x, v, \theta, \omega)$| {0, 1} | {0, 1} | {0, 1} |
|$x$: position | 0: push to the left | 0: game ended | 0: game continues |
|$v$: velocity | 1: push to the right | 1: game continues | 1: game ended |
|$\theta$: angle |  | | |
|$\omega$: angular velocity | | | |

In this game, the goal is to keep the pole upright as long as possible. A reward of +1 for every step taken

In [2]:
class DQN():
    def __init__(self, env_string, batch_size=64):
        self.memory = deque(maxlen=100000)  # deque is similar to a list, but provide faster append and pop operation.
        self.env = gym.make(env_string)  # create the simulation environment
        input_size = self.env.observation_space.shape[0]  # the state of the agent (position, velocity, angle, angular velocity)
        action_size = self.env.action_space.n  # how many possible actions can the agent take, in this example, it is 2.
        self.batch_size = batch_size  # batch size of training data
        self.gamma = 1.0  # gamma is the discount factor
        # epsilon determine the ratio of exploration, the higher the epsilon, the more exploration the agent will do
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        
        alpha=0.01  # alpha is learning rate
        alpha_decay=0.01
        
        # Initialize Deep Q network model (the network is to predict the cumulative value given a certain state)
        self.model = Sequential()
        self.model.add(Dense(24, input_dim=input_size, activation='tanh'))
        self.model.add(Dense(48, activation='tanh'))
        self.model.add(Dense(action_size, activation='linear'))
        self.model.compile(loss='mse', optimizer=Adam(learning_rate=alpha, decay=alpha_decay))
    

    # Store past experience (S,A,R,S') into the memory, which will be served as training data to the neural network
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    

    # choose random action to explore the environment, or exploit the action with the highest value
    def choose_action(self, state, epsilon):
        if np.random.random() <= epsilon:
            return self.env.action_space.sample()
        else:
            return np.argmax(self.model.predict(state))
    

    # Preprocess: reshape the state into 2D array
    def preprocess_state(self, state):
        return np.reshape(state, [1, 4])
    
    
    # Replay: randomly select a batch of training data (S,A,R,S') from the agent's previous interaction 
    # with the environment and this can solve the oscilation or divergence of network weights problem.
    def replay(self, batch_size):
        x_batch, y_batch = [], []
        minibatch = random.sample(self.memory, min(len(self.memory), batch_size))
        for state, action, reward, next_state, done in minibatch:
            y_target = self.model.predict(state)  # y_target is the predicted Q value for each action
            # y_target[0][action] = reward if done else reward + self.gamma * np.max(self.model.predict(next_state)[0])
            if done:
                y_target[0][action] = reward
            else:
                y_target[0][action] = reward + self.gamma * np.max(self.model.predict(next_state)[0])
            x_batch.append(state[0])
            y_batch.append(y_target[0])
        
        self.model.fit(convert_to_tensor(x_batch), convert_to_tensor(y_batch), batch_size=len(x_batch), verbose=0)

        # epsilon = max(self.epsilon_min, self.epsilon_decay*epsilon) # decrease epsilon
       

    def train(self):
        scores = deque(maxlen=10)  # scores will only store the score of last 10 episode 
        avg_scores = []
        
        for e in range(1, EPOCHS+1):  # episode start from 1, end at 1000
            state = self.env.reset()
            state = self.preprocess_state(state)
            done = False
            i = 0
            while not done:  # when the pole falls down, the current epoch ends and a new epoch will start
                self.env.render()
                action = self.choose_action(state,self.epsilon)
                next_state, reward, done, _ = self.env.step(action)
                next_state = self.preprocess_state(next_state)
                self.remember(state, action, reward, next_state, done)
                state = next_state
                self.epsilon = max(self.epsilon_min, self.epsilon_decay*self.epsilon) # decrease epsilon
                i += 1
            
            scores.append(i)
            mean_score = round(np.mean(scores), 1)
            avg_scores.append(mean_score)
            if mean_score >= THRESHOLD:  # when the mean score of last 10 episodes is higher than the target threshold, the game is solved.
                print('Solved after {} trials ✔'.format(e))
                return avg_scores
            elif e % 10 == 0:  # if game is not solved yet, we will print the mean score after every 10 epochs
                print('[Episode {}] - Mean survival time over last 10 episodes was {} ticks.'.format(e, mean_score))

            self.replay(self.batch_size)
        
        print('Did not solve after {} episodes 😞'.format(e))
        return avg_scores

In [3]:
env_string = 'CartPole-v1'
agent = DQN(env_string)
scores = agent.train()













KeyboardInterrupt: 

In [None]:
# plot the learning process
plt.plot(scores)
plt.xlabel('episodes')
plt.ylabel('score')
plt.show()