In [1]:
import os

# https://www.kaggle.com/competitions/ai-mathematical-olympiad-progress-prize-2/discussion/560682#3113134
os.environ["TRITON_PTXAS_PATH"] = "/usr/local/cuda/bin/ptxas"

In [2]:
import io
import time
import shutil

import pandas as pd
import polars as pl

import kaggle_evaluation.konwinski_prize_inference_server
from typing import List, Tuple, Dict, Optional

start_time = time.time()

The evaluation API requires that you set up a server which will respond to inference requests. We have already defined the server; you just need write the predict function. When we evaluate your submission on the hidden test set the client defined in `konwinski_prize_gateway` will run in a different container with direct access to the hidden test set and hand off the data.
#
Your code will always have access to the published copies of the files.

In [3]:
instance_count: Optional[int] = None


def get_number_of_instances(num_instances: int) -> None:
    """The very first message from the gateway will be the total number of instances to be served.
    You don't need to edit this function.
    """
    global instance_count
    instance_count = num_instances

# Initialize LLM

In [None]:
from vllm import LLM, SamplingParams, RequestOutput
import warnings

warnings.simplefilter("ignore")

os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3"
os.environ["TOKENIZERS_PARALLELISM"] = "false"

if os.getenv("KAGGLE_KERNEL_RUN_TYPE") or os.getenv("KAGGLE_IS_COMPETITION_RERUN"):
    llm_model_pth: str = (
        "/kaggle/input/m/shelterw/deepseek-r1/transformers/deepseek-r1-distill-qwen-14b-awq/1"
        #"/kaggle/input/mistral-small-24b/transformers/mistral-small-24b-base-2501/1"
        #"/kaggle/input/deepseek-r1/transformers/deepseek-r1-distill-qwen-14b/2"
        #"/kaggle/input/m/shelterw/deepseek-r1/transformers/deepseek-r1-distill-qwen-32b-awq/1"
    )
# else:
#     llm_model_pth: str = "/root/volume/KirillR/QwQ-32B-Preview-AWQ"


BATCH_SIZE: int = 6
VALIDATION_COPY_COUNT: int = 1
MAX_TOKENS: int = 4096

MAX_NUM_SEQS: int = 6
MAX_MODEL_LEN: int = 32_768

llm: LLM = LLM(
    llm_model_pth,
    max_num_seqs=MAX_NUM_SEQS,  # Maximum number of sequences per iteration. Default is 256
    max_model_len=MAX_MODEL_LEN,  # Model context length
    trust_remote_code=True,  # Trust remote code (e.g., from HuggingFace) when downloading the model and tokenizer
    tensor_parallel_size=4,  # The number of GPUs to use for distributed execution with tensor parallelism
    gpu_memory_utilization=0.95,  # The ratio (between 0 and 1) of GPU memory to reserve for the model
    seed=42,#seed=2024,
)

In [5]:
tokenizer = llm.get_tokenizer()


def count_tokens(text: str) -> int:
    return len(tokenizer.encode(text))

# Helper functions

In [6]:
import os


def stringify_directory(directory: str) -> str:
    full_paths: List[str] = []

    for root, dirs, files in os.walk(directory):
        for file in files:
            full_path: str = os.path.join(root, file)
            full_paths.append(full_path)
    return "\n".join(full_paths)

In [7]:
import re


def extract_file_query(xml_content: str) -> Dict[str, List[str]]:
    import xml.etree.ElementTree as ET

    # Prepare a data structure to collect results
    parsed_data: Dict[str, List[str]] = {}
    pattern: str = r"<root>(.*?)</root>"
    matches: List[str] = re.findall(pattern, xml_content, re.DOTALL)

    for match in matches:
        try:
            # Parse the XML
            root = ET.fromstring("<root>" + match + "</root>")

            # Find all <entry> elements
            for entry in root.findall("entry"):
                # Extract the <filepath> text
                filepath = entry.find("filepath")
                filepath_text: Optional[str] = (
                    filepath.text.strip()
                    if filepath is not None and filepath.text is not None
                    else None
                )

                # Locate <strings_to_search> container
                strings_container = entry.find("strings_to_search")

                # Gather each <string_to_search> text
                search_strings: List[str] = []
                if strings_container is not None:
                    for s in strings_container.findall("string_to_search"):
                        if s.text is not None:
                            search_strings.append(s.text.strip())

                # Store in a dictionary: { filepath: [search_strings...] }
                parsed_data[filepath_text] = search_strings  # type: ignore
        except:
            print("Error parsing output")
            print(xml_content)
            return {}

    return parsed_data

In [8]:
reading_prompt: str = (
    """
You will be implementing a git diff patch to solve an issue with the code repository.
You will first need to select files in the file directory.

This is the problem statement.

{problem_statement}

This is the file directory

<directory>
{directory_string}
</directory>

Which files should be inspected so that we can solve the problem?
When we inspect each file, what strings should be searched?

Return the strings to search in this format

(explanation)

<root>
    <entry>
        <filepath>filepath</filepath>  
        <strings_to_search>
            <string_to_search>string_to_search</string_to_search>
            ...
            <string_to_search>string_to_search</string_to_search>
        </strings_to_search>
    </entry>
    <entry>
        <filepath>filepath</filepath>
        <strings_to_search>
            <string_to_search>string_to_search</string_to_search>
            ...
            <string_to_search>string_to_search</string_to_search>
        </strings_to_search>
    </entry>
    ...
</root>
...

Notes:
- Make sure to encode each entry between <root> and </root>
- Return the FULL filepath - exactly as specified in <directory> and </directory>
    - Example: <filepath>repo/path/to/directory/file.py</filepath>
- If you are searching for a word instead of a substring, maybe add spaces or brackets before and after the string
    - For example, if you are searching for uses of the function `calculate`, use ` calculate(` as the search string instead of `calculate`
- Prefer searching longer strings
    - Avoid searching for strings that might appear in many parts of the codebase
- Search the test files as well to understand the feature behavior
    - Also search for the relevant function calls in the test files
""".strip()
)


def get_selection_query(
    directory_string: str, problem_statement: str
) -> Tuple[List[str], List[Dict[str, List[str]]]]:
    sampling_params: SamplingParams = SamplingParams(
        temperature=0.6,  # randomness of the sampling
        min_p=0.01,
        skip_special_tokens=True,  # Whether to skip special tokens in the output
        max_tokens=MAX_TOKENS,
    )

    list_of_messages: List[List[Dict[str, str]]] = [
        [
            {
                "role": "user",
                "content": reading_prompt.format(
                    problem_statement=problem_statement[:20_000],
                    directory_string=directory_string[:30_000],
                ),
            },
        ]
        for _ in range(BATCH_SIZE)
    ]

    prompt_texts: List[str] = [
        (
            tokenizer.apply_chat_template(
                conversation=messages, tokenize=False, add_generation_prompt=True
            )  # type: ignore
        )
        + "<think>\n"
        for messages in list_of_messages
    ]
    # print(prompt_texts)

    print("get_selection_query", [count_tokens(text) for text in prompt_texts])
    request_outputs: list[RequestOutput] = llm.generate(
        prompt_texts, sampling_params=sampling_params
    )
    if not request_outputs:
        return [], []
    response_texts: List[str] = [
        request_output.outputs[0].text for request_output in request_outputs
    ]
    print("get_selection_query", [count_tokens(text) for text in response_texts])

    completion_texts = [
        prompt_text + response_text
        for prompt_text, response_text in zip(prompt_texts, response_texts)
    ]
    file_queries: List[Dict[str, List[str]]] = [
        extract_file_query(response_text) for response_text in response_texts
    ]
    return completion_texts, file_queries

In [9]:
REPO_PATH: str = "repo"


def fetch_file_contents(
    files_to_search: Dict[str, List[str]], context_lines: int = 12, max_gap: int = 0
) -> str:
    from io import StringIO
    from typing import Tuple

    def find_lines_in_files_with_context(
        search_map: Dict[str, List[str]], context_lines: int = context_lines
    ) -> List[List[List[Tuple[int, str]]]]:
        """
        Given a dictionary mapping file paths to a list of search terms,
        open each file and gather *snippets* of lines that contain any
        of those search terms, including 'context_lines' before and after.

        Returns a list of lists:
        [
          [  # For file1
             [ (line_number, text), (line_number, text), ... ],
             [ ... ],
          ],
          [  # For file2
             ...
          ],
          ...
        ]
        """
        all_matches_per_file: List[List[List[Tuple[int, str]]]] = []

        for path, terms in search_map.items():
            if not os.path.isfile(path):
                # If the file is not found, record an empty list
                all_matches_per_file.append([])
                continue

            with open(path, "r", encoding="utf-8", errors="replace") as f:
                lines = f.readlines()

            file_snippets: List[List[Tuple[int, str]]] = []
            num_lines: int = len(lines)

            for i, line in enumerate(lines, start=1):
                if any(t in line for t in terms):
                    start_idx: int = max(1, i - context_lines)
                    end_idx: int = min(num_lines, i + context_lines)
                    snippet: List[Tuple[int, str]] = []
                    for snippet_no in range(start_idx, end_idx + 1):
                        text_content: str = lines[snippet_no - 1].rstrip("\n")
                        snippet.append((snippet_no, text_content))
                    file_snippets.append(snippet)

            all_matches_per_file.append(file_snippets)

        return all_matches_per_file

    # ---------------------------------------------------------
    # 3. MERGE OVERLAPPING/ADJACENT SNIPPETS
    # ---------------------------------------------------------

    def merge_file_snippets(
        file_snippets: List[List[Tuple[int, str]]], gap: int = 0
    ) -> List[List[Tuple[int, str]]]:
        """
        Merge overlapping or nearly adjacent snippets in a single file’s snippet list.
        """
        intervals: List[Tuple[int, int, List[Tuple[int, str]]]] = []
        for snippet in file_snippets:
            if snippet:
                start_line: int = snippet[0][0]
                end_line: int = snippet[-1][0]
                intervals.append((start_line, end_line, snippet))

        intervals.sort(key=lambda x: x[0])  # sort by start line

        merged: List[Tuple[int, int, List[Tuple[int, str]]]] = []
        for start, end, snippet in intervals:
            if not merged:
                merged.append((start, end, snippet))
                continue

            prev_start, prev_end, prev_snippet = merged[-1]
            if start <= prev_end + gap:
                new_end: int = max(end, prev_end)
                combined_dict: Dict[int, str] = {}
                for ln, txt in prev_snippet:
                    combined_dict[ln] = txt
                for ln, txt in snippet:
                    combined_dict[ln] = txt
                merged_snippet: List[Tuple[int, str]] = [
                    (ln, combined_dict[ln]) for ln in sorted(combined_dict)
                ]
                merged[-1] = (prev_start, new_end, merged_snippet)
            else:
                merged.append((start, end, snippet))

        # Extract just the merged snippet portion
        return [x[2] for x in merged]

    def merge_all_snippets(
        all_files_snips: List[List[List[Tuple[int, str]]]], gap: int = 0
    ) -> List[List[List[Tuple[int, str]]]]:
        """
        Merge snippet blocks within each file.
        all_files_snips is a list-of-lists:
          [
            [ snippetA, snippetB, ... ],  # file 1
            [ snippetC, snippetD, ... ],  # file 2
          ]
        """
        merged: List[List[List[Tuple[int, str]]]] = []
        for snips in all_files_snips:
            merged.append(merge_file_snippets(snips, gap=gap))
        return merged

    # ---------------------------------------------------------
    # 4. RUN LOGIC: generate files, search, merge, and BUILD A STRING
    # ---------------------------------------------------------

    has_any_matches: bool = False

    # 1) Gather snippets around each match
    context_snippets: List[List[List[Tuple[int, str]]]] = (
        find_lines_in_files_with_context(files_to_search, context_lines=context_lines)
    )

    # 2) Merge overlapping snippets
    merged_snips: List[List[List[Tuple[int, str]]]] = merge_all_snippets(
        context_snippets, gap=max_gap
    )

    # 3) Build a string (instead of printing)
    output = StringIO()

    # Header
    output.write("Sample files created successfully.\n\n")
    output.write("Search Results (by file, merging any overlapping context):\n\n")

    # For each file
    for (filepath, terms), snippet_list in zip(files_to_search.items(), merged_snips):
        output.write(f"[file name]: {filepath[len(REPO_PATH) + 1:]}\n")
        terms_searched_as_str = "\n".join(terms)
        output.write(f"[terms searched]:\n{terms_searched_as_str}\n")
        output.write("[file content begin]\n")
        if not snippet_list:
            output.write("  No matches found.\n")
        else:
            has_any_matches = True
            for snippet_idx, snippet in enumerate(snippet_list, start=1):
                snippet_start: int = snippet[0][0]
                snippet_end: int = snippet[-1][0]
                output.write(
                    f"\nMatch #{snippet_idx}, lines {snippet_start} to {snippet_end}:\n"
                )
                for line_no, text in snippet:
                    output.write(f"  {line_no:3d} | {text}\n")
                output.write("\n")
        output.write("[file content end]\n\n")

    file_content_string: str = output.getvalue()

    if has_any_matches:
        return file_content_string
    return ""

In [10]:
import re

def extract_patch_string(text: str) -> Optional[str]:
    pattern: str = r"\n```diff\n(.*?)\n```"
    matches: List[str] = re.findall(pattern, text, re.DOTALL)
    if not matches:
        return None
    return matches[-1] + "\n"

In [11]:
patching_prompt: str = (
    """
You will be implementing a git diff patch to solve an issue with the code repository.
This is the problem statement.

{problem_statement}

These are the files that is thought to be relevant

{file_content_string}

Write a git diff within ```diff and ``` that fully fixes the problem.
The git diff should not cause other tests to fail.

Example:

```diff
--- a/first.txt
+++ b/first.txt
@@ -1,3 +1,3 @@
 start
-first change
+new first change
 middle
@@ -7,4 +7,4 @@
 some content
-second change
+new second change
 more content
--- a/second.txt
+++ b/second.txt
@@ -1,3 +1,3 @@
 beginning
-old line
+new line
 end
```

Reminder
- Put your diff within ```diff and ``` and make sure the diff is valid.
- Only the last diff printed will be considered.
""".strip()
)

import re


def get_patch_string(
    problem_statement: str, file_content_strings: List[str], num_candidates: int = 3
) -> Tuple[List[str], List[Optional[str]]]:
    """
    Generates patch candidates using diverse sampling by varying the sampling temperature.
    Returns:
      - A list of combined completion texts (one per file)
      - A list of patch strings (one per file), selected from the candidate pool.
    """
    # Identify indices for non-empty file content strings.
    inference_idx_to_input_idx: List[int] = [
        idx for idx, fc in enumerate(file_content_strings) if fc != ""
    ]
    
    # Prepare the messages for each file.
    list_of_messages: List[List[Dict[str, str]]] = [
        [
            {
                "role": "user",
                "content": patching_prompt.format(
                    problem_statement=problem_statement[:20_000],
                    file_content_string=file_content_strings[idx][:30_000],
                ),
            },
        ]
        for idx in inference_idx_to_input_idx
    ]
    
    # Initialize dictionaries to store candidates for each file.
    candidate_completion_texts = {idx: [] for idx in inference_idx_to_input_idx}
    candidate_patch_strings = {idx: [] for idx in inference_idx_to_input_idx}
    
    # Loop to generate multiple candidates per input.
    for candidate in range(num_candidates):
        # Vary the temperature slightly for diversity.
        candidate_sampling_params = SamplingParams(
            temperature=0.6 + candidate * 0.1,
            min_p=0.01,
            skip_special_tokens=True,
            max_tokens=MAX_TOKENS,
        )
        
        # Build prompt texts for all messages.
        prompt_texts: List[str] = [
            tokenizer.apply_chat_template(
                conversation=messages, tokenize=False, add_generation_prompt=True
            ) + "<think>\n"
            for messages in list_of_messages
        ]
        
        print("get_patch_string (candidate {}):".format(candidate), [count_tokens(text) for text in prompt_texts])
        
        # Generate responses for this candidate round.
        request_outputs: List[RequestOutput] = llm.generate(
            prompt_texts, sampling_params=candidate_sampling_params
        )
        response_texts_from_inference: List[str] = [
            request_output.outputs[0].text for request_output in request_outputs
        ]
        print("Candidate {} responses:".format(candidate), [count_tokens(text) for text in response_texts_from_inference])
        
        # Combine prompt and response, then extract patch string.
        completion_texts_from_inference = [
            prompt_text + response_text
            for prompt_text, response_text in zip(prompt_texts, response_texts_from_inference)
        ]
        patch_strings_from_inference: List[Optional[str]] = [
            extract_patch_string(response_text)
            for response_text in response_texts_from_inference
        ]
        
        # Save the candidate outputs per file.
        for idx, (comp_text, patch_str) in zip(inference_idx_to_input_idx, 
                                                 zip(completion_texts_from_inference, patch_strings_from_inference)):
            candidate_completion_texts[idx].append(comp_text)
            candidate_patch_strings[idx].append(patch_str)
    
    # For each file, select the first candidate with a valid (non-None) patch.
    final_completion_texts: List[str] = ["" for _ in file_content_strings]
    final_patch_strings: List[Optional[str]] = [None for _ in file_content_strings]
    
    for idx in inference_idx_to_input_idx:
        selected_patch = None
        selected_completion = ""
        for comp_text, patch_str in zip(candidate_completion_texts[idx], candidate_patch_strings[idx]):
            if patch_str is not None:
                selected_patch = patch_str
                selected_completion = comp_text
                break
        final_patch_strings[idx] = selected_patch
        final_completion_texts[idx] = selected_completion

    return final_completion_texts, final_patch_strings


In [12]:
from pathlib import Path

verifying_prompt: str = (
    """
This is the problem statement.

{problem_statement}

These are the files that is thought to be relevant, which may not be complete.

{file_content_string}

This is the proposed patch to fix the problem.

{patch_string}

Evaluate whether the patch works
- The patch fully fixes the problem described in the problem statement.
- The patch does not cause side effects and make any other tests fail.

End your response with exactly either of
- <label>Yes</label>, this fixes the problem.
- <label>No</label>, this does not fix the problem.

Reminder
- Only evaluate, do not provide suggestion on how to fix.
- Remember to write exactly either of <label>Yes</label> or <label>No</label> in the last line
""".strip()
)


def is_valid_patch_format(patch_string: str) -> bool:
    """
    A quick check to confirm if a patch could be valid.
    """
    if not(isinstance(patch_string, str)):
        return False
    try:
        patch_set = unidiff.PatchSet(patch_string)
        if len(patch_set) == 0:
            return False
    except Exception:
        return False
    return True


def patch_dry_run_succeeds(patch_string: str, repo_path: str = REPO_PATH, timeout: int = 60) -> bool:
    """
    A robust check if the patch will proceed without any errors.
    Should be run after `is_valid_patch_format()`: the patch
    command can hang if the inputs are sufficiently invalid.

    Args:
        patch_path: Path to a file containing the patch.
        repo_path: Path to the directory to be patched.
        timeout: Number of seconds before the dry run will be cancelled.
    """
    with open("patch.txt", "w") as f:
        f.write(patch_string)
    patch_path = "/kaggle/working/patch.txt"

    cmd = f"patch --quiet --dry-run -p1 -i {patch_path} -d {repo_path}"
    try:
        subprocess.run(cmd, shell=True, check=True, timeout=timeout)
        return True
    except subprocess.CalledProcessError:
        return False


def get_verification(
    problem_statement: str,
    file_content_strings: List[str],
    patch_strings: List[Optional[str]],
    repo_path: str,
) -> Tuple[List[List[str]], List[List[bool]]]:
    assert len(file_content_strings) == len(patch_strings)
    sampling_params: SamplingParams = SamplingParams(
        temperature=0.6,  # randomness of the sampling
        min_p=0.01,
        skip_special_tokens=True,  # Whether to skip special tokens in the output
        max_tokens=MAX_TOKENS,
    )

    inference_idx_to_input_idx: list[int] = [
        input_idx
        for _ in range(VALIDATION_COPY_COUNT)
        for input_idx, patch_string in enumerate(patch_strings)
        if patch_string is not None and is_valid_patch_format(patch_string) # and patch_dry_run_succeeds(patch_string, repo_path)
    ]
    print(inference_idx_to_input_idx)

    list_of_messages: List[List[Dict[str, str]]] = [
        [
            {
                "role": "user",
                "content": verifying_prompt.format(
                    problem_statement=problem_statement[:20_000],
                    file_content_string=file_content_strings[input_idx][:30_000],
                    patch_string=patch_strings[input_idx],
                ),
            },
        ]
        for input_idx in inference_idx_to_input_idx
    ]

    prompt_texts: List[str] = [
        (
            tokenizer.apply_chat_template(
                conversation=messages, tokenize=False, add_generation_prompt=True
            )  # type: ignore
        )
        + "<think>\n"
        for messages in list_of_messages
    ]
    # print(prompt_texts)

    print("get_verification", [count_tokens(text) for text in prompt_texts])
    request_outputs: list[RequestOutput] = llm.generate(
        prompt_texts, sampling_params=sampling_params
    )
    response_texts: List[str] = [
        request_output.outputs[0].text for request_output in request_outputs
    ]
    print("get_verification", [count_tokens(text) for text in response_texts])

    completion_texts = [
        prompt_text + response_text
        for prompt_text, response_text in zip(prompt_texts, response_texts)
    ]
    judgments_flattened: List[bool] = [
        "<label>Yes</label>" in response_text for response_text in response_texts
    ]
    print(judgments_flattened)

    judgments_aggregated: List[List[bool]] = [[] for _ in file_content_strings]
    completion_text_aggregated: List[List[str]] = [[] for _ in patch_strings]
    for inference_idx, (completion_text, judgement) in enumerate(
        zip(completion_texts, judgments_flattened)
    ):
        input_idx = inference_idx_to_input_idx[inference_idx]
        completion_text_aggregated[input_idx].append(completion_text)
        judgments_aggregated[input_idx].append(judgement)
    print(judgments_aggregated)

    return completion_text_aggregated, judgments_aggregated

In [13]:
import unidiff
import subprocess


def choose_patch_string(
    patch_strings: list[Optional[str]], judgments_aggregated: List[List[bool]], repo_path: str
) -> tuple[list[int], Optional[str]]:
    best_score = -4
    best_patch_string = None

    scores = []
    for judgments, patch_string in zip(judgments_aggregated, patch_strings):

        if patch_string is None:
            score = -3
            scores.append(score)
            continue
        
        if not is_valid_patch_format(patch_string):
            score = -2
            scores.append(score)
            continue
        
        if not patch_dry_run_succeeds(patch_string, repo_path):
            score = -1
            scores.append(score)
            continue
        
        score = judgments.count(True)
        scores.append(score)
        
        if score > best_score:
            best_score = score
            best_patch_string = patch_string

    return scores, best_patch_string

# Predict function

In [14]:
def predict_inner(problem_statement: str, directory: str) -> Optional[str]:
    directory_string = stringify_directory(directory)
    
    # Run file selection once
    selection_completion_texts, file_queries = get_selection_query(
        directory_string, problem_statement
    )
    file_content_strings: List[str] = [
        fetch_file_contents(file_query) for file_query in file_queries
    ]
    
    max_iterations = 3  # Maximum number of refinement iterations
    best_patch = None
    best_score = -float("inf")
    final_iteration_data = None  # Will hold data from the best iteration
    
    # Iterative refinement loop
    for iteration in range(max_iterations):
        print(f"Iteration {iteration + 1} of {max_iterations}")
        # Generate patch candidates (you might also adjust sampling parameters here)
        patch_completion_texts, patch_strings = get_patch_string(
            problem_statement, file_content_strings
        )
        # Verify the generated patches
        verification_completion_texts_aggregated, judgments_aggregated = get_verification(
            problem_statement, file_content_strings, patch_strings, directory
        )
        # Choose the best patch from the current iteration
        scores, patch_candidate = choose_patch_string(patch_strings, judgments_aggregated, directory)
        
        # For simplicity, we use the maximum score from this iteration (could also use an average)
        iteration_best = max(scores) if scores else -float("inf")
        print(f"Iteration {iteration + 1} best score: {iteration_best}")
        
        # Update our best candidate if this iteration is better
        if iteration_best > best_score:
            best_score = iteration_best
            best_patch = patch_candidate
            final_iteration_data = {
                "patch_completion_text": patch_completion_texts,
                "patch_strings": patch_strings,
                "scores": scores,
                "judgments_aggregated": judgments_aggregated,
            }
        
        # Break early if a satisfactory patch is found (e.g., at least one positive judgment)
        if best_score >= 1:
            print("Satisfactory patch found; breaking early.")
            break

    # Logging the iteration data if not in a competition rerun.
    if not os.getenv("KAGGLE_IS_COMPETITION_RERUN") and final_iteration_data is not None:
        data = {
            "problem_statement": [problem_statement] * len(file_queries),
            "selection_completion_text": selection_completion_texts,
            "selection_completion_length": [
                count_tokens(text) for text in selection_completion_texts
            ],
            "file_query": file_queries,
            "file_content_string": file_content_strings,
            "patch_completion_text": final_iteration_data["patch_completion_text"],
            "patch_completion_length": [
                count_tokens(text) for text in final_iteration_data["patch_completion_text"]
            ],
            "patch_string": final_iteration_data["patch_strings"],
            "score": final_iteration_data["scores"],
            "best_score": best_score,
        }
        # Optionally log verification details for each copy.
        for copy_idx in range(VALIDATION_COPY_COUNT):
            data[f"verification_completion_text_{copy_idx}"] = [
                completion_texts[copy_idx] if completion_texts else None
                for completion_texts in verification_completion_texts_aggregated
            ]
            data[f"verification_completion_length_{copy_idx}"] = [
                count_tokens(completion_texts[copy_idx]) if completion_texts else None
                for completion_texts in verification_completion_texts_aggregated
            ]
            data[f"judgment_{copy_idx}"] = [
                judgments[copy_idx] if judgments else None
                for judgments in judgments_aggregated
            ]
        data["judgment_count_true"] = [judgments.count(True) for judgments in judgments_aggregated]
        pd.DataFrame(data).to_csv(
            f"{str(int(time.time() - start_time)).zfill(5)}.csv", index=False
        )
    
    return best_patch


In [15]:
import io
from typing import Optional, List

skip_prediction: bool = False


def predict(
    problem_statement: str,
    repo_archive: io.BytesIO,
    pip_packages_archive: io.BytesIO,
    env_setup_cmds_templates: List[str],
) -> Optional[str]:
    """Replace this function with your inference code.
    Args:
        problem_statement: The text of the git issue.
        repo_archive: A BytesIO buffer path with a .tar containing the codebase that must be patched. The gateway will make this directory available immediately before this function runs.
    """
    global skip_prediction
    if skip_prediction:
        return None

    with open("repo_archive.tar", "wb") as f:
        f.write(repo_archive.read())
    repo_path: str = REPO_PATH
    if os.path.exists(repo_path):
        shutil.rmtree(repo_path)
    shutil.unpack_archive("repo_archive.tar", extract_dir=repo_path)
    os.remove("repo_archive.tar")

    patch_string: Optional[str] = None
    patch_string = predict_inner(
        problem_statement=problem_statement, directory=repo_path
    )
    shutil.rmtree(repo_path)

    if not os.getenv("KAGGLE_IS_COMPETITION_RERUN"):
        skip_prediction = True

    print("submitted patch_string")
    print(patch_string)

    if patch_string is None:
        return None

    return patch_string

# Get predict data without server

In [16]:
# import os
# import zipfile

# # !mkdir -p /kaggle/tmp/konwinski-prize-alt
# os.makedirs("/kaggle/tmp/konwinski-prize-alt", exist_ok=True)

# # !unzip -q -o /kaggle/input/konwinski-prize/data.a_zip -d /kaggle/tmp/konwinski-prize-alt/ 2>/dev/null || true
# try:
#     with zipfile.ZipFile("/kaggle/input/konwinski-prize/data.a_zip", "r") as zip_ref:
#         zip_ref.extractall("/kaggle/tmp/konwinski-prize-alt/")
# except:
#     pass

In [17]:
# import pandas as pd


# def get_problem(problem_index: int) -> Tuple[str, str, io.BytesIO]:
#     df = pd.read_parquet("/kaggle/tmp/konwinski-prize-alt/data/data.parquet")

#     problem_statement: str = df["problem_statement"][problem_index]
#     repo_path: str = (
#         f"/kaggle/tmp/konwinski-prize-alt/data/repos/repo__{df['instance_id'][problem_index]}"
#     )

#     import shutil
#     import tempfile

#     with tempfile.TemporaryDirectory() as tmpdir:
#         shutil.make_archive(os.path.join(tmpdir, "a_repo"), "tar", repo_path)
#         with open(os.path.join(tmpdir, "a_repo.tar"), "rb") as f:
#             repo_archive = io.BytesIO(f.read())

#     return problem_statement, repo_path, repo_archive

In [18]:
# demo_problem_index: int = 0

# if os.getenv("KAGGLE_KERNEL_RUN_TYPE") == "Interactive" and not os.getenv(
#     "KAGGLE_IS_COMPETITION_RERUN"
# ):
#     problem_statement, repo_path, repo_archive = get_problem(
#         problem_index=demo_problem_index
#     )

#     print(repo_path)
#     print(problem_statement)
#     print(len(list(repo_archive)))
#     print(len(list(repo_archive)))

In [19]:
# if os.getenv("KAGGLE_KERNEL_RUN_TYPE") == "Interactive" and not os.getenv(
#     "KAGGLE_IS_COMPETITION_RERUN"
# ):
#     skip_prediction = False
#     problem_statement, repo_path, repo_archive = get_problem(
#         problem_index=demo_problem_index
#     )
#     patch_string = predict(problem_statement, repo_archive, io.BytesIO(), [])

In [20]:
# if (
#     os.getenv("KAGGLE_KERNEL_RUN_TYPE") == "Interactive"
#     and not os.getenv("KAGGLE_IS_COMPETITION_RERUN")
#     and patch_string is not None
# ):
#     import polars as pl

#     df = pl.read_parquet("/kaggle/tmp/konwinski-prize-alt/data/data.parquet")

#     import kaggle_evaluation.konwinski_prize_gateway

#     k_prize_gateway = kaggle_evaluation.konwinski_prize_gateway.KPrizeGateway()
#     k_prize_gateway.unpack_data_paths()

#     results = k_prize_gateway._evaluate_instance(
#         instance=df.row(demo_problem_index, named=True),
#         patch=patch_string,
#     )

#     from collections import Counter
#     print(
#         demo_problem_index, Counter(result.unit_test_outcome for result in results[1:])
#     )

In [21]:
# if (
#     os.getenv("KAGGLE_KERNEL_RUN_TYPE") == "Interactive"
#     and not os.getenv("KAGGLE_IS_COMPETITION_RERUN")
#     and patch_string is not None
# ):
#     from kaggle_evaluation.konwinski_prize_gateway import UnitTestOutcome

#     for result in results[1:]:
#         if result.unit_test_outcome != UnitTestOutcome.PASSED:
#             print(result.test_name)
#             print(result.fail_description)
            

When your notebook is run on the hidden test set, inference_server.serve must be called within 15 minutes of the notebook starting or the gateway will throw an error. If you need more than 15 minutes to load your model you can do so during the very first predict call, which does not have the usual 30 minute response deadline.

# Evaluation with inference server

In [22]:
skip_prediction = False

In [23]:
inference_server = (
    kaggle_evaluation.konwinski_prize_inference_server.KPrizeInferenceServer(
        get_number_of_instances, predict
    )
)

if os.getenv("KAGGLE_IS_COMPETITION_RERUN"):
    inference_server.serve()
else:
    inference_server.run_local_gateway(
        data_paths=(
            "/kaggle/input/konwinski-prize/",  # Path to the entire competition dataset
            "/kaggle/tmp/konwinski-prize/",  # Path to a scratch directory for unpacking data.a_zip.
        ),  # type: ignore
        
        use_concurrency=True,
    )

Existing uv installation found. Skipping uv installation.
Installing Python 3.11...


GatewayRuntimeError: (<GatewayRuntimeErrorType.SERVER_RAISED_EXCEPTION: 3>, 'Cannot use chat template functions because tokenizer.chat_template is not set and no template argument was passed! For information about writing templates and setting the tokenizer.chat_template attribute, please see the documentation at https://huggingface.co/docs/transformers/main/en/chat_templating')