In [1]:
import json, re
from scripts.bugsinpy_utils import *
from datetime import datetime
import time
import os

from dotenv import load_dotenv

load_dotenv()

True

In [2]:
# from ollama import Client
# client = Client(host="https://dirty-tables-mix.loca.lt")

# from openai import OpenAI
# client = OpenAI(api_key=os.environ["OPENAI_KEY"])
from openai import OpenAI
client = OpenAI(api_key=os.environ["OPENROUTER_KEY"], base_url="https://openrouter.ai/api/v1")


In [10]:
SYSTEM_PROMPT = """Given a stack trace, code snippet, and optionally a test file, identify the root cause of the error and output the fully corrected version of the entire function always, regardless of how much code changes.
- The error is not in the test file, you cannot change the test file as a fix. Do not change the name of the given functions and do not add any new functions as a fix. All errors can be fixed through a change in the given functions
- Analyze the stack trace and connect it to errors in the code snippet.
- Consider assertions, inputs, and behavior from the test file if provided.
- Reason step-by-step about the possible fix; document this step-by-step reasoning in your output.
- After thoroughly understanding and explaining your reasoning, summarize why you believe your fix is appropriate.
- Do not use any new imports than what is already given to you.
- Each changed function needs its own ```python tags, do not put 2 functions in the same tag.
- Always output the entire function source code with the fix applied, matching the original style and programming language. Do not change the function name.
- Format your output code using ```python tags; only the function code should be inside these tags. If there's multiple functions, put them in separate ```python tags
"""

MODEL_NAME = "baseline_all_deepseek-r1-0528"
FIRST_TIME=False
PREVIOUS_RESULTS_PATH="tmp/ast/results/llm/single/baseline_all_deepseek-r1-0528_08_01_2025__17_03_54.json"

In [11]:
def generate_code(trace_back, chunks, previous_chat=None):
    if previous_chat is not None:
        previous_chat.append({"role": "user", "content": f"New error: {trace_back}"})
    else:
        previous_chat = [
            {
                "role": "system",
                "content": SYSTEM_PROMPT,
            },
            {
                "role": "user",
                "content": f"Traceback: {trace_back}",
            },
            {
                "role": "user",
                "content": "Code: " + "\n".join(chunks),
            },
            
        ]

    # response = client.chat(
    #     # model=MODEL_NAME,
    #     model="phi4-reasoning:14b-plus-q4_K_M",
    #     messages=previous_chat
    # )
    # previous_chat.append({
    #     "role": "assistant",
    #     "content": response["message"]["content"]
    # })
    # print(previous_chat[-1])
    
    
    # response = client.responses.create(
    #     model="o4-mini",
    #     input=[
    #         {
    #             "role": "system",
    #             "content": [{"type": "input_text", "text": SYSTEM_PROMPT}],
    #         },
    #         {
    #             "role": "user",
    #             "content": [{"type": "input_text", "text": f"Traceback: {trace_back}"}],
    #         },
    #         {
    #             "role": "user",
    #             "content": [
    #                 {"type": "input_text", "text": "Code: " + "\n".join(chunks)}
    #             ],
    #         },
    #     ],
    #     text={"format": {"type": "text"}},
    #     reasoning={
    #         "effort": "medium"
    #     },
    #     tools=[],
    #     temperature=1,
    #     max_output_tokens=16384,
    #     top_p=1,
    #     store=True,
    # )
    # return response.output_text
    
    completion = client.chat.completions.create(
        extra_body={}, model="deepseek/deepseek-r1-0528", messages=previous_chat
    )
    print(completion.choices)
    previous_chat.append({
        "role": "assistant",
        "content": completion.choices[0].message.content
    })
    
    
    return previous_chat


def extract_code(text: str):
    # Extract fenced code blocks first
    fence_pattern = r"```(?:([a-zA-Z]+)\n)?(.*?)```"
    code_blocks = re.findall(fence_pattern, text, re.DOTALL)

    def_pattern = r"(def\s+[a-zA-Z_][a-zA-Z0-9_]*\s*\(.*?(?:(?=^def\s)|\Z))"
    func_codes = []
    func_names = []

    for lang, block in code_blocks:
        if lang != "python":
            continue
        # Extract individual functions from within the block
        matches = re.findall(def_pattern, block, re.DOTALL | re.MULTILINE)
        for match in matches:
            func_codes.append(match.strip())
            # Extract just the function name
            name_match = re.match(r"def\s+([a-zA-Z_][a-zA-Z0-9_]*)", match)
            if name_match:
                func_names.append(name_match.group(1))

    return func_codes, func_names

In [12]:
llm_results = []

In [13]:
project_name = "youtube-dl"
bugs = get_bugs(project_name)
bugs = list(map(str, sorted(list(map(int, bugs)))))

passed_bugs = [
    "2",
    "16",
    "17",
    "27",
    "31",
    "32",
    "33",
]
for bug in bugs:
    trace_back = get_raw_traceback(project_name, bug)
    bug_info = get_bug_info(project_name, bug)
    buggy_commit_id = bug_info["buggy_commit_id"]
    checkout_to_commit(project_name, buggy_commit_id, silent=True)
    changed_file = parse_changed_files(project_name, bug)
    function_names = parse_changed_function_names_2(project_name, bug)
    if len(changed_file) > 1:
        continue

    changed_file = changed_file[0]
    print(bug, function_names)

    flag = False
    for result in llm_results:
        if result["bug"] == bug:
            flag = True
            break

    if flag:
        print("Skipped")
        continue
    chunks = {}
    test_functions = extract_function_name_from_traceback(trace_back)
    for test_function in test_functions:
        tree, sources = test_to_source_code(
            project_name, bug_info["test_file"], test_function, max_depth=0
        )
        for (file_path, func_name), src in sources.items():
            chunks[func_name] = src
    
    for name in function_names:
        chunks[name] = grab_chunk("youtube-dl", bug, name)["code"]

    if bug in passed_bugs:
        with open(PREVIOUS_RESULTS_PATH, "r", encoding="utf-8") as f:
            data = json.load(f)
            for x in data:
                if x["bug"] == bug:
                    data_bug = x
                    break

            llm_results.append(data_bug)
            print(f"Bug_id: {bug} is done")
            continue
    if not FIRST_TIME:
        with open(PREVIOUS_RESULTS_PATH, "r", encoding="utf-8") as f:
            data = json.load(f)

            for x in data:
                if x["bug"] == bug:
                    data_bug = x
                    break

            chats = generate_code(
                trace_back, list(chunks.values()), data_bug["previous_chat"]
            )
    else:
        chats = generate_code(trace_back, list(chunks.values()))

    codes, changed_function_names = extract_code(chats[-1]["content"])
    llm_results.append(
        {
            "project": project_name,
            "bug": bug,
            "file": changed_file,
            "actual_function": function_names,
            "given_functions": list(chunks.keys()),
            "previous_chat": chats,
            "changes_function_class_names": changed_function_names,
            "change": codes,
        }
    )

    print(f"Bug_id: {bug} is done")

1 {'_match_one': 'youtube_dl/utils.py'}
[Choice(finish_reason='stop', index=0, logprobs=None, message=ChatCompletionMessage(content='The new error indicates that the `\'!is_live\'` expression is returning `False` when the context `{\'is_live\': False}` expects `True`. This occurs because the unary operator `\'!\'` only checks for key absence, not boolean negation. \n\nThe fix:\n1. Modify the `UNARY_OPERATORS` for `\'!\'` to handle boolean values by returning their logical negation\n2. Maintain the original key absence check for non-boolean values\n\nHere\'s the corrected function:\n\n```python\ndef _match_one(filter_part, dct):\n    COMPARISON_OPERATORS = {\n        \'<\': operator.lt,\n        \'<=\': operator.le,\n        \'>\': operator.gt,\n        \'>=\': operator.ge,\n        \'=\': operator.eq,\n        \'!=\': operator.ne,\n    }\n    operator_rex = re.compile(r\'\'\'(?x)\\s*\n        (?P<key>[a-z_]+)\n        \\s*(?P<op>%s)(?P<none_inclusive>\\s*\\?)?\\s*\n        (?:\n       

In [None]:
llm_results 

In [14]:
now = datetime.now()

os.makedirs("tmp/ast/results/llm/single", exist_ok=True)
with open(f"tmp/ast/results/llm/single/{MODEL_NAME}_{now.strftime('%m_%d_%Y__%H_%M_%S')}.json", "w", encoding="utf-8") as f:
    f.write(json.dumps(llm_results, indent=2))

In [3]:
project_name = "youtube-dl"
bugs = get_bugs(project_name)
bugs = list(map(str, sorted(list(map(int, bugs)))))
bugs = ["2"]
# bugs = ["1", "2", "3", "5", "6"]
passed_bugs = [
]
for bug in bugs:
    trace_back = get_raw_traceback(project_name, bug)
    bug_info = get_bug_info(project_name, bug)
    buggy_commit_id = bug_info["buggy_commit_id"]
    checkout_to_commit(project_name, buggy_commit_id, silent=True)
    changed_file = parse_changed_files(project_name, bug)
    function_names = parse_changed_function_names(project_name, bug)
    if len(changed_file) > 1:
        continue

    changed_file = changed_file[0]

    chunks = {}
    test_functions = extract_function_name_from_traceback(trace_back)
    for test_function in test_functions:
        tree, sources = test_to_source_code(
            project_name, bug_info["test_file"], test_function, max_depth=10
        )
        for (file_path, func_name), src in sources.items():
            chunks[file_path + "@" +func_name] = src
    
    print(bug, function_names, list(chunks.keys()))

