In [1]:
%tensorflow_version 1.x
!pip install stable-baselines[mpi]==2.10.0

TensorFlow 1.x selected.
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting stable-baselines[mpi]==2.10.0
  Downloading stable_baselines-2.10.0-py3-none-any.whl (248 kB)
[K     |████████████████████████████████| 248 kB 5.3 MB/s 
Installing collected packages: stable-baselines
  Attempting uninstall: stable-baselines
    Found existing installation: stable-baselines 2.2.1
    Uninstalling stable-baselines-2.2.1:
      Successfully uninstalled stable-baselines-2.2.1
Successfully installed stable-baselines-2.10.2


# Constants

In [2]:
BASE_SPEED = 100 # Km/h
PRECISION = 1 #Km

# Utils - Location

In [3]:
import random
import numpy as np
from geopy.distance import geodesic

# comment the line below to generate different random numbers
random.seed(1)


def generate_random_coordinates():
    point = {
        'x': random.uniform(-180.0, 180.0),
        'y': random.uniform(-90.0, 90.0)
    }
    return point


def calculate_distance(point1, point2):
    p1_as_arr = np.array((point1['x'], point1['y']))
    p2_as_arr = np.array((point2['x'], point2['y']))
    return np.linalg.norm(p1_as_arr - p2_as_arr)


def calculate_geodesic_distance(point1, point2):
    return geodesic((point1['y'],point1['x']), (point2['y'],point2['x'])).kilometers


def calculate_geodesic_movement(point, direction, distance):
    new_point = geodesic(kilometers=distance).destination((point['y'], point['x']), bearing=direction)
    return { 'x': new_point[1], 'y': new_point[0] }


def check_limits(pos_x, pos_y, max_value, min_value=0.0):
    return min_value <= pos_x <= max_value and min_value <= pos_y <= max_value


def update_position(position_object, new_position):
    if not isinstance(position_object, dict):
        raise TypeError

    for coordinate in position_object.keys():
        position_object[coordinate] = new_position[coordinate]

    return position_object


# Controllers - UAV

In [4]:
class UAVController:
    def __init__(self, uav_id, position):
        if not isinstance(position, dict):
            raise TypeError(
                "position must be a dict { 'x': float, 'y': float }")

        self._id = uav_id
        self._position = position

    def get_id(self):
        return self._id

    def get_position(self):
        return self._position
    
    def set_position(self, position):
        self._position = position

# Controllers - Host

In [5]:
class HostController:
    def __init__(self, host_id, position):
        if not isinstance(position, dict):
            raise TypeError(
                "position must be a dict { 'x': float, 'y': float }")

        self._id = host_id
        self._position = position

    def get_id(self):
        return self._id

    def get_position(self):
        return self._position

    def set_position(self, position):
        self._position = position

# Controllers - Simulation

In [6]:
class SimulationController:
    def __init__(self, host_quantity=2, uav_quantity=1, **kw_args):
        self._hosts = []
        self._uavs = []
        self._center_of_mass = None

        if 'hosts' in kw_args:
            self._hosts = kw_args['hosts']
        else:
            for host_index in range(host_quantity):
                position = generate_random_coordinates()
                id = kw_args['hosts_names'][host_index] \
                    if 'hosts_names' in kw_args \
                    else "host_{}".format(host_index)
                new_host = HostController(id, position)
                self._hosts.append(new_host)

        if 'uavs' in kw_args:
            self._uavs = kw_args['uavs']
        else:
            for uav_index in range(uav_quantity):
                position = generate_random_coordinates()
                id = "uav_{}".format(uav_index)
                new_uav = UAVController(id, position)
                self._uavs.append(new_uav)

        self.calculate_center_of_mass()

    def get_hosts(self):
        return self._hosts

    def get_uavs(self):
        return self._uavs

    def get_center_of_mass(self):
        self.calculate_center_of_mass()
        return self._center_of_mass

    def calculate_center_of_mass(self):
        x_sum = 0.0
        y_sum = 0.0
        hosts_quantity = len(self._hosts)

        for host in self._hosts:
            position = host.get_position()
            x_sum += position['x']
            y_sum += position['y']

        self._center_of_mass = {
            'x': x_sum / hosts_quantity,
            'y': y_sum / hosts_quantity
        }

# Environment

In [13]:
import random
import numpy as np
from gym import Env
from gym.spaces import Discrete, Box

# Possible actions:
#     0: Stay stopped
#     1: Move up
#     2: Move right
#     3: Move down
#     4: Move left
ACTIONS = {
    0: 'stay_stopped',
    1: 'move_up',
    2: 'move_right',
    3: 'move_down',
    4: 'move_left',
}

class UAVPlacerEnv(Env):
    def __init__(self):
        self.action_space = Discrete(5)
        boundaries = np.array(
            [
                180.0,
                90.0,
                180.0,
                90.0
            ],
            dtype=np.float32,
        )
        self.observation_space = Box(low=-boundaries, high=boundaries, dtype=np.float32)
        self.speed = BASE_SPEED

        self._start_simulation()
        uav_position = self.simulation.get_uavs()[0].get_position()
        center_of_mass = self.simulation.get_center_of_mass()

        self.state = np.array(
            [
                uav_position['x'],
                uav_position['y'],
                center_of_mass['x'],
                center_of_mass['y'],
            ],
            dtype=np.float32
        )


    def _start_simulation(self) -> None:
        random_coordinates = generate_random_coordinates()
        host1 = HostController("host_1", random_coordinates)
        host2 = HostController("host_2", {'x': random_coordinates['x']+0.08, 'y': random_coordinates['y']+0.08})

        uav_position = {
            'x': random.uniform(host1.get_position()['x'],host2.get_position()['x']),
            'y': random.uniform(host1.get_position()['y'],host2.get_position()['y'])
        }

        uav = UAVController("uav_1", uav_position)
        self.simulation = SimulationController(hosts=[host1,host2], uavs=[uav])


    def _calculate_uav_distance_to_center_of_mass(self) -> float:
        uav = self.simulation.get_uavs()[0]
        center_of_mass = self.simulation.get_center_of_mass()

        return calculate_geodesic_distance(uav.get_position(), center_of_mass)


    def _calculate_reward(self, current_distance, previous_distance):
        if current_distance < previous_distance:
            return 1
        elif current_distance >= previous_distance:
            return -1


    def _move_uav(self, action):
        uav = self.simulation.get_uavs()[0]
        current_position = uav.get_position()
        if ACTIONS.get(action) == 'move_up':
            direction = 0
        elif ACTIONS.get(action) == 'move_right':
            direction = 90
        elif ACTIONS.get(action) == 'move_down':
            direction = 180
        elif ACTIONS.get(action) == 'move_left':
            direction = -90

        if ACTIONS.get(action) != 'stay_stopped':
            new_position = calculate_geodesic_movement(current_position, direction, BASE_SPEED/60.0)
            uav.set_position(new_position)

    def step(self, action):
        previous_distance = self._calculate_uav_distance_to_center_of_mass()

        # Move UAV
        self._move_uav(action)

        # Calculate distance to center of mass
        current_distance = self._calculate_uav_distance_to_center_of_mass()

        # Calculate reward
        reward = self._calculate_reward(current_distance, previous_distance)

        # Check if is done
        if current_distance <= PRECISION:
            done = True
        else:
            done = False

        uav = self.simulation.get_uavs()[0]
        uav_position = uav.get_position()
        center_of_mass = self.simulation.get_center_of_mass()
        self.state = np.array(
            [
                uav_position['x'],
                uav_position['y'],
                center_of_mass['x'],
                center_of_mass['y'],
            ],
            dtype=np.float32
        )
        info = {
            'current_distance': current_distance,
        }

        return self.state, reward, done, info


    def render(self):
        pass


    def reset(self):
        self._start_simulation()
        uav_position = self.simulation.get_uavs()[0].get_position()
        center_of_mass = self.simulation.get_center_of_mass()

        self.state = np.array(
            [
                uav_position['x'],
                uav_position['y'],
                center_of_mass['x'],
                center_of_mass['y'],
            ],
            dtype=np.float32
        )

        return self.state

In [14]:
from stable_baselines.common.env_checker import check_env
env = UAVPlacerEnv()
# If the environment don't follow the interface, an error will be thrown
check_env(env, warn=True)

## Test environment

In [16]:
obs = env.reset()

print(env.observation_space)
print(env.action_space)

n_steps = 20
for step in range(n_steps):
  print("Step {}".format(step + 1))
  action = env.action_space.sample()
  print("Action {}".format(ACTIONS.get(action)))

  obs, reward, done, info = env.step(action)
  print('obs=', obs, 'reward=', reward, 'done=', done, 'info=', info)
  if done:
    print("Goal reached!", "reward=", reward)
    break

Box(-180.0, 180.0, (4,), float32)
Discrete(5)
Step 1
Action move_right
obs= [ 96.41297  -89.562935  94.46083  -89.58091 ] reward= -1 done= False info= {'current_distance': 2.5854604726508623}
Step 2
Action move_up
obs= [ 96.41297 -89.54801  94.46083 -89.58091] reward= -1 done= False info= {'current_distance': 4.030679481491554}
Step 3
Action move_up
obs= [ 96.41297 -89.53309  94.46083 -89.58091] reward= -1 done= False info= {'current_distance': 5.600330682198745}
Step 4
Action move_left
obs= [ 94.58249 -89.53285  94.46083 -89.58091] reward= 1 done= False info= {'current_distance': 5.369015674199385}
Step 5
Action move_up
obs= [ 94.58249 -89.51793  94.46083 -89.58091] reward= -1 done= False info= {'current_distance': 7.035464376945541}
Step 6
Action move_right
obs= [ 96.355446 -89.5177    94.46083  -89.58091 ] reward= -1 done= False info= {'current_distance': 7.253057972372776}
Step 7
Action move_right
obs= [ 98.12755 -89.51746  94.46083 -89.58091] reward= -1 done= False info= {'current

# Trying it with stable-baselines

In [22]:
from stable_baselines import DQN
from stable_baselines.common.evaluation import evaluate_policy
from stable_baselines.common.cmd_util import make_vec_env

# Instantiate the env
env = UAVPlacerEnv()
# wrap it
env = make_vec_env(lambda: env, n_envs=1)

In [38]:
# Train the agent
dqn_model = DQN("MlpPolicy", 
                env, 
                learning_rate=4e-3, 
                buffer_size=10000, 
                exploration_fraction=0.2, 
                exploration_final_eps=0.07, 
                train_freq=16, 
                batch_size=128, 
                double_q=True, 
                prioritized_replay=True, 
                prioritized_replay_alpha=0.6, 
                prioritized_replay_beta0=0.4, 
                prioritized_replay_beta_iters=None, 
                prioritized_replay_eps=1e-06, 
                verbose=1, 
                tensorboard_log=None, 
                seed=1).learn(int(1e6), log_interval=100)

In [39]:
# Test the trained agent
obs = env.reset()
n_steps = 1000
total_reward = 0
for step in range(n_steps):
  action, _ = dqn_model.predict(obs, deterministic=True)
  obs, reward, done, info = env.step(action)
  total_reward += reward
  if step % 100 == 0:
    print("Step {}".format(step + 1))
    print("Action: ", action)
    print('obs=', obs, 'reward=', reward, 'done=', done, 'info=', info)
  if done:
    # Note that the VecEnv resets automatically
    # when a done signal is encountered
    print("Goal reached!", "reward=", total_reward)
    break

Step 1
Action:  [4]
obs= [[177.2902    64.816986 177.35562   64.830376]] reward= [-1.] done= [False] info= [{'current_distance': 3.4464721895103203}]
Step 101
Action:  [4]
obs= [[173.78133   64.816574 177.35562   64.830376]] reward= [-1.] done= [False] info= [{'current_distance': 169.71620513085142}]
Step 201
Action:  [4]
obs= [[170.27249   64.816154 177.35562   64.830376]] reward= [-1.] done= [False] info= [{'current_distance': 336.1853771020537}]
Step 301
Action:  [4]
obs= [[166.7637    64.81574  177.35562   64.830376]] reward= [-1.] done= [False] info= [{'current_distance': 502.39829497780033}]
Step 401
Action:  [4]
obs= [[163.25497   64.81533  177.35562   64.830376]] reward= [-1.] done= [False] info= [{'current_distance': 668.2251668242375}]
Step 501
Action:  [4]
obs= [[159.7463    64.81492  177.35562   64.830376]] reward= [-1.] done= [False] info= [{'current_distance': 833.5372667266917}]
Step 601
Action:  [4]
obs= [[156.23769   64.8145   177.35562   64.830376]] reward= [-1.] done

In [37]:
mean_reward, std_reward = evaluate_policy(dqn_model, dqn_model.get_env(), deterministic=True, n_eval_episodes=1)

print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")

KeyboardInterrupt: ignored