In [101]:
!git clone https://github.com/RayZhhh/funsearch.git

import sys
sys.path.append('/content/funsearch/')

Cloning into 'funsearch'...
remote: Enumerating objects: 184, done.[K
remote: Counting objects: 100% (184/184), done.[K
remote: Compressing objects: 100% (135/135), done.[K
remote: Total 184 (delta 74), reused 154 (delta 44), pack-reused 0[K
Receiving objects: 100% (184/184), 234.75 KiB | 2.32 MiB/s, done.
Resolving deltas: 100% (74/74), done.


## 1. Implement LLM interface

In [102]:
import json
import multiprocessing
from typing import Collection, Any
import http.client
from implementation import sampler


class LLMAPI(sampler.LLM):
    """Language model that predicts continuation of provided source code.
    """

    def __init__(self, samples_per_prompt: int):
        super().__init__(samples_per_prompt)
        additional_prompt = ('Complete a different and more complex Python function. '
                             'Be creative and you can insert multiple if-else and for-loop in the code logic.'
                             'Only output the Python code, no descriptions.')
        self._additional_prompt = additional_prompt

    def draw_samples(self, prompt: str) -> Collection[str]:
        """Returns multiple predicted continuations of `prompt`."""
        return [self._draw_sample(prompt) for _ in range(self._samples_per_prompt)]

    def _draw_sample(self, content: str) -> str:
        prompt = '\n'.join([content, self._additional_prompt])
        while True:
            try:
                conn = http.client.HTTPSConnection("api.chatanywhere.com.cn")
                payload = json.dumps({
                    "max_tokens": 512,
                    "model": "gpt-3.5-turbo",
                    "messages": [
                        {
                            "role": "user",
                            "content": prompt
                        }
                    ]
                })
                headers = {
                    'Authorization': 'Bearer sk-U9F2AYYlmDcGNFZEBVfKiDyW5r4BvD9JDTzjXpbzIGNu3Faq',
                    'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
                    'Content-Type': 'application/json'
                }
                conn.request("POST", "/v1/chat/completions", payload, headers)
                res = conn.getresponse()
                data = res.read().decode("utf-8")
                data = json.loads(data)
                response = data['choices'][0]['message']['content']
                return response
            except Exception:
                continue

## 2. Implement a 'SandBox' interface

In [103]:
from implementation import evaluator
from implementation import evaluator_accelerate
from implementation import code_manipulation
from implementation import funsearch


class Sandbox(evaluator.Sandbox):
    """Sandbox for executing generated code. Implemented by RZ.

    RZ: Sandbox returns the 'score' of the program and:
    1) avoids the generated code to be harmful (accessing the internet, take up too much RAM).
    2) stops the execution of the code in time (avoid endless loop).
    """

    def __init__(self, verbose=False, numba_accelerate=True):
        """
        Args:
            verbose         : Print evaluate information.
            numba_accelerate: Use numba to accelerate the evaluation. It should be noted that not all numpy functions
                              support numba acceleration, such as np.piecewise().
        """
        self._verbose = verbose
        self._numba_accelerate = numba_accelerate

    def run(
            self,
            program: str,
            function_to_run: str,  # RZ: refers to the name of the function to run (e.g., 'evaluate')
            function_to_evolve: str,  # RZ: accelerate the code by decorating @numba.jit() on function_to_evolve.
            inputs: Any,  # refers to the dataset
            test_input: str,  # refers to the current instance
            timeout_seconds: int,
            **kwargs  # RZ: add this
    ) -> tuple[Any, bool]:
        """Returns `function_to_run(test_input)` and whether execution succeeded.

        RZ: If the generated code (generated by LLM) is executed successfully,
        the output of this function is the score of a given program.
        RZ: PLEASE NOTE THAT this SandBox is only designed for bin-packing problem.
        """
        dataset = inputs[test_input]
        result_queue = multiprocessing.Queue()
        process = multiprocessing.Process(
            target=self._compile_and_run_function,
            args=(program, function_to_run, function_to_evolve, dataset, self._numba_accelerate, result_queue)
        )
        process.start()
        process.join(timeout=timeout_seconds)
        if process.is_alive():
            # if the process is not finished in time, we consider the program illegal
            process.terminate()
            process.join()
            results = None, False
        else:
            if result_queue.qsize() != 0:
                results = result_queue.get_nowait()
            else:
                results = None, False

        if self._verbose:
            print(f'================= Evaluated Program =================')
            program_: code_manipulation.Program = code_manipulation.text_to_program(text=program)
            func_to_evolve_: str = kwargs.get('func_to_evolve', 'priority')
            function_: code_manipulation.Function = program_.get_function(func_to_evolve_)
            function_: str = str(function_).strip('\n')
            print(f'{function_}')
            print(f'-----------------------------------------------------')
            print(f'Score: {str(results)}')
            print(f'=====================================================')
            print(f'\n\n')

        return results

    def _compile_and_run_function(self, program, function_to_run, function_to_evolve, dataset, numba_accelerate,
                                  result_queue):
        try:
            # optimize the code (decorate function_to_run with @numba.jit())
            if numba_accelerate:
                program = evaluator_accelerate.add_numba_decorator(
                    program=program,
                    function_to_evolve=function_to_evolve
                )
            # compile the program, and maps the global func/var/class name to its address
            all_globals_namespace = {}
            # execute the program, map func/var/class to global namespace
            exec(program, all_globals_namespace)
            # get the pointer of 'function_to_run'
            function_to_run = all_globals_namespace[function_to_run]
            # return the execution results
            results = function_to_run(dataset)
            # the results must be int or float
            if not isinstance(results, (int, float)):
                result_queue.put((None, False))
                return
            result_queue.put((results, True))
        except Exception:
            # if raise any exception, we assume the execution failed
            result_queue.put((None, False))

## 3. Prepare a 'specification'

In [139]:
specification = r'''
import numpy as np

def cvrp(capacity, demand, edge_weight):
  routes = []
  route = []
  dis = 0
  dis_sum = 0
  load = 0
  current_city = 0
  counter = 0
  depot_dis = edge_weight[:, 0].copy()
  edge_weight[:, 0] = math.pow(10, 4)
  for i in range(len(edge_weight)):
    edge_weight[i, i] = math.pow(10, 4)
  while counter < len(demand) - 1:
    priorities = priority(current_city, edge_weight)
    next_city = np.argmax(priorities)
    if load + demand[next_city] <= capacity:
      dis += edge_weight[current_city, next_city]
      load += demand[next_city]
      route.append(next_city)
      counter += 1
      edge_weight[:, next_city] = math.pow(10,4)
      current_city = next_city
    else:
      dis += depot_dis[current_city]
      dis_sum += dis
      routes.append(route)
      current_city = 0
      dis = 0
      load = 0
      route = []
  if current_city != 0:
    dis += depot_dis[current_city]
    dis_sum += dis
    routes.append(route)
  return routes, dis_sum

@funsearch.run
def evaluate(instances):
  costs = []
  for name in instances:
    instance = instances[name]
    capacity = instance['capacity']
    demand = instance['demand']
    edge_weight = instance['edge_weight']
    routes, cost = cvrp(capacity, demand, edge_weight)
    costs.append(cost)
  return -np.mean(costs)

@funsearch.evolve
def priority(current_city, edge_weight):
  return -edge_weight[current_city]
'''

## 4. Prepare a dataset

In [140]:
import vrplib

instance = vrplib.read_instance("/content/drive/MyDrive/Artificial Intelligence/A-n32-k5.vrp")
solution = vrplib.read_solution("/content/drive/MyDrive/Artificial Intelligence/A-n32-k5.sol")

datasets = {}
datasets['setA'] = {'A-n32-k5': instance}

cvrp_setA = {'setA': datasets['setA']}

## 5. Start FunSearch
Please note that in jupyter notebook the following code will fail. This is because juypter does not support multiprocessing.

In [141]:
from implementation import funsearch
from implementation import config

# It should be noted that the if __name__ == '__main__' is required.
# Because the inner code uses multiprocess evaluation.
if __name__ == '__main__':
    class_config = config.ClassConfig(llm_class=LLMAPI, sandbox_class=Sandbox)
    config = config.Config(samples_per_prompt=4)
    global_max_sample_num = 10  # if it is set to None, funsearch will execute an endless loop
    funsearch.main(
        specification=specification,
        inputs=cvrp_setA,
        config=config,
        max_sample_nums=global_max_sample_num,
        class_config=class_config,
        log_dir='../logs/funsearch_llm_api'
    )

def priority(current_city, edge_weight):
  return -edge_weight[current_city]
------------------------------------------------------
Score        : None
Sample time  : None
Evaluate time: 0.38422060012817383
Sample orders: None




ValueError: zero-size array to reduction operation maximum which has no identity