In [1]:
import numpy as np
import pandas as pd
import random

import warnings
warnings.filterwarnings("ignore")

In [2]:
path = '.../Datasets/'

<h2 style="color:green; font-family:Calibri"><b>Genetic Algorithms</b></h2>

<h3 style="color:purple; font-family:Calibri">3.2 Example: Maximizing a Simple Quadratic Function</h3>

In [3]:
class GAExample:

    def __init__(self, N=4, r_c=0.85, r_m=0.1, G=20):
        self.N = N  # population size
        self.r_c = r_c  # crossover rate
        self.r_m = r_m  # mutation rate
        self.G = G  # number of generations
        self.solutions = None

    def generate_candidate_solutions(self):
        self.solutions = np.array(random.sample(range(-10, 11), self.N))
        return self.solutions

    def fitness(self, x):
        return -(x - 2)**2 + 4

    def evaluation(self):
        return np.array([self.fitness(x) for x in self.solutions])
    
    def selection(self):
        return np.array([num/np.sum(self.evaluation()) for num in self.evaluation()])

    def crossover(self, parent1, parent2):
        alpha1 = random.random()
        # perform single-point crossover
        child1 = alpha1*parent1 + (1-alpha1)*parent2
        alpha2 = random.random()
        while alpha1 == alpha2:
            alpha2 = random.random()
        child2 = alpha2*parent1 + (1-alpha2)*parent2
        return child1, child2
        
    def mutation(self, child):
        new_child = random.random() + child
        return new_child

    def generate_optimal_solutions(self):
        
        for n in range(1, self.G + 1):
            results = self.evaluation()
            probs = self.selection()
            chance_for_crossover = random.random()

            if chance_for_crossover < self.r_c:
                top_2_highest_indices = np.argsort(probs)[-2:]
                top_individuals = self.solutions[top_2_highest_indices]
                child1, child2 = self.crossover(top_individuals[0], top_individuals[1])

                chance_for_mutation = random.random()
                if chance_for_mutation < self.r_m:
                    if random.sample(range(0, 2), 1) == 0:
                        child1 = self.mutation(child1)
                    else:
                        child2 = self.mutation(child2)

                top_2_min_indices = np.argsort(results)[:2]
                self.solutions[top_2_min_indices] = child1, child2

        final_results = self.evaluation()
        optimal_index = np.argsort(final_results)[-1:]
        optimal_solution = self.solutions[optimal_index]

        return self.solutions, optimal_solution

In [4]:
random.seed(501)
ga_example = GAExample(G=3, r_m=0)
candidate_sols = ga_example.generate_candidate_solutions()
print(f"Candidate solutions: {candidate_sols}\n")
evolutionary_sols, optimal_sol = ga_example.generate_optimal_solutions()
print(f"Evolutionary solution: {evolutionary_sols}")
print(f"Optimal solution: {optimal_sol}")

Candidate solutions: [10 -1  1 -3]

Evolutionary solution: [1 2 1 3]
Optimal solution: [2]


<h3 style="color:purple; font-family:Calibri">GA Application -  Vehicle Routing Problem</h3>

URL: https://terpconnect.umd.edu/~bgolden/vrp_data.htm</br>
File: Benchmark Problems for the Consistent VRP > Data Set 1 (ConVRP_small_problems.zip)

In [5]:
def read_vrp(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()

    for i, line in enumerate(lines):
        line = line.strip()
        if line.startswith('CAPACITY'):
            capacity_parts = line.split()
            capacity = capacity_parts[1]
        elif line.startswith('DISTANCE'):
            distance_parts = line.split()
            distance = distance_parts[1]
        elif line == 'NODE_COORD_SECTION':
            node_cord_start_line = i + 1
        elif line == 'DEMAND_SECTION':
            demand_start_line = i + 1
        elif line == 'SVC_TIME_SECTION':
            svc_time_start_line = i + 1
        elif line == 'DEPOT_SECTION':
            depot_start_line = i + 1
            break

    node_id = []
    x_coord = []
    y_coord = []
    for l in range(node_cord_start_line, demand_start_line-1):
        cleaned_line = lines[l].strip()
        parts = cleaned_line.split()
        node_id.append(parts[0])
        x_coord.append(parts[1])
        y_coord.append(parts[2])

    node_coord_df = pd.DataFrame({
        'node_id': node_id,
        'x_coord': x_coord,
        'y_coord': y_coord
    })

    node_coord_df['node_id'] = node_coord_df['node_id'].astype(int)
    node_coord_df['x_coord'] = node_coord_df['x_coord'].astype(float)
    node_coord_df['y_coord'] = node_coord_df['y_coord'].astype(float)
    
    node_id = []
    day1 = []
    day2 = []
    day3 = []
    for l in range(demand_start_line, svc_time_start_line-1):
        cleaned_line = lines[l].strip()
        parts = cleaned_line.split()
        node_id.append(parts[0])
        day1.append(parts[1])
        day2.append(parts[2])
        day3.append(parts[3])
    demand_df = pd.DataFrame({
        'node_id': node_id,
        'day1': day1,
        'day2': day2,
        'day3': day3
    })

    demand_df['node_id'] = demand_df['node_id'].astype(int)
    demand_df['day1'] = demand_df['day1'].astype(int)
    demand_df['day2'] = demand_df['day2'].astype(int)
    demand_df['day3'] = demand_df['day3'].astype(int)

    cleaned_depot = lines[depot_start_line].strip()
    depot_loc = cleaned_depot.split()
    depot_x = depot_loc[0]
    depot_y = depot_loc[1]
    depot_df = pd.DataFrame({'node_id': [0],
                             'depot_x': [depot_x],
                             'depot_y': [depot_y]})

    depot_df['depot_x'] = depot_df['depot_x'].astype(float)
    depot_df['depot_y'] = depot_df['depot_y'].astype(float)
    
    return node_coord_df, demand_df, depot_df, capacity, distance

In [6]:
node_coord_df, demand_df, depot_df, capacity, distance = read_vrp(path+'ConVRP_small_problems/convrp_10_test_1.vrp')
print(f"Capacity: {capacity}")
print(f"Distance: {distance}")
node_coord_df.head()

Capacity: 15
Distance: 35


Unnamed: 0,node_id,x_coord,y_coord
0,1,8.18,9.781
1,2,0.864,3.187
2,3,3.66,2.82
3,4,0.866,2.47
4,5,3.045,8.571


In [7]:
demand_df.head()

Unnamed: 0,node_id,day1,day2,day3
0,1,3,3,1
1,2,-1,1,-1
2,3,1,-1,2
3,4,1,3,-1
4,5,3,3,-1


In [8]:
depot_df

Unnamed: 0,node_id,depot_x,depot_y
0,0,0.0,0.0


In [17]:
class GA_VRP:

    def __init__(self, node_coord_df, demand_df, depot_df, day, capacity=15, distance=35, N=5, r_c=0.85, r_m=0.1, G=20):
        self.node_coord = np.array(node_coord_df)
        self.demand = np.array(demand_df)
        self.depot = np.array(depot_df)
        self.all_nodes_coord = np.vstack([self.depot, self.node_coord])
        self.day = day
        self.capacity = capacity
        self.distance = distance
        self.N = N  # population size
        self.r_c = r_c  # crossover rate
        self.r_m = r_m  # mutation rate
        self.G = G  # number of generations
        self.routes = None
        
    def create_route(self):
        random.seed(101)
        svc_indices = []
        for i in range(len(self.demand)):
            if self.demand[i, self.day] != -1:  # no services requested
                svc_indices.append(i)
        node_ids = self.demand[:, 0]
        route = np.random.permutation(node_ids[svc_indices])
        return route.tolist()
        
    def generate_candidate_routes(self):
        self.routes = [self.create_route() for _ in range(self.N)]
        for n in range(self.N):
            self.routes[n] = np.insert(self.routes[n], 0, 0)
            self.routes[n] = np.append(self.routes[n], 0)
            self.routes[n] = self.routes[n].tolist()
        return self.routes

    def calculate_euclidean_dist(self, node1_x, node1_y, node2_x, node2_y):
        return np.sqrt((node1_x - node2_x)**2 + (node1_y - node2_y)**2)
        
    def euclidean_dist_matrix(self):
        dist_matrix = np.zeros((self.all_nodes_coord.shape[0], self.all_nodes_coord.shape[0]))
        for i in range(self.all_nodes_coord.shape[0]):
            for j in range(self.all_nodes_coord.shape[0]):
                dist_matrix[i, j] = self.calculate_euclidean_dist(self.all_nodes_coord[i, 1], self.all_nodes_coord[i, 2], self.all_nodes_coord[j, 1], self.all_nodes_coord[j, 2])
        return dist_matrix

    def calculate_total_distance(self, route):
        total_distance = 0
        dist_matrix = self.euclidean_dist_matrix()
        for i in range(len(route)-1):
            current_node = route[i]
            next_node = route[i+1]
            total_distance += dist_matrix[current_node, next_node]
        return total_distance

    def calculate_capacity(self, route):
        total_capacity = 0
        for i in range(1, len(route)-1): 
            current_node = route[i]
            for n in range(len(self.demand)):
                if self.demand[n, 0]==current_node:
                    svc_unit = self.demand[n, self.day]
            total_capacity = total_capacity + svc_unit
        return total_capacity

    def is_distance_valid(self, route):
        total_distance = self.calculate_total_distance(route)
        if total_distance > self.distance:
            return False
        else:
            return True

    def is_capacity_valid(self, route):
        total_capacity = self.calculate_capacity(route)
        if total_capacity > self.capacity:
            return False
        else:
            True

    def repair_route_distance(self, route):
        indices = []
        new_total_dist = 0
        for i in range(0, len(route)-1):
            new_total_capacity = self.calculate_total_distance(route[:i+1])
            if new_total_capacity <= self.distance:
                new_total_capacity = 0
                continue
            else:
                indices.append(i)
                new_total_capacity = 0   
        sub_routes = []
        start_idx = 0
        
        for idx in indices:
            sub_route = route[start_idx:idx]
            sub_routes.append(sub_route)
            start_idx = idx
        sub_route = route[start_idx:]
        sub_routes.append(sub_route)

        if len(sub_routes) == 1:
            sub_routes[0] = route[:-2]
            sub_routes.append(route[-2:])
            
        for i, sub_route in enumerate(sub_routes):
            if sub_route[0] != 0:
                sub_route.insert(0, 0)
                sub_routes[i] = sub_route
            if sub_route[-1] != 0:
                sub_route.append(0)
                sub_routes[i] = sub_route

        return sub_routes
            
    def repair_route_capacity(self, route):
        new_total_capacity = 0
        indices = []
        for i in range(1, len(route)-1):
            new_total_capacity = self.calculate_capacity(route[:i+1])
            if new_total_capacity <= self.distance:
                continue
            else:
                indices.append(i)
                new_total_capacity = 0
            
        sub_routes = []
        start_idx = 0
        for idx in indices:
            sub_route = route[start_idx:idx]
            sub_routes.append(sub_route)
            start_idx = idx
        sub_route = route[start_idx:]
        sub_routes.append(sub_route)
        for i, sub_route in enumerate(sub_routes):
            sub_route = sub_route.tolist()
            if sub_route[0] != 0:
                sub_route.insert(0, 0)
                sub_routes[i] = sub_route
            if sub_route[-1] != 0:
                sub_route.append(0)
                sub_routes[i] = sub_route
        return sub_routes

    def adjust_route(self, route, svc_node_ids):
        tracking_tbl = {node: 0 for node in svc_node_ids}
        for sub_route in route:
            for i in range(len(sub_route)):
                if sub_route[i] != 0:
                    tracking_tbl[sub_route[i]] += 1          

        # Check if any nodes are duplicate
        duplicate_nodes = []
        for node in svc_node_ids:
            if tracking_tbl[node] > 1:
                duplicate_nodes.append(node)

        # Check if any nodes are missing
        missing_nodes = []
        for node in svc_node_ids:
            if tracking_tbl[node] < 1:
                missing_nodes.append(node)

        if len(duplicate_nodes) != 0:
            for sub_route in route:
                for node in sub_route:
                    if node in duplicate_nodes:
                        sub_route.remove(node)
                        duplicate_nodes.remove(node)
                    if len(duplicate_nodes) == 0:
                        break
                break
                
        if len(missing_nodes) != 0:
            for sub_route in route:
                for node in missing_nodes:
                    if node not in sub_route:
                        sub_route.append(node)
                        missing_nodes.remove(node)
                    if len(missing_nodes) == 0:
                        break
                break

        for i, sub_route in enumerate(route):
            sub_route = [x for x in sub_route if x != 0]
            route[i] = sub_route
            if sub_route[0] != 0:
                sub_route.insert(0, 0)
            if sub_route[-1] != 0:
                sub_route.append(0)
        return route
            
    def selection(self, route):
        # Normalize fitness scores (Euclidean distance)
       return 1 / (1 + self.calculate_total_distance(route))  # plus 1 to avoid dividing by 0

    def crossover(self, route1, route2, svc_node_ids, case):
        if case == 1:
            sub_route1 = [route1[0]] + route2[1:]
            sub_route2 = [route2[0]] + route1[1:]
            sub_route1 = self.adjust_route(sub_route1, svc_node_ids)
            sub_route2 = self.adjust_route(sub_route2, svc_node_ids)
        elif case == 2:
            split_idx = len(route1)
            sub_route1 = [route1[0]] + route2[split_idx:]
            sub_route2 = route2[:split_idx] + route1[1:]
            sub_route1 = self.adjust_route(sub_route1, svc_node_ids)
            sub_route2 = self.adjust_route(sub_route2, svc_node_ids)
        elif case == 3:
            split_idx = len(route2)
            sub_route1 = route1[:split_idx] + route2[1:]
            sub_route2 = route2[0] + route1[split_idx:]
            sub_route1 = self.adjust_route(sub_route1, svc_node_ids)
            sub_route2 = self.adjust_route(sub_route2, svc_node_ids)
        else:
            split_idx = random.sample(range(len(parent1)), 1)
            sub_route1 = route1[:split_idx] + route2[split_idx:]
            sub_route2 = route2[:split_idx] + route1[split_idx:]
            sub_route1 = self.adjust_route(sub_route1, svc_node_ids)
            sub_route2 = self.adjust_route(sub_route2, svc_node_ids)
        return sub_route1, sub_route2
        
    def mutation(self, route):
        # swap mutation
        i, j = random.sample(range(1, len(route) - 1), 2)  # Avoid depot at index 0 and end
        route[i], route[j] = route[j], route[i]
        return route

    def generate_optimal_route(self):
        
        svc_node_ids = self.create_route()
        self.routes = self.generate_candidate_routes()
        for g in range(1, self.G + 1):

            probs = []
            violated_route_idx = []
            for i, route in enumerate(candidate_routes):
                if self.is_distance_valid(route) == False:
                    route = self.repair_route_distance(route)
                    prob = 0
                    for sub_route in route:
                        prob += self.selection(sub_route)
                    probs.append(prob)
                    self.routes[i] = route
                    violated_route_idx.append(i)
                elif self.is_capacity_valid(route) == False:
                    route = self.repair_route_capacity(route)
                    prob = 0
                    for sub_route in route:
                        prob += self.selection(sub_route)
                    probs.append(prob)
                    self.routes[i] = route
                    violated_route_idx.append(i)
                else:
                    prob = self.selection(route)
                    probs.append(prob)

            chance_for_crossover = random.random()
            if chance_for_crossover < self.r_c:
                top_routes = []
                top_2_highest_indices = np.argsort(probs)[-2:].tolist()
                for i in top_2_highest_indices:
                    top_routes.append(self.routes[i])
                
                if top_2_highest_indices[0] in violated_route_idx:
                    if top_2_highest_indices[1] in violated_route_idx:
                        sub_route1, sub_route2 = self.crossover(top_routes[0], top_routes[1], svc_node_ids, case=1)
                    else:
                        sub_route1, sub_route2 = self.crossover(top_routes[0], top_routes[1], svc_node_ids, case=2)
                elif top_2_highest_indices[0] not in violated_route_idx:
                    if top_2_highest_indices[1] in violated_route_idx:
                        sub_route1, sub_route2 = self.crossover(top_routes[0], top_routes[1], svc_node_ids, case=3)
                    else:
                        sub_route1, sub_route2 = self.crossover(top_routes[0], top_routes[1], svc_node_ids, case=4)

                chance_for_mutation = random.random()
                if chance_for_mutation < self.r_m:
                    if random.sample(range(0, 2), 1) == 0:
                        for i, r in enumerate(sub_route1):
                            if len(r) > 3:
                                sub_route1[i] = self.mutation(r)
                                break
                    else:
                        for i, r in enumerate(sub_route2):
                            if len(r) > 3:
                                sub_route2[i] = self.mutation(r)
                                break                            

            dist_list = []
            for i, route in enumerate(self.routes):
                route_dist = 0
                if i in violated_route_idx:
                    for sub_route in route:
                        route_dist += self.calculate_total_distance(sub_route)
                dist_list.append(route_dist)
            
            sorted_dist_list = sorted(dist_list, reverse=True)
            highest_value = sorted_dist_list[0]
            second_highest_value = sorted_dist_list[1]

            indices_of_highest = [i for i, x in enumerate(dist_list) if x == highest_value]
            indices_of_second_highest = [i for i, x in enumerate(dist_list) if x == second_highest_value]
            self.routes[indices_of_highest[0]] = sub_route1
            self.routes[indices_of_second_highest[0]] = sub_route2
        
        final_dist = []
        for i, route in enumerate(self.routes):
            route_dist = 0
            if any(isinstance(item, list) for item in route) == True:
                for sub_route in route:
                    route_dist += self.calculate_total_distance(sub_route)
            else:
                print(f"route: {route}")
                route_dist = self.calculate_total_distance(route)
            final_dist.append(route_dist)
            

        min_dist = min(final_dist)
        optimal_index = [i for i in range(len(final_dist)) if final_dist[i]==min_dist]
        optimal_route = self.routes[optimal_index[0]]

        return self.routes, optimal_route, min_dist

In [18]:
random.seed(909)

for day in range(1, 4):
    print(f"\nDay #{day}")
    vrp = GA_VRP(node_coord_df, demand_df, depot_df, day=day)
    candidate_routes = vrp.generate_candidate_routes()
    print(f"Candidate routes: {candidate_routes}")
    evolutionary_routes, optimal_route, distance = vrp.generate_optimal_route()
    print(f"Evolutionary routes: {evolutionary_routes}")
    print(f"Optimal route: {optimal_route}")
    print(f"Distance by optimal route: {distance}")


Day #1
Candidate routes: [[0, 1, 5, 4, 8, 10, 3, 0], [0, 5, 10, 3, 1, 4, 8, 0], [0, 1, 3, 8, 5, 4, 10, 0], [0, 1, 8, 3, 5, 10, 4, 0], [0, 1, 5, 4, 3, 8, 10, 0]]
Evolutionary routes: [[[0, 1, 5, 4, 8, 10, 0], [0, 3, 0]], [[0, 5, 3, 1, 8, 0], [0, 10, 0], [0, 4, 0]], [[0, 1, 3, 8, 5, 4, 0], [0, 10, 0]], [[0, 1, 3, 5, 10, 0], [0, 4, 0], [0, 8, 0]], [[0, 1, 5, 4, 3, 8, 0], [0, 10, 0]]]
Optimal route: [[0, 1, 5, 4, 3, 8, 0], [0, 10, 0]]
Distance by optimal route: 42.87143023531864

Day #2
Candidate routes: [[0, 8, 6, 2, 4, 9, 1, 5, 7, 0], [0, 5, 6, 9, 1, 7, 2, 4, 8, 0], [0, 4, 1, 2, 5, 6, 8, 9, 7, 0], [0, 9, 5, 8, 7, 4, 6, 1, 2, 0], [0, 9, 6, 1, 4, 7, 2, 8, 5, 0]]
Evolutionary routes: [[[0, 1, 5, 6, 9, 0], [0, 7, 0], [0, 2, 0], [0, 4, 0], [0, 8, 0]], [[0, 5, 6, 1, 0, 4], [0, 7, 0], [0, 2, 0], [0, 4, 0], [0, 8, 0]], [[0, 1, 5, 6, 0, 9], [0, 8, 0], [0, 9, 0], [0, 7, 0]], [[0, 5, 6, 1, 4, 0], [0, 8, 0], [0, 9, 0], [0, 7, 0]], [[0, 9, 6, 1, 4, 7, 2, 8, 0], [0, 5, 0]]]
Optimal route: [[0, 9, 6, 