Skip to content

chore: ui server updates #951

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Jul 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rdagent/app/data_science/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ def main(
checkout: bool | str | Path = True,
step_n: int | None = None,
loop_n: int | None = None,
all_duration: str | None = None,
competition="bms-molecular-translation",
timeout=None,
replace_timer=True,
exp_gen_cls: str | None = None,
):
Expand Down Expand Up @@ -67,7 +67,7 @@ def main(
if exp_gen_cls is not None:
kaggle_loop.exp_gen = import_class(exp_gen_cls)(kaggle_loop.exp_gen.scen)

asyncio.run(kaggle_loop.run(step_n=step_n, loop_n=loop_n, all_duration=timeout))
asyncio.run(kaggle_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration))


if __name__ == "__main__":
Expand Down
9 changes: 8 additions & 1 deletion rdagent/app/qlib_rd_loop/factor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import asyncio
from pathlib import Path
from typing import Any

import fire
Expand All @@ -25,7 +26,13 @@ def running(self, prev_out: dict[str, Any]):
return exp


def main(path=None, step_n=None, loop_n=None, all_duration=None, checkout=True):
def main(
path: str | None = None,
step_n: int | None = None,
loop_n: int | None = None,
all_duration: str | None = None,
checkout: bool | str | Path = True,
):
"""
Auto R&D Evolving loop for fintech factors.

Expand Down
4 changes: 2 additions & 2 deletions rdagent/components/coder/model_coder/task_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def extract_model_from_docs(docs_dict):


class ModelExperimentLoaderFromDict(ModelTaskLoader):
def load(self, model_dict: dict) -> list:
def load(self, model_dict: dict) -> QlibModelExperiment:
"""Load data from a dict."""
task_l = []
for model_name, model_data in model_dict.items():
Expand All @@ -117,7 +117,7 @@ def load(self, model_dict: dict) -> list:

class ModelExperimentLoaderFromPDFfiles(ModelTaskLoader):
@wait_retry(retry_n=5)
def load(self, file_or_folder_path: str) -> dict:
def load(self, file_or_folder_path: str) -> QlibModelExperiment:
docs_dict = load_and_process_pdfs_by_langchain(file_or_folder_path) # dict{file_path:content}
model_dict = extract_model_from_docs(
docs_dict
Expand Down
24 changes: 20 additions & 4 deletions rdagent/log/server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,18 @@ Only **2** Message in one loop
"evo_id": "0",
"content": [ // list of task_name & codes
{
"evo_id": "0",
"target_task_name": "task_1",
"codes": { // one or more codes
"workspace": { // one or more codes
"a.py": "...<python codes>",
"b.py": "...<python codes>",
//...
}
},
{
"evo_id": "0",
"target_task_name": "task_2",
"codes": {
"workspace": {
"a.py": "...<python codes>",
//...
}
Expand Down Expand Up @@ -155,6 +157,7 @@ Only **2** Message in one loop
"evo_id": "0",
"content": [ // list of feedbacks
{
"evo_id": "0",
"final_decision": "True", // True or False
"execution": "...",
"code": "...",
Expand Down Expand Up @@ -230,8 +233,21 @@ Each tag below appears only once per loop.
"reason": "...",
"exception": "...",
"observations": "...", // may not exists
"hypothesis_evaluation": "...", // may not exists
"hypothesis_evaluation": "...", // may not existsc
"new_hypothesis": "...", // may not exists
}
}
```
```

# TODO

## Session

- How to continue.
- show & copy trace_id(name)?
-

## Page

1. remove Medical, add Finance Whole Pipeline
2.
139 changes: 100 additions & 39 deletions rdagent/log/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import random
import signal
import subprocess
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
Expand All @@ -12,46 +11,88 @@
from flask import Flask, jsonify, request, send_from_directory
from flask_cors import CORS

msgs_for_frontend = defaultdict(list)
from rdagent.log.storage import FileStorage
from rdagent.log.ui.conf import UI_SETTING
from rdagent.log.ui.storage import WebStorage
from rdagent.log.utils import is_valid_session

app = Flask(__name__, static_folder="./docs/_static")
app = Flask(__name__, static_folder=UI_SETTING.static_path)
CORS(app)

rdagent_processes = defaultdict()
server_port = 19899
log_folder_path = Path(UI_SETTING.trace_folder).absolute()


@app.route("/favicon.ico")
def favicon():
return send_from_directory("./docs/_static", "favicon.ico", mimetype="image/vnd.microsoft.icon")
return send_from_directory(app.static_folder, "favicon.ico", mimetype="image/vnd.microsoft.icon")


msgs_for_frontend = defaultdict(list)
pointers = defaultdict(lambda: defaultdict(int)) # pointers[trace_id][user_ip]


def read_trace(log_path: Path, id: str = "") -> None:
fs = FileStorage(log_path)
ws = WebStorage(port=1, path=log_path)
msgs_for_frontend[id] = []
last_timestamp = None
for msg in fs.iter_msg():
data = ws._obj_to_json(obj=msg.content, tag=msg.tag, id=id, timestamp=msg.timestamp.isoformat())
if data:
if isinstance(data, list):
for d in data:
msgs_for_frontend[id].append(d["msg"])
last_timestamp = msg.timestamp
else:
msgs_for_frontend[id].append(data["msg"])
last_timestamp = msg.timestamp

pointers = {id: 0 for id in msgs_for_frontend.keys()}
now = datetime.now(timezone.utc)
if last_timestamp and (now - last_timestamp).total_seconds() > 1800:
msgs_for_frontend[id].append({"tag": "END", "timestamp": now.isoformat(), "content": {}})


# load all traces from the log folder
for p in log_folder_path.glob("*/*/"):
if is_valid_session(p):
read_trace(p, id=str(p))


@app.route("/trace", methods=["POST"])
def update_trace():
global pointers, msgs_for_frontend
data = request.get_json()
trace_id = data.get("id")
return_all = data.get("all")
reset = data.get("reset")
msg_num = random.randint(1, 10)
app.logger.info(data)
log_folder_path = Path(UI_SETTING.trace_folder).absolute()
if not trace_id:
return jsonify({"error": "Trace ID is required"}), 400
trace_id = str(log_folder_path / trace_id)

user_ip = request.remote_addr

if reset:
pointers[trace_id] = 0
pointers[trace_id][user_ip] = 0

end_pointer = pointers[trace_id] + msg_num
start_pointer = pointers[trace_id][user_ip]
end_pointer = start_pointer + msg_num
if end_pointer > len(msgs_for_frontend[trace_id]) or return_all:
end_pointer = len(msgs_for_frontend[trace_id])

print(f"trace_id: {trace_id}, start_pointer: {pointers[trace_id]}, end_pointer: {end_pointer}")
returned_msgs = msgs_for_frontend[trace_id][pointers[trace_id] : end_pointer]
returned_msgs = msgs_for_frontend[trace_id][start_pointer:end_pointer]

pointers[trace_id] = end_pointer
pointers[trace_id][user_ip] = end_pointer
if returned_msgs:
app.logger.info([msg["tag"] for msg in returned_msgs])
return jsonify(returned_msgs), 200


@app.route("/upload", methods=["GET"])
@app.route("/upload", methods=["POST"])
def upload_file():
# 获取请求体中的字段
global rdagent_processes, server_port
Expand All @@ -62,11 +103,18 @@ def upload_file():
all_duration = request.form.get("all_duration")

# scenario = "Data Science Loop"
trace_name = randomname.get_name()
log_folder_path = Path("./RD-Agent_server_trace").absolute()
log_trace_path = (log_folder_path / scenario / trace_name).absolute()
if scenario == "Data Science":
competition = competition[10:] # Eg. MLE-Bench:aerial-cactus-competition
trace_name = f"{competition}-{randomname.get_name()}"
else:
trace_name = randomname.get_name()
trace_files_path = log_folder_path / scenario / "uploads" / trace_name

log_trace_path = (log_folder_path / scenario / trace_name).absolute()
stdout_path = log_folder_path / scenario / f"{trace_name}.stdout"
if not stdout_path.exists():
stdout_path.parent.mkdir(parents=True, exist_ok=True)

# save files
for file in files:
if file:
Expand All @@ -87,33 +135,34 @@ def upload_file():
else: # one file is uploaded
rfp = str(trace_files_path / files[0].filename)
cmds = ["rdagent", "general_model", "--report_file_path", rfp]
if scenario == "Medical Model Implementation":
cmds = ["rdagent", "med_model"]
if scenario == "Data Science Loop":
cmds = ["rdagent", "kaggle", "--competition", competition]
if scenario == "Finance Whole Pipeline":
cmds = ["rdagent", "fin_quant"]
if scenario == "Data Science":
cmds = ["rdagent", "data_science", "--competition", competition]

# time control parameters
if loop_n:
cmds += ["--loop_n", loop_n]
if scenario != "Finance Data Building (Reports)":
if loop_n:
cmds += ["--loop_n", loop_n]
if all_duration:
cmds += ["--all_duration", all_duration]

rdagent_processes[str(log_trace_path)] = subprocess.Popen(
cmds,
# stdout=subprocess.PIPE,
# stderr=subprocess.PIPE,
stdout=sys.stdout,
stderr=sys.stderr,
env={
"LOG_TRACE_PATH": str(log_trace_path),
"UI_SERVER_PORT": server_port,
},
)

cmds += ["--all_duration", f"{all_duration}h"]

app.logger.info(f"Started process for {log_trace_path} with parameters: {cmds}")
with stdout_path.open("w") as log_file:
rdagent_processes[str(log_trace_path)] = subprocess.Popen(
cmds,
stdout=log_file,
stderr=log_file,
env={
**os.environ,
"LOG_TRACE_PATH": str(log_trace_path),
"LOG_UI_SERVER_PORT": str(server_port),
},
)
return (
jsonify(
{
"id": str(log_trace_path),
"id": f"{scenario}/{trace_name}",
}
),
200,
Expand All @@ -124,6 +173,7 @@ def upload_file():
def receive_msgs():
try:
data = request.get_json()
# app.logger.info(data["msg"]["tag"])
if not data:
return jsonify({"error": "No JSON data received"}), 400
except Exception as e:
Expand All @@ -140,12 +190,13 @@ def receive_msgs():

@app.route("/control", methods=["POST"])
def control_process():
global rdagent_processes
global rdagent_processes, msgs_for_frontend
data = request.get_json()
app.logger.info(data)
if not data or "id" not in data or "action" not in data:
return jsonify({"error": "Missing 'id' or 'action' in request"}), 400

id = data["id"]
id = str(log_folder_path / data["id"])
action = data["action"]

if id not in rdagent_processes or rdagent_processes[id] is None:
Expand Down Expand Up @@ -175,13 +226,23 @@ def control_process():
else:
return jsonify({"error": "Unknown action"}), 400
except Exception as e:
return jsonify({"error": f"Failed to {action} process"}), 500
return jsonify({"error": f"Failed to {action} process, {e}"}), 500


@app.route("/test", methods=["GET"])
def test():
# return 'Hello, World!'
global msgs_for_frontend, pointers
msgs = {k: [i["tag"] for i in v] for k, v in msgs_for_frontend.items()}
pointers = pointers
return jsonify({"msgs": msgs, "pointers": pointers}), 200


@app.route("/", methods=["GET"])
def index():
# return 'Hello, World!'
return msgs_for_frontend
# return {k: [i["tag"] for i in v] for k, v in msgs_for_frontend.items()}
return send_from_directory(app.static_folder, "index.html")


@app.route("/<path:fn>", methods=["GET"])
Expand Down
Loading