In [1]:
# import subprocess
# import os

# result = subprocess.run('bash -c "source /etc/network_turbo && env | grep proxy"', shell=True, capture_output=True, text=True)
# output = result.stdout
# for line in output.splitlines():
#     if '=' in line:
#         var, value = line.split('=', 1)
#         os.environ[var] = value

In [None]:
import sys
import os

# 添加项目根目录到Python路径
project_root = os.path.abspath(os.path.join(os.getcwd(), '../..'))
if project_root not in sys.path:
    sys.path.insert(0, project_root)
    print(f"已添加 {project_root} 到Python路径")

In [None]:
import os
import json
import random
from typing import Optional, Dict, Any, List, Callable, Union
from pydantic import BaseModel, Field, model_validator
from textwrap import dedent
from litellm.types.llms.openai import ChatCompletionUserMessage

from moatless_qa.benchmark.utils import get_moatless_instance
from moatless_qa.completion.model import StructuredOutput, Completion
from moatless_qa.completion.completion import CompletionModel, CompletionResponse

from moatless_qa.repository.repository import Repository
from moatless_qa.benchmark.swebench import create_repository
from moatless_qa.index import CodeIndex
from moatless_qa.file_context import FileContext
from moatless_qa.selector import BestFirstSelector, Selector, SoftmaxSelector, LLMSelector
from moatless_qa.selector.feedback_selector import FeedbackSelector
from moatless_qa.feedback import FeedbackGenerator
from moatless_qa.feedback.feedback_agent import FeedbackAgent
from moatless_qa.value_function.base import ValueFunction

from moatless_qa.actions.action import Action
from moatless_qa.actions import FindClass, FindFunction, FindCodeSnippet, SemanticSearch, ViewCode, Finish, FindCalledObject
from moatless_qa.agent.code_qa_agent import CodeQAAgent
from moatless_qa.agent.code_qa_prompts import *
from moatless_qa.agent.agent import ActionAgent
from moatless_qa.code_qa.search_tree import CodeQASearchTree
from moatless_qa.completion.completion import (
    LLMResponseFormat,
    CompletionModel,
)
from moatless_qa.schema import MessageHistoryType
from moatless_qa.agent.settings import AgentSettings
from moatless_qa.node import Node, ActionStep, FeedbackData
from moatless_qa.expander import Expander
from moatless_qa.value_function.model import Reward
from moatless_qa.exceptions import RuntimeError, RejectError

In [None]:
import os
from dotenv import load_dotenv
import os

# 加载 .env 文件
load_dotenv()
os.environ["REPO_DIR"] = "./tmp/repos"
os.environ["MOATLESS_INDEX_DIR"] = "./tmp/index_store"
completion_model = CompletionModel(model="deepseek/deepseek-chat", temperature=0.7)
instance_id = "sphinx-doc__sphinx-8551"
# completion_model = CompletionModel(model="openai/deepseek-ai/DeepSeek-V3", model_base_url=os.getenv("CUSTOM_LLM_API_BASE"), model_api_key=os.getenv("CUSTOM_LLM_API_KEY"), temperature=0.7)
# completion_model = CompletionModel(model="openai/moonshot-v1-32k", model_base_url=os.getenv("CUSTOM_LLM_API_BASE"), model_api_key=os.getenv("CUSTOM_LLM_API_KEY"), temperature=0.7)
instance = get_moatless_instance(split='verified',instance_id=instance_id)  # 获得的instance是本地下载下来有点删改属性的swe-bench

repository = create_repository(instance)

code_index = CodeIndex.from_index_name(
    instance["instance_id"], file_repo=repository
)
file_context = FileContext(repo=repository)

In [None]:
print('Problem Statement :\n{}'.format(instance['problem_statement']))
print('--'*100)
print('Golden Patch:\n{}'.format(instance['golden_patch']))

In [None]:

get_json('/tmp/trajectory/django__django-10914/2025-02-26_trajectory.json')

In [5]:
from datetime import datetime
current_date = datetime.now().strftime("%Y-%m-%d")
instance_path = f'./tmp/trajectory/{instance_id}/'
persist_path = f'./tmp/trajectory/{instance_id}/{current_date}_trajectory.json'
experience_path = f"./tmp/experience/{instance_id}/{current_date}_experience.json"

In [None]:
print(completion_model)
completion_model.response_format = LLMResponseFormat.TOOLS
print(completion_model)

In [None]:
reflection_model = completion_model.clone(
    response_format=completion_model.response_format
)
reflection_model

In [10]:
# def generate_summary(repo, instance, model, example):
#     messages = []
#     messages.append({"role": "user", "content": summarize_prompt.format(example=example, repo=repo, description=instance['problem_statement'])})
#     output = model._litellm_base_completion(
#                     messages=messages
#                 )
#     return output

In [16]:
# summary = generate_summary('astropy', instance, reflection_model, example1)

In [17]:
# summary = summary.choices[0].message.content

In [20]:
# messages = []
# messages.insert(0, {"role": "system", "content": planner_prompt})
# messages.append({"role": "user", "content": f"Issue:\n{instance['problem_statement']}"})
# output = reflection_model._litellm_base_completion(
#                 messages=messages, response_format={"type": "json_object"}
#             )

In [None]:
# plans = json.loads(output.choices[0].message.content)
# for i, j in plans.items():
#     print(i)
#     print(j)
#     print('\n')

In [8]:
value_function = ValueFunction(completion_model=completion_model)

In [9]:
actions = [
    FindClass(completion_model=completion_model, code_index=code_index, repository=repository),
    FindFunction(completion_model=completion_model, code_index=code_index, repository=repository),
    FindCodeSnippet(completion_model=completion_model, code_index=code_index, repository=repository),
    FindCalledObject(completion_model=completion_model, code_index=code_index, repository=repository),
    SemanticSearch(completion_model=completion_model, code_index=code_index, repository=repository),
    ViewCode(completion_model=completion_model, repository=repository),
    Finish(),
    # Reject()
]

system_prompt = QA_AGENT_ROLE
if completion_model.response_format == LLMResponseFormat.REACT:
    system_prompt += REACT_CORE_OPERATION_RULES
elif completion_model.response_format == LLMResponseFormat.TOOLS:
    system_prompt += REACT_GUIDELINES
workflow_prompt = generate_workflow_prompt(actions)
system_prompt += workflow_prompt + generate_guideline_prompt() + ADDITIONAL_NOTES + RESPOENSE_FORMAT

In [10]:
# system_prompt += experience_prompt.format(experience)

In [None]:
print(system_prompt)

In [None]:
agent = CodeQAAgent(system_prompt=system_prompt, actions=actions, completion=completion_model)
# # 我认为应该是下面这种初始化，用的是内部的prompt而不是手动system_prompt，但是测试的时候是用了上面的初始化以及SIMPLE_CODE_PROMPT
# agent = CodingAgent.create(repository=repository, completion_model=completion_model)
# agent.actions = actions    # if not， 它内部的action没有code index，也没有repository

In [11]:
feedback_generator = FeedbackAgent(
                completion_model=agent.completion, instance_dir=instance_path
            )

In [12]:
value_function = ValueFunction(completion_model=completion_model)
search_tree = CodeQASearchTree.create(
    # message=instance["problem_statement"],
    message="What is the render function of the texts?",
    agent=agent,
    file_context=file_context,
    value_function=value_function,
    feedback_generator=feedback_generator,
    max_iterations=100,
    max_expansions=3,
    max_depth=25,
    persist_path=persist_path,
)

In [None]:
print(agent.generate_system_prompt())

## First Rollout

In [None]:
node = search_tree._select(search_tree.root)
node

In [None]:
new_node = search_tree._expand(node)
new_node

In [None]:
search_tree._simulate(new_node, None)

In [24]:
search_tree._backpropagate(new_node)

In [None]:
search_tree.display_uct(new_node)

In [19]:
# search_tree.persist()

## Second Rollout

In [None]:
search_tree.is_finished()

In [19]:
second_node = search_tree._select(search_tree.root)

In [None]:
second_new_node = search_tree._expand(second_node)

In [None]:
agent.message_generator.generate(second_new_node)

In [None]:
search_tree._simulate(second_new_node, None)

In [None]:
search_tree._backpropagate(second_new_node)

In [None]:
search_tree.display_uct(second_new_node)

## Third Rollout

In [None]:
search_tree.is_finished()

In [29]:
third_node = search_tree._select(search_tree.root)

In [None]:
third_new_node = search_tree._expand(third_node)

In [None]:
search_tree._simulate(third_new_node, None)

In [43]:
search_tree._backpropagate(third_new_node)

In [None]:
print(third_new_node.observation.properties)

In [45]:
search_tree.persist()

## Forth Rollout

In [None]:
search_tree.is_finished()

In [47]:
forth_node = search_tree._select(search_tree.root)

In [None]:
forth_new_node = search_tree._expand(forth_node)

In [None]:
search_tree._simulate(forth_new_node, None)

In [51]:
search_tree._backpropagate(forth_new_node)

In [None]:
search_tree.display_uct(forth_new_node)

In [87]:
# traj = get_trajectory(search_tree)

In [88]:
# out = extract_experience('astropy', instance['problem_statement'], traj, reflection_model, failed=True)

In [49]:
# get_save_experience(search_tree, reflection_model)

In [53]:
search_tree.persist()

In [None]:
# get_trajectory(search_tree.persist_path)

## Fifth Rollout

In [None]:
search_tree.is_finished()

In [55]:
fifth_node = search_tree._select(search_tree.root)

In [None]:
fifth_new_node = search_tree._expand(fifth_node)

In [None]:
search_tree._simulate(fifth_new_node, None)

In [58]:
search_tree._backpropagate(fifth_new_node)

In [None]:
search_tree.display_uct(fifth_new_node)

In [60]:
traj = get_trajectory(search_tree)

In [None]:
# print(fifth_new_node.observation.properties['diff'])

In [61]:
search_tree.persist()

## Sixth Rollout

In [None]:
search_tree.is_finished()

In [63]:
sixth_node = search_tree._select(search_tree.root)

In [None]:
sixth_new_node = search_tree._expand(sixth_node)

In [None]:
search_tree._simulate(sixth_new_node, None)

In [66]:
search_tree._backpropagate(sixth_new_node)

In [None]:
search_tree.display_uct(sixth_new_node)

In [68]:
traj = get_trajectory(search_tree)

In [69]:
search_tree.persist()

## Seventh Rollout

In [None]:
search_tree.is_finished()

In [71]:
seventh_node = search_tree._select(search_tree.root)

In [None]:
seventh_new_node = search_tree._expand(seventh_node)

In [None]:
search_tree._simulate(seventh_new_node, None)

In [None]:
search_tree._backpropagate(seventh_new_node)

In [None]:
print(seventh_new_node.observation.properties['diff'])

In [None]:
search_tree.display_uct(seventh_new_node)

In [None]:
print(seventh_new_node.observation.properties['diff'])

In [78]:
search_tree.persist()

## Eigth Rollout

In [None]:
search_tree.is_finished()

In [80]:
eigth_node = search_tree._select(search_tree.root)

In [None]:
eigth_new_node = search_tree._expand(eigth_node)

In [None]:
search_tree._simulate(eigth_new_node, None)

In [None]:
print(eigth_new_node.observation.properties['diff'])

In [83]:
search_tree._backpropagate(eigth_new_node)

## Ninth Rollout

In [84]:
ninth_node = search_tree._select(search_tree.root)

In [None]:
ninth_new_node = search_tree._expand(ninth_node)

In [None]:
search_tree._simulate(ninth_new_node, None)

In [87]:
search_tree._backpropagate(ninth_new_node)

In [88]:
search_tree.persist()

## Tenth Rollout

In [89]:
tenth_node = search_tree._select(search_tree.root)

In [None]:
tenth_new_node = search_tree._expand(tenth_node)

In [None]:
search_tree._simulate(tenth_new_node, None)

In [92]:
search_tree._backpropagate(tenth_new_node)

## Eleventh Rollout

In [93]:
eleventh_node = search_tree._select(search_tree.root)

In [None]:
eleventh_new_node = search_tree._expand(eleventh_node)

In [None]:
search_tree._simulate(eleventh_new_node)

In [None]:
search_tree._backpropagate(eleventh_new_node)

In [None]:
search_tree.display_uct(eleventh_new_node)

## Twelfth Rollout

In [98]:
twelfth_node = search_tree._select(search_tree.root)

In [None]:
twelfth_new_node = search_tree._expand(twelfth_node)

In [None]:
search_tree._simulate(twelfth_new_node)

In [None]:
search_tree._backpropagate(twelfth_new_node)

In [102]:
search_tree.persist()