In [None]:
import random  
import gym  
import numpy as np  
from collections import deque  
from keras.models import Sequential  
from keras.layers import Dense  
from keras.optimizers import Adam
from statistics import mean  
    
ENV_NAME = "CartPole-v1"  
  
GAMMA = 0.95  
LEARNING_RATE = 0.001  
  
MEMORY_SIZE = 1000000  
BATCH_SIZE = 20  
  
EXPLORATION_MAX = 1.0  
EXPLORATION_MIN = 0.01  
EXPLORATION_DECAY = 0.995  

# Solution criteria
CONSECUTIVE_EPISODES_TO_SOLVE = 100  # Minimum number of consecutive episodes before eligible for solve
MEAN_SCORE_TO_SOLVE = 200            # Average score required for solve
  
# Deep Q Network Class  
class DQNSolver:  
  
    def __init__(self, observation_space, action_space):  
        self.exploration_rate = EXPLORATION_MAX  
  
        # Initialize the action space
        self.action_space = action_space
        
        # Create a deque container for the neural net's training memory (experience replay)
        self.memory = deque(maxlen=MEMORY_SIZE)  
  
        # The Neural Network Model
        self.model = Sequential()  
        self.model.add(Dense(24, input_shape=(observation_space,), activation="relu"))  
        self.model.add(Dense(24, activation="relu"))  
        self.model.add(Dense(self.action_space, activation="linear"))  
        self.model.compile(loss="mse", optimizer=Adam(lr=LEARNING_RATE))  
  
    # Adds state, action, reward, next_state, and terminal to the DQN's memory
    def remember(self, state, action, reward, next_state, done):  
        self.memory.append((state, action, reward, next_state, done))  
  
    # Determines whether the agent Explores or Exploits, then chooses an action
    def act(self, state):  
        
        # Explore? (Exloration throttled by exploration_rate)
        if np.random.rand() < self.exploration_rate:  
            return random.randrange(self.action_space)  
        # If not Exploring, then Exploit
        q_values = self.model.predict(state)  
        return np.argmax(q_values[0])  
  
    # Train the model using replay memory
    def experience_replay(self):  
        
        # Check to see if number of entries in the memory container has reached our BATCH_SIZE constant
        if len(self.memory) < BATCH_SIZE:  
            return  
        
        # If len(memory) >= BATCH_SIZE, then take a random sampling from memory of size BATCH_SIZE
        batch = random.sample(self.memory, BATCH_SIZE)  
        
        # Loop through each memory in the batch
        for state, action, reward, state_next, terminal in batch:  
            
            # Assign the reward from the memory to a q_update variable
            q_update = reward  
            
            # If the memory did not end in a terminal state
            if not terminal:  
                
                # Calculate the updated Q value for the state
                q_update = (reward + GAMMA * np.amax(self.model.predict(state_next)[0]))  
                
            # Act and assign the result to the q_values table
            q_values = self.model.predict(state)  
            
            # Update the value in the q_values table for the given action with the updated q value
            q_values[0][action] = q_update  
            
            # Train the model based on the state and the q_values table
            self.model.fit(state, q_values, verbose=0)  
            
        # Adjust the exploration rate based on our EXPLORATION_DECAY constant
        self.exploration_rate *= EXPLORATION_DECAY  
        self.exploration_rate = max(EXPLORATION_MIN, self.exploration_rate)  
  
# Function used to run the DQN in the cartpole environment  
def cartpole():  
    
    # Initialize the environment and the DQN
    env = gym.make(ENV_NAME)      
    observation_space = env.observation_space.shape[0]  
    action_space = env.action_space.n  
    dqn_solver = DQNSolver(observation_space, action_space)  
    episode_counter = 0  # variable to track total number of episodes
    
    # Create a deque container to hold scores with a designated maxlen
    scores = deque(maxlen=CONSECUTIVE_EPISODES_TO_SOLVE)
    
    # Main Loop variable
    is_running = True
    
    # Main Loop
    while is_running:  
        
        # increment the episode_counter
        episode_counter += 1  
        
        # reset the environment
        state = env.reset()  
        
        # Determine the starting state from the observation space
        state = np.reshape(state, [1, observation_space])  
        
        # Initialize step_counter
        step_counter = 0  # variable to track number of steps before failing during the current episode
        
        # Episode Loop
        while True:  
            
            # increment the step counter
            step_counter += 1  
            
            # Render the environment (separate window)
            env.render()  
            
            # Determine next action
            action = dqn_solver.act(state)  
            
            # Act
            state_next, reward, terminal, info = env.step(action)
            
            # Calculate the reward for the action just taken
            reward = reward if not terminal else -reward
            
            # Determine the next state from observation space
            state_next = np.reshape(state_next, [1, observation_space])
            
            # Remember the data related to the step just taken
            dqn_solver.remember(state, action, reward, state_next, terminal)  
            
            # Update the state variable to the next state
            state = state_next  
            
            # Check to see if reached a terminal state for this episode
            if terminal:  
                
                print("Episode: " + str(episode_counter))
                print("Exploration Rate: " + str(dqn_solver.exploration_rate) + ", Episode Score: " + str(step_counter))  
                                
                # Append the score for this episode to the scores deque (score = number of steps)
                scores.append(step_counter)
                
                # Calculate the mean score for all episodes in the scores deque
                mean_score = mean(scores)
                print("Mean score: " + str(mean_score) + "\n")
                
                # Check to see if the agent solved the cartpole problem according to our criteria constants
                if mean_score >= MEAN_SCORE_TO_SOLVE and len(scores) >= CONSECUTIVE_EPISODES_TO_SOLVE:
                    print("CONGRATULATIONS!")
                    print("Solved after " + str(episode_counter) + " episodes.")
                    
                    # End the loop
                    is_running = False
                    
                    # Close the environment
                    env.close()
                
                # Start a new episode
                break  
            
            # Use experience replay to train the model
            dqn_solver.experience_replay()  
            
# Start the DQN agent working through the cartpole problem
cartpole()