In [1]:
import numpy as np
import random
random.seed(42)
import gym
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras import optimizers

Using TensorFlow backend.


## TRAIN Pendulum-v0 with class DQNAgent

In [None]:
"""
class DQNAgent():
    def __init__(self):
        
        ##############################
        self.epsilon_min = 0.001
        self.epsilon_decay = 0.995
        ##############################
        
        self.memory = deque(maxlen=5000)
        self.nS = 20
        self.nA = 10
        self.state_size = 3
        self.learning_rate = 0.001
        self.epsilon = 1.0
        self.discount = 0.9
        self.model = self._build_model()
        
        #action
        self.action_space=np.linspace(-2.0, 2.0, self.nA)
        
        #observation
        self.cos_space=np.linspace(-1.0, 1.0, self.nS)
        self.sin_space=np.linspace(-1.0, 1.0, self.nS)
        self.theta_dt_space=np.linspace(-8.0, 8.0, self.nS)
        
    def _build_model(self):
        # Neural Net for Deep-Q learning Model
        model = Sequential()
        model.add(Dense(20, input_dim=self.state_size, activation='relu'))
        model.add(Dense(20, activation='relu'))
        model.add(Dense(self.nA, activation='linear'))
        model.compile(loss='mse', optimizer=optimizers.RMSprop(lr=self.learning_rate))
        return model
    
    def remember(self, observation, action, reward, next_observation, done):
        observation = self.discretize_obs(observation)
        observation = np.reshape(observation, [1, 3])
        next_observation = self.discretize_obs(next_observation)
        next_observation = np.reshape(next_observation, [1, 3])
        
        self.memory.append((observation, action, reward, next_observation, done))
        
    def discretize_obs(self, observation):
        cos=observation[0]
        sin=observation[1]
        theta_dt=observation[2]
        
        diff_cos=abs(cos - self.cos_space)
        idx_cos=np.argmin(diff_cos)
        diff_sin=abs(sin - self.sin_space)
        idx_sin=np.argmin(diff_sin)
        diff_theta_dt=abs(theta_dt - self.theta_dt_space)
        idx_theta_dt=np.argmin(diff_theta_dt)
        
        observation = [self.cos_space[idx_cos], self.sin_space[idx_sin], self.theta_dt_space[idx_theta_dt]]
        observation = np.reshape(observation, [1, self.state_size])
        return observation
    
    def discretize_action(self, action):
        diff_action=abs(action - self.action_space)
        idx_action=np.argmin(diff_action)
        
        action = self.action_space[idx_action]
        return action
        
    def act(self, observation):
        observation = self.discretize_obs(observation)
        
        if np.random.rand() <= self.epsilon:
            action_idx = random.randrange(self.nA)
        else:
            q_values = self.model.predict(observation)
            action_idx = np.argmax(q_values[0])
        
        action = self.action_space[action_idx]
        return action_idx, [action]
    
    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for observation, action_idx, reward, next_observation, done in minibatch:
            q_target = reward
            if not done:
                q_target = reward + self.discount * np.max(self.model.predict(next_observation)[0])
            target_f = self.model.predict(observation)
            target_f[0][action_idx] = q_target
            self.model.fit(observation, target_f, epochs=1, verbose=0)
        if (self.epsilon > self.epsilon_min) and (len(self.memory) >= 3000):
            self.epsilon *= self.epsilon_decay
            
            
            
            
if __name__ == "__main__":
    env = gym.make('Pendulum-v0')
    agent = DQNAgent()
    episodes=3000
    for e in range(episodes):
        observation = env.reset()
        total_reward=0
        for t in range(200):
            #env.render()
            action_idx, action = agent.act(observation)
            next_observation, reward, done, _ = env.step(action)
            agent.remember(observation, action_idx, reward, next_observation, done)
            observation = next_observation
            total_reward+=reward
            if done:
                print("episode: {}/{}, average_reward: {}".format(e, episodes, total_reward/(1+t)))
                break
        # train the agent with the experience of the episode
        agent.replay(32)
"""

In [None]:
"""
if __name__ == "__main__":
    env = gym.make('Pendulum-v0')
    agent = DQNAgent()
    episodes=3000
    for e in range(episodes):
        observation = env.reset()
        total_reward=0
        for t in range(200):
            #env.render()
            action_idx, action = agent.act(observation)
            next_observation, reward, done, _ = env.step(action)
            agent.remember(observation, action_idx, reward, next_observation, done)
            observation = next_observation
            total_reward+=reward
            if done:
                print("episode: {}/{}, average_reward: {}".format(e, episodes, total_reward/(1+t)))
                break
        # train the agent with the experience of the episode
        agent.replay(32)
"""

save weights of the model trained

In [1]:
#agent.model.save_weights('my_model_w.h5')

## Test Pendulum-v0 with class TestAgent

In [2]:
class TestAgent:
    def __init__(self):
        self.nS=20
        self.nA=10
        self.state_size=3
        
        #action space
        self.action_space=np.linspace(-2.0, 2.0, self.nA)
        
        #obs space
        self.cos_space=np.linspace(-1.0, 1.0, self.nS)
        self.sin_space=np.linspace(-1.0, 1.0, self.nS)
        self.theta_dt_space=np.linspace(-8.0, 8.0, self.nS)
        
        self.model = self._build_model()
    
    def _build_model(self):
        # Neural Net for Deep-Q learning Model
        model = Sequential()
        model.add(Dense(20, input_dim=self.state_size, activation='relu'))
        model.add(Dense(20, activation='relu'))
        model.add(Dense(self.nA, activation='linear'))
        model.compile(loss='mse', optimizer=optimizers.RMSprop(lr=0.001))
        model.load_weights('my_model_w.h5')
        return model
        
    def discretize_obs(self, observation):
        cos=observation[0]
        sin=observation[1]
        theta_dt=observation[2]
        
        diff_cos=abs(cos - self.cos_space)
        idx_cos=np.argmin(diff_cos)
        diff_sin=abs(sin - self.sin_space)
        idx_sin=np.argmin(diff_sin)
        diff_theta_dt=abs(theta_dt - self.theta_dt_space)
        idx_theta_dt=np.argmin(diff_theta_dt)
        
        observation = [self.cos_space[idx_cos], self.sin_space[idx_sin], self.theta_dt_space[idx_theta_dt]]
        observation = np.reshape(observation, [1, self.state_size])
        return observation
        
    def act(self, observation):
        observation = self.discretize_obs(observation)
        
        if np.random.rand() <= 0:
            action_idx = random.randrange(self.nA)
        else:
            q_values = self.model.predict(observation)
            action_idx = np.argmax(q_values[0])
        
        action = self.action_space[action_idx]
        return action_idx, [action]

    def remember(self, observation, action, reward, next_observation, done):
        observation = self.discretize_obs(observation)
        observation = np.reshape(observation, [1, 3])
        next_observation = self.discretize_obs(next_observation)
        next_observation = np.reshape(next_observation, [1, 3])
    
    def reward(self, observation, action, reward):
        pass

In [None]:
if __name__ == "__main__":
    env = gym.make('Pendulum-v0')
    agent = TestAgent()
    episodes=100
    for e in range(episodes):
        observation = env.reset()
        total_reward=0
        for t in range(200):
            env.render()
            action_idx, action = agent.act(observation)
            next_observation, reward, done, _ = env.step(action)
            agent.remember(observation, action_idx, reward, next_observation, done)
            observation = next_observation
            total_reward+=reward
            if done:
                print("episode: {}/{}, average_reward: {}".format(e, episodes, total_reward/(1+t)))

episode: 0/3000, average_reward: -1.7068161107244313
episode: 1/3000, average_reward: -4.5756133232007326
episode: 2/3000, average_reward: -1.2839261924396488
episode: 3/3000, average_reward: -3.7926754963085823
episode: 4/3000, average_reward: -1.9076334487040842
episode: 5/3000, average_reward: -4.323204020180501
episode: 6/3000, average_reward: -0.017111194909851198
episode: 7/3000, average_reward: -1.8848021854873236
episode: 8/3000, average_reward: -1.9419585013504521
episode: 9/3000, average_reward: -3.7150795129862355
episode: 10/3000, average_reward: -0.607937451314952
episode: 11/3000, average_reward: -4.576593728588722
episode: 12/3000, average_reward: -4.546716226457789
episode: 13/3000, average_reward: -0.017763053311767234
episode: 14/3000, average_reward: -4.372702988849354
episode: 15/3000, average_reward: -5.03159522845906
episode: 16/3000, average_reward: -0.016222116056472318
episode: 17/3000, average_reward: -1.2609177272553214
episode: 18/3000, average_reward: -0.64