# Run FunSearch on Capacitated Vehicle Routing Problem

**Introduction**

Our project is to apply [FunSearch](https://deepmind.google/discover/blog/funsearch-making-new-discoveries-in-mathematical-sciences-using-large-language-models/) on [Capacitated Vehicle Routing Problem](https://developers.google.com/optimization/routing/cvrp)(CVRP) to find better heuristic algorithm.

Instead of relying on human intuitions or manual trial-and-errors to find a better heuristic, **FunSearch**  automate the design process by treating it as a search problem.

FunSearch refers to a heuristic function as a program. FunSearch first asks an LLM to create a set 
of programs, each of which will be incorporated into the general heuristic code templatve. Each program will be evaluated on some test instancms 
and the resulting performance will be recorded. Then, the main loop of FunSearch starts, where n 
each iteration, FunSearch picks a promising pronce) from the setof 
programs created before. This program will be sent to an LLM for further modifications so thatthe 
performance can be improved. Then, the modified program is evaroblem), and it will replace the original program f its 
performance improves. This process will repeat for many iterations until a novel program is otained 
or some pre-defined max # of iterations is

This project is divided into 5 steps:
1. Implement 'LLM' interface.
2. Implement a 'SandBox' interface.
3. Prepare a 'specification'.
4. Prepare a dataset.
5. Start FunSearch.

**Caution**

This document references https://github.com/RayZhhh/funsearchFunSearch.

## Preparation: download the project file from github. And update system path.

In [None]:
!git clone https://github.com/RayZhhh/funsearch.git

import sys

sys.path.append('/content/funsearch/')

## 1. Implement LLM interface
Set the API's IP address according to your API provider (See line 65 in the following code).
```python
conn = http.client.HTTPSConnection("api.chatanywhere.com.cn")
```
You should prepare a 'key' for the LLM API. And fill them in the header (See line 76-80 in the following code).
```python
headers = {
    'Authorization': 'Bearer [put your key here, the key may start with "sk-..."]',
    'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
    'Content-Type': 'application/json'
}
```
**Caution**
The LLM interface implementation references https://github.com/RayZhhh/funsearch

In [None]:
import time
import json
import multiprocessing
from typing import Collection, Any
import http.client
from implementation import sampler


def _trim_preface_of_body(sample: str) -> str:
    """Trim the redundant descriptions/symbols/'def' declaration before the function body.
    Please see my comments in sampler.LLM (in sampler.py).
    Since the LLM used in this file is not a pure code completion LLM, this trim function is required.

    -Example sample (function & description generated by LLM):
    -------------------------------------
    This is the optimized function ...
    def priority_v2(...) -> ...:
        return ...
    This function aims to ...
    -------------------------------------
    -This function removes the description above the function's signature, and the function's signature.
    -The indent of the code is preserved.
    -Return of this function:
    -------------------------------------
        return ...
    This function aims to ...
    -------------------------------------
    """
    lines = sample.splitlines()
    func_body_lineno = 0
    find_def_declaration = False
    for lineno, line in enumerate(lines):
        # find the first 'def' statement in the given code
        if line[:3] == 'def':
            func_body_lineno = lineno
            find_def_declaration = True
            break
    if find_def_declaration:
        code = ''
        for line in lines[func_body_lineno + 1:]:
            code += line + '\n'
        return code
    return sample


class LLMAPI(sampler.LLM):
    """Language model that predicts continuation of provided source code.
    """

    def __init__(self, samples_per_prompt: int, trim=True):
        super().__init__(samples_per_prompt)
        additional_prompt = ('You are an expert in optimization of CVRP, generate a specific improvement strategy based on the current state of the solution.'
                             'Complete a different and more complex Python function. '
                             'Be creative and you can insert multiple if-else and for-loop in the code logic.'
                             'Only output the Python code, no descriptions.')
        self._additional_prompt = additional_prompt
        self._trim = trim

    def draw_samples(self, prompt: str) -> Collection[str]:
        """Returns multiple predicted continuations of `prompt`."""
        return [self._draw_sample(prompt) for _ in range(self._samples_per_prompt)]

    def _draw_sample(self, content: str) -> str:
        prompt = '\n'.join([content, self._additional_prompt])
        while True:
            try:
                conn = http.client.HTTPSConnection("api.chatanywhere.com.cn")
                payload = json.dumps({
                    "max_tokens": 512,
                    "model": "gpt-3.5-turbo",
                    "messages": [
                        {
                            "role": "user",
                            "content": prompt
                        }
                    ]
                })
                headers = {
                    'Authorization': 'Bearer sk-5szlvRHdRzZvQfum20165bFa0364427bA0B08cAf8765D52e',
                    'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
                    'Content-Type': 'application/json'
                }
                conn.request("POST", "/v1/chat/completions", payload, headers)
                res = conn.getresponse()
                data = res.read().decode("utf-8")
                data = json.loads(data)
                response = data['choices'][0]['message']['content']
                # trim function
                if self._trim:
                    response = _trim_preface_of_body(response)
                return response
            except Exception:
                time.sleep(2)
                continue

## 2. Implement a 'SandBox' interface
**Caution**:
The Sandbox interface implementation references https://github.com/RayZhhh/funsearch

In [None]:
from implementation import evaluator
from implementation import evaluator_accelerate


class Sandbox(evaluator.Sandbox):
    """Sandbox for executing generated code. Implemented by RZ.

    RZ: Sandbox returns the 'score' of the program and:
    1) avoids the generated code to be harmful (accessing the internet, take up too much RAM).
    2) stops the execution of the code in time (avoid endless loop).
    """

    def __init__(self, verbose=False, numba_accelerate=True):
        """
        Args:
            verbose         : Print evaluate information.
            numba_accelerate: Use numba to accelerate the evaluation. It should be noted that not all numpy functions
                              support numba acceleration, such as np.piecewise().
        """
        self._verbose = verbose
        self._numba_accelerate = numba_accelerate

    def run(
            self,
            program: str,
            function_to_run: str,  # RZ: refers to the name of the function to run (e.g., 'evaluate')
            function_to_evolve: str,  # RZ: accelerate the code by decorating @numba.jit() on function_to_evolve.
            inputs: Any,  # refers to the dataset
            test_input: str,  # refers to the current instance
            timeout_seconds: int,
            **kwargs  # RZ: add this
    ) -> tuple[Any, bool]:
        """Returns `function_to_run(test_input)` and whether execution succeeded.

        RZ: If the generated code (generated by LLM) is executed successfully,
        the output of this function is the score of a given program.
        RZ: PLEASE NOTE THAT this SandBox is only designed for bin-packing problem.
        """
        dataset = inputs[test_input]
        try:
            result_queue = multiprocessing.Queue()
            process = multiprocessing.Process(
                target=self._compile_and_run_function,
                args=(program, function_to_run, function_to_evolve, dataset, self._numba_accelerate, result_queue)
            )
            process.start()
            process.join(timeout=timeout_seconds)
            if process.is_alive():
                # if the process is not finished in time, we consider the program illegal
                process.terminate()
                process.join()
                results = None, False
            else:
                if not result_queue.empty():
                    results = result_queue.get_nowait()
                else:
                    results = None, False

            return results
        except:
            return None, False

    def _compile_and_run_function(self, program, function_to_run, function_to_evolve, dataset, numba_accelerate,
                                  result_queue):
        try:
            # optimize the code (decorate function_to_run with @numba.jit())
            if numba_accelerate:
                program = evaluator_accelerate.add_numba_decorator(
                    program=program,
                    function_to_evolve=function_to_evolve
                )
            # compile the program, and maps the global func/var/class name to its address
            all_globals_namespace = {}
            # execute the program, map func/var/class to global namespace
            exec(program, all_globals_namespace)
            # get the pointer of 'function_to_run'
            function_to_run = all_globals_namespace[function_to_run]
            # return the execution results
            results = function_to_run(dataset)
            # the results must be int or float
            if not isinstance(results, (int, float)):
                result_queue.put((None, False))
                return
            result_queue.put((results, True))
        except Exception:
            # if raise any exception, we assume the execution failed
            result_queue.put((None, False))

## 3. Prepare a 'specification'
This code implements solution to the classic CVRP, which involves:
1. Initializes all vehicles at the depot (node 0)
2. For each vehicle:
   - Selects next node using demand-aware priority scores
   - Updates route until capacity is reached
   - Returns to depot to finalize route
4. Repeats until all customer demands served
5. Evaluates total travel cost across all routes

This code includes 3 components:
1. `vehicle_routing`
    - Constructs routes sequentially using a greedy insertion stratege
    - Dynamically adjusts available nodes using
        - Capacity checks: Verifies if next node's demand can be fulfilled
	- Exclusion logic: Prevents node revisits via masking
    - Automatically dispatches new vehicles when capacity is exhausted
2. `priority`
    - Implements a weighted distance metric that considers:
	- Base travel cost (distance from current position)
	- Demand-adjusted scoring (demand used as penalty/reward)
    - Favors nodes where:
	- Distance is short (minimizes travel time)
	- Demand fits remaining capacity (maximizes utilization)
3. `evaluate`
    - Computes average performance across mus negative cost convention for compatibility with maximmizations all routesroutes

In [None]:
specification = r'''
import numpy as np
import math

def vehicle_routing(vehicle_capacity: int, node_requirements: np.ndarray, 
           distance_matrix: np.ndarray) -> tuple[list, float]:
    """
    Solves the Capacitated Vehicle Routing Problem (CVRP) using a heuristic approach.
    
    Args:
        vehicle_capacity: Maximum load capacity of each vehicle
        node_requirements: Array of demand values for each node (index 0 is depot)
        distance_matrix: Square matrix of inter-node travel distances
    
    Returns:
        tuple: (list of routes, total travel distance)
        - routes: List of routes where each sublist represents a vehicle's path
        - total_distance: Sum of all route distances including depot returns
    
    Algorithm:
        1. Initializes vehicles at depot (node 0)
        2. Sequentially builds routes using demand-aware priority scoring
        3. Manages vehicle reloading when capacity is exhausted
        4. Ensures no repeated node visits through matrix manipulation
    """
    
    # Routing state initialization
    current_route = [] # Active vehicle's path
    completed_routes = [] # Finished vehicle paths
    total_distance = 0 # Cumulative distance across all routes
    current_leg_distance = 0 # Distance for current vehicle's route
    current_load = 0 # Current vehicle's carried load
    current_location = 0 # Always starts at depot (node 0)
    visited_count = 0  # Tracks fulfilled demand points
    
    # Matrix preparation
    working_matrix = distance_matrix.copy()
    depot_return_costs = working_matrix[:, 0].copy()
    np.fill_diagonal(working_matrix, 1e10) # Block self-loops
    working_matrix[:, 0] = 1e10  # Prevent depot returns
    
    def select_next_node(scores: np.ndarray, excluded_nodes: list) -> int:
        """
        Internal node selection logic with exclusion management
        
        Args:
            scores: Priority scores for node selection
            excluded_nodes: Nodes to exclude from selection
            
        Returns:
            int: Index of selected node
            
        Raises:
            ValueError: When no valid nodes remain
        """
        # Flatten and deduplicate exclusion list
        excluded = {n for group in excluded_nodes for n in group}
        valid_mask = np.ones_like(scores, dtype=bool)
        valid_mask[list(excluded)] = False
        
        if not np.any(valid_mask):
            raise ValueError("No available nodes for selection")
        
        # Get highest valid score
        valid_scores = scores[valid_mask]
        max_idx = np.argmax(valid_scores)
        return np.where(valid_mask)[0][max_idx]

    # Main routing loop
    while visited_count < len(node_requirements) - 1:
        # Calculate selection priorities
        available_capacity = vehicle_capacity - current_load
        priority_scores = priority(
            current_location,
            working_matrix.copy(),
            available_capacity,
            node_requirements
        )
        
        # Generate exclusion list
        exclusion_list = completed_routes + [[current_location] + current_route]
        
        try:
            chosen_node = select_next_node(priority_scores, exclusion_list)
        except ValueError:
            break  # No more nodes available
        
        # Validate selection
        demand = node_requirements[chosen_node]
        valid_selection = (
            current_load + demand <= vehicle_capacity 
            and chosen_node != 0
        )
        
        if valid_selection:
            # Update current route
            current_leg_distance += working_matrix[current_location, chosen_node]
            current_load += demand
            current_route.append(chosen_node)
            visited_count += 1
            
            # Block future selections of this node
            working_matrix[:, chosen_node] = 1e10
            current_location = chosen_node
        else:
            # Finalize current vehicle's route
            current_leg_distance += depot_return_costs[current_location]
            total_distance += current_leg_distance
            completed_routes.append(current_route)
            
            # Reset for new vehicle
            current_location = 0
            current_leg_distance = 0
            current_load = 0
            current_route = []
    
    # Handle final vehicle return
    if current_location != 0:
        current_leg_distance += depot_return_costs[current_location]
        total_distance += current_leg_distance
        completed_routes.append(current_route)
    
    return completed_routes, total_distance

@funsearch.run
def evaluate(test_instances: dict) -> float:
    """
    Evaluates routing performance across multiple problem instances.
    
    Args:
        test_instances: Dictionary of CVRP instances with:
            - 'capacity': Vehicle capacity
            - 'demand': Node requirements array
            - 'edge_weight': Distance matrix
            
    Returns:
        float: Negative average cost (for maximization objectives)
    """
    costs = []
    for instance in test_instances.values():
        _, route_cost = vehicle_routing(
            instance['capacity'],
            instance['demand'],
            instance['edge_weight']
        )
        costs.append(route_cost)
    return -np.mean(costs)

@funsearch.evolve
def priority(current_node: int, distance_data: np.ndarray,
       remaining_capacity: int, node_demands: np.ndarray) -> np.ndarray:
    """
    Generates node selection scores balancing distance and demand.
    
    Args:
        current_node: Current vehicle position
        distance_data: Modified distance matrix
        remaining_capacity: Available vehicle capacity
        node_demands: Demand values for all nodes
        
    Returns:
        np.ndarray: Priority scores (higher = better)
    """
    scores = distance_data[current_node].copy()
    
    for idx, demand in enumerate(node_demands):
        adjustment = math.sqrt(demand)
        if remaining_capacity >= demand:
            scores[idx] -= adjustment  # Favor closer high-demand nodes
        else:
            # Penalize nodes exceeding capacity
            scores[idx] += adjustment if scores[idx] > 0 else -adjustment
    
    return -scores  # Convert to negative for minimization
'''

## 4. Prepare a dataset
Our project use [**CVRPLib**](http://vrp.atd-lab.inf.puc-rio.br/index.php/en/ ) setB for testing.

CVRPLib is the go-to benchmark dataset for CVRP, offering standardized instances with known optimal solutions that enable reliable algorithm comparisons. While invaluable for research and education, it focuses mainly on small-to-medium scale, static problems with homogeneous fleets, lacking real-world complexities like dynamic conditions or large-scale scenarios. Though limited for modern logistics applications, it remains essential for validating fundamental routing algorithms.

In [1]:
import vrplib
import os

dataset_path = "cvrplib/setB"
cvrp_dataset = {}
cvrp_dataset['B'] = {}

for file in os.listdir(dataset_path):
    if file.endswith(".vrp"):
        instances = vrplib.read_instance(os.path.join(dataset_path, file))
        cvrp_dataset['B'][file[:-4]] = instances

FileNotFoundError: [WinError 3] 系统找不到指定的路径。: 'cvrplib/setE'

## 5. Start FunSearch
Please note that in jupyter notebook the following code will fail. This is because juypter does not support multiprocessing. Colab backend supports multiprocessing.

In [None]:
from implementation import funsearch
from implementation import config

# It should be noted that the if __name__ == '__main__' is required.
# Because the inner code uses multiprocess evaluation.
if __name__ == '__main__':
    class_config = config.ClassConfig(llm_class=LLMAPI, sandbox_class=Sandbox)
    config = config.Config(samples_per_prompt=4, evaluate_timeout_seconds=30)
    global_max_sample_num = 100  # if it is set to None, funsearch will execute an endless loop
    funsearch.main(
        specification=specification,
        inputs=cvrp_dataset,
        config=config,
        max_sample_nums=global_max_sample_num,
        class_config=class_config,
        log_dir='../logs/funsearch_llm_api'
    )