In [None]:
import pandas as pd
import numpy as np
import folium
import requests
from math import radians
from tqdm.notebook import tqdm
import json
from sklearn.preprocessing import StandardScaler

<h1>Particle Swarm optimization application</h3>

<h2>Reading dataset</h2>
For the purpose of the experiment, the results are only shown for a particular cluster.

In [None]:
df = pd.read_csv('./Olist_data/clusters_new_features.csv', index_col= 0)
df.head(5)

In [None]:
c1 = df[df.CLUSTER_hdbscan == 86]

<h2>Plotting cluster on a map</h2>

In [None]:
mapf = folium.Map(
    location= [c1.geolocation_lat.mean(),c1.geolocation_lng.mean()],
    zoom_start = 11.5,
    tiles= 'OpenStreetMap',
    height= 650
)

circles= c1.apply(
    lambda row: folium.CircleMarker(
        location= [row.geolocation_lat, row.geolocation_lng],
        radius= 1,
        popup= ""+str(row.geolocation_lat)+", "+str(row.geolocation_lng)+"\n"+str(row.CLUSTER_hdbscan),
        color= 'darkcyan',
        fill= True,
        fill_color= 'darkcyan',
        fill_opacity=0.5
    ).add_to(mapf),
    axis= 1
)

rect= folium.Rectangle(
    bounds= [(c1.geolocation_lat.min(),c1.geolocation_lng.min()),(c1.geolocation_lat.max(), c1.geolocation_lng.max())]
).add_to(mapf)

cluster= folium.CircleMarker(
    location= [c1.centroid_lat.values[0],c1.centroid_lng.values[0]],
    radius= 5,
    color= 'tomato',
    fill= True,
    fill_color= 'tomato',
    fill_opacity=0.5
).add_to(mapf)


mapf

<h2>Feature engineering functions</h2>
Functions to calculate Haversine distance and retrieve road distance and road duration using Google Maps API

In [None]:
maps_api_key = 'ENTER YOUR API KEY'
endpoint = 'https://maps.googleapis.com/maps/api/directions/json?'

def get_distance_duration(origin, destination, mode = 'driving'):
    origin_str = f'{origin[0]},{origin[1]}'
    destination_str = f'{destination[0]},{destination[1]}'
    request_url = f'{endpoint}origin={origin_str}&destination={destination_str}&mode={mode}&key={maps_api_key}'
    response = requests.get(request_url)
    data = response.json()
    
    if data['status'] == 'OK':
        road_distance_meters = data['routes'][0]['legs'][0]['distance']['value']
        road_duration_seconds = data['routes'][0]['legs'][0]['duration']['value']

        return road_distance_meters/1000, road_duration_seconds/3600
        
    else:
        print('Error: Unable to calculate road distance and duration.')
        return None

def haversine(latlon1, latlon2):
    lat1, lon1 = latlon1
    lat2, lon2 = latlon2
    R = 6371000  #radius of Earth in meters
    phi_1 = radians(lat1)
    phi_2 = radians(lat2)

    delta_phi = radians(lat2 - lat1)
    delta_lambda = radians(lon2 - lon1)

    a = (np.sin(delta_phi / 2) ** 2 +
         np.cos(phi_1) * np.cos(phi_2) * np.sin(delta_lambda / 2) ** 2)

    c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))

    meters = R * c  
    return meters/1000 #output distance in km

<h2>Particle Swarm optimizer class</h2>
Functions wrapped in a class to apply the optimizer to custom dataset.

In [None]:
class ParticleSwarmOptimizer():
    PARAM_BOUNDS = (0, 10)

    def __init__(self, lat_bounds, long_bounds, max_iter= 10, num_particles= 10, inertia= 0.5, cognitive_coef= 1.5, social_coef= 1.5):
        self.LAT_BOUNDS = lat_bounds #(c1.geolocation_lat.min(), c1.geolocation_lat.max())
        self.LON_BOUNDS = long_bounds #(c1.geolocation_lng.min(), c1.geolocation_lng.max())
        self.MAX_ITERATIONS = max_iter
        self.NUM_PARTICLES = num_particles
        self.W = inertia  # Inertia weight
        self.C1 = cognitive_coef  # Cognitive coefficient
        self.C2 = social_coef  # Social coefficient
        self.global_path = []
        self.score_path = []
    
    def initialize_particles(self):
        particle_positions = np.random.uniform([self.LAT_BOUNDS[0], self.LON_BOUNDS[0]] + [ParticleSwarmOptimizer.PARAM_BOUNDS[0]] * 3,
                                               [self.LAT_BOUNDS[1], self.LON_BOUNDS[1]] + [ParticleSwarmOptimizer.PARAM_BOUNDS[1]] * 3,
                                               (self.NUM_PARTICLES, 5))
        particle_velocities = np.random.uniform(-1, 1, particle_positions.shape)
        return particle_positions, particle_velocities

    def update_velocity(self,velocity, personal_best, global_best, position):
        r1, r2 = np.random.rand(), np.random.rand()
        new_velocity = self.W * velocity + self.C1 * r1 * (personal_best - position) + self.C2 * r2 * (global_best - position)
        return new_velocity
    
    def update_position(self, position, velocity):
        new_position = position + velocity
        new_position[0] = np.clip(new_position[0], self.LAT_BOUNDS[0], self.LAT_BOUNDS[1])  # Latitude
        new_position[1] = np.clip(new_position[1], self.LON_BOUNDS[0], self.LON_BOUNDS[1])  # Longitude
        # new_position[2:5] = np.clip(new_position[2:5], PARAM_BOUNDS[0], PARAM_BOUNDS[1])  # Alpha, Beta, Gamma
        return new_position
    
    def update_features(self, particle, customer_locations):
        dist = []
        dur = []
        hav = []
        origin = (particle[0],particle[1])
    
        for loc in customer_locations:
            hav.append(haversine(origin, tuple(loc)))
            data = get_distance_duration(origin, tuple(loc))
            
            if data:
                dist.append(data[0])
                dur.append(data[1])
    
        return pd.Series(hav), pd.Series(dist), pd.Series(dur)
    
    def cost_function(self, position, haversine_distances, road_distances, road_durations, lambda_reg=0.01, epsilon=1e-6):
        latitude, longitude, alpha, beta, gamma = position
    
        cost = (1 / len(haversine_distances)) * sum(
            alpha * haversine_distances +
            beta * road_distances +
            gamma * road_durations
        ) + lambda_reg * (alpha**2 + beta**2 + gamma**2)
    
        log_penalty = -np.sum(np.log(np.array([alpha, beta, gamma]) + epsilon))
        cost = cost + log_penalty
    
        return cost

    def fit_train(self, X, locs):
        scaler = StandardScaler()
        vals = scaler.fit_transform(np.c_[X.geo_distance, X.road_distance, X.road_duration])
        initial_haversine_distances, initial_road_distances, initial_road_durations = vals[:,0], vals[:,1], vals[:,2]
        
        # Initialize particles and their velocities
        self.particles, self.velocities = self.initialize_particles()
        self.personal_best = self.particles.copy()
        
        
        # Initialize the cost for each particle (using the initial distances and durations)
        self.personal_best_scores = np.array([self.cost_function(
            p, 
            initial_haversine_distances, 
            initial_road_distances, 
            initial_road_durations
        ) for p in self.personal_best])
        self.global_best = self.personal_best[np.argmin(self.personal_best_scores)]
        self.global_best_score = min(self.personal_best_scores)
        self.global_path.append(self.global_best)
        self.score_path.append(self.global_best_score)
        
        for its in tqdm(range(self.MAX_ITERATIONS)):
            print(f'Iteration no: {its}')
            for i in tqdm(range(self.NUM_PARTICLES)):
                self.velocities[i] = self.update_velocity(self.velocities[i], self.personal_best[i], self.global_best, self.particles[i])
                self.particles[i] = self.update_position(self.particles[i], self.velocities[i])
        
                # Update the distances and durations for the current particle position
                updated_haversine_distances, updated_road_distances, updated_road_durations = self.update_features(self.particles[i], locs)
                vals = scaler.transform(np.c_[updated_haversine_distances, updated_road_distances, updated_road_durations])
                updated_haversine_distances, updated_road_distances, updated_road_durations = vals[:,0], vals[:,1], vals[:,2]
                
                # Calculate the current cost with updated distances and durations
                current_cost = self.cost_function(self.particles[i], updated_haversine_distances, updated_road_distances, updated_road_durations)
        
                if current_cost < self.personal_best_scores[i]:
                    self.personal_best[i] = self.particles[i]
                    self.personal_best_scores[i] = current_cost
        
                    if current_cost < self.global_best_score:
                        self.global_best = self.particles[i]
                        self.global_best_score = current_cost
                        self.global_path.append(self.global_best)
                        self.score_path.append(self.global_best_score)


<h2>Cost and performance vs multiple iterations</h2>

In [None]:
params = np.arange(2,25,2)
best_loc = []
best_score = []
path = []
for iter in tqdm(params):
    pso = ParticleSwarmOptimizer(
        lat_bounds= (c1.geolocation_lat.min(), c1.geolocation_lat.max()), 
        long_bounds= (c1.geolocation_lng.min(), c1.geolocation_lng.max()),
        max_iter= iter,
        num_particles= 5,
    )
    pso.fit_train(X= c1, locs= np.c_[np.array(c1.geolocation_lat), np.array(c1.geolocation_lng)])
    best_loc.append(pso.global_best) 
    best_score.append(pso.global_best_score) 
    path.append((pso.global_path, pso.score_path))

In [None]:
coords = [[positions[0],positions[1]] for positions in path[10][0]]
coords.insert(0, [c1.centroid_lat.values[0],c1.centroid_lng.values[0]])

<h2>Plotting new warehouse location on map</h2>

In [None]:
mapf = folium.Map(
    location= [c1.geolocation_lat.mean(),c1.geolocation_lng.mean()],
    zoom_start = 11.5,
    tiles= 'OpenStreetMap',
    height= 650
)

circles= c1.apply(
    lambda row: folium.CircleMarker(
        location= [row.geolocation_lat, row.geolocation_lng],
        radius= 1,
        popup= ""+str(row.geolocation_lat)+", "+str(row.geolocation_lng)+"\n"+str(row.CLUSTER_hdbscan),
        color= 'darkcyan',
        fill= True,
        fill_color= 'darkcyan',
        fill_opacity=0.5
    ).add_to(mapf),
    axis= 1
)

rect= folium.Rectangle(
    bounds= [(c1.geolocation_lat.min(),c1.geolocation_lng.min()),(c1.geolocation_lat.max(), c1.geolocation_lng.max())]
).add_to(mapf)

cluster1= folium.CircleMarker(
    location= [c1.centroid_lat.values[0],c1.centroid_lng.values[0]],
    radius= 5,
    color= 'tomato',
    fill= True,
    fill_color= 'tomato',
    fill_opacity=0.5
).add_to(mapf)

cluster2= folium.CircleMarker(
    location= [-16.70603564, -49.28418465],
    radius= 5,
    color= 'magenta',
    fill= True,
    fill_color= 'magenta',
    fill_opacity=0.5
).add_to(mapf)

mapf

<h2>Plotting path to new warehouse location on map</h2>

In [None]:
mapf = folium.Map(
    location= [c1.geolocation_lat.mean(),c1.geolocation_lng.mean()],
    zoom_start = 11.5,
    tiles= 'OpenStreetMap',
    height= 650
)

circles= c1.apply(
    lambda row: folium.CircleMarker(
        location= [row.geolocation_lat, row.geolocation_lng],
        radius= 1,
        popup= ""+str(row.geolocation_lat)+", "+str(row.geolocation_lng)+"\n"+str(row.CLUSTER_hdbscan),
        color= 'darkcyan',
        fill= True,
        fill_color= 'darkcyan',
        fill_opacity=0.5
    ).add_to(mapf),
    axis= 1
)

rect= folium.Rectangle(
    bounds= [(c1.geolocation_lat.min(),c1.geolocation_lng.min()),(c1.geolocation_lat.max(), c1.geolocation_lng.max())]
).add_to(mapf)

cluster1= folium.CircleMarker(
    location= [c1.centroid_lat.values[0],c1.centroid_lng.values[0]],
    radius= 5,
    color= 'tomato',
    fill= True,
    fill_color= 'tomato',
    fill_opacity=0.5
).add_to(mapf)

cluster2= folium.CircleMarker(
    location= [-16.706036, -49.284185],
    radius= 5,
    color= 'magenta',
    fill= True,
    fill_color= 'magenta',
    fill_opacity=0.5
).add_to(mapf)

poltLines = folium.PolyLine(
    locations= coords,
    color= 'darkolivegreen'
).add_to(mapf)

poltPoints = [folium.CircleMarker(
    location= coord,
    radius= 3,
    color= 'tomato',
    fill= True,
    fill_color= 'tomato',
    fill_opacity=0.5
).add_to(mapf) for coord in coords]

mapf