In [None]:
import gym
import numpy as np
from gym import spaces
import numpy as np 
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam 
import random

# Task 1 : 
Add a third action to the possible actions where the agent does not do anything, not pushing, not pulling.

- Creation of the CustomCartPole class that inherites from the gym environment
- Method overrinding to modify the environment

In [None]:
class CustomCartPole(gym.Env):
    def __init__(self):
        self.env = gym.make('CartPole-v1')
        
        self.action_space = spaces.Discrete(3) # Add a third action
        self.observation_space = self.env.observation_space
        self.reward_range = self.env.reward_range
        self.metadata = self.env.metadata 
        
        self.last_obs = np.asarray(self.env.state)
        self.last_reward = 0.0
        self.last_term = False
        self.last_trun = False
        self.last_info = {}
        
    def step(self, action): # modifying the step method 
        
        # If action is 2, do nothing
        if action == 2:
            
            self.last_reward = 0.0
            self.last_term = False
            self.last_trun = False
            self.last_info = {}
            
        else:
            # Use the default action space (0=move left, 1=move right)
            self.last_obs, self.last_reward, self.last_term, self.last_trun, self.last_info = self.env.step(action)
        
        return self.last_obs, self.last_reward, self.last_term, self.last_trun, self.last_info 
    
    def reset(self):
        return self.env.reset(seed=1)

# Task 2:
Find a deep RL solution for your new environment

- I chose to use Deep Q-learning (Instead of using a Q-table, we use a Neural Network that takes a state and approximates the Q-values for each action based on that state )

In [None]:
class DQLAgent(): 
    
    def __init__(self, env):
        # parameters and hyperparameters
        
        # this part is for neural network or build_model()
        self.state_size = env.observation_space.shape[0] # this variable contain number of states (for the input layer)
        self.action_size = env.action_space.n # this cariable contains the number of action (for the output layer)
        # this part is for replay()
        self.gamma = 0.95
        self.learning_rate = 0.001
        
        # this part is for adaptiveEGreedy()
        self.epsilon = 1 # initial exploration rate
        self.epsilon_decay = 0.99
        self.epsilon_min = 0.01
        
        self.memory = deque(maxlen = 10_000) # a list with 10000 memory, if it becomes full first inputs will be deleted
        
        self.model = self.build_model()
    
    def build_model(self):
        # neural network for deep Q-learning
        model = Sequential()
        model.add(Dense(24, input_dim = self.state_size, activation = 'relu')) # first hidden layer
        model.add(Dense(24, activation='relu')) # second hidden layer
        model.add(Dense(self.action_size, activation = 'linear')) # output layer        
        model.compile(loss = 'mse', optimizer = Adam(lr = self.learning_rate))
        return model
    
    def remember(self, state, action, reward, next_state, termination):
        # storage
        self.memory.append((state, action, reward, next_state, termination))
    
    def act(self, state):
        # acting, exploit or explore
        if random.uniform(0,1) <= self.epsilon:
            return env.action_space.sample()
        else:
            act_values = self.model.predict(state)
            return np.argmax(act_values[0])
            
    
    def replay(self, batch_size):
        # training
        
        if len(self.memory) < batch_size:
            return # memory is still not full
        
        minibatch = random.sample(self.memory, batch_size) # take 15 (batch_size) random samples from memory
        for state, action, reward, next_state, termination in minibatch:
            if termination: # if the game is over, I dont have next state, I just have reward 
                target = reward
            else:
                target = reward + self.gamma * np.amax(self.model.predict(next_state)[0]) 
                # target = R(s,a) + gamma * max Q`(s`,a`)
                # target (max Q` value) is output of Neural Network which takes s` as an input 
                # amax(): flatten the lists (make them 1 list) and take max value
            train_target = self.model.predict(state) # s --> NN --> Q(s,a)=train_target
            train_target[0][action] = target
            self.model.fit(state, train_target,epochs=1,verbose = 0) 
    
    def adaptiveEGreedy(self):
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
            

if __name__ == "__main__":
    
    # initialize custom environment and agent
    env = CustomCartPole()
    
    agent = DQLAgent(env)

    batch_size = 15
    episodes = 450
    for e in range(episodes):
        
        # initialize environment
        state = env.reset()
        state = state[0]
        state = np.reshape(state, [1,4])
        
        total_reward = 0 # this variable will contain the total reward of each episode 
        x = 0 # Truncation: the game ends after 500 steps
        while True:
            
            # act
            action = agent.act(state)
            
            # step
            next_state, reward, termination, _,_ = env.step(action)
            next_state = np.reshape(next_state, [1,4])
            
            # add the reward of the step to the total
            total_reward = total_reward+ reward
            
            # +1 every step and ends the episode when = 500
            x=x+1
            
            # remember / storage
            agent.remember(state, action, total_reward, next_state, termination)
            
            # update state
            state = next_state
            
            # replay
            agent.replay(batch_size)
            
            # adjust epsilon
            agent.adaptiveEGreedy()
            
            
            
            if termination or x>499:
                print('Episode: {}, Reward: {}'.format(e, total_reward))
                break