# Genetic Algorithm for Vehicle Routing Optimization

## Overview
This notebook implements a custom genetic algorithm for solving large-scale Vehicle Routing Problems (VRP). The algorithm combines classical genetic operators with local search optimization to efficiently route crews across 800+ job sites in Seattle.

## Key Features
- **Hybrid Approach**: Combines genetic algorithms with local search (2-opt, 3-opt)
- **Performance Optimization**: Critical functions implemented in Rust for speed
- **Scalable Architecture**: Handles large problem instances with parallel processing
- **Real-world Application**: Designed for tree trimming crew routing optimization on real Seattle dataset

## 1. Environment Setup and Dependencies

In [22]:
import numpy as np
from typing import List, Sequence, Union, Tuple
import random
import pandas as pd
import folium
import logging

In [2]:
df = pd.read_csv('coordinates.csv')
coords = tuple(zip(df['0'], df['1']))
matrix = np.load('distance_matrix.npy')

## 2. Problem Definition and Fitness Function

In [3]:
def calculate_fitness(route: Sequence[int], distance_matrix: np.ndarray) -> float:
    total_distance: float = 0.0
    num_points_in_route: int = len(route)

    for i in range(num_points_in_route):
        from_point = route[i]
        # Next point, wraps around to the start for the last leg
        # route is just a random sequence of ints generated by the initial population, and mutated later
        to_point = route[(i + 1) % num_points_in_route]

        distance = distance_matrix[from_point][to_point]

        # Handle cases where a route segment is impossible (if your matrix uses inf)
        if distance == float('inf'):
            return float('inf') # This route is infinitely bad (invalid)
        total_distance += distance

    # For minimization problems, often fitness is inversely related to cost (distance)
    # or you can simply use the distance itself and aim to minimize it.
    # If your GA framework maximizes fitness, you might use 1.0 / total_distance.
    # For now, let's just return the total distance (lower is better).
    return total_distance

In [4]:
def visualize_route(route_indices: Sequence[int], coords: tuple[tuple[float, float]]) -> folium.map:
    route_to_plot_on_map = []
    
    for index in route_indices:
        route_to_plot_on_map.append(coords[index])

    # Determine a reasonable map center
    if route_to_plot_on_map:
        # Calculate the average latitude and longitude for the center
        avg_lat = sum(p[0] for p in route_to_plot_on_map) / len(route_to_plot_on_map)
        avg_lon = sum(p[1] for p in route_to_plot_on_map) / len(route_to_plot_on_map)
        map_center = [avg_lat, avg_lon]
        # Adjust zoom based on the spread of points (this is a simple heuristic)
        # For a more robust solution, you might calculate bounds and use map.fit_bounds()
        zoom_level = 6 
    elif coords: # If no route but coords exist, center on the first coord
        map_center = coords[0]
        zoom_level = 10
    else: # Fallback if everything is empty (though caught by earlier checks)
        map_center = [0, 0] 
        zoom_level = 2


    my_map = folium.Map(location=map_center, zoom_start=zoom_level)
    folium.PolyLine(locations=route_to_plot_on_map, color="blue").add_to(my_map)
    
    for point_coord in route_to_plot_on_map:
        folium.Marker(location=point_coord).add_to(my_map)
    return my_map

In [5]:
def create_initial_population(num_points: int, population_size: int=100) -> List[List[int]]:
    """
    Creates an initial population of random routes (permutations of point indices).

    Args:
        population_size: The number of individuals (routes) in the population.
        num_points: The total number of unique points to visit.

    Returns:
        A list of routes, where each route is a list of integers (point indices).
    """
    population: List[List[int]] = []
    points: List[int] = list(range(num_points)) # List of point indices [0, 1, ..., num_points-1]
    
    if num_points in (0, 1):
        return []

    for _ in range(population_size):
        # Create a random permutation of the points
        route: List[int] = random.sample(points, len(points)) 
        population.append(route)
    return population

In [119]:
def select_elites_and_parents(
    matrix: np.ndarray, 
    population: List[Sequence[int]], 
    elite_count: int = 5, 
    parent_count: int = 50,
    pool: Pool = None
) -> Tuple[List[Sequence[int]], List[Sequence[int]], Sequence[int], float]:
    
    """
    Calculates fitness for a population, selects elites and parents.

    Args:
        matrix: The distance matrix.
        population: A list of routes (each route is a sequence of point indices).
        elite_count: The number of best individuals to select as elites.
        parent_count: The number of individuals to select as parents (after elites).

    Returns:
        A tuple containing:
            - elites (List[Sequence[int]]): List of elite routes.
            - parents (List[Sequence[int]]): List of parent routes.
            - best_route (Sequence[int]): The best route found in this population.
            - best_route_score (float): The fitness score of the best route.
    """
    if not population:
        print("Population is empty. Cannot select elites or parents.")
        return [], [], [], float('inf') # Return empty and a very bad score

    # Step 1: Calculate fitness for each route and store as (fitness, route) tuples
    # This preserves all individuals, even if they have the same fitness score
    tasks_for_pool = [(route, matrix) for route in population]
    
    population_with_fitness: List[Tuple[float, Sequence[int]]] = []
    created_pool_locally = False
    if pool is None:
        try:
            # Determine number of processes. os.cpu_count() can be a good default.
            num_processes = os.cpu_count() if os.cpu_count() else 2 
            pool = Pool(processes=num_processes)
            created_pool_locally = True
            #logger.debug(f"Created a local multiprocessing Pool with {num_processes} processes.")
        except Exception as e:
            #logger.error(f"Failed to create multiprocessing Pool: {e}. Falling back to serial processing.")
            # Fallback to serial processing if pool creation fails
            population_with_fitness = []
            for route in population:
                fitness = calculate_fitness(route, matrix)
                population_with_fitness.append((fitness, route))


    if pool: # If pool exists (either passed or created locally)
        try:
            # results will be a list of (fitness, route) tuples
            population_with_fitness = pool.map(calculate_fitness_wrapper, tasks_for_pool)
            #logger.debug("Fitness calculation completed using multiprocessing Pool.")
        except Exception as e:
            #logger.error(f"Error during pool.map execution: {e}. Falling back to serial processing.")
            # Fallback to serial processing if pool.map fails
            population_with_fitness = []
            for route in population:
                fitness = calculate_fitness(route, matrix)
                population_with_fitness.append((fitness, route))
        finally:
            if created_pool_locally and pool:
                pool.close()
                pool.join()
                #logger.debug("Closed local multiprocessing Pool.")
    else: # Fallback if pool was not created (already handled above, but for completeness)
        population_with_fitness = []
        for route in population:
            fitness = calculate_fitness(route, matrix)
            population_with_fitness.append((fitness, route))

    # Step 2: Sort the population by fitness (ascending, lower is better for TSP)
    # The first element of each tuple (fitness) is used for sorting.
    sorted_population = sorted(population_with_fitness, key=lambda item: item[0])
    
    if not sorted_population: # Should not happen if population was not empty
        print("Sorted population is empty. This indicates an issue.")
        return [], [], [], float('inf')

    # Step 3: Extract the single best route and its score
    best_route_score: float = sorted_population[0][0]
    best_route: Sequence[int] = sorted_population[0][1]
    
    # Step 4: Extract the routes for the elites
    # We take the routes (second element of the tuple) from the sorted population.
    elites: List[Sequence[int]] = []
    for i in range(min(elite_count, len(sorted_population))): # Ensure we don't exceed population size
        elites.append(sorted_population[i][1]) # Get the route part of the tuple

    # Step 5: Extract the routes for the parents
    # These come after the elites.
    parents: List[Sequence[int]] = []
    # Start index for parents is after the elites
    start_parent_index = min(elite_count, len(sorted_population))
    # End index for parents
    end_parent_index = min(start_parent_index + parent_count, len(sorted_population))
    
    for i in range(start_parent_index, end_parent_index):
        parents.append(sorted_population[i][1]) # Get the route part of the tuple
        
    shuffled_parents = parents.copy()
    random.shuffle(shuffled_parents)
    # Assuming logger is defined if you uncomment this
    # logger.info(f"Selected {len(elites)} elites. Best elite score: {sorted_population[0][0] if elites else 'N/A'}")
    # logger.info(f"Selected {len(parents)} parents. First parent score: {sorted_population[start_parent_index][0] if parents else 'N/A'}")
    # print(f'parents: {parents}') # Your original print

    return elites, shuffled_parents, best_route, best_route_score

In [105]:
def mutate(route, mutation_rate=.05):
    # start with simple swap mutation
    route = route.copy()
    if random.random() < mutation_rate:
        i, j = random.sample(range(len(route)), 2)
        route[i], route[j] = route[j], route[i]
        
    return route

In [71]:
def order_crossover(parent1: Sequence[int], parent2: Sequence[int], children_per_parent_pair: int=2) -> List[int]:
    """
    Performs Order Crossover (OX1) on two parent routes to produce one child.

    Args:
        parent1: The first parent route (a list or tuple of integers).
        parent2: The second parent route (a list or tuple of integers).

    Returns:
        A child route (a list of integers).
    """
    size = len(parent1)
    if size == 0:
        return []
    if size != len(parent2):
        raise ValueError("Parents must be of the same length for crossover.")

    child: List[int | None] = [None] * size # Initialize child with None placeholders

    # 1. Randomly select two crossover points (indices)
    # Ensure start_index is less than end_index for slicing
    point1, point2 = sorted(random.sample(range(size), 2))
    
    # Handle the case where point2 might be the last index, making the slice empty if not +1
    # Or ensure the segment has at least one element.
    # For simplicity, let's ensure the segment is from point1 up to (but not including) point2
    # If point1 and point2 are the same after sorting (shouldn't happen with random.sample(..., 2) if size >=2)
    # or if you want to ensure a segment of at least 1, you might need more logic.
    # For this example, point1 to point2 defines the segment start and end (exclusive of point2 for range).
    # Let's adjust to make it inclusive for the segment from parent1.
    # The segment from parent1 will be parent1[point1:point2+1]
    # For easier understanding, let's use start and end for the slice from parent1
    
    start_index = point1
    end_index = point2 # This will be the exclusive end for slicing parent1's segment

    # 2. Copy the segment from parent1 to the child
    child_segment_from_parent1 = list(parent1[start_index : end_index + 1])
    
    # Place this segment into the child
    for i in range(len(child_segment_from_parent1)):
        child[start_index + i] = child_segment_from_parent1[i]

    # 3. Fill the remaining positions from parent2
    parent2_idx = 0 # Current index in parent2
    child_idx = 0   # Current index in child

    # Elements already in the child from parent1's segment
    elements_in_child_segment = set(child_segment_from_parent1)

    # Fill remaining slots in child
    while None in child: # Continue until all None placeholders are filled
        # Find the next available slot in the child
        # Start filling from (end_index + 1) % size, this allows it to wrap around
        current_child_fill_idx = (end_index + 1 + child_idx) % size
        
        if child[current_child_fill_idx] is None: # If this slot in child is empty
            # Find the next element in parent2 that's not already in the child's segment
            # Start looking in parent2 from where we left off, or from (end_index + 1) % size
            # to maintain order relative to the copied segment.
            # Let's iterate through parent2 starting from a point that maintains relative order.
            # A common way is to start checking parent2 from the element after its copy of parent1's segment end.
            
            # Simpler approach for filling: Iterate through parent2 and place if not already in child.
            # The fill order should maintain parent2's relative order for the remaining items.
            
            # We need to iterate through parent2 starting from a position that makes sense.
            # Let's try filling child slots sequentially after the copied segment,
            # taking items from parent2 in order, skipping those already present.
            
            # Item from parent2 to consider (start after parent2's version of the segment end, wrap around)
            # This ensures we maintain the order of parent2's elements relative to the crossover points.
            item_from_parent2 = parent2[(end_index + 1 + parent2_idx) % size]

            if item_from_parent2 not in elements_in_child_segment:
                if child[current_child_fill_idx] is None: # Check again if the slot is still available
                    child[current_child_fill_idx] = item_from_parent2
                    child_idx += 1 # Move to consider the next empty slot in child
            
            parent2_idx += 1 # Always move to the next item in parent2

            # Safety break if parent2_idx gets too large (shouldn't happen if logic is correct)
            if parent2_idx > 2 * size: 
                # print("Error in crossover: parent2_idx too large. Parents:", parent1, parent2, "Child:", child)
                # This might indicate an issue if not all Nones are filled.
                # For robustness, fill remaining Nones if any (though this indicates a flaw)
                remaining_elements = [p for p in parent1 if p not in child] # or parent2
                current_none_idx = 0
                for i in range(size):
                    if child[i] is None and current_none_idx < len(remaining_elements):
                        child[i] = remaining_elements[current_none_idx]
                        current_none_idx += 1
                break


    # Cast child back to List[int] from List[int | None]
    # At this point, all Nones should be filled.
    final_child: List[int] = [c for c in child if c is not None]
    if len(final_child) != size:
        # This indicates an error in the filling logic
        # print(f"Error: Child length {len(final_child)} does not match parent length {size}. Child: {final_child}")
        # Fallback or raise error
        # For simplicity, if error, could return a copy of parent1
        return list(parent1) 

    return final_child

In [90]:
def have_sex_and_have_mutated_children(shuffled_parents):
    children = []
    paired_parents = list(zip(shuffled_parents[::2], shuffled_parents[1::2]))
    
    for parent1, parent2 in paired_parents:
        child1, child2 = (order_crossover(parent1, parent2), order_crossover(parent2, parent1))
        children.append(mutate(child1))
        children.append(mutate(child2))
        
    return children

In [None]:
def find_route(matrix, coords, num_generations=500):
    num_points = len(coords)
    population = create_initial_population(num_points)
    best_route = []
    best_route_score = float('inf')

    for i in range(num_generations):
        elites, parents, route, route_score = select_elites_and_parents(matrix, population)
        if best_route_score > route_score:
            best_route_score = route_score
            best_route = route
        children = have_sex_and_have_mutated_children(parents)
        population = elites + children

        if i%500 == 0:
            print(f"Distance of best route: {best_route_score} meters.")
            print(f"{i/500}% done")

    return best_route_score, best_route

In [None]:
from multiprocessing import Pool, cpu_count
import os

def calculate_fitness_wrapper(args_tuple):
    route, dist_matrix = args_tuple
    return calculate_fitness(route, dist_matrix), route

In [120]:
%%time
best_route_score, best_route = find_route(matrix, coords)
route_map = visualize_route(best_route, coords)
route_map

Process SpawnPoolWorker-14:
Traceback (most recent call last):
Process SpawnPoolWorker-13:
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/python@3.13/3.13.3/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/process.py", line 313, in _bootstrap
    self.run()
    ~~~~~~~~^^
  File "/opt/homebrew/Cellar/python@3.13/3.13.3/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.13/3.13.3/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/opt/homebrew/Cellar/python@3.13/3.13.3/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/queues.py", line 387, in get
    return _ForkingPickler.loads(res)
           ~~~~~~~~~~~~~~~~~~~~~^^^^^
AttributeError: Can't get attribute 'calculate_fi

KeyboardInterrupt: 

In [130]:
import numpy as np
from typing import List, Sequence, Union, Tuple
import random
import pandas as pd
import folium
import logging
from multiprocessing import Pool, cpu_count
from ga_helpers import calculate_fitness
import os
from functools import partial # Useful for pool.map with functions taking multiple args

# --- Logger Setup (run once at the start of your notebook/script) ---
# It's better to configure this once.
# If you run this cell multiple times, it might add multiple handlers.
logger = logging.getLogger("GA_Logger") # Give your logger a specific name
if not logger.handlers: # Check if handlers are already added
    logger.setLevel(logging.INFO)
    ch = logging.StreamHandler()
    formatter = logging.Formatter('%(asctime)s - %(levelname)-8s - %(name)s - %(message)s')
    ch.setFormatter(formatter)
    logger.addHandler(ch)

# --- Type Aliases ---
Numeric = Union[int, float]
DistanceMatrix = np.ndarray
Route = Sequence[int]

# --- Wrapper for multiprocessing (if needed, or use partial) ---
# This version of the wrapper is if calculate_fitness is part of a class or needs other fixed args.
# If calculate_fitness is top-level and only needs route & matrix, functools.partial is cleaner.
# For this example, assuming calculate_fitness is top-level as defined above.

# --- GA Core Functions (Assume these are defined as in your notebook) ---
def create_initial_population(num_points: int, population_size: int=100) -> List[List[int]]:
    population: List[List[int]] = []
    points: List[int] = list(range(num_points))
    if num_points == 0: return []
    if num_points == 1:
        for _ in range(population_size): population.append([0])
        return population
    for _ in range(population_size):
        route: List[int] = random.sample(points, len(points))
        population.append(route)
    return population

def mutate(route: Sequence[int], mutation_probability: float = 0.1) -> List[int]:
    route_list = list(route) # Work on a copy
    if random.random() < mutation_probability:
        if len(route_list) >= 2:
            i, j = random.sample(range(len(route_list)), 2)
            route_list[i], route_list[j] = route_list[j], route_list[i]
    return route_list

def order_crossover(parent1: Sequence[int], parent2: Sequence[int]) -> List[int]:
    size = len(parent1)
    if size == 0: return []
    if size != len(parent2): raise ValueError("Parents must be of the same length.")
    child: List[int | None] = [None] * size
    point1, point2 = sorted(random.sample(range(size), 2))
    start_index, end_index = point1, point2 
    
    child_segment_from_parent1 = list(parent1[start_index : end_index + 1])
    for i in range(len(child_segment_from_parent1)):
        child[start_index + i] = child_segment_from_parent1[i]
    
    elements_in_child_segment = set(child_segment_from_parent1)
    parent2_ptr = (end_index + 1) % size
    child_fill_ptr = (end_index + 1) % size
    
    filled_count = len(child_segment_from_parent1)
    while filled_count < size:
        item_from_parent2 = parent2[parent2_ptr]
        if item_from_parent2 not in elements_in_child_segment:
            child[child_fill_ptr] = item_from_parent2
            child_fill_ptr = (child_fill_ptr + 1) % size
            filled_count +=1
        parent2_ptr = (parent2_ptr + 1) % size
        if parent2_ptr == (end_index + 1) % size and filled_count < size: # Safety break / error condition
            # This indicates an issue, possibly non-unique elements in parents or logic error
            logger.error(f"Crossover error: Could not fill child. P1:{parent1}, P2:{parent2}, Child:{child}")
            # Fallback: return a copy of a parent to ensure valid permutation
            return list(parent1) 
            
    final_child: List[int] = [c for c in child if c is not None] # Should all be non-None
    if len(final_child) != size: # Should not happen if logic above is perfect
        logger.error(f"Crossover produced child of wrong length. P1:{parent1}, Child:{final_child}")
        return list(parent1)
    return final_child


def have_sex_and_have_mutated_children(
    parents_list: List[Sequence[int]], 
    num_offspring_needed: int, 
    mutation_probability: float = 0.1,
    crossover_rate: float = 0.7
):
    children = []
    num_available_parents = len(parents_list)
    if num_available_parents == 0: return []
    if num_available_parents == 1: # Asexual reproduction if only one parent
        while len(children) < num_offspring_needed:
            children.append(mutate(parents_list[0], mutation_probability))
        return children[:num_offspring_needed]

    idx = 0
    while len(children) < num_offspring_needed:
        p1_idx = idx % num_available_parents
        p2_idx = (idx + 1 + random.randint(0, num_available_parents-2)) % num_available_parents # Try to ensure different second parent
        if p1_idx == p2_idx: # Ensure different parents if possible
             p2_idx = (p1_idx + 1) % num_available_parents
        
        parent1 = parents_list[p1_idx]
        parent2 = parents_list[p2_idx]
        
        if random.random() < crossover_rate:
            child = order_crossover(parent1, parent2)
        else: # Asexual reproduction (clone one parent)
            child = list(random.choice([parent1, parent2])) 
            
        children.append(mutate(child, mutation_probability))
        idx += 1
    return children[:num_offspring_needed]


def select_elites_and_parents_mp(
    matrix: DistanceMatrix, 
    population: List[Route], 
    elite_count: int, 
    parent_selection_count: int, # How many parents to select for breeding pool
    pool: Pool 
) -> Tuple[List[Route], List[Route], Route, float]:
    """
    Calculates fitness for a population using a provided multiprocessing pool, 
    selects elites and parents.
    """
    if not population:
        logger.warning("Population is empty. Cannot select elites or parents.")
        return [], [], [], float('inf')

    # Use functools.partial to create a function that only takes 'route'
    # as 'matrix' will be fixed for all calls in this batch.
    fitness_evaluator_with_matrix = partial(calculate_fitness, distance_matrix=matrix)
    
    try:
        # pool.map takes a function that accepts one argument.
        # fitness_evaluator_with_matrix now fits this.
        fitness_scores_list = pool.map(fitness_evaluator_with_matrix, population)
        population_with_fitness = list(zip(fitness_scores_list, population))
        logger.debug("Fitness calculation completed using multiprocessing Pool.")
    except Exception as e:
        logger.error(f"Error during pool.map execution: {e}. Falling back to serial processing.")
        population_with_fitness = []
        for route in population:
            fitness = calculate_fitness(route, matrix)
            population_with_fitness.append((fitness, route))

    if not population_with_fitness:
        logger.error("Population with fitness is empty. Cannot proceed.")
        return [], [], [], float('inf')

    sorted_population = sorted(population_with_fitness, key=lambda item: item[0])
    
    best_route_score: float = sorted_population[0][0]
    best_route: Route = sorted_population[0][1]
    
    elites: List[Route] = [ind[1] for ind in sorted_population[:min(elite_count, len(sorted_population))]]
    
    # Select parents from the portion after elites
    # Ensure parent_selection_count doesn't exceed available non-elite individuals
    start_parent_idx = len(elites)
    num_candidates_for_parents = len(sorted_population) - start_parent_idx
    actual_parent_count = min(parent_selection_count, num_candidates_for_parents)
    
    parents: List[Route] = [ind[1] for ind in sorted_population[start_parent_idx : start_parent_idx + actual_parent_count]]
        
    shuffled_parents = parents.copy() 
    random.shuffle(shuffled_parents) 

    return elites, shuffled_parents, best_route, best_route_score


def find_route_mp(matrix, coords, population_size: int = 100, elite_count: int = 5, 
                  num_generations=1000, mutation_prob: float = 0.1, 
                  parent_selection_count: int = 50):
    num_points = len(coords)
    if num_points == 0:
        logger.error("No coordinates provided. Cannot run GA.")
        return float('inf'), []
        
    population = create_initial_population(num_points, population_size)
    if not population:
        logger.error("Failed to create initial population.")
        return float('inf'), []
        
    best_overall_route = list(population[0]) # Initialize with a valid route
    best_overall_score = calculate_fitness(best_overall_route, matrix) # Calculate its score

    # Create the Pool once for all generations for efficiency
    # Use context manager to ensure pool is closed properly
    num_processes = os.cpu_count() if os.cpu_count() else 2
    logger.info(f"Creating Multiprocessing Pool with {num_processes} processes for GA.")
    
    with Pool(processes=num_processes) as pool:
        for generation in range(num_generations):
            elites, parents, current_gen_best_route, current_gen_best_score = \
                select_elites_and_parents_mp(matrix, population, elite_count, parent_selection_count, pool=pool)
            
            if current_gen_best_score < best_overall_score:
                best_overall_score = current_gen_best_score
                best_overall_route = list(current_gen_best_route) # Ensure it's a list
                logger.debug(f"Gen {generation+1}/{num_generations}: New best score: {best_overall_score:.2f}")

            num_offspring_needed = population_size - len(elites)
            if not parents and num_offspring_needed > 0: # Handle case where no parents might be selected if pop is too small
                logger.warning(f"No parents selected in generation {generation+1}, filling with mutated elites or random if elites are also few.")
                # Fallback: create offspring from mutated elites or new random individuals
                filler_source = elites if elites else [random.sample(range(num_points), num_points)] # Use a random route if no elites
                children = [mutate(list(random.choice(filler_source)), mutation_prob) for _ in range(num_offspring_needed)]
            elif num_offspring_needed > 0 :
                 children = have_sex_and_have_mutated_children(parents, num_offspring_needed, mutation_prob)
            else:
                 children = []
            
            population = elites + children
            
            if (generation+1) % (num_generations//100) == 0: # Log progress
                 logger.info(f"--- Generation {generation + 1} complete. Best score so far: {best_overall_score:.2f} ---")

    logger.info(f"Finished. Best score: {best_overall_score:.2f}")
    return best_overall_score, best_overall_route

# --- Main execution block (IMPORTANT for multiprocessing) ---
if __name__ == '__main__':
    # This ensures that the code below only runs when the script is executed directly,
    # not when it's imported by child processes spawned by multiprocessing.Pool.
    
    # Configure the main logger for the script
    # This basicConfig should ideally be called only once in the entire application.
    # If GA_Logger was already configured above, this might reconfigure or add handlers.
    # For a script, it's fine here.
    #logging.basicConfig(level=logging.INFO, 
                        #format='%(asctime)s - %(levelname)-8s - %(name)s - %(message)s')
    # Use the logger instance defined at the top of the script
    logger.info("Script execution started under if __name__ == '__main__'.")

    # Load data
    try:
        df = pd.read_csv('coordinates.csv') # Ensure this file is in the same directory or provide full path
        coords_list = list(zip(df['0'], df['1'])) 
        matrix_loaded = np.load('distance_matrix.npy') # Ensure this file is present
        logger.info(f"Data loaded: {len(coords_list)} coordinates, matrix shape {matrix_loaded.shape}")
    except FileNotFoundError as e:
        logger.error(f"Error loading data: {e}. Please ensure 'coordinates.csv' and 'distance_matrix.npy' are present.")
        exit() # Exit if data can't be loaded
    except Exception as e:
        logger.error(f"An unexpected error occurred during data loading: {e}", exc_info=True)
        exit()

    # GA Parameters
    POPULATION_SIZE = 100  
    ELITE_COUNT = 10 # Increased slightly      
    PARENT_SELECTION_COUNT = 50 
    NUM_GENERATIONS = 500 
    MUTATION_PROBABILITY = 0.05 # Example: 5% mutation chance per child

    logger.info(f"Starting GA with Population: {POPULATION_SIZE}, Generations: {NUM_GENERATIONS}, Mutation: {MUTATION_PROBABILITY}")

    best_score, best_rt = find_route_mp(
        matrix_loaded, 
        coords_list, 
        population_size=POPULATION_SIZE,
        elite_count=ELITE_COUNT,
        num_generations=NUM_GENERATIONS,
        mutation_prob=MUTATION_PROBABILITY,
        parent_selection_count=PARENT_SELECTION_COUNT
    )

    logger.info(f"Multiprocessing GA finished.")
    logger.info(f"Best route score found: {best_score:.2f} meters")
    # logger.info(f"Best route (indices): {best_rt}")

    # Visualize (optional, assuming visualize_route is defined and folium imported)
    if best_rt and coords_list:
          # from IPython.display import display # Needed if running in a script and want to auto-open
        route_map_viz = visualize_route(best_rt, tuple(coords_list))
        if route_map_viz:
            map_file = "best_route_map_mp.html"
            route_map_viz.save(map_file)
            logger.info(f"Best route map saved to {map_file}")
            import webbrowser # To auto-open
            webbrowser.open(map_file) 
    

2025-05-16 16:42:00,082 - INFO     - GA_Logger - Script execution started under if __name__ == '__main__'.
2025-05-16 16:42:00,082 - INFO     - GA_Logger - Script execution started under if __name__ == '__main__'.
2025-05-16 16:42:00,088 - INFO     - GA_Logger - Data loaded: 666 coordinates, matrix shape (666, 666)
2025-05-16 16:42:00,088 - INFO     - GA_Logger - Data loaded: 666 coordinates, matrix shape (666, 666)
2025-05-16 16:42:00,088 - INFO     - GA_Logger - Starting GA with Population: 100, Generations: 500, Mutation: 0.05
2025-05-16 16:42:00,088 - INFO     - GA_Logger - Starting GA with Population: 100, Generations: 500, Mutation: 0.05
2025-05-16 16:42:00,105 - INFO     - GA_Logger - Creating Multiprocessing Pool with 12 processes for GA.
2025-05-16 16:42:00,105 - INFO     - GA_Logger - Creating Multiprocessing Pool with 12 processes for GA.
2025-05-16 16:42:00,763 - INFO     - GA_Logger - --- Generation 5 complete. Best score so far: 4113259.70 ---
2025-05-16 16:42:00,763 - IN

In [130]:
import numpy as np
from typing import List, Sequence, Union, Tuple
import random
import pandas as pd
import folium
import logging
from multiprocessing import Pool, cpu_count
from ga_helpers import calculate_fitness
import os
from functools import partial # Useful for pool.map with functions taking multiple args

# --- Logger Setup (run once at the start of your notebook/script) ---
# It's better to configure this once.
# If you run this cell multiple times, it might add multiple handlers.
logger = logging.getLogger("GA_Logger") # Give your logger a specific name
if not logger.handlers: # Check if handlers are already added
    logger.setLevel(logging.INFO)
    ch = logging.StreamHandler()
    formatter = logging.Formatter('%(asctime)s - %(levelname)-8s - %(name)s - %(message)s')
    ch.setFormatter(formatter)
    logger.addHandler(ch)

# --- Type Aliases ---
Numeric = Union[int, float]
DistanceMatrix = np.ndarray
Route = Sequence[int]

# --- Wrapper for multiprocessing (if needed, or use partial) ---
# This version of the wrapper is if calculate_fitness is part of a class or needs other fixed args.
# If calculate_fitness is top-level and only needs route & matrix, functools.partial is cleaner.
# For this example, assuming calculate_fitness is top-level as defined above.

# --- GA Core Functions (Assume these are defined as in your notebook) ---
def create_initial_population(num_points: int, population_size: int=100) -> List[List[int]]:
    population: List[List[int]] = []
    points: List[int] = list(range(num_points))
    if num_points == 0: return []
    if num_points == 1:
        for _ in range(population_size): population.append([0])
        return population
    for _ in range(population_size):
        route: List[int] = random.sample(points, len(points))
        population.append(route)
    return population

def mutate(route: Sequence[int], mutation_probability: float = 0.1) -> List[int]:
    route_list = list(route) # Work on a copy
    if random.random() < mutation_probability:
        if len(route_list) >= 2:
            i, j = random.sample(range(len(route_list)), 2)
            route_list[i], route_list[j] = route_list[j], route_list[i]
    return route_list

def order_crossover(parent1: Sequence[int], parent2: Sequence[int]) -> List[int]:
    size = len(parent1)
    if size == 0: return []
    if size != len(parent2): raise ValueError("Parents must be of the same length.")
    child: List[int | None] = [None] * size
    point1, point2 = sorted(random.sample(range(size), 2))
    start_index, end_index = point1, point2 
    
    child_segment_from_parent1 = list(parent1[start_index : end_index + 1])
    for i in range(len(child_segment_from_parent1)):
        child[start_index + i] = child_segment_from_parent1[i]
    
    elements_in_child_segment = set(child_segment_from_parent1)
    parent2_ptr = (end_index + 1) % size
    child_fill_ptr = (end_index + 1) % size
    
    filled_count = len(child_segment_from_parent1)
    while filled_count < size:
        item_from_parent2 = parent2[parent2_ptr]
        if item_from_parent2 not in elements_in_child_segment:
            child[child_fill_ptr] = item_from_parent2
            child_fill_ptr = (child_fill_ptr + 1) % size
            filled_count +=1
        parent2_ptr = (parent2_ptr + 1) % size
        if parent2_ptr == (end_index + 1) % size and filled_count < size: # Safety break / error condition
            # This indicates an issue, possibly non-unique elements in parents or logic error
            logger.error(f"Crossover error: Could not fill child. P1:{parent1}, P2:{parent2}, Child:{child}")
            # Fallback: return a copy of a parent to ensure valid permutation
            return list(parent1) 
            
    final_child: List[int] = [c for c in child if c is not None] # Should all be non-None
    if len(final_child) != size: # Should not happen if logic above is perfect
        logger.error(f"Crossover produced child of wrong length. P1:{parent1}, Child:{final_child}")
        return list(parent1)
    return final_child


def have_sex_and_have_mutated_children(
    parents_list: List[Sequence[int]], 
    num_offspring_needed: int, 
    mutation_probability: float = 0.1,
    crossover_rate: float = 0.7
):
    children = []
    num_available_parents = len(parents_list)
    if num_available_parents == 0: return []
    if num_available_parents == 1: # Asexual reproduction if only one parent
        while len(children) < num_offspring_needed:
            children.append(mutate(parents_list[0], mutation_probability))
        return children[:num_offspring_needed]

    idx = 0
    while len(children) < num_offspring_needed:
        p1_idx = idx % num_available_parents
        p2_idx = (idx + 1 + random.randint(0, num_available_parents-2)) % num_available_parents # Try to ensure different second parent
        if p1_idx == p2_idx: # Ensure different parents if possible
             p2_idx = (p1_idx + 1) % num_available_parents
        
        parent1 = parents_list[p1_idx]
        parent2 = parents_list[p2_idx]
        
        if random.random() < crossover_rate:
            child = order_crossover(parent1, parent2)
        else: # Asexual reproduction (clone one parent)
            child = list(random.choice([parent1, parent2])) 
            
        children.append(mutate(child, mutation_probability))
        idx += 1
    return children[:num_offspring_needed]


def select_elites_and_parents_mp(
    matrix: DistanceMatrix, 
    population: List[Route], 
    elite_count: int, 
    parent_selection_count: int, # How many parents to select for breeding pool
    pool: Pool 
) -> Tuple[List[Route], List[Route], Route, float]:
    """
    Calculates fitness for a population using a provided multiprocessing pool, 
    selects elites and parents.
    """
    if not population:
        logger.warning("Population is empty. Cannot select elites or parents.")
        return [], [], [], float('inf')

    # Use functools.partial to create a function that only takes 'route'
    # as 'matrix' will be fixed for all calls in this batch.
    fitness_evaluator_with_matrix = partial(calculate_fitness, distance_matrix=matrix)
    
    try:
        # pool.map takes a function that accepts one argument.
        # fitness_evaluator_with_matrix now fits this.
        fitness_scores_list = pool.map(fitness_evaluator_with_matrix, population)
        population_with_fitness = list(zip(fitness_scores_list, population))
        logger.debug("Fitness calculation completed using multiprocessing Pool.")
    except Exception as e:
        logger.error(f"Error during pool.map execution: {e}. Falling back to serial processing.")
        population_with_fitness = []
        for route in population:
            fitness = calculate_fitness(route, matrix)
            population_with_fitness.append((fitness, route))

    if not population_with_fitness:
        logger.error("Population with fitness is empty. Cannot proceed.")
        return [], [], [], float('inf')

    sorted_population = sorted(population_with_fitness, key=lambda item: item[0])
    
    best_route_score: float = sorted_population[0][0]
    best_route: Route = sorted_population[0][1]
    
    elites: List[Route] = [ind[1] for ind in sorted_population[:min(elite_count, len(sorted_population))]]
    
    # Select parents from the portion after elites
    # Ensure parent_selection_count doesn't exceed available non-elite individuals
    start_parent_idx = len(elites)
    num_candidates_for_parents = len(sorted_population) - start_parent_idx
    actual_parent_count = min(parent_selection_count, num_candidates_for_parents)
    
    parents: List[Route] = [ind[1] for ind in sorted_population[start_parent_idx : start_parent_idx + actual_parent_count]]
        
    shuffled_parents = parents.copy() 
    random.shuffle(shuffled_parents) 

    return elites, shuffled_parents, best_route, best_route_score


def find_route_mp(matrix, coords, population_size: int = 100, elite_count: int = 5, 
                  num_generations=1000, mutation_prob: float = 0.1, 
                  parent_selection_count: int = 50):
    num_points = len(coords)
    if num_points == 0:
        logger.error("No coordinates provided. Cannot run GA.")
        return float('inf'), []
        
    population = create_initial_population(num_points, population_size)
    if not population:
        logger.error("Failed to create initial population.")
        return float('inf'), []
        
    best_overall_route = list(population[0]) # Initialize with a valid route
    best_overall_score = calculate_fitness(best_overall_route, matrix) # Calculate its score

    # Create the Pool once for all generations for efficiency
    # Use context manager to ensure pool is closed properly
    num_processes = os.cpu_count() if os.cpu_count() else 2
    logger.info(f"Creating Multiprocessing Pool with {num_processes} processes for GA.")
    
    with Pool(processes=num_processes) as pool:
        for generation in range(num_generations):
            elites, parents, current_gen_best_route, current_gen_best_score = \
                select_elites_and_parents_mp(matrix, population, elite_count, parent_selection_count, pool=pool)
            
            if current_gen_best_score < best_overall_score:
                best_overall_score = current_gen_best_score
                best_overall_route = list(current_gen_best_route) # Ensure it's a list
                logger.debug(f"Gen {generation+1}/{num_generations}: New best score: {best_overall_score:.2f}")

            num_offspring_needed = population_size - len(elites)
            if not parents and num_offspring_needed > 0: # Handle case where no parents might be selected if pop is too small
                logger.warning(f"No parents selected in generation {generation+1}, filling with mutated elites or random if elites are also few.")
                # Fallback: create offspring from mutated elites or new random individuals
                filler_source = elites if elites else [random.sample(range(num_points), num_points)] # Use a random route if no elites
                children = [mutate(list(random.choice(filler_source)), mutation_prob) for _ in range(num_offspring_needed)]
            elif num_offspring_needed > 0 :
                 children = have_sex_and_have_mutated_children(parents, num_offspring_needed, mutation_prob)
            else:
                 children = []
            
            population = elites + children
            
            if (generation+1) % (num_generations//100) == 0: # Log progress
                 logger.info(f"--- Generation {generation + 1} complete. Best score so far: {best_overall_score:.2f} ---")

    logger.info(f"Finished. Best score: {best_overall_score:.2f}")
    return best_overall_score, best_overall_route

# --- Main execution block (IMPORTANT for multiprocessing) ---
if __name__ == '__main__':
    # This ensures that the code below only runs when the script is executed directly,
    # not when it's imported by child processes spawned by multiprocessing.Pool.
    
    # Configure the main logger for the script
    # This basicConfig should ideally be called only once in the entire application.
    # If GA_Logger was already configured above, this might reconfigure or add handlers.
    # For a script, it's fine here.
    #logging.basicConfig(level=logging.INFO, 
                        #format='%(asctime)s - %(levelname)-8s - %(name)s - %(message)s')
    # Use the logger instance defined at the top of the script
    logger.info("Script execution started under if __name__ == '__main__'.")

    # Load data
    try:
        df = pd.read_csv('coordinates.csv') # Ensure this file is in the same directory or provide full path
        coords_list = list(zip(df['0'], df['1'])) 
        matrix_loaded = np.load('distance_matrix.npy') # Ensure this file is present
        logger.info(f"Data loaded: {len(coords_list)} coordinates, matrix shape {matrix_loaded.shape}")
    except FileNotFoundError as e:
        logger.error(f"Error loading data: {e}. Please ensure 'coordinates.csv' and 'distance_matrix.npy' are present.")
        exit() # Exit if data can't be loaded
    except Exception as e:
        logger.error(f"An unexpected error occurred during data loading: {e}", exc_info=True)
        exit()

    # GA Parameters
    POPULATION_SIZE = 100  
    ELITE_COUNT = 10 # Increased slightly      
    PARENT_SELECTION_COUNT = 50 
    NUM_GENERATIONS = 500 
    MUTATION_PROBABILITY = 0.05 # Example: 5% mutation chance per child

    logger.info(f"Starting GA with Population: {POPULATION_SIZE}, Generations: {NUM_GENERATIONS}, Mutation: {MUTATION_PROBABILITY}")

    best_score, best_rt = find_route_mp(
        matrix_loaded, 
        coords_list, 
        population_size=POPULATION_SIZE,
        elite_count=ELITE_COUNT,
        num_generations=NUM_GENERATIONS,
        mutation_prob=MUTATION_PROBABILITY,
        parent_selection_count=PARENT_SELECTION_COUNT
    )

    logger.info(f"Multiprocessing GA finished.")
    logger.info(f"Best route score found: {best_score:.2f} meters")
    # logger.info(f"Best route (indices): {best_rt}")

    # Visualize (optional, assuming visualize_route is defined and folium imported)
    if best_rt and coords_list:
          # from IPython.display import display # Needed if running in a script and want to auto-open
        route_map_viz = visualize_route(best_rt, tuple(coords_list))
        if route_map_viz:
            map_file = "best_route_map_mp.html"
            route_map_viz.save(map_file)
            logger.info(f"Best route map saved to {map_file}")
            import webbrowser # To auto-open
            webbrowser.open(map_file) 
    

2025-05-16 16:42:00,082 - INFO     - GA_Logger - Script execution started under if __name__ == '__main__'.
2025-05-16 16:42:00,082 - INFO     - GA_Logger - Script execution started under if __name__ == '__main__'.
2025-05-16 16:42:00,088 - INFO     - GA_Logger - Data loaded: 666 coordinates, matrix shape (666, 666)
2025-05-16 16:42:00,088 - INFO     - GA_Logger - Data loaded: 666 coordinates, matrix shape (666, 666)
2025-05-16 16:42:00,088 - INFO     - GA_Logger - Starting GA with Population: 100, Generations: 500, Mutation: 0.05
2025-05-16 16:42:00,088 - INFO     - GA_Logger - Starting GA with Population: 100, Generations: 500, Mutation: 0.05
2025-05-16 16:42:00,105 - INFO     - GA_Logger - Creating Multiprocessing Pool with 12 processes for GA.
2025-05-16 16:42:00,105 - INFO     - GA_Logger - Creating Multiprocessing Pool with 12 processes for GA.
2025-05-16 16:42:00,763 - INFO     - GA_Logger - --- Generation 5 complete. Best score so far: 4113259.70 ---
2025-05-16 16:42:00,763 - IN

In [None]:
coords