<a href="https://colab.research.google.com/github/saeedmoazami/Spherical_Mobile_Robot/blob/master/Libra_PyBullet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
pip install pybullet

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

In [0]:
!ls

In [0]:
%cd /content/gdrive/My Drive/PyBullet/Libra_PyBullet_01.SLDASM.zip (Unzipped Files)/

In [0]:
import numpy as np
import pybullet as p
import time
import pybullet_data
from matplotlib import pyplot as plt

class Spherical_Robot:
    
    def __init__(self, target_position, sphere_position = (np.random.rand(1,2)).tolist()[0]+[0]):
        
        physicsClient = p.connect(p.DIRECT)#or p.DIRECT for non-graphical version or GUI
        
        p.setAdditionalSearchPath(pybullet_data.getDataPath()) #optionally
            
        self.initialized = False
        
        self.TORQUE = 1
        
        self.FORCE  = 1
        

        
        self.pendulum_joint_ID = 0
        
        self.slider_joint_id   = 1
        
        self.target_x = target_position[0]
        
        self.target_y = target_position[1]
        

        
        self.distance = self.distance_func(sphere_position[0], sphere_position[1])
        
        self.SphereId, _ = self.reset(sphere_position)
        

        

    def reset(self, sphere_position = (0.05 * np.random.rand(1,2)).tolist()[0]+[0]):
        
        self.done = False
        
        # Control Signals
        self.theta_dot = 0
        
        self.delta = 0
        
        if self.initialized:
            p.resetSimulation()

        p.setGravity(0,0,-10)
        planeId = p.loadURDF("plane.urdf")
        sphereStartPos = sphere_position
        sphereStartOrien = (0.05 * np.random.rand(1,3)).tolist()[0]
        SphereStartOrientation = p.getQuaternionFromEuler(sphereStartOrien)
        SphereId = p.loadURDF("./Libra_PyBullet_URDF_MESHES/Libra_PyBullet_01.SLDASM.urdf",sphereStartPos, SphereStartOrientation)

        p.changeVisualShape(SphereId, -1, rgbaColor = (0.3, 0.3, 1, 0.4))
        p.changeVisualShape(SphereId, 0, rgbaColor = (1, 0, 0, 1))
        p.changeVisualShape(SphereId, 1, rgbaColor = (0, 1, 0, 1))

        self.initialized = True
        
        spherePos, sphereOrn = p.getBasePositionAndOrientation(SphereId)

        
        p.resetDebugVisualizerCamera( cameraDistance=1.5, cameraYaw=30,
                                     cameraPitch=-30, cameraTargetPosition=spherePos)
        
        state = list(spherePos[0:2] + sphereOrn) + [self.target_x, self.target_y, self.theta_dot, self.delta]
#         print(np.reshape(state, (1,10)))
        
        
#         state = np.reshape((np.array([self.target_x, self.target_y] - np.array(spherePos[:2]))), (1,2))
        
#         print(state)
#         print(self.theta_dot)
#         print(self.delta)
        
#         state = np.concatenate((state[0], [self.theta_dot], [self.delta]))
        
        return SphereId, state

    def distance_func(self, sphere_x, sphere_y):
        distance = np.sqrt((sphere_x - self.target_x)**2 + (sphere_y - self.target_y)**2)
        return distance
        
    def reward_function(self, sphere_x, sphere_y):
        
        new_distance = self.distance_func(sphere_x, sphere_y)        
        self.reward = (self.distance - new_distance) * 1000
        self.distance = new_distance
        return self.reward

    def step(self, action, Discrete = True):
      
        th_acc_inc  =  0.01
        del_dot_inc = 0.001

        if not Discrete: # Continuous
            theta_dot = action[0]
            delta     = action[1]


            
        else:

          if action == 0:
                theta_acc = 0
                delta_dot = 0
                
          elif action == 1:
              theta_acc = 0
              delta_dot = del_dot_inc
              
          elif action == 2:
              theta_acc = 0
              delta_dot = - del_dot_inc



          elif action == 3:                
              theta_acc = th_acc_inc
              delta_dot = 0
              
          elif action == 4:                
              theta_acc = th_acc_inc
              delta_dot = del_dot_inc
              
          elif action == 5:                
              theta_acc = th_acc_inc
              delta_dot = -del_dot_inc
          


          elif action == 6:                
              theta_acc = -th_acc_inc
              delta_dot = 0
              
          elif action == 7:                
              theta_acc = -th_acc_inc
              delta_dot = del_dot_inc
              
          elif action == 8:                
              theta_acc = -th_acc_inc
              delta_dot = -del_dot_inc
              
          else:
            
            print("INVALID ACTION")
            
        self.theta_dot += theta_acc
        self.delta += delta_dot
                
                
        p.setJointMotorControl2(self.SphereId, self.pendulum_joint_ID,
                                controlMode=p.VELOCITY_CONTROL, targetVelocity = self.theta_dot, force = self.TORQUE)
        p.setJointMotorControl2(self.SphereId, self.slider_joint_id,
                                controlMode=p.POSITION_CONTROL, targetPosition= self.delta, force = self.FORCE)

        p.stepSimulation()
        # time.sleep(1./240.)
        spherePos, sphereOrn = p.getBasePositionAndOrientation(self.SphereId)

        p.resetDebugVisualizerCamera( cameraDistance=1.5, cameraYaw=30,
                                     cameraPitch=-30, cameraTargetPosition=spherePos)
        
        
        if np.abs(self.delta) > 0.2:
            self.done = True
            reward = -1
            
        else:
            reward = self.reward_function(spherePos[0], spherePos[1])
            
        

        next_state = list(spherePos[0:2] + sphereOrn) + [self.target_x, self.target_y, self.theta_dot, self.delta]
            
#         next_state = np.reshape((np.array([self.target_x, self.target_y] - np.array(spherePos[:2]))), (1,2))
#         next_state = np.concatenate((next_state[0], [self.theta_dot], [self.delta]))
        
        return next_state, reward, self.done, None 
            
        
        

In [0]:
# import os
# os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
# os.environ["CUDA_VISIBLE_DEVICES"] = ""
# os.environ["CUDA_VISIBLE_DEVICES"] = "1"
import time
import random
# import gym
import numpy as np
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import os # for creating directories

In [0]:
# env = gym.make('CartPole-v0') # initialise environment


# state_size = env.observation_space.shape[0]
state_size = 10

# action_size = env.action_space.n
action_size = 9

batch_size = 32

n_episodes = 10000 # n games we want agent to play (default 1001)
total_time_step = 10000

target_position = [5,1]

# Libra = Spherical_Robot(target_position)

In [0]:
output_dir = 'model_output/'

if not os.path.exists(output_dir):
    os.makedirs(output_dir)

env = Spherical_Robot(target_position)

class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=200000) # double-ended queue; acts like list, but elements can be added/removed from either end
        self.gamma = 0.95 # decay or discount rate: enables agent to take into account future actions in addition to the immediate ones, but discounted at this rate
        self.epsilon = 1.0 # exploration rate: how much to act randomly; more initially than later due to epsilon decay
        self.epsilon_decay = 0.9995 # decrease number of random explorations as the agent's performance (hopefully) improves over time
        self.epsilon_min = 0.01 # minimum amount of random exploration permitted
        self.learning_rate = 0.001 # rate at which NN adjusts models parameters via SGD to reduce cost 
        self.alpha = 0.01
        self.model = self._build_model() # private method 
        # self.load(output_dir + "weights.hdf5")
    
    def _build_model(self):
        # neural net to approximate Q-value function:
        print("Building Model ------------------------------------------------------------")
        model = Sequential()
        model.add(Dense(64, input_dim=self.state_size, activation='relu')) # 1st hidden layer; states as input
        model.add(Dense(64, activation='relu')) # 2nd hidden layer
        model.add(Dense(self.action_size, activation='linear')) # 2 actions, so 2 output neurons: 0 and 1 (L/R)
        model.compile(loss='mse',
                      optimizer=Adam(lr=self.learning_rate))
        model.summary()
        return model
    
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done)) # list of previous experiences, enabling re-training later

    def act(self, state):
        if np.random.rand() <= self.epsilon: # if acting randomly, take random action
            return random.randrange(self.action_size)
        act_values = self.model.predict(state) # if not acting randomly, predict reward value based on current state
        return np.argmax(act_values[0]) # pick the action that will give the highest reward (i.e., go left or right?)

    def replay(self, batch_size): # method that trains NN with experiences sampled from memory
        minibatch = random.sample(self.memory, batch_size) # sample a minibatch from memory
        
        for state, action, reward, next_state, done in minibatch: # extract data for each minibatch sample
            target = reward # if done (boolean whether game ended or not, i.e., whether final state or not), then target = reward
            if not done: # if not done, then predict future discounted reward
                target = (reward + self.gamma * np.amax(self.model.predict(next_state)[0]))

            Q_values = self.model.predict(state) # approximately map current state to future discounted reward


            Q_values[0][action] = (Q_values[0][action]) * (1-self.alpha) + (self.alpha * target)


            self.model.fit(state, Q_values, epochs=1, verbose=0) # single epoch of training with x=state, y=target_f; fit decreases loss btwn target_f and y_hat
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def load(self, name):
        print("Model Loaded ----------------------------------------------------------------------------\n")
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)

agent = DQNAgent(state_size, action_size) # initialise agent

done = False
for e in range(n_episodes): # iterate over new episodes of the game
    episode_reward = 0
    # print("Episode:  ", e)
    _, state = env.reset() # reset state at start of each new episode of the game
    
    
    state = np.reshape(state, [1, state_size])
    
    time_step = 0
    for time_step in range(total_time_step):  # time represents a frame of the game; goal is to keep pole upright as long as possible up to range, e.g., 500 or 5000 timesteps
#         env.render()
        action = agent.act(state) # action is either 0 or 1 (move cart left or right); decide on one or other here
        next_state, reward, done, _ = env.step(action) # agent interacts with env, gets feedback; 4 state data points, e.g., pole angle, cart position  
        episode_reward += reward
        # print(time_step)     

        # reward = reward if not done else -10 # reward +1 for each additional frame with pole upright        
        next_state = np.reshape(next_state, [1, state_size])
        agent.remember(state, action, reward, next_state, done) # remember the previous timestep's state, actions, reward, etc.        
        state = next_state # set "current state" for upcoming iteration to the current next state        
        
        if len(agent.memory) > batch_size and time_step % 100 == 0:

        # print("We are here")
          agent.replay(batch_size) # train the agent by replaying the experiences of the episode
        
        
        if done: # episode ends if agent drops pole or we reach timestep 5000
            # print("episode: {}/{}, score: {}, e: {:.2}" # print the episode's score and agent's epsilon
            #       .format(e, n_episodes, time_step, agent.epsilon))
            break # exit loop
    if e % 20 ==0:
      print("Episode: ", e, "           Final time_step : " , time_step, "      Final Distance :", env.distance)
      print("Average reward = ", episode_reward/time_step,"\n")

    if e % 50 == 0:
        agent.save(output_dir + "weights.hdf5")
        print("Model saved at episode: ", e)
        

# saved agents can be loaded with agent.load("./path/filename.hdf5")

In [0]:
p.disconnect()