#Control of drone in 1-D using Reinforcement Learning

### Import Libraries or Framework selection

In [None]:
# Stable Baselines only supports tensorflow 1.x for now
%tensorflow_version 1.x
!pip install stable-baselines[mpi]==2.10.0
!pip install pybullet

### Drone Envioronment

In [8]:
import numpy as np
import random
import gym
from gym import spaces
import pybullet as p


class Drone1DEnv(gym.Env):
    metadata = {'render.modes':['human']}


    def __init__(self, drone):
        super(Drone1DEnv, self).__init__()

        self.observation_space = spaces.Box(low=-1, high=1,
                                            shape=(2,), 
                                            dtype=np.float32)

        self.action_space = spaces.Box(low = -1, high = 1,
                                       shape = (1,), 
                                       dtype=np.float32)

        self.drone = drone
        self.z_des = 8.0      # m
        self.z_dot_des = 0.0  # m/s
        self.mass = 1.5       # kg
        self.gravity = 9.81   # m/s^2
        self.obs_max = 5.0


    # current state--> current position, current velocity in z-direction
    def state(self):
        drone_pos, _ = p.getBasePositionAndOrientation(self.drone)
        drone_vel, _ = p.getBaseVelocity(self.drone)

        # discretization and error representation of state
        state = self.abs_to_error_state(drone_pos[2], drone_vel[2])
        return state


    # reward
    def reward(self, action):
        e_z, e_z_dot = self.state()
        reward = 0.0
        if self.done():
            if (abs(e_z)>=1.5 or abs(e_z_dot)>=1):
                reward = -5.0
                print('...outbound conditions...', e_z, e_z_dot)
            else:
                reward = 100.0
                print('----desired point achieved----')
        else:            
            reward = -(10*abs(e_z) + 0.5*abs(e_z_dot) + 0.3*abs(action))

        return float(reward)
    
    
    # whether goal is achieved or not.
    def done(self):
        e_z, e_z_dot = self.state()
        '''
        done=1; episodes terminates when:
          1. if e_z >= 1.5: drone is 1.5*5 m or more apart from desired pos.
          2. if e_z_dot >= 1: drone velocity is >=5 m/s {pysical constraints}.
          3. if e_z<=0.01/5.0 and e_z_dot<=0.01/5: desired condition is achieved.
        '''
        if (abs(e_z)>=1.5 or abs(e_z_dot)>=1) or\
            (abs(e_z)<=0.01/self.obs_max\
             and abs(e_z_dot)<=0.01/self.obs_max):
            return True
        return False


    #info
    def info(self):
        return {}


    # step
    def step(self, action):
        # action must be a float.
        action_ = (action+1)*self.mass*self.gravity
        p.applyExternalForce(objectUniqueId=self.drone, linkIndex=-1,
                         forceObj=[0, 0 ,action_], posObj=[0,0,0], 
                         flags=p.LINK_FRAME)
        p.stepSimulation()
        state = self.state()
        reward = self.reward(action)
        done = self.done()
        info = self.info()
        return state, reward, done, info


    # reset the environment
    def reset(self):
        # initializing quadcopter with random z_position and z_velocity
        droneStartPos, droneStartOrn, droneStartLinVel, droneStartAngVel\
             = self.random_state_generator()
        p.resetBasePositionAndOrientation(self.drone, droneStartPos,
                                          droneStartOrn)
        p.resetBaseVelocity(self.drone, droneStartLinVel, 
                            droneStartAngVel)
        # print("\n[--------z_des: %f,    z_init: %f,     v_init: %f ---------]\n\n"
        #       %(self.z_des, droneStartPos[2], droneStartLinVel[2]))

        # return state
        state  = self.abs_to_error_state(droneStartPos[2], droneStartLinVel[2])
        return state


    def random_state_generator(self):
        # initialize drone's position between 3 and 13 m.
        z_init = random.uniform(3.0,13.0) 
        # initialized with velocity in between -1 and 1 m/s.
        z_dot_init = random.uniform(-1,1)
        
        StartPos = [0,0,z_init] 
        StartOrn = p.getQuaternionFromEuler([0,0,0])
        StartLinVel = [0,0,z_dot_init]
        StartAngVel = [0,0,0]
        return StartPos, StartOrn, StartLinVel, StartAngVel


    def abs_to_error_state(self, z, z_dot):
        e_z = (z - self.z_des) / self.obs_max
        e_z_dot = (z_dot - self.z_dot_des) / self.obs_max

        return np.array([e_z, e_z_dot])


    def render(self, mode='human'):
        pass

### Integration with pybullet simulation

In [6]:
import pybullet as p
import pybullet_data
import tensorflow as tf


def init_simulation(render = False):

    gpus = tf.config.experimental.list_physical_devices('GPU')
    if gpus:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print('optimized...')

    if render:
        physicsClient = p.connect(p.GUI)
    else:
        physicsClient = p.connect(p.DIRECT)
        
    p.setAdditionalSearchPath(pybullet_data.getDataPath())
    p.setGravity(0,0,-9.81)
    p.setTimeStep(0.01)

    '------------------------------------'
    # drone
    drone = p.loadURDF('/content/drive/MyDrive/drone_URDF/drone.urdf')

    # marker at desired point
    sphereVisualId = p.createVisualShape(shapeType=p.GEOM_SPHERE,
                                        radius = 0.05,
                                        rgbaColor= [1, 0, 0, 1])
    marker = p.createMultiBody(baseMass=0.0, baseCollisionShapeIndex=-1,
                    baseVisualShapeIndex=sphereVisualId, basePosition=[0, 0, 8.0],
                    useMaximalCoordinates=False)
    '-------------------------------------'

    return drone

def end_simulation():
    p.disconnect()

### Ensure our Custom Enviornment matches OpenAI gym interface

In [9]:
# ensure custom environment matches gym env interface 
from stable_baselines.common.env_checker import check_env

drone = init_simulation()
env = Drone1DEnv(drone)
print('obs_space',env.observation_space)
print('action_spc: ', env.action_space)

for i in range(10):
    print(env.action_space.sample())

check_env(env, warn=True)
end_simulation()

optimized...
obs_space Box(-1.0, 1.0, (2,), float32)
action_spc:  Box(-1.0, 1.0, (1,), float32)
[0.1728102]
[-0.74861497]
[-0.9111984]
[0.41781712]
[0.40568385]
[0.84027106]
[0.2439733]
[0.98645836]
[-0.85671103]
[-0.2155102]


### Training of DDPG model using Stable Baselines

In [None]:
# train a DDPG algorithm based model using Stable Baselines library from open AI.
from stable_baselines.ddpg.policies import MlpPolicy
from stable_baselines.common.noise import NormalActionNoise, OrnsteinUhlenbeckActionNoise
from stable_baselines import DDPG

drone = init_simulation()
env = Drone1DEnv(drone)
param_noise = None
action_noise = OrnsteinUhlenbeckActionNoise(mean=np.zeros(1), sigma=float(0.5) * np.ones(1))

model = DDPG(MlpPolicy, env, verbose=1, param_noise=param_noise,
             action_noise=action_noise)

for i in range(1, 51):
  save_path = "/content/drive/MyDrive/drone1D/drone" + str(i) + ".zip"
  if i==1:
    model.learn(total_timesteps=10000)
    # at every 10000 time steps, we will save our model
    model.save(save_path)
  else:
    del model
    model = DDPG.load(prev_path, env)
    model.learn(total_timesteps=10000)
    model.save(save_path)
  prev_path = save_path

print('done')

end_simulation()

#### Checking which model is working the best with manually chosen games

In [None]:
# from pybulletSim import init_simulation, end_simulation
# from drone1D_env import Drone1DEnv
from stable_baselines import DDPG
import time
import matplotlib.pyplot as plt
import pandas as pd

drone = init_simulation(render = False)
env = Drone1DEnv(drone)


def run_agent(model):
    obs = env.reset()
    print('initial state: ', obs)
    t = 0
    for i in range(500):
        action = model.predict(obs)
        obs, reward, done, _ = env.step(action[0])
        # time.sleep(0.01)
        t += 0.01
        if done:
            if reward>90.0:
                print('***********',t)
            else:
                print('***********', 5)     # evaluation metric
                t = 5
            return t
    return t

t1 = []
game = []
for i in range(1,51):
    filepath = '/content/drive/MyDrive/drone1D/drone'+ str(i) + '.zip'
    model = DDPG.load(filepath)
    t = run_agent(model=model)
    t1.append(t)
    game.append(i)


d = {'game': game, 'time': t1}
df = pd.DataFrame(d).to_csv('/content/drive/MyDrive/drone1D/game6.csv')


end_simulation()

### Run our Trained Agent on random games

In [None]:
# run our best trained agent on random games
from stable_baselines import DDPG

drone = init_simulation(render = False)
env = Drone1DEnv(drone)
model = DDPG.load('/content/drive/MyDrive/drone1D/ddpg_drone2.4D.zip')    # path to the best model


obs = env.reset()
print('initial state: ', obs)

for _ in range(10000):
    action = model.predict(obs)
    obs, rew, done, info = env.step(action[0])

    if done:
        env.reset()

end_simulation()