In [1]:
import time
from design import *
from utils import *
import importlib
import shutil
from openai import OpenAI
from prompts import *
import json
import numpy as np
from gymnasium.envs.robodesign.GPTAnt import GPTAntEnv

In [2]:
class DGA:
    def __init__(self):
        api_key = "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"
        self.client = OpenAI(api_key=api_key)
        # self.model = "gpt-3.5-turbo"
        self.model = "gpt-4-turbo"

    def extract_code(self, text):
        match = re.search(r'```python\n(.*?)\n```', text, re.DOTALL)
        return match.group(1).strip() if match else None

    def indent_code(self, code):
        return "\n".join("    " + line if line.strip() else line for line in code.split("\n"))


    def improve_rewardfunc(self, best_rewardfunc, rewardfunc_list, fitness_list, folder_name):
        reward_improve_prompts = prompts.reward_improve_prompts
        for reward_content, fitness in zip(rewardfunc_list, fitness_list):
            reward_improve_prompts = reward_improve_prompts + f"reward function:{reward_content} \n" + f"fintess:{fitness}"
        reward_improve_prompts = reward_improve_prompts + f"best reward function:{best_rewardfunc} \n" + f"best fintess:{max(fitness_list)}" 

        messages = [
            {"role": "system", "content": "You are a reinforcement learning reward function designer"},
            {"role": "user", "content": rewardfunc_prompts + rewardfunc_format}
        ]

        response = self.client.chat.completions.create(
            model=self.model, messages=messages
        )

        print(response)
        reward_code = self.extract_code(response.choices[0].message.content)

        if reward_code:
            full_code = "import numpy as np \n" + self.indent_code(reward_code) + "\n"
            file_name =  f"GPTAnt_refine.py"

            file_path = os.path.join(folder_name, "env", file_name)
            with open(file_path, "w") as fp:
                fp.write(full_code)

        return file_path



    def improve_morphology(self, best_parameter, parameter_list, fitness_list, folder_name):
        morphology_improve_prompts = prompts.morphology_improve_prompts
        for parameter_content, fitness in zip(parameter_list, fitness_list):
            morphology_improve_prompts = morphology_improve_prompts + f"parameter:{parameter_content} \n" + f"fintess:{fitness}"
        morphology_improve_prompts = morphology_improve_prompts + f"best parameter:{best_parameter} \n" + f"best fintess:{max(fitness_list)}" 

        messages = [
            {"role": "system", "content": "You are a helpful mujoco robot designer"},
            {"role": "user", "content": morphology_improve_prompts + morphology_format}
        ]
        
        responses = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            response_format={'type': 'json_object'},
        )
        print(responses)
        parameter = json.loads(responses.choices[0].message.content).get('parameters', []) 
        print(parameter)
        xml_file = ant_design(parameter)  
        filename = f"GPTAnt_refine.xml"
        file_path = os.path.join(folder_name, "assets", filename)

        with open(file_path, "w") as fp:
            fp.write(xml_file)

        print(f"Successfully saved {filename}")
        
        return file_path, parameter


In [2]:
folder_name = "results/div2025-03-15_07-10-42"
os.makedirs(folder_name, exist_ok=True)
log_file = os.path.join(folder_name, "parameters_fine.log")
logging.basicConfig(filename=log_file, level=logging.INFO, format="%(asctime)s - %(message)s", filemode='w')

In [14]:
folder_name = "results/gpt4turbo"
coarse_best = [(4,1),(4,5),(7,0)]

morphology_list = [f'results/gpt4turbo/assets/GPTAnt_{i}.xml' for i in range(0,10) ]
rewardfunc_list = [f'results/gpt4turbo/env/GPTrewardfunc_{i}.py' for i in range(0,6)]

parameter_list = np.array([[0.25, 0.2, 0.2, 0.2, 0.2,0.4,0.4, 0.08, 0.08, 0.08 ], 
                           [0.1, 0.25, 0.1, 0.15, 0.2, 0.15, 0.1, 0.15, 0.1, 0.1],
                           [0.2, 0.3, 0.1, 0.25, 0.15, 0.25, 0.1, 0.2, 0.1, 0.1],
                           [0.15, 0.4, 0.1, 0.3, 0.3, 0.25, 0.1, 0.2, 0.05, 0.15],
                           [0.25, 0.4, 0.2, 0.3, 0.35, 0.3, 0.2, 0.25, 0.2, 0.15],
                           [0.3, 0.1, 0.25, 0.2, 0.35, 0.25, 0.1, 0.3, 0.2, 0.2],
                           [0.2, 0.2, 0.3, 0.3, 0.35, 0.3, 0.25, 0.15, 0.1, 0.1],
                           [0.1, 0.5, 0.15, 0.4, 0.4, 0.35, 0.3, 0.3, 0.25, 0.2],
                           [0.4, 0.2, 0.35, 0.25, 0.3, 0.3, 0.2, 0.35, 0.3, 0.25]

])
fitness_matrix = np.array([ [ 13.1913,  39.2054,  38.1279,  52.4951,  24.3172,  65.7318],
                            [ 89.7386,  87.452 ,  90.315 ,  97.3308,  93.3873,  79.9451],
                            [  3.0429,   0.7254,   1.4515,   1.2504,   1.7224,   1.224 ],
                            [118.384 ,  82.6535,  90.5402, 100.974 ,  22.5402,  55.7487],
                            [174.6064, 261.0052, 219.1338, 201.1097, 193.178 , 228.1926],
                            [100.3417, 103.9535,  88.7779, 118.7093, 118.6595, 131.8388],
                            [ 34.9591, 126.9539, 105.976 ,  31.1006, 108.086 ,  79.6313],
                            [244.8765, 215.8829, 221.8278, 183.4445, 213.6648, 197.3411],
                            [213.7326, 144.6978, 202.905 , 162.7769, 139.2229, 137.8305]])
logging.info(f'folder_name:{folder_name}')
logging.info(f'coarse_best:{coarse_best}')
logging.info(f'parameter_list:{parameter_list}')
logging.info(f'fitness_matrix:{fitness_matrix}')


In [5]:
for morphology_index, rewardfunc_index in coarse_best:

    morphology = morphology_list[morphology_index]
    parameter = parameter_list[morphology_index]
    rewardfunc = rewardfunc_list[rewardfunc_index]
    best_fitness = fitness_matrix[morphology_index][rewardfunc_index]

    print("morphology", morphology)
    print("parameter", parameter)
    print("rewardfunc", rewardfunc)
    print("best_fitness", best_fitness)

    while True:
        designer = DGA()
        # 输入最好的morphology & rewardfunc, 之前所有的参数parameter_list，以及对应的fitness_matrix
        # improved_rewardfunc = designer.improve_rewardfunc(rewardfunc, rewardfunc_list, fitness_matrix[morphology_index], folder_name)
        shutil.copy(morphology, "GPTAnt.xml")
        # shutil.copy(improved_rewardfunc, "GPTrewardfunc.py")
        shutil.copy(rewardfunc, "GPTrewardfunc.py")
        model_path = Train(morphology_index, rewardfunc_index, folder_name)
        improved_fitness, improved_reward = Eva(model_path)
        improved_material = compute_ant_volume(parameter_list[morphology_index])
        improved_material_efficiency = improved_fitness/improved_material 
        if improved_fitness>best_fitness:
            best_fitness = improved_fitness
            best_morphology = morphology
            best_parameter = parameter
            best_material_efficiency = improved_material_efficiency
            best_rewardfunc = improved_rewardfunc
            best_material = improved_material
            logging.info(f"reward optimization: material cost: {improved_material}  fitness: {improved_fitness} merterial_efficiency: {improved_material_efficiency}")
        else:
            break
        improved_morphology, improved_parameter = designer.improve_morphology(parameter, parameter_list, fitness_matrix[:,rewardfunc_index], folder_name)
        shutil.copy(improved_morphology, "GPTAnt.xml")
        shutil.copy(best_rewardfunc, "GPTrewardfunc.py")
        model_path = Train(morphology_index,  rewardfunc_index, folder_name)
        improved_fitness, improved_reward = Eva(model_path)
        improved_material = compute_ant_volume(improved_parameter)
        improved_material_efficiency = improved_fitness/improved_material

        if improved_fitness>best_fitness:
            best_fitness = improved_fitness
            best_morphology = improved_morphology
            best_parameter = improved_parameter
            best_material_efficiency = improved_material_efficiency
            best_rewardfunc = best_rewardfunc
            best_material = improved_material
            logging.info(f"morphology optimization: material cost: {improved_material}  fitness: {improved_fitness} merterial_efficiency: {improved_material_efficiency}")

        else:
            break

        rewardfunc = best_rewardfunc
        morphology = best_morphology
        parameter = best_parameter


morphology results/gpt4turbo/assets/GPTAnt_4.xml
parameter [0.25 0.4  0.2  0.3  0.35 0.3  0.2  0.25 0.2  0.15]
rewardfunc results/gpt4turbo/env/GPTrewardfunc_1.py
best_fitness 261.0052
Using cuda device
Logging to results/gpt4turbo./sac_morphology4_rewardfunc1/SAC_3
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 1e+03    |
|    ep_rew_mean     | -354     |
| time/              |          |
|    episodes        | 4        |
|    fps             | 3229     |
|    time_elapsed    | 4        |
|    total_timesteps | 16000    |
| train/             |          |
|    actor_loss      | -12.9    |
|    critic_loss     | 1.32     |
|    ent_coef        | 0.947    |
|    ent_coef_loss   | -0.733   |
|    learning_rate   | 0.0003   |
|    n_updates       | 184      |
---------------------------------
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 1e+03    |
|    ep_rew_mean     | -354     |
| time/            

Process ForkServerProcess-46:
Traceback (most recent call last):
  File "/root/miniconda3/envs/robodesign/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/root/miniconda3/envs/robodesign/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/root/miniconda3/envs/robodesign/lib/python3.8/site-packages/stable_baselines3/common/vec_env/subproc_vec_env.py", line 32, in _worker
    cmd, data = remote.recv()
  File "/root/miniconda3/envs/robodesign/lib/python3.8/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/root/miniconda3/envs/robodesign/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
  File "/root/miniconda3/envs/robodesign/lib/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
Process ForkServerProcess-48:
Traceback (most recent call last

KeyboardInterrupt: 

# packages in environment at /root/miniconda3:
#
# Name                    Version                   Build  Channel
_libgcc_mutex             0.1                        main    defaults
_openmp_mutex             4.5                       1_gnu    defaults
absl-py                   1.4.0                    pypi_0    pypi
aiofiles                  22.1.0                   pypi_0    pypi
aiohappyeyeballs          2.4.4                    pypi_0    pypi
aiohttp                   3.10.11                  pypi_0    pypi
aiosignal                 1.3.1                    pypi_0    pypi
aiosqlite                 0.18.0                   pypi_0    pypi
annotated-types           0.7.0                    pypi_0    pypi
anyio                     3.6.2                    pypi_0    pypi
argon2-cffi               21.3.0                   pypi_0    pypi
argon2-cffi-bindings      21.2.0                   pypi_0    pypi
arrow                     1.2.3                    pypi_0    pypi
asttokens         