<a href="https://colab.research.google.com/github/naguzmans/opportunistic-atm/blob/main/pso_model_3D.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h2> 3D Particle Swarm Optimization </h2>

In [1]:
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
import numpy as np
from scipy import interpolate
from tqdm import tqdm
import random
import warnings
warnings.filterwarnings('ignore')
%config InlineBackend.figure_format = 'retina'

class Obstacle:
  def __init__(self, x, y, z, r):
    self.x = x
    self.y = y
    self.z = z
    self.r = r

class Particle:
  def __init__(self, origin, destination, handles):
    self.origin = origin
    self.destination = destination
    self.handles = handles

    # Define x array
    self.x, self.y, self.z = self.CreatePath(self.origin, self.destination)

    # Define velocity
    self.vx = np.zeros(handles+2)
    self.vy = np.zeros(handles+2)
    self.vz = np.zeros(handles+2)

    # Calculate Cost
    self.Cost()
    
    # Define Best
    self.Best(self.x, self.y, self.z, self.xs, self.ys, self.zs, self.cost)

  def Cost(self):

    # Create spline
    # ts = np.linspace(self.origin[0], self.destination[0], self.handles+2 )

    if self.destination[0] > self.origin[0]:
      ts = np.linspace(self.origin[0], self.destination[0], self.handles+2 )
    else:
      ts = np.linspace(self.destination[0], self.origin[0], self.handles+2 )

    spline_x = interpolate.CubicSpline(ts, self.x)
    spline_y = interpolate.CubicSpline(ts, self.y)
    spline_z = interpolate.CubicSpline(ts, self.z)

    ts2 = np.linspace(self.origin[0], self.destination[0], 100)
    self.xs = spline_x(ts2)
    self.ys = spline_y(ts2)
    self.zs = spline_z(ts2)
    # self.zs = (self.zs/self.zs.max()) * self.zlim

    dsx = spline_x.derivative(nu=1)
    dsy = spline_y.derivative(nu=1)
    dsz = spline_z.derivative(nu=1)

    self.dxs = dsx(ts2)
    self.dys = dsy(ts2)
    self.dzs = dsz(ts2)
    self.L = np.sum(np.sqrt(self.dxs**2 + self.dys**2 + self.dzs**2))
    
    violation = 0
    for i in range(0, len(obstacles)):
      distance_to_obstacle = np.sqrt((self.xs-obstacles[i].x)**2+(self.ys-obstacles[i].y)**2+(self.zs-obstacles[i].z)**2)
      v = 1-distance_to_obstacle/obstacles[i].r
      for j in range(0,len(v)):
        if v[j] < 0:
          v[j] = 0
      violation += np.mean(v)

    # Z out-of-bounds violation
    if any(i < 0 for i in self.zs):
      violation = 1e10

    # Calculate final cost
    beta = 100
    self.cost = self.L*(1+beta*violation)

  def Best(self, x, y, z, xs, ys, zs, cost):
    self.best_x = x
    self.best_y = y
    self.best_z = z

    self.best_xs = xs
    self.best_ys = ys
    self.best_zs = zs

    self.best_cost = cost

  def CreatePath(self, start, end):
    x = np.random.uniform(start[0], end[0], self.handles+2)
    x[0] = start[0]
    x[-1] = end[0]

    y = np.random.uniform(start[1], end[1], self.handles+2)
    y[0] = start[1]
    y[-1] = end[1]

    # z = np.random.uniform(start[2], end[2], self.handles+2)
    # z[0] = start[2]
    # z[-1] = end[2]
    z = np.linspace(start[2], end[2], self.handles+2)
    
    return x,y,z

class GlobalBest:
  def __init__(self, x, y, z, xs, ys, zs, cost):
    self.x = x
    self.y = y
    self.z = z
    self.xs = xs
    self.ys = ys
    self.zs = zs
    self.cost = cost

# Plotting 3d-Spheres
def ms(x, y, z, radius, resolution=10):
    u, v = np.mgrid[0:2*np.pi:resolution*2j, 0:np.pi:resolution*1j]
    X = radius * np.cos(u)*np.sin(v) + x
    Y = radius * np.sin(u)*np.sin(v) + y
    Z = radius * np.cos(v) + z
    return (X, Y, Z)

# Main
def PSO(origin, destination, geometry, no_particles, handles, iterations, min_limit, max_limit):

  # Create Particle Swarm
  particles = []
  for i in range(no_particles):
    particles.append(Particle(origin, destination, handles))

  # Initialize
  global_best = GlobalBest(0,0,0,0,0,0,1e6)
  for particle in particles:
    if particle.cost < global_best.cost:
      global_best.x = particle.x.copy()
      global_best.y = particle.y.copy()
      global_best.z = particle.z.copy()

      global_best.xs = particle.xs.copy()
      global_best.ys = particle.ys.copy()
      global_best.zs = particle.zs.copy()

      global_best.cost = particle.cost.copy()

  # Particle Swarm Optimization
  canvas_max_x = max_limit[0]
  canvas_min_x = min_limit[0]

  canvas_max_y = max_limit[1]
  canvas_min_y = min_limit[1]

  canvas_max_z = max_limit[2]
  canvas_min_z = min_limit[2]

  iterations = iterations
  w = 1 
  wdamp = .98
  c1 = 1.5
  c2 = 1.5
  alpha = .1
  vel_max_x = alpha*(canvas_max_x - canvas_min_x)
  vel_min_x = -vel_max_x
  vel_max_y = alpha*(canvas_max_y - canvas_min_y)
  vel_min_y = -vel_max_y
  vel_max_z = alpha*(canvas_max_z - canvas_min_z)
  vel_min_z = -vel_max_z

  for i in tqdm(range(0, iterations)):
    
    for particle in particles:

      # X
      particle.vx = w*particle.vx + \
                    c1*np.random.rand(1, handles+2).reshape(handles+2)*(particle.best_x-particle.x) + \
                    c2*np.random.rand(1, handles+2).reshape(handles+2)*(global_best.x-particle.x)
      
      particle.vx = np.maximum(particle.vx.copy(), vel_min_x)
      particle.vx = np.minimum(particle.vx.copy(), vel_max_x)
  
      particle.x = np.add(particle.x, particle.vx, out=particle.x, casting='unsafe')

      # Y
      particle.vy = w*particle.vy + \
                    c1*np.random.rand(1, handles+2).reshape(handles+2)*(particle.best_y-particle.y) + \
                    c2*np.random.rand(1, handles+2).reshape(handles+2)*(global_best.y-particle.y)
      
      particle.vy = np.maximum(particle.vy.copy(), vel_min_y)
      particle.vy = np.minimum(particle.vy.copy(), vel_max_y)

      particle.y = np.add(particle.y, particle.vy, out=particle.y, casting='unsafe')

      # Z
      particle.vz = w*particle.vz + \
                    c1*np.random.rand(1, handles+2).reshape(handles+2)*(particle.best_z-particle.z) + \
                    c2*np.random.rand(1, handles+2).reshape(handles+2)*(global_best.z-particle.z)
      
      particle.vz = np.maximum(particle.vz.copy(), vel_min_z)
      particle.vz = np.minimum(particle.vz.copy(), vel_max_z)

      particle.z = np.add(particle.z, particle.vz, out=particle.z, casting='unsafe')

      # Calculate cost
      particle.Cost()
      if particle.cost < particle.best_cost:
        particle.best_x = particle.x.copy()
        particle.best_y = particle.y.copy()
        particle.best_z = particle.z.copy()

        particle.best_xs = particle.xs.copy()
        particle.best_ys = particle.ys.copy()
        particle.best_zs = particle.zs.copy()

        particle.best_cost = particle.cost.copy()

      if particle.cost < global_best.cost:
        global_best.x = particle.x.copy()
        global_best.y = particle.y.copy()
        global_best.z = particle.z.copy()

        global_best.xs = particle.xs.copy()
        global_best.ys = particle.ys.copy()
        global_best.zs = particle.zs.copy()

        global_best.cost = particle.cost.copy()

    w = w * wdamp

  print(f'\nFinal Global Best Cost: {global_best.cost}')
  return global_best.xs, global_best.ys, global_best.zs, global_best.cost


In [2]:
# Create model obstacles
bounds_x = (-10, 10)
bounds_y = (-10, 10)
bounds_z = (0, 3)

obstacles = [Obstacle(2,2,0,1.5), \
             Obstacle(-2,-2,0,1.5), \
             Obstacle(2,-2,0,1.0), \
             Obstacle(-2,2,0,1.0)]

origins = [(-5,-5,.5), (5,-5,.5)]
destinations = [(5,5,.5), (-5,5,.5)]

data = []
vectors = []
colors = ['red', 'blue', 'green']
for i in range(0, len(origins)):
  x, y, z, cost = PSO(origin=origins[i], destination=destinations[i], geometry=obstacles, no_particles=10, \
                    handles=3, iterations=100, min_limit=(bounds_x[0],bounds_y[0],bounds_z[0]), max_limit=(bounds_x[1],bounds_y[1],bounds_z[1]))

  # Append best line
  data.append(go.Scatter3d(x=x, y=y, z=z, mode='lines', name=f'Corridor {i}', line=dict(width=5, color=colors[i])))

  vectors.append([x,y,z])

# Append 3d-spheres
for j in range(0, len(obstacles)):
    (x_pns_surface, y_pns_surface, z_pns_surface) = ms(obstacles[j].x, obstacles[j].y, obstacles[j].z, obstacles[j].r)
    data.append(go.Surface(x=x_pns_surface, y=y_pns_surface, z=z_pns_surface, showscale=False, opacity=1))

layout = go.Layout(scene=dict(
  aspectmode='manual',
  aspectratio=dict(x=1, y=(bounds_y[1]-bounds_y[0])/(bounds_x[1]-bounds_x[0]), z=(bounds_z[1]-bounds_z[0])/(bounds_x[1]-bounds_x[0])),
  bgcolor='rgba(0,0,0,0)',
  xaxis = dict(range=[bounds_x[0], bounds_x[1]], showgrid=True, gridcolor='rgba(0, 0, 0, .1)', backgroundcolor="rgba(0, 0, 0, 0)", showticklabels=True),
  yaxis = dict(range=[bounds_y[0], bounds_y[1]], showgrid=True, gridcolor='rgba(0, 0, 0, .1)', backgroundcolor="rgba(0, 0, 0, 0)", showticklabels=True),
  zaxis = dict(range=[bounds_z[0], bounds_z[1]], showgrid=True, gridcolor='rgba(0, 0, 0, .1)', backgroundcolor="rgba(0, 0, 0, 0)", showticklabels=True)),
  updatemenus=[dict(type="buttons", buttons=[dict(label="Play", method="animate",args=[None])])]
  )

frames=[go.Frame(
        data=[go.Scatter3d(
            x=[x[k]],
            y=[y[k]],
            z=[z[k]],
            name='Drone',
            mode="markers",
            marker=dict(color="black", size=5))], traces = [3])

        for k in range(len(x))]

fig = go.Figure(data=data, layout=layout)
fig.show()

100%|██████████| 100/100 [00:01<00:00, 72.37it/s]



Final Global Best Cost: 163.45309418598728


100%|██████████| 100/100 [00:01<00:00, 75.52it/s]



Final Global Best Cost: 167.7386129685412


In [144]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy.interpolate import interp1d

class Drone:
  def __init__(self, speed):
    self.speed = speed
  
  def get_track(self, x, y):
    self.x = x
    self.y = y

    length = 0
    for i in range(0, len(x)):
      try:
        dx = x[i+1]-x[i]
        dy = y[i+1]-y[i]
        dh = np.sqrt(dx**2+dy**2)
        length += dh
      except:
        break
    length = round(length,1)

    time = length/self.speed
    time_vector = np.arange(0, round(time,2)*10 + 1)/10
    xnew = np.linspace(x[0],x[-1],len(time_vector))
    interp = interp1d(x, y)
    yinterp = interp(xnew)
    
    df = pd.DataFrame({'time':time_vector, 'x':xnew, 'y':yinterp})
    df.x = df.x.round(2)
    df.y = df.y.round(2)
    df['pos'] = df[['x','y']].apply(tuple, axis=1)

    return df.drop(columns=['x', 'y'])

  def update_speed(self, speed):
    self.speed = speed
    return self.get_track(self.x, self.y)

class CDR():
  def __init__(self):
    self.master = []
    self.drone_index = 0

  def add_drone(self, drone, track):
    if len(self.master) == 0:
      self.master = drone.get_track(track[0], track[1])
      self.master.columns = ['time', f'drone_{self.drone_index}']
      self.drone_index += 1
    else:
      # print('hello')
      # print(drone.get_track(track[0], track[1]))
      self.master = pd.concat([self.master.set_index('time'), drone.get_track(track[0], track[1]).set_index('time')], axis=1).reset_index()
      self.master.columns = [*self.master.columns[:-1], f'drone_{self.drone_index}']
      self.drone_index += 1

x = np.linspace(0,3,100)
y = np.linspace(0,4,100)

cdr = CDR()
cdr.add_drone(drone=Drone(speed=4), track=(x,y))
cdr.add_drone(drone=Drone(speed=5), track=(x,y))
cdr.master


Unnamed: 0,time,drone_0,drone_1
0,0.0,"(0.0, 0.0)","(0.0, 0.0)"
1,0.1,"(0.23, 0.31)","(0.3, 0.4)"
2,0.2,"(0.46, 0.62)","(0.6, 0.8)"
3,0.3,"(0.69, 0.92)","(0.9, 1.2)"
4,0.4,"(0.92, 1.23)","(1.2, 1.6)"
5,0.5,"(1.15, 1.54)","(1.5, 2.0)"
6,0.6,"(1.38, 1.85)","(1.8, 2.4)"
7,0.7,"(1.62, 2.15)","(2.1, 2.8)"
8,0.8,"(1.85, 2.46)","(2.4, 3.2)"
9,0.9,"(2.08, 2.77)","(2.7, 3.6)"
