# LLM Agent for Software Engineering

In [None]:
import locale
locale.getpreferredencoding = lambda: "UTF-8" # this is needed to get rid of weird colab locale error
# if you are still running into issues, please restart the runtime to initialize a new environment

In [None]:
# https://github.com/evalplus/evalplus
!pip install evalplus==0.2.0

In [None]:
!pip install litellm

In [None]:
!pip install tenacity

In [None]:
!wget https://raw.githubusercontent.com/uiuc-cs598lmz-s25/hw5/main/buggy_humaneval.jsonl

In [None]:
import json

def grab_buggy_dataset():
    inference_dataset = []
    file = "buggy_humaneval.jsonl"
    with open(file, "r") as f:
        inference_dataset.extend([json.loads(x) for x in f.readlines()])
    print("Number of tasks: {}".format(len(inference_dataset)))
    return inference_dataset

buggy_humaneval = grab_buggy_dataset()
# feel free to play around the dataset for a bit

In [None]:
### Basic Tool Class ###
# Here we define a basic tool class that can be extended to create custom tools.
from abc import ABC, ABCMeta, abstractmethod
from dataclasses import dataclass
from litellm import ChatCompletionToolParam, ChatCompletionToolParamFunctionChunk
from typing import Any

@dataclass(kw_only=True, frozen=True)
class ToolResult:
    """Represents the result of a tool execution."""
    output: str | None = None  # This is the response that will be provided to the LLM.
    data: dict[str, Any] | None = None  # Additional data that might be returned, not shown to the LLM.

    def __repr__(self):
        return self.output


class ToolError(ToolResult):
    """Raised when a tool encounters an error."""

    def __repr__(self):
        return f"Error: {self.output}"


class Tool(ABC):
    """Abstract class for tools."""

    @abstractmethod
    def call(self, *args, **kwargs) -> ToolResult:
        """Execute the tool"""
        ...

    def __call__(self, *args, **kwargs) -> ToolResult:
        return self.call(*args, **kwargs)


In [None]:
### Wrapper for LLM ###
# we use the litellm framework for easier model access
import litellm
import os
from litellm import completion, ModelResponse
from typing import Literal

os.environ["GEMINI_API_KEY"] = "YOUR_API_KEY_HERE"

class LLM(ABCMeta):

    @abstractmethod
    def completion(self, messages: list, tools: list) -> ModelResponse:
        """Get the completion from the LLM"""
        ...

def _completion_with_retries(*args, **kwargs):
    """
    Executes a litellm.completion() with retries
    """
    try:
        import tenacity
    except Exception as e:
        raise Exception(
            f"tenacity import failed please run `pip install tenacity`. Error{e}"
        )

    kwargs["max_retries"] = 0
    kwargs["num_retries"] = 0
    retry_strategy: Literal["exponential_backoff_retry", "constant_retry"] = kwargs.pop(
        "retry_strategy", "constant_retry"
    )  # type: ignore
    original_function = kwargs.pop("original_function", completion)
    if retry_strategy == "exponential_backoff_retry":
        retryer = tenacity.Retrying(
            wait=tenacity.wait_exponential(multiplier=1, min=20, max=120),
            reraise=True,
        )
    else:
        retryer = tenacity.Retrying(
            wait=tenacity.wait_fixed(20),
            retry=tenacity.retry_if_exception_type(
                (litellm.ContentPolicyViolationError, litellm.Timeout, litellm.RateLimitError, )
            ),
            reraise=True,
        )
    return retryer(original_function, *args, **kwargs)

class GeminiLLM():
    def __init__(self, model_name: str, temperature: float = 1.0, max_tokens: int = 512):
        self.model_name = model_name
        self.temperature = temperature
        self.max_tokens = max_tokens
        # similar to the vulnerability detection assignment, we adjust the level of blocking for gemini
        # to make it less strict, we set the threshold to BLOCK_NONE
        self.safety_settings = [
            {
                "category": "HARM_CATEGORY_CIVIC_INTEGRITY",
                "threshold": "BLOCK_NONE",
            },
            {
                "category": "HARM_CATEGORY_HARASSMENT",
                "threshold": "BLOCK_NONE",
            },
            {
                "category": "HARM_CATEGORY_HATE_SPEECH",
                "threshold": "BLOCK_NONE",
            },
            {
                "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
                "threshold": "BLOCK_NONE",
            },
            {
                "category": "HARM_CATEGORY_DANGEROUS_CONTENT",
                "threshold": "BLOCK_NONE",
            },
        ]

    def completion(self, messages: list, tools: list = None) -> ModelResponse:
        response = _completion_with_retries(
            model=self.model_name,
            messages=messages,
            safety_settings=self.safety_settings,
            tools=tools,
            temperature=self.temperature,
            max_tokens=self.max_tokens,
        )
        return response

In [None]:
### Example Usage ###
gemini = GeminiLLM(model_name="gemini/gemini-2.0-flash", temperature=1.0, max_tokens=10)
model_response = gemini.completion(
    messages=[
        {
            "role": "user",
            "content": "What is 1 + 2?"
        }
    ],
)
print(model_response.choices[0].message.content)

## Localization

In [None]:
class PrintBuggyLocalizationTool(Tool):
    """A tool that prints the buggy line in a code snippet."""
    tool_description = """A tool that prints the buggy line in a given code snippet."""
    tool_param_dict = {
        "name": "print_buggy_line",
        "description": tool_description,
        "input_schema": {
            "type": "object",
            "properties": {
                "line_number": {
                    "description": "The line number of the buggy line in the code snippet (1-indexed).",
                    "type": "integer",
                },
            },
            "required": ["line_number"],
        },
    }

    def __init__(self, ):
        self.tool_param = ChatCompletionToolParam(
            type="function",
            function=ChatCompletionToolParamFunctionChunk(
                name=self.tool_param_dict["name"],
                description=self.tool_param_dict["description"],
                parameters=self.tool_param_dict["input_schema"],
            )
        )


    def call(self, line_number: int, buggy_code: str) -> ToolResult:
        """Call the tool with the given parameters."""
        # TODO: Implement the logic to call the tool
        # Note that LLM can generate invalid argument (e.g., line_number can be out of bound)
        # Please first validate that the line_number is valid, and return ToolError with a proper error message if it is not.
        # Next, please extract the buggy line from the code snippet and return in the output field of the tool result.
        return ToolError(output="Tool not implemented yet")



In [None]:
# In this assignment we implement a simple localization agent to locate the buggy line and report the localization accuracy

class Agent(metaclass=ABCMeta):


    def get_tool(self, tool_name: str) -> Tool:
        """Retrieve a tool by its name."""
        for tool in self.tools:
            if tool.tool_param["function"]["name"] == tool_name:
                return tool

        raise ValueError(f"Tool {tool_name} not found.")



class LocalizationAgent(Agent):

    def __init__(self, buggy_code: str, llm: LLM):
        self.buggy_code = buggy_code
        self.tools = [PrintBuggyLocalizationTool(), ]
        self.llm = llm

    def build_user_message(self) -> str:
        """Build the initial user message for the LLM."""
        # TODO: Implement the logic to construct the initial prompt with the buggy code.
        # To make it easier for the LLM to locate the buggy line, don't forget to add line numbers to the code.
        
        return user_message

    def run(self,) -> int | None:
        messages = [
            {
                "role": "user",
                "content": self.build_user_message(),
            },
        ]
        tools = [tool.tool_param for tool in self.tools]
        response = self.llm.completion(messages, tools)


        # TODO: process response and parse the tool call into a line number
        # if the response does not contain a valid tool call, return None
        # Hint: you can use `response.choices[0].message.tool_calls` to get the tool calls
        # Hint: you can use `json.loads()` to parse the tool call arguments
        # Hint: note that the line number is 1-indexed, so you need to convert it to 0-indexed

        return line_number
        


In [None]:
### Test the Localization Agent on a single task ###
gemini = GeminiLLM(model_name="gemini/gemini-2.0-flash", temperature=1.0, max_tokens=512)
localization_agent = LocalizationAgent(
    buggy_code=buggy_humaneval[0]['buggy_code'],
    llm=gemini
)
pred_line_number = localization_agent.run()
print("pred line number = ", pred_line_number)
print("gt line number = ", buggy_humaneval[0]['buggy_line'])

In [None]:
from tqdm import tqdm

def gemini_fault_localization(model, bug_dataset, workdir) -> tuple[float, float]:

    all_ids = []
    complete_ids = []
    pass_ids = []
  
    os.makedirs(workdir, exist_ok=True)
    with open(os.path.join(workdir, "buggy_line_predictions.jsonl"), "w") as f:
        f.write("")
    for bug in tqdm(bug_dataset):
        localization_agent = LocalizationAgent(
            buggy_code=bug['buggy_code'],
            llm=model
        )
    gt_line_number = bug['buggy_line']
    pred_line_number = localization_agent.run()
    with open(os.path.join(workdir, "buggy_line_predictions.jsonl"), "a") as f:
        f.write(json.dumps({
            "task_id": bug['task_id'],
            "gt_line_number": gt_line_number,
            "pred_line_number": pred_line_number,
        }) + "\n")
    
    if pred_line_number:
        complete_ids.append(bug['task_id'])
    if pred_line_number and pred_line_number == gt_line_number:
        pass_ids.append(bug['task_id'])
    all_ids.append(bug['task_id'])
    accuracy = len(pass_ids) / len(all_ids)
    complete_rate = len(complete_ids) / len(all_ids)
    return accuracy, complete_rate

# accuracy of gemini
gemini_acc_score, gemini_complete_rate = gemini_fault_localization(gemini, buggy_humaneval, "gemini_localization")
print(f"{gemini_acc_score = }, {gemini_complete_rate = }")

## Repair

In [None]:
class TextEditorTool(Tool):
    """A tool that edits a code snippet."""
    # TODO: Implement the tool definition
    # Hint: you can write a long description for the tool to help the LLM understand how to use it
    # Hint: valid type of input parameters are: "integer", "string", etc
    # Hint: don't forget to set the required parameters for the tool
    tool_param_dict = ...

    def __init__(self):
        self.tool_param = ChatCompletionToolParam(
            type="function",
            function=ChatCompletionToolParamFunctionChunk(
                name=self.tool_param_dict["name"],
                description=self.tool_param_dict["description"],
                parameters=self.tool_param_dict["input_schema"],
            )
        )
    
    def run_command(self, code, command: str, line_number: int = None, old_str: str = None, new_str: str = None) -> str:
        """Run the specified command on the code, and return the edited code."""
        # TODO: Implement the logic to run the command on the code
        # Hint: perform different operations based on the command
        # Hint: you may raise ValueError if missing required parameters or invalid command
        return new_code

    def call(self, code, command: str, line_number: int = None, old_str: str = None, new_str: str = None) -> ToolResult:
        try:
            new_code = self.run_command(code, command, line_number, old_str, new_str)
            return ToolResult(output=f"The code has been successfully edited. Below is the new code:\n```python\n{new_code}\n```",
                              data={"new_code": new_code})
        except Exception as e:
            return ToolError(output=f"Error: {str(e)}")



In [None]:
class SimpleRepairAgent(Agent):

    def __init__(self, buggy_code: str, llm: LLM):
        self.buggy_code = buggy_code
        self.tools = [TextEditorTool(), ]
        self.llm = LLM
    
    def build_user_message(self) -> str:
        """Build the initial user message for the LLM."""
        # TODO: Implement the logic to construct the initial prompt with the buggy code.

    def run(self) -> str:
        # TODO: Implement the logic to run the agent
        return fixed_code

In [None]:
import difflib
def get_diff(old_code: str, new_code: str) -> str:
    """Compute the difference between two code snippets."""
    diff = difflib.unified_diff(old_code.splitlines(), new_code.splitlines(), fromfile="old", tofile="new", lineterm="")
    return "\n".join(diff)

In [None]:
#### Test the Repair Agent on a single task ###
gemini = GeminiLLM(model_name="gemini/gemini-2.0-flash", temperature=1.0, max_tokens=512)
repair_agent = SimpleRepairAgent(
    buggy_code=buggy_humaneval[0]['buggy_code'],
    llm=gemini
)
fixed_code = repair_agent.run()
print("========= Fixed Code =========")
print(fixed_code)
print("========= DIFF =========")
print(get_diff(buggy_humaneval[0]['buggy_code'], fixed_code))


In [None]:
from tqdm import tqdm

def gemini_repair(model, bug_dataset, workdir) -> float:

  for bug in tqdm(bug_dataset):
    repair_agent = SimpleRepairAgent(
        buggy_code=bug['buggy_code'],
        llm=model
    )
    # run the repair agent
    fixed_code = repair_agent.run()

    name = bug["task_id"].replace("/", "_")
    os.makedirs(os.path.join(workdir, name), exist_ok=True)
    with open(os.path.join(workdir, name, '0.py'), 'w') as f:
        f.write(fixed_code)

gemini_repair(gemini, buggy_humaneval, "gemini_repair")


In [None]:
!yes Y | evalplus.evaluate --dataset humaneval --samples gemini_repair --i-just-wanna-run

## Repair with test execution feedback

In [None]:
from evalplus.data import get_human_eval_plus
human_eval_plus = get_human_eval_plus()

In [None]:
class RunTestTool(Tool):
    """A tool that executes a function for a test input."""
    # TODO: Implement the tool definition
    
    def __init__(self, canonical_solution: str = None):
        self.tool_param = ChatCompletionToolParam(
            type="function",
            function=ChatCompletionToolParamFunctionChunk(
                name=self.tool_param_dict["name"],
                description=self.tool_param_dict["description"],
                parameters=self.tool_param_dict["input_schema"],
            )
        )
        self.canonical_solution = canonical_solution
    
    def _execute(self, function_definition_code: str, function_invocation: str):
        """Execute the function with the given code and function invocation."""
        exec_globals = {}
        exec_locals = {}
        exec(function_definition_code, exec_globals, exec_locals)
        result = eval(f"{function_invocation}", exec_globals, exec_locals)
        return result

    def call(self, code: str, function_invocation) -> ToolResult:
        """Run the tests on the code snippet."""
        expected_output_message = ""
        if self.canonical_solution:
            try:
                gold_result = self._execute(self.canonical_solution, function_invocation)
                expected_output_message = f"\n\nExpected output:\n```\n{gold_result}\n```"
            except Exception as e:
                return ToolError(output=f"Error: The test input is not valid.\n{str(e)}")
        try:
            result = self._execute(code, function_invocation)
            return ToolResult(output=f"Execution output:\n```\n{result}\n```" + expected_output_message,
                              data={"result": result})
        except Exception as e:
            return ToolError(output=f"Error: {str(e)}")


In [None]:
# Utility function to dump message to JSON
from json import JSONEncoder


class MessageEncoder(JSONEncoder):
        def default(self, o):
            return o.__dict__

In [None]:
class RepairAgent(Agent):

    def __init__(self, buggy_code: str, llm: LLM, canonical_solution: str = None):
        self.buggy_code = buggy_code
        self.tools = [TextEditorTool(), RunTestTool(canonical_solution=canonical_solution), ]
        self.llm = llm
        self.canonical_solution = canonical_solution
    
    def build_user_message(self) -> str:
        # TODO: construct the initial prompt with the buggy code

    def run(self) -> str:
        messages = [
            {
                "role": "user",
                "content": self.build_user_message(),
            },
        ]
        current_code = self.buggy_code

        # Set a maximum number of iterations to prevent infinite loops
        for _ in range(20):
            tools = [tool.tool_param for tool in self.tools]
            response = self.llm.completion(messages, tools)

            model_message = response.choices[0].message
            messages.append(model_message.model_dump())
            # TODO: Implement the logic to parse and run the tool calls
            # Hint: You can terminate the loop if the model does not return any tool calls
            # Hint: You may use `self.get_tool(tool_name)` to get the tool
            # Hint: After you execute the tool, you need to update the current code
            # Hint: After you get the tool result, you need to append a tool message to the message list
            # The tool message is in the following format:
            # {
            #     "role": "tool",
            #     "tool_call_id": tool_call.id,
            #     "content": tool_result.output,
            #     "name": the function name of the tool call,
            # }
        return current_code, messages

In [None]:

gemini = GeminiLLM(model_name="gemini/gemini-2.0-flash", temperature=1.0, max_tokens=512)
task_id = buggy_humaneval[0]['task_id']
canonical_solution = human_eval_plus[task_id]["prompt"] + human_eval_plus[task_id]["canonical_solution"]
repair_agent = RepairAgent(
    buggy_code=buggy_humaneval[0]['buggy_code'],
    llm=gemini,
    canonical_solution=canonical_solution,
)
fixed_code, messages = repair_agent.run()
print("========= Fixed Code =========")
print(fixed_code)
print("========= DIFF =========")
print(get_diff(buggy_humaneval[0]['buggy_code'], fixed_code))


In [None]:
from tqdm import tqdm

def gemini_repair(model, bug_dataset, workdir, restart=True) -> float:

    for bug in tqdm(bug_dataset):
        name = bug["task_id"].replace("/", "_")
        if not restart:
            if os.path.exists(os.path.join(workdir, name, '0.py')):
                continue
        task_id = bug['task_id']
        canonical_solution = human_eval_plus[task_id]["prompt"] + human_eval_plus[task_id]["canonical_solution"]
        repair_agent = RepairAgent(
            buggy_code=bug['buggy_code'],
            llm=model,
            canonical_solution=canonical_solution,
        )
        # run the repair agent
        fixed_code, messages = repair_agent.run()

        name = bug["task_id"].replace("/", "_")
        os.makedirs(os.path.join(workdir, name), exist_ok=True)
        with open(os.path.join(workdir, name, '0.py'), 'w') as f:
            f.write(fixed_code)
        with open(os.path.join(workdir, name, 'traj.json'), 'w') as f:
            json.dump(messages, f, indent=2, cls=MessageEncoder)


gemini_repair(gemini, buggy_humaneval, "gemini_repair_with_execution_feedback")


In [None]:
!yes Y | evalplus.evaluate --dataset humaneval --samples gemini_repair_with_execution_feedback --i-just-wanna-run