### Table of Content
- [Load dataset](#load-dataset)
- [Exploratory Data Analysis](#Exploratory-Data-Analysis)
- [Setup LLM](#Setup-LLM)
- [Analyze repo issues & Retrieve relevant codes](#Analyze-repo-issues-&-Retrieve-relevant-codes)
- [Retrieve relevant codes](#Retrieve-relevant-codes)
- [Generate patches](#Generate-patches)
- [Validate patches](#Validate-patches)

In [1]:
import os
import pandas as pd

# Define paths
comp_dir = "konwinski-prize"
comp_kaggle_evaluation_dir = os.path.join(comp_dir, "kaggle_evaluation")
comp_kprize_setup_dir = os.path.join(comp_dir, "kprize_setup")

comp_data_zip_path = os.path.join(comp_dir, "data.a_zip")
comp_data_dir = os.path.join(comp_dir, "data")
comp_data_parquet_path = os.path.join(comp_data_dir, "data.parquet")
comp_conda_packages_dir = os.path.join(comp_data_dir, "conda_packages")
comp_pip_packages_dir = os.path.join(comp_data_dir, "pip_packages")
comp_repo_configs_dir = os.path.join(comp_data_dir, "repo_configs")
comp_repos_dir = os.path.join(comp_data_dir, "repos")

### Load dataset

From the competition readme and our earlier investigation we know that the dataframe contains the following:

**instance_id (string)**
- Unique string identifier for each instance (GitHub issue)

**repo (string)**
- The GitHub repository relevant to the issue
- Also accessible through the evaluation API

**problem_statement (string)**
- Textual description of the issue
- Also accessible through the evaluation API

**patch (string)**
- The patch that resolves the issue
- Only provided in the train set

**test_patch (string)**
- The patch that resolves the issue
- Only provided in the train set

**pull_number (int)**
- The pull request number that resolved the issue

**base_commit (string)**
- The commit used as the foundation for the provided repository copy

**issue_numbers (int)**
- The original ID number of the GitHub issue

**[PASS_TO_PASS/FAIL_TO_PASS] (list)**
- Lists containing unit tests to be executed for this issue


In [2]:
# Load dataset
kprize_df = pd.read_parquet(comp_data_parquet_path)
kprize_df

Unnamed: 0,instance_id,repo,problem_statement,patch,test_patch,pull_number,base_commit,PASS_TO_PASS,FAIL_TO_PASS,issue_numbers
0,pylint-dev__astroid-2496,pylint-dev/astroid,TypeError: unsupported format string passed to...,diff --git a/ChangeLog b/ChangeLog\nindex 4560...,diff --git a/tests/test_inference.py b/tests/t...,2496,8d3cdbbe6685fd8cf211816bec56c90f38f1859e,[tests/test_inference.py::InferenceUtilsTest::...,[tests/test_inference.py::test_formatted_fstri...,[2492]
1,pylint-dev__astroid-2468,pylint-dev/astroid,Pylint checks against incorrect type with prop...,diff --git a/ChangeLog b/ChangeLog\nindex fdbb...,diff --git a/tests/test_inference.py b/tests/t...,2468,6db3a60553ff538a936d5dda23d67a3924a57f45,[tests/test_inference.py::InferenceUtilsTest::...,[tests/test_inference.py::InferenceTest::test_...,[2467]
2,astropy__astropy-17048,astropy/astropy,QTable cannot take `dimensionless_unscaled` wh...,diff --git a/astropy/table/table.py b/astropy/...,diff --git a/astropy/table/tests/test_table.py...,17048,d60f6b72cd525262bfd179331d9fe4474177918f,[astropy/table/tests/test_table.py::TestSetTab...,[astropy/table/tests/test_table.py::test_qtabl...,[17047]
3,astropy__astropy-16898,astropy/astropy,BUG: tables do not deal well with zero-sized s...,diff --git a/astropy/io/registry/core.py b/ast...,diff --git a/astropy/io/fits/tests/test_connec...,16898,ee6d087baf301c1d08db92e6e5b6d909d57e6fac,[astropy/io/fits/tests/test_connect.py::TestSi...,[astropy/io/fits/tests/test_connect.py::test_z...,[16897]
4,astropy__astropy-16830,astropy/astropy,KeyError: 'version_1_3_or_later' when parsing ...,diff --git a/astropy/io/votable/tree.py b/astr...,diff --git a/astropy/io/votable/tests/test_tre...,16830,e39f486fec48d87aa3677326167954370d7a7bf9,[astropy/io/votable/tests/test_tree.py::test_c...,[astropy/io/votable/tests/test_tree.py::test_v...,"[16825, 16826]"
5,astropy__astropy-16812,astropy/astropy,Provide a way to make a copy of a model with d...,diff --git a/astropy/modeling/core.py b/astrop...,diff --git a/astropy/modeling/tests/test_core....,16812,c241103c11954d3c1cfe3c1840b1ece72479c522,[astropy/modeling/tests/test_core.py::test_Mod...,[astropy/modeling/tests/test_core.py::test_res...,[16593]


### Exploratory Data Analysis

In [3]:
from rich import print as rprint

In [4]:
rprint(f"{kprize_df.shape=}\n")

In [5]:
rprint("\nAny Missing Values?\n")
rprint(kprize_df.isnull().sum())

In [6]:
rprint("\nDatatypes?\n")
kprize_df.info(show_counts=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6 entries, 0 to 5
Data columns (total 10 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             --------------  ----- 
 0   instance_id        6 non-null      object
 1   repo               6 non-null      object
 2   problem_statement  6 non-null      object
 3   patch              6 non-null      object
 4   test_patch         6 non-null      object
 5   pull_number        6 non-null      int64 
 6   base_commit        6 non-null      object
 7   PASS_TO_PASS       6 non-null      object
 8   FAIL_TO_PASS       6 non-null      object
 9   issue_numbers      6 non-null      object
dtypes: int64(1), object(9)
memory usage: 608.0+ bytes


In [7]:
rprint("\nRepo Distribution?\n")
rprint(kprize_df['repo'].value_counts())

In [8]:
# Fixes can reference more than one GitHub issue.
rprint("\nNumber of Issues per PR\n")
kprize_df["issue_numbers"].apply(len).value_counts()

1    5
2    1
Name: issue_numbers, dtype: int64

In [9]:
kprize_df['problem_statement_length'] = kprize_df['problem_statement'].apply(lambda x: len(x.split()))
rprint("\nProblem Statement Lengths\n")
display(kprize_df['problem_statement_length'].describe())

count      6.000000
mean     297.166667
std      162.149828
min       72.000000
25%      171.750000
50%      358.000000
75%      407.000000
max      462.000000
Name: problem_statement_length, dtype: float64

In [10]:
rprint("\nPatch Lengths\n")
kprize_df['patch_length'] = kprize_df['patch'].apply(lambda x: len(x))
kprize_df['test_patch_length'] = kprize_df['test_patch'].apply(lambda x: len(x))
display(kprize_df[['patch_length', 'test_patch_length']].describe())

Unnamed: 0,patch_length,test_patch_length
count,6.0,6.0
mean,2337.833333,2255.833333
std,1380.618328,629.710542
min,912.0,1339.0
25%,1382.25,2069.5
50%,2195.0,2214.5
75%,2723.5,2405.25
max,4714.0,3277.0


In [11]:
rprint("\nTest Counts\n")
kprize_df['PASS_TO_PASS_count'] = kprize_df['PASS_TO_PASS'].apply(len)
kprize_df['PASS_TO_PASS_count'] = kprize_df['FAIL_TO_PASS'].apply(len)
display(kprize_df[['PASS_TO_PASS', 'FAIL_TO_PASS', 'PASS_TO_PASS_count', 'PASS_TO_PASS_count']])

Unnamed: 0,PASS_TO_PASS,FAIL_TO_PASS,PASS_TO_PASS_count,PASS_TO_PASS_count.1
0,[tests/test_inference.py::InferenceUtilsTest::...,[tests/test_inference.py::test_formatted_fstri...,2,2
1,[tests/test_inference.py::InferenceUtilsTest::...,[tests/test_inference.py::InferenceTest::test_...,3,3
2,[astropy/table/tests/test_table.py::TestSetTab...,[astropy/table/tests/test_table.py::test_qtabl...,3,3
3,[astropy/io/fits/tests/test_connect.py::TestSi...,[astropy/io/fits/tests/test_connect.py::test_z...,2,2
4,[astropy/io/votable/tests/test_tree.py::test_c...,[astropy/io/votable/tests/test_tree.py::test_v...,1,1
5,[astropy/modeling/tests/test_core.py::test_Mod...,[astropy/modeling/tests/test_core.py::test_res...,2,2


Look at an example problem from the kprize data

In [124]:
import pathlib
from rich.filesize import decimal
from rich.markup import escape
from rich.text import Text
from rich.tree import Tree
from typing import Iterable, Optional, Literal, Any, List, Dict, Union, Tuple, Generator


default_ignore_list: Iterable[str] = ".ipynb_checkpoints", ".DS_Store", ".git", ".idea", ".coverage", ".pytest_cache"


def get_directory_tree(
        directory: pathlib.Path,
        tree: Optional[Tree] = None,
        show_hidden: bool = False,
        inplace: bool = False,
        ignore_list_extras: Iterable[str] = (),
) -> Optional[Tree]:
    """Recursively build a Tree with directory contents.

    Args:
        directory (pathlib.Path): The directory to walk.
        tree (Tree, optional):
            The Tree object to build.
            If not provided, a new Tree is created.
        show_hidden (bool, optional):
            Whether to show hidden files.
        inplace (bool, optional):
            Whether to print the tree in place.
            If False, the tree is returned.
        ignore_list_extras (Iterable[str], optional):
            Additional file extensions to ignore.

    Returns:
        If inplace is False, the Tree object with the directory contents.
        Else, None.
    """
    # Get the ignore list that includes the default and any extras
    ignore_list = sorted(set(default_ignore_list) | set(ignore_list_extras))

    # Create a new Tree if one is not provided
    tree = tree or Tree(label=f"[bold]{directory!s}[/bold] File Tree")  # type: ignore

    # Sort dirs first then by filename
    paths = sorted(
        pathlib.Path(directory).iterdir(),
        key=lambda path: (path.is_file(), path.name.lower()),
    )

    # Sort dirs first then by filename
    for path in paths:

        # Remove hidden files if show_hidden is False
        if path.name.startswith(".") and not show_hidden:
            continue

        # Skip files in the ignore list by suffix or name
        if path.is_file() and (path.suffix in ignore_list or path.name in ignore_list):
            continue

        # Skip directories only by name (not suffix)
        if path.is_dir() and path.name in ignore_list:
            continue

        # Add the directory to the tree
        if path.is_dir():

            # Style directories starting with "__" differently
            style = "dim" if path.name.startswith("__") else ""

            # Add the directory to the tree
            branch = tree.add(
                f"[bold magenta]:open_file_folder: [link file://{path}]{escape(path.name)}",
                style=style,
                guide_style=style,
            )
            get_directory_tree(path, branch)

        # Add the file to the tree
        else:
            main_style = "dim green" if path.name.startswith("_") else "green"
            ext_style = "dim red" if path.name.startswith("_") else "bold red"
            file_size_style = "dim blue" if path.name.startswith("_") else "blue"
            text_filename = Text(path.name, main_style)
            text_filename.highlight_regex(r"\..*$", ext_style)
            text_filename.stylize(f"link file://{path}")
            text_filename.append(f" ({decimal(path.stat().st_size)})", file_size_style)
            if path.suffix == ".py":
                icon = "🐍 "
            elif path.suffix == ".ipynb":
                icon = "🐍📓 "
            elif path.suffix == ".sh":
                icon = "🔧 "
            elif ".env" in path.name.lower():
                icon = "🔑 "
            elif path.suffix == ".csv":
                icon = "📊 "
            elif path.suffix in [".yaml", ".yml", ".json"]:
                icon = "📜 "
            elif path.suffix in [".txt", ".md"]:
                icon = "📝 "
            elif path.suffix in [".png", ".jpg", ".jpeg", ".gif", ".svg"]:
                icon = "🖼️ "
            elif path.suffix in [".zip", ".tar", ".gz", ".7z"]:
                icon = "📦 "
            elif path.suffix in [".pdf"]:
                icon = "📰 "
            elif path.suffix in [".mp4", ".avi", ".mov", ".mkv"]:
                icon = "🎥 "
            elif path.suffix in [".mp3", ".wav", ".flac"]:
                icon = "🎵 "
            elif path.suffix in [".html", ".css", ".js"]:
                icon = "🌐 "
            elif path.suffix in [".exe", ".msi"]:
                icon = "🛠️ "
            elif path.suffix in [".docx", ".pptx", ".xlsx"]:
                icon = "📄 "
            elif path.suffix in [".parquet", ".feather"]:
                icon = "🧼 "
            elif path.suffix in [".db", ".sqlite", ".sql", ".jsonl"]:
                icon = "🗄️ "
            else:
                icon = "📄 "

            # Prefix hidden files with a "🤫" emoji
            if path.name.startswith("."):
                icon = "🤫"+icon

            # Add the file to the tree (with icon prefix)
            tree.add(Text(icon) + text_filename)

    # If inplace is False, return the tree... otherwise the Tree object is updated in place
    if not inplace:
        return tree
    return None

The history saving thread hit an unexpected error (OperationalError('attempt to write a readonly database')).History will not be written to the database.


In [75]:
idx = 3
row = kprize_df.iloc[idx]
problem_statement = row["problem_statement"]
instance_id = row["instance_id"]
p2p_tests = row["PASS_TO_PASS"]
f2p_tests = row["FAIL_TO_PASS"]
repo_path = os.path.join(comp_repos_dir, f'repo__{instance_id}')

In [14]:
# Display tree
tree = get_directory_tree(repo_path)
rprint(tree)

In [15]:
# Display problem_statement
rprint(problem_statement)

In [16]:
# Display current row
display(pd.DataFrame(row).T)

Unnamed: 0,instance_id,repo,problem_statement,patch,test_patch,pull_number,base_commit,PASS_TO_PASS,FAIL_TO_PASS,issue_numbers,problem_statement_length,patch_length,test_patch_length,PASS_TO_PASS_count
3,astropy__astropy-16898,astropy/astropy,BUG: tables do not deal well with zero-sized s...,diff --git a/astropy/io/registry/core.py b/ast...,diff --git a/astropy/io/fits/tests/test_connec...,16898,ee6d087baf301c1d08db92e6e5b6d909d57e6fac,[astropy/io/fits/tests/test_connect.py::TestSi...,[astropy/io/fits/tests/test_connect.py::test_z...,[16897],315,2203,2463,2


### Setup LLM

In [17]:
import openai

In [18]:
def set_openai_key(path_to_key: str = "/home/loc/Documents/keys/OPENAI_API_KEY.txt") -> None:
    """
    Sets the OpenAI API key from a file to an environment variable.

    Args:
        path_to_key (str): Path to the file containing the OpenAI API key.
                           Default is '/home/loc/Documents/keys/OPENAI_API_KEY.txt'.
    """
    # Check if the path exists
    if os.path.exists(path_to_key):
        with open(path_to_key, "r") as f:
            api_key = f.read().strip()  # Read and strip any extra whitespace/newlines
        os.environ["OPENAI_API_KEY"] = api_key  # Set the environment variable
        print(f"API key set successfully.")
    else:
        raise FileNotFoundError(f"{path_to_key} does not exist!")  # Use a proper exception

In [19]:
def test_openai_api(model_name: str = "gpt-4o") -> None:
    """
    Tests the OpenAI API by generating a chat completion with a simple prompt.

    Args:
        model_name (str): The name of the model to use. Default is 'gpt-4o'.
    """
    try:
        client = openai.OpenAI(
            api_key=os.environ.get("OPENAI_API_KEY"),  # This is the default and can be omitted
        )

        
        response = client.chat.completions.create(
            messages=[
                {
                    "role": "user",
                    "content": "Hello assistant!",
                }
            ],
            model=model_name,
        )

        # Access the assistant's response content
        response_content = response.choices[0].message.content
        print(response_content)
        
    except Exception as e:
        print(f"An error occurred: {e}")

In [20]:
set_openai_key()

API key set successfully.


In [21]:
test_openai_api()

Hello! How can I assist you today?


In [22]:
import random

def format_gpt_message(role: Literal["user", "assistant", "system"], content: str) -> dict:
    """Format a single message for the OpenAI GPT API."""
    if role not in ["user", "assistant", "system"]:
        raise ValueError("Role must be 'user', 'assistant', or 'system'")
    return {"role": role, "content": content}
    
def to_gpt(message: str,
           system_message: Optional[str] = None,
           messages: Optional[List[Dict[str, str]]] = None,
           model_name: str = "gpt-4o",
           min_temp: float = 0.6,
           max_temp: float = 1.5,
           min_top_p: float = 0.6,
           max_top_p: float = 1.0,
           max_tokens: int = 500) -> str:
    
    # Initialize OpenAI Client
    client = openai.Client(api_key=os.getenv("OPENAI_API_KEY"))

    # Clone messages list to prevent modification of original list
    messages = list(messages) if messages else []

    # Handle system message
    if system_message:
        has_system = any(msg["role"] == "system" for msg in messages)

        if has_system:
            for msg in messages:
                if msg["role"] == "system":
                    msg["content"] = system_message
                    break
        else:
            messages.insert(0, format_gpt_message(role="system", content=system_message))

    # Append user message
    messages.append(format_gpt_message(role="user", content=message))

    # Ensure temperature is within OpenAI's valid range [0, 2]
    temperature = min(max(random.uniform(min_temp, max_temp), 0.0), 2.0)

    # Ensure top_p is within [0, 1]
    top_p = min(max(random.uniform(min_top_p, max_top_p), 0.0), 1.0)

    try:
        # Call OpenAI API
        response = client.chat.completions.create(
            model=model_name,
            messages=messages,
            temperature=temperature,
            top_p=top_p,
            max_tokens=max_tokens
        )

        # Ensure response contains valid choices
        if not response.choices:
            return "No response received from GPT."

        return response.choices[0].message.content

    except openai.OpenAIError as e:
        return f"API Error: {str(e)}"

Test `to_gpt` function

In [23]:
messages = [
    {
        "role": "system",
        "content": "You are an expert software engineer helping users with coding questions."
    },
    {
        "role": "user",
        "content": "How do I implement a binary search tree in Python?"
    },
    {
        "role": "assistant",
        "content": "Here's an implementation of a binary search tree in Python:\n\n```python\nclass Node:\n    def __init__(self, key):\n        self.left = None\n        self.right = None\n        self.val = key\n\nclass BST:\n    def __init__(self):\n        self.root = None\n    \n    def insert(self, root, key):\n        if root is None:\n            return Node(key)\n        if key < root.val:\n            root.left = self.insert(root.left, key)\n        else:\n            root.right = self.insert(root.right, key)\n        return root\n```"
    },
]

text = to_gpt(message="Can you add a search function?",
       messages=messages
      )

print(text)

Certainly! Here is how you can add a search function to the binary search tree (BST) implementation:

```python
class Node:
    def __init__(self, key):
        self.left = None
        self.right = None
        self.val = key

class BST:
    def __init__(self):
        self.root = None
    
    def insert(self, root, key):
        if root is None:
            return Node(key)
        if key < root.val:
            root.left = self.insert(root.left, key)
        else:
            root.right = self.insert(root.right, key)
        return root

    def search(self, root, key):
        # Base cases: root is null or key is present at root
        if root is None or root.val == key:
            return root

        # Key is greater than root's key
        if key < root.val:
            return self.search(root.left, key)

        # Key is smaller than root's key
        return self.search(root.right, key)

# Example usage:
bst = BST()
bst.root = bst.insert(bst.root, 50)
bst.insert(bst.root, 3

In [24]:
messages = [
    {
        "role": "system",
        "content": "You are an expert software engineer helping users with coding questions."
    },
    {
        "role": "user",
        "content": "How do I implement a binary search tree in Python?"
    },
    {
        "role": "assistant",
        "content": "Here's an implementation of a binary search tree in Python:\n\n```python\nclass Node:\n    def __init__(self, key):\n        self.left = None\n        self.right = None\n        self.val = key\n\nclass BST:\n    def __init__(self):\n        self.root = None\n    \n    def insert(self, root, key):\n        if root is None:\n            return Node(key)\n        if key < root.val:\n            root.left = self.insert(root.left, key)\n        else:\n            root.right = self.insert(root.right, key)\n        return root\n```"
    },
]

text = to_gpt(message="Can you add a search function?",
       messages=messages
      )

print(text)

Certainly! Here's how you can add a search function to the binary search tree implementation:

```python
class Node:
    def __init__(self, key):
        self.left = None
        self.right = None
        self.val = key

class BST:
    def __init__(self):
        self.root = None
    
    def insert(self, root, key):
        if root is None:
            return Node(key)
        if key < root.val:
            root.left = self.insert(root.left, key)
        else:
            root.right = self.insert(root.right, key)
        return root
    
    def search(self, root, key):
        # Base case: root is null or key is present at root
        if root is None or root.val == key:
            return root

        # Key is greater than root's key
        if key > root.val:
            return self.search(root.right, key)

        # Key is smaller than root's key
        return self.search(root.left, key)

# Example usage:
bst = BST()
bst.root = bst.insert(bst.root, 50)
bst.insert(bst.root, 30)
b

In [25]:
# Test case change system message
messages = [
    {
        "role": "system",
        "content": "You are an expert software engineer helping users with coding questions."
    },
    {
        "role": "user",
        "content": "How do I implement a binary search tree in Python?"
    },
    {
        "role": "assistant",
        "content": "Here's an implementation of a binary search tree in Python:\n\n```python\nclass Node:\n    def __init__(self, key):\n        self.left = None\n        self.right = None\n        self.val = key\n\nclass BST:\n    def __init__(self):\n        self.root = None\n    \n    def insert(self, root, key):\n        if root is None:\n            return Node(key)\n        if key < root.val:\n            root.left = self.insert(root.left, key)\n        else:\n            root.right = self.insert(root.right, key)\n        return root\n```"
    },
]

text = to_gpt(system_message="You are a 5-year-old child with no coding knowledge. If asked to write code, politely decline.",
        message="Can you add a search function?",
        messages=messages
      )

print(text)

I'm just a little kid, so I don't know how to write code. Maybe you can ask a grown-up for help!


### Analyze repo issues & Retrieve relevant codes

| #  | Function Name                            | Usage Description                                                                                          | Calls Other Functions              |
|----|------------------------------------------|------------------------------------------------------------------------------------------------------------|------------------------------------|
| 1  | `get_lines_from_file`                    | Reads specific lines from a file and returns them as a string or list.                                      | N/A                                |
| 2  | `_normalize_imports`                     | Extracts and normalizes Python import statements from a list of lines.                                      | N/A                                |
| 3  | `_collect_imports_from_lines`            | Collects and normalizes import statements from lines of code.                                               | `_normalize_imports`               |
| 4  | `search_code`                            | Searches for a string in Python files and optionally extracts surrounding context and imports.               | `_collect_imports_from_lines`      |
| 5  | `_get_indent_level`                      | Determines indentation level of a given line.                                                               | N/A                                |
| 6  | `_extract_entire_definition`             | Extracts the full definition block (function/class) from a list of lines.                                   | `_get_indent_level` (second variant only) |
| 7  | `_find_method_block_in_lines`            | Finds the start and end indices of a method definition within a block of class lines.                      | `_get_indent_level`                |
| 8  | `_extract_class_up_to_init_or_method`    | Extracts class definition lines up to `__init__` or a specific method.                                      | `_extract_entire_definition`, `_find_method_block_in_lines` |
| 9  | `_parse_class_and_method`                | Splits an object name into class and method parts if applicable.                                            | N/A                                |
| 10 | `get_object_definition`                  | Searches for a function, class, or method definition in a codebase and extracts relevant portions.          | `_parse_class_and_method`, `_collect_imports_from_lines`, `_extract_entire_definition`, `_find_method_block_in_lines`, `_extract_class_up_to_init_or_method`, `_get_indent_level` |
| 11 | `process_instructions`                   | Parses JSON instructions and processes each step to retrieve code snippets or definitions.                  | `search_code`, `get_lines_from_file`, `get_object_definition` |


In [26]:
def get_lines_from_file(file_path: str,
                        line_range: Optional[str],
                        as_list: bool = False) -> str:
    """Returns the lines of code from a specified file and line range.

    Args:
        file_path (str):
            The path to the file from which to retrieve lines.
        line_range (str, optional):
            A string representing the range of lines in the format "start-end".
            For example, "10-20" means lines 10 through 20, inclusive (1-based indexing).
            When not provided all lines are returned.
        as_list (bool, optional):
            Keep the lines separate.

    Returns:
        str:
            A concatenated string of lines from the specified range. If the file
            is not found or if the line range is invalid, returns an error message string.
    """
    if not os.path.isfile(file_path):
        return f"[Error] File not found: {file_path}"

    with open(file_path, 'r', encoding='utf-8') as f:
        snippets = f.readlines()

    if line_range:
        try:
            if "-" not in line_range:
                line_range=f"{line_range.strip()}-{line_range.strip()}"
            start_line, end_line = [int(x.strip()) for x in line_range.split("-")]
            snippets = snippets[start_line-1 : end_line]
        except ValueError:
            return f"[Error] Invalid line range: {line_range}. Must be either a single integer or two integers delimited by a single dash."
    return ''.join(snippets) if not as_list else snippets

In [27]:
code = get_lines_from_file(file_path = os.path.join(repo_path, "astropy/_dev/scm_version.py"),
                                           line_range="1-20")
print(code)

# Try to use setuptools_scm to get the current version; this is only used
# in development installations from the git repository.
import os.path as pth

try:
    from setuptools_scm import get_version

    version = get_version(root=pth.join("..", ".."), relative_to=__file__)
except Exception:
    raise ImportError("setuptools_scm broken or not installed")



In [36]:
import re

def _normalize_imports(lines: List[str]) -> List[str]:
    """Processes a list of lines to collect and normalize all import statements.

    This function extracts all `import` and `from ... import ...` statements from a given list of 
    Python source code lines, handling both single-line and multi-line imports. It removes duplicates, 
    splits multi-item imports, trims extra whitespace, and returns a sorted list of unique import statements.

    Args:
        lines (list[str]): 
            The lines of code from which to extract import statements.

    Returns:
        list[str]: 
            A sorted list of unique import statements, each formatted as a single line.
    """
    
    imports = set()  # Stores unique import statements
    current_import = []  # Temporary storage for multi-line imports
    inside_multiline = False  # Tracks whether we are inside a multi-line import

    # Regex to detect import statements (either `import X` or `from X import Y`)
    import_pattern = re.compile(r'^\s*(from\s+\S+\s+import\s+|import\s+)')

    for line in lines:
        stripped = line.strip()  # Remove leading/trailing whitespace

        if inside_multiline:
            # Handling multi-line imports: Collect lines until we reach the closing parenthesis `)`
            if stripped.endswith(')'):
                current_import.append(stripped[:-1])  # Remove the closing `)`
                # Flatten, normalize whitespace, and store as a single line
                imports.add(' '.join(' '.join(current_import).split()))
                current_import = []  # Reset buffer
                inside_multiline = False  # Exit multi-line mode
            else:
                current_import.append(stripped)  # Continue accumulating multi-line import
            continue

        match = import_pattern.match(line)  # Check if the line starts with `import` or `from ... import`
        if match:
            if stripped.endswith('('):  
                # Multi-line import detected: Start accumulating lines
                current_import.append(stripped[:-1])  # Store line without the opening `(`
                inside_multiline = True
            elif '(' in stripped and ')' in stripped:
                # Handle inline multi-item import: `from X import (a, b, c)`
                base_import, items = stripped.split('(', 1)  # Split before the first `(`
                items = items.rstrip(')').split(',')  # Extract and split imported items
                for item in items:
                    imports.add(f"{base_import.strip()} {item.strip()}")  # Store each as a separate import
            else:
                # Standard single-line import: `import X` or `from X import Y`
                imports.add(stripped)

    return sorted(imports)  # Return sorted list of unique imports


In [34]:
lines = [
    "import os",
    "import sys",
    "from collections import deque",
    "from math import sqrt",
]

_normalize_imports(lines)

['from collections import deque',
 'from math import sqrt',
 'import os',
 'import sys']

In [35]:
lines = [
    "import os",
    "import sys",
    "from collections import deque",
    "from os import (path, environ)",
    "import sys",
]
_normalize_imports(lines)

['from collections import deque',
 'from os import environ',
 'from os import path',
 'import os',
 'import sys']

In [40]:
def _collect_imports_from_lines(lines: List[str]) -> List[str]:
    """Collects import statements from a list of lines.

    Args:
        lines (list[str]):
            The lines of code in which to search for import statements.

    Returns:
        list[str]:
            A list of import statements, each stripped of trailing newlines.
    """
    imports = []
    import_pattern = re.compile(r'^\s*(?:import|from)\s+')
    for line in lines:
        if import_pattern.match(line):
            imports.append(line.rstrip('\n'))
    return _normalize_imports(imports)


In [41]:
lines = [
    "import os\n",
    "import sys\n",
    "from collections import deque\n",
    "print('Hello')\n",
    "from os import path\n",
    "import sys\n",
]

_collect_imports_from_lines(lines)

['from collections import deque',
 'from os import path',
 'import os',
 'import sys']

In [52]:
def search_code(
    root_directory: str,
    search_string: str,
    n_lines_before: int = 0,
    n_lines_after: int = 0,
    return_imports: bool = False
) -> List[Dict[str, Union[str, int, List[str]]]]:
    """Searches for a given string in all .py files under root_directory.
    
    Optionally returning surrounding lines (context) and import statements from matching files.

    Args:
        root_directory (str): Path to the root directory of the codebase to search.
        search_string (str): The string to search for in .py files.
        n_lines_before (int, optional): Number of lines of context before match. Defaults to 0.
        n_lines_after (int, optional): Number of lines of context after match. Defaults to 0.
        return_imports (bool, optional): Whether to return import statements. Defaults to False.

    Returns:
        List[Dict[str, Union[str, int, List[str]]]]: List of match dictionaries.
    """
    matches: List[Dict[str, Union[str, int, List[str]]]] = []
    pattern = re.compile(rf"\b{re.escape(search_string)}\b")  # Ensure exact word match

    for dirpath, _, filenames in os.walk(root_directory):
        for filename in filenames:
            if filename.endswith('.py'):
                full_path = os.path.join(dirpath, filename)

                # Safe file reading
                try:
                    with open(full_path, 'r', encoding='utf-8') as f:
                        lines = f.readlines()
                except (UnicodeDecodeError, FileNotFoundError) as e:
                    print(f"Skipping file {full_path} due to error: {e}")
                    continue

                file_imports = _collect_imports_from_lines(lines) if return_imports else []

                for i, line in enumerate(lines, start=1):
                    if pattern.search(line):  # Exact match instead of substring
                        start_idx = max(0, i - 1 - n_lines_before)
                        end_idx = min(len(lines), i - 1 + n_lines_after + 1)

                        context_before = [l.rstrip('\n') for l in lines[start_idx:i - 1]] if start_idx < i - 1 else []
                        context_after = [l.rstrip('\n') for l in lines[i:end_idx]] if i < end_idx else []

                        match_entry = {
                            'file': full_path,
                            'line': i,
                            'content': line.rstrip('\n'),
                            'context_before': context_before,
                            'context_after': context_after
                        }

                        if return_imports:
                            match_entry['imports'] = file_imports

                        matches.append(match_entry)

    return matches


In [76]:
search_code(root_directory="konwinski-prize",
    search_string="normalize_kernel",
    n_lines_before=2,
    n_lines_after=2,
    return_imports=True
)

[{'file': 'konwinski-prize/data/repos/repo__astropy__astropy-17048/astropy/visualization/tests/test_lupton_rgb.py',
  'line': 115,
  'content': '                image, psf, boundary="extend", normalize_kernel=True',
  'context_before': ['        def convolve_with_noise(image, psf):',
   '            convolvedImage = convolve('],
  'context_after': ['            )',
   '            randomImage = np.random.normal(0, 2, image.shape)'],
  'imports': ['from astropy.convolution import Gaussian2DKernel, convolve',
   'from astropy.utils.compat.optional_deps import HAS_MATPLOTLIB',
   'from astropy.visualization import lupton_rgb',
   'from astropy.visualization.interval import ManualInterval',
   'from astropy.visualization.stretch import LinearStretch',
   'from numpy.testing import assert_allclose, assert_equal',
   'import matplotlib.pyplot as plt',
   'import numpy as np',
   'import pytest',
   'import sys']},
 {'file': 'konwinski-prize/data/repos/repo__astropy__astropy-17048/astropy/con

In [77]:
def _extract_entire_definition(
    lines: List[str],
    start_index: int
) -> List[str]:
    """Extracts the entire definition body (function or class) starting at a given line.

    Args:
        lines (list[str]):
            The full list of lines from the file (unmodified).
        start_index (int):
            The index (0-based) of the line where the definition ('def' or 'class')
            was found.

    Returns:
        list[str]:
            A list of lines comprising the entire definition block.
    """
    definition_lines: list[str] = []
    base_indent = len(lines[start_index]) - len(lines[start_index].lstrip())
    top_def_pattern = re.compile(r'^\s*(def|class)\s+')

    current_index = start_index
    while current_index < len(lines):
        line = lines[current_index]
        current_indent = len(line) - len(line.lstrip())

        if (current_index > start_index and top_def_pattern.match(line) and current_indent <= base_indent):
            break

        definition_lines.append(line)
        current_index += 1

    return definition_lines


In [78]:
lines = [
    "import os\n",
    "import sys\n",
    "\n",
    "def my_function(x, y):\n",
    "    result = x + y\n",
    "    return result\n",
    "\n",
    "def another_function():\n",
    "    print('Hello')\n"
]

start_index = 3  # Index where "def my_function(x, y):" is found

_extract_entire_definition(lines, start_index)


['def my_function(x, y):\n',
 '    result = x + y\n',
 '    return result\n',
 '\n']

In [79]:
lines = [
    "class MyClass:\n",
    "    def __init__(self, value):\n",
    "        self.value = value\n",
    "\n",
    "    def method(self):\n",
    "        return self.value\n",
    "\n",
    "class AnotherClass:\n",
    "    pass\n"
]

start_index = 0  # Index where "class MyClass:" is found

_extract_entire_definition(lines, start_index)


['class MyClass:\n',
 '    def __init__(self, value):\n',
 '        self.value = value\n',
 '\n',
 '    def method(self):\n',
 '        return self.value\n',
 '\n']

In [80]:

def _get_indent_level(line: str) -> int:
    """Utility to count the number of leading spaces in a line.

    leading_spaces = (line-length minus (line-length minus non-prefixing-spaces))
    
    Args:
        line (str): The line of code

    Returns:
        The number of leading spaces 
    """
    return len(line) - len(line.lstrip(' '))

In [81]:
def _extract_entire_definition(lines: List[str], start_index: int) -> List[str]:
    """Extracts all lines in the definition block (function or class). 
    
    This is starting at start_index and continuing until we reach a line with 
    less-or-equal indentation that indicates the next top-level definition, or the end of file.

    Args:
        lines (list[str]):
            The lines containing the entirety of the class definition.
        start_index (int):
            Where we will start checking from looking for the relevant information.

    Returns:
        A list of strings representing the lines for a given definition block (function/class/method)
    """
    definition_block = []
    initial_indent = _get_indent_level(lines[start_index])
    definition_block.append(lines[start_index])
    # Gather everything that's part of this definition’s indentation
    for idx in range(start_index + 1, len(lines)):
        line = lines[idx]
        if line.strip() == '':
            # Blank lines inside the definition are included
            definition_block.append(line)
            continue
        if _get_indent_level(line) <= initial_indent and re.match(r'^\s*(def|class)\s+', line):
            # Found a new top-level definition
            break
        definition_block.append(line)
    return definition_block

In [82]:
lines = [
    "import os\n",
    "import sys\n",
    "\n",
    "def my_function(x, y):\n",
    "    result = x + y\n",
    "    return result\n",
    "\n",
    "def another_function():\n",
    "    print('Hello')\n"
]

start_index = 3  # The index where "def my_function(x, y):" starts

_extract_entire_definition(lines, start_index)

['def my_function(x, y):\n',
 '    result = x + y\n',
 '    return result\n',
 '\n']

In [83]:
lines = [
    "class MyClass:\n",
    "    def __init__(self, value):\n",
    "        self.value = value\n",
    "\n",
    "    def method(self):\n",
    "        return self.value\n",
    "\n",
    "class AnotherClass:\n",
    "    pass\n"
]

start_index = 0  # The index where "class MyClass:" starts

_extract_entire_definition(lines, start_index)

['class MyClass:\n',
 '    def __init__(self, value):\n',
 '        self.value = value\n',
 '\n',
 '    def method(self):\n',
 '        return self.value\n',
 '\n']

In [89]:
def _find_method_block_in_lines(
    block_lines: List[str], 
    method_name: str
) -> Optional[Tuple[int, int]]:
    """Within a block of lines (e.g. a class block), find the start and end line indices (inclusive).
    
    This is used to allow for effective retrieval of method code from a file.
    For example, the definition for 'def method_name(...)' may exist within a class.

    Args:
        block_lines (List[str]):
            The line-by-line strings making up the class definition.
        method_name (str):
            The name of the method to be extracted.

    Returns:
        Optional[Tuple[int, int]]: The start and end line indices (if found, inclusive) for the method.
    """
    pattern = re.compile(rf'^\s*def\s+{re.escape(method_name)}\s*\(')
    for i, line in enumerate(block_lines):
        if pattern.search(line):
            # Found the start. Now find where it ends by indentation.
            start_idx = i
            init_indent = _get_indent_level(line)

            # Move forward to find where this method ends.
            for j in range(i + 1, len(block_lines)):
                if block_lines[j].strip() == '':
                    continue
                if _get_indent_level(block_lines[j]) <= init_indent and re.match(r'^\s*(def|class)\s+', block_lines[j]):
                    # Reached the next method/class -> end of this method’s block
                    return (start_idx, j - 1)
            return (start_idx, len(block_lines) - 1)  # Goes until end of block

    return None


In [96]:
def _extract_class_up_to_init_or_method(
    lines: List[str],
    class_index: int,
    method_name: str
) -> List[str]:
    """Grab the first part of a class definition up to the point at which initialization has completed.

    (1) Extract the entire class definition at class_index (using _extract_entire_definition).
    (2) Within that class block, find the __init__ block (if any) and the block for method_name (if any).
    (3) Return lines from the start of the class up through the furthest end of either __init__ or the method.

    Args:
        lines (list[str]):
            The lines of code containing the class definition.
        class_index (int):
            The starting point of the class (indexable) for the definition within the lines.
        method_name (str):
            The method we want to retrieve (in addition to the initialization code)
    
    Returns:
        list[str]:
            The relevant lines as a list of strings.
    """
    class_block = _extract_entire_definition(lines, class_index)
    # Look for __init__ and the target method
    init_block_bounds = _find_method_block_in_lines(class_block, '__init__')
    method_block_bounds = _find_method_block_in_lines(class_block, method_name)

    # If neither __init__ nor method is found, we just return the whole class
    if not init_block_bounds and not method_block_bounds:
        return class_block

    furthest_line = 0
    if init_block_bounds:
        furthest_line = max(furthest_line, init_block_bounds[1])
    if method_block_bounds:
        furthest_line = max(furthest_line, method_block_bounds[1])

    # Slice from start of the class block up to furthest_line
    return class_block[:furthest_line + 1]

In [97]:
lines = [
    "class Example:\n",
    "    def __init__(self):\n",
    "        self.x = 10\n",
    "        self.y = 20\n",
    "\n",
    "    def compute(self):\n",
    "        return self.x + self.y\n",
    "\n",
    "    def other_method(self):\n",
    "        return self.x * self.y\n",
]

# Call function
result = _extract_class_up_to_init_or_method(lines, 0, "compute")

# Expected output
print("".join(result))


class Example:
    def __init__(self):
        self.x = 10
        self.y = 20

    def compute(self):
        return self.x + self.y




In [98]:
lines = [
    "class Example:\n",
    "    def __init__(self, x):\n",
    "        self.x = x\n",
    "\n",
    "    def method_a(self):\n",
    "        print('Method A')\n",
    "\n",
    "    def method_b(self):\n",
    "        print('Method B')\n",
]

class_index = 0  # The index where "class Example:" appears in the lines list
method_name = "method_a"  # We want to extract up to method_a

result = _extract_class_up_to_init_or_method(lines, class_index, method_name)
print("".join(result))


class Example:
    def __init__(self, x):
        self.x = x

    def method_a(self):
        print('Method A')




In [101]:

def _parse_class_and_method(object_name: str) -> Tuple[Optional[str], str]:
    """Get the class and method names separately from an object if applicable.

    For example, for the Cat class with method _meow:
        - If object_name = "Cat._meow", returns ("Cat", "_meow").
        - Otherwise (object_name="Cat"), returns (None, object_name) if there's no dot.
        
    Args:
        object_name (str):
            The string containing the object name, one of:
                - Class Name: 'Cat'
                - Method Name: '_meow'
                - Function Name: make_cat_meow
                - Method With Class Prefix: Cat._meow

    Returns:
        Tuple[Optional[str], str]:
            - The class name (or None if no dot found) 
            - The method name
    """
    if '.' in object_name:
        parts = object_name.split('.', 1)  # Split on first dot
        if len(parts) == 2:
            return parts[0], parts[1]  # class_name, method_name
    return None, object_name  # No dot -> treat entire string as the object


In [102]:
test_cases = ["Cat._meow", "Dog.bark", "Animal", "make_sound", "Person.walk.fast", "Shape.draw"]

for test in test_cases:
    print(f"Input: {test} -> Output: {_parse_class_and_method(test)}")


Input: Cat._meow -> Output: ('Cat', '_meow')
Input: Dog.bark -> Output: ('Dog', 'bark')
Input: Animal -> Output: (None, 'Animal')
Input: make_sound -> Output: (None, 'make_sound')
Input: Person.walk.fast -> Output: ('Person', 'walk.fast')
Input: Shape.draw -> Output: ('Shape', 'draw')


In [106]:
def get_object_definition(
    root_directory: str,
    object_name: str,
    return_imports: bool = False
) -> Optional[Dict[str, Union[str, int, List[str]]]]:
    """Searches the codebase for the first definition of a function, class, or method matching object_name.

    If object_name is a method referenced with dot notation (e.g. "Cat._meow"),
    then we find class 'Cat', extract the relevant portion of its definition block,
    and include the method definition plus any __init__.

    Args:
        root_directory (str):
            The path to the root directory of the codebase.
        object_name (str):
            The name of the function or class to find (e.g., "my_function", "MyClass", or "Cat._meow").
        return_imports (bool, optional):
            Whether to collect import statements found in the file.

    Returns:
        Optional[Dict[str, Union[str, int, List[str]]]]: 
            A dictionary describing the object definition, or None if not found.
                - file (str): Path to the file containing the definition.
                - line (int): The 1-based line number where the definition appears.
                - content (str): The exact line that matched (the def/class line).
                - definition_block (list[str]): The extracted lines of the definition.
                - imports (list[str], optional): The file’s import statements, if return_imports=True.
    """
    class_name, method_name = _parse_class_and_method(object_name)

    # If we have a separate class_name, we'll do a 2-phase search:
    #   - Phase A: find the class definition for class_name
    #   - Phase B: from that block, locate method_name
    if class_name:
        # We only search for 'class class_name'
        class_pattern = re.compile(rf'^\s*class\s+{re.escape(class_name)}\b')

        for dirpath, _, filenames in os.walk(root_directory):
            for filename in filenames:
                if filename.endswith('.py'):
                    full_path = os.path.join(dirpath, filename)
                    with open(full_path, 'r', encoding='utf-8') as f:
                        lines = f.readlines()
                    file_imports = _collect_imports_from_lines(lines) if return_imports else []

                    for i, line in enumerate(lines, start=1):
                        if class_pattern.search(line.strip()):
                            # Found the class
                            class_definition_block = _extract_entire_definition(lines, i - 1)
                            # Now see if we can find the method inside
                            bounds = _find_method_block_in_lines(class_definition_block, method_name)

                            if bounds is None:
                                continue

                            init_bounds = _find_method_block_in_lines(class_definition_block, '__init__')
                            furthest_line = max(bounds[1], init_bounds[1] if init_bounds else 0)
                            final_block = class_definition_block[: furthest_line + 1]

                            result = {
                                'file': full_path,
                                'line': i,
                                'content': line.rstrip('\n'),
                                'definition_block': [l.rstrip('\n') for l in final_block],
                            }
                            if return_imports:
                                result['imports'] = file_imports
                            return result
        return None

    else:
        # class_name is None -> (handle "def object_name" or "class object_name")
        pattern = re.compile(rf'^\s*(?:def|class)\s+{re.escape(method_name)}\b')

        for dirpath, _, filenames in os.walk(root_directory):
            for filename in filenames:
                if filename.endswith('.py'):
                    full_path = os.path.join(dirpath, filename)

                    with open(full_path, 'r', encoding='utf-8') as f:
                        lines = f.readlines()

                    file_imports = _collect_imports_from_lines(lines) if return_imports else []

                    for i, line in enumerate(lines, start=1):
                        if pattern.search(line.strip()):
                            stripped = line.strip()
                            if stripped.startswith(f'class {method_name}'):
                                definition_block = _extract_entire_definition(lines, i - 1)
                            else:
                                def_indent = _get_indent_level(line)
                                class_line_idx = None
                                for rev_idx in range(i - 2, -1, -1):
                                    if lines[rev_idx].lstrip().startswith('class '):
                                        class_indent = _get_indent_level(lines[rev_idx])
                                        if class_indent < def_indent:
                                            class_line_idx = rev_idx
                                            break

                                if class_line_idx is None:
                                    definition_block = _extract_entire_definition(lines, i - 1)
                                else:
                                    definition_block = _extract_class_up_to_init_or_method(
                                        lines, class_line_idx, method_name
                                    )

                            result = {
                                'file': full_path,
                                'line': i,
                                'content': line.rstrip('\n'),
                                'definition_block': [l.rstrip('\n') for l in definition_block],
                            }
                            if return_imports:
                                result['imports'] = file_imports

                            return result

        return None

In [107]:
get_object_definition(
    root_directory="konwinski-prize/data/repos/repo__astropy__astropy-16830",
    object_name="Gaussian1DKernel",
    return_imports=True
)

{'file': 'konwinski-prize/data/repos/repo__astropy__astropy-16830/astropy/convolution/kernels.py',
 'line': 37,
 'content': 'class Gaussian1DKernel(Kernel1D):',
 'definition_block': ['class Gaussian1DKernel(Kernel1D):',
  '    """',
  '    1D Gaussian filter kernel.',
  '',
  '    The Gaussian filter is a filter with great smoothing properties. It is',
  '    isotropic and does not produce artifacts.',
  '',
  '    The generated kernel is normalized so that it integrates to 1.',
  '',
  '    Parameters',
  '    ----------',
  '    stddev : number',
  '        Standard deviation of the Gaussian kernel.',
  '    x_size : int, optional',
  '        Size of the kernel array. Default = ⌊8*stddev+1⌋.',
  "    mode : {'center', 'linear_interp', 'oversample', 'integrate'}, optional",
  '        One of the following discretization modes:',
  "            * 'center' (default)",
  '                Discretize model by taking the value',
  '                at the center of the bin.',
  "           

In [118]:

def process_instructions(
    json_instructions: Union[str, dict],
    root_directory: str,
    search_kwargs: Union[Any, None] = None,
    lookup_kwargs: Union[Any, None] = None,
) -> List[Dict[str, Any]]:
    """Parses JSON instructions and processes each step to retrieve code snippets or definitions.

    Args:
        json_instructions (Union[str, dict]):
            The instructions, either as a JSON string or a Python dictionary.
        root_directory (str):
            The path to the root directory of the codebase.

    Returns:
        List[Dict[str, Any]]: A list of dictionaries containing the results of each step.
    """
    instructions = json_instructions
    if isinstance(instructions, str):
        instructions = json.loads(instructions)
        
    # Initialize
    search_kwargs = search_kwargs if search_kwargs else {}
    lookup_kwargs = lookup_kwargs if lookup_kwargs else {}
    next_steps = instructions.get('clear_next_steps', [])
    results = []

    for step in next_steps:
        # Initialize the step result
        result = {}

        if 'search' in step:
            search_string = step['search']
            search_results = search_code(
                root_directory,
                search_string,
                n_lines_before=search_kwargs.get('n_lines_before', 0),
                n_lines_after=search_kwargs.get('n_lines_after', 0),
                return_imports=search_kwargs.get('return_imports', False)
            )
            result['search'] = search_string
            result['results'] = search_results

        elif 'file' in step and 'lines' in step:
            file_path = step['file'] if os.path.isfile(step['file']) else os.path.join(root_directory, step['file'])
            line_range = step['lines']
            snippet = get_lines_from_file(file_path, line_range)
            result['file'] = file_path
            result['lines'] = line_range
            result['snippet'] = snippet

        elif 'object' in step:
            object_name = step['object']
            definition = get_object_definition(
                root_directory, 
                object_name,
                return_imports=lookup_kwargs.get('return_imports', False)
            )
            result['object'] = object_name
            result['definition'] = definition

        results.append(result)

    return results


In [122]:
json_instructions = {
    "clear_next_steps": [
        {
            "search": "class FLRW"
        }
    ]
}
root_directory = "konwinski-prize/data/repos/repo__astropy__astropy-17048"
search_kwargs = {"n_lines_before": 2, "n_lines_after": 2, "return_imports": True}

outputs = process_instructions(json_instructions, root_directory, search_kwargs)


print(outputs)



In [115]:
rprint("[bold cyan]Demo Code Search[/bold cyan]")
search_code_results = search_code(repo_path, "yield from nodes[0]._infer(context, **kwargs)", return_imports=True, n_lines_before=10, n_lines_after=10)
rprint(search_code_results)

In [116]:
rprint("[bold cyan]Demo Object Search[/bold cyan]")
object_def_results = get_object_definition(repo_path, "Node", return_imports=True)
rprint(object_def_results)

In [117]:

rprint("[bold cyan]Get Lines from File[/bold cyan]")
line_results = get_lines_from_file(os.path.join(repo_path, "astroid/nodes/node_classes.py"), "150-175")
rprint(line_results)

The provided pseudocode defines a multi-step process where an AI model is used to analyze a repo issue, request relevant information incrementally, and eventually propose a solution. The AI follows a structured prompt that guides it through the problem-solving process.

```
1. DEFINE a global conversation history list

2. DEFINE function process_steps:
    INPUT: 
        - problem_statement (string): The issue description
        - initial_prompt (string): The structured AI prompt
        - repo_path (string): Path to the repository
        - accumulate_requested_info (boolean, default=True): Whether to store info from previous steps
        - temperature (float, default=0.1): Controls randomness in AI responses
        - max_steps (integer, default=10): Maximum number of processing steps

    OUTPUT:
        - A dictionary containing AI-generated outputs and requested information for each step

3. INITIALIZE requested_info as an empty string

4. LOOP over a range of max_steps:
    a. FORMAT step_input using initial_prompt, inserting problem_statement and requested_info

    b. CALL to_gemini function with:
       - step_input as message
       - temperature
       - response_mime_type="application/json"
       - STORE response in step_output after parsing JSON

    c. PRINT step_output for debugging

    d. CALL process_instructions function with:
       - step_output as json_instructions
       - repo_path as root_directory
       - STORE result in processed_instructions

    e. FORMAT _requested_info string containing step number and processed_instructions

    f. IF accumulate_requested_info is True:
       - APPEND _requested_info to requested_info
       ELSE:
       - SET requested_info = _requested_info

    g. YIELD dictionary:
       - "step_output": step_output (AI's response)
       - "requested_info": processed_instructions

---

### **Execution Flow**
1. INITIALIZE step generator: `step_generator = process_steps(DEMO_PROBLEM_STATEMENT, STEP_1_PROMPT, DEMO_REPO_PATH)`
2. FETCH first step output: `step_1_output = next(step_generator)`
3. REPEAT for subsequent steps until max_steps is reached or issue is resolved.

---

### **Key Functions & Responsibilities**
- **STEP_1_PROMPT**: Defines structured AI instructions for step-by-step issue resolution.
- **process_steps**: Handles the iterative AI-guided problem-solving process.
- **to_gemini**: Sends formatted queries to the AI model and retrieves responses.
- **process_instructions**: Processes AI's JSON output to extract and accumulate relevant information.

---

### **How It Works**
1. AI receives an issue description and structured instructions.
2. It analyzes the issue and determines what code snippets, files, or functions it needs.
3. AI requests specific information step by step, refining its understanding.
4. Accumulates knowledge and continues refining its approach until a solution is reached.

---
```


In [123]:
step1_prompt = """You are a brilliant software engineer tasked with solving github issues in a reproducible and logical step-by-step way.

Context: 
  - We have a large code repository and a specific GitHub issue (pasted below).
  - The codebase is too big to share in full, so you must work incrementally. 
  - I can provide you with specific code snippets, files, functions, or lines of code on demand if you tell me which files/lines/keywords you want.
  - I cannot provide you with access to the internet or previous Github commits/issues/PRs.
  - I will provide the things you ask for in the section titled Previously Requested Information.

Your Task:
  - Read the issue text (below) carefully.
  - Summarize the problem in your own words making sure to understand how the requested information helps you and reframes the issue.
  - Outline a plan to investigate and solve the issue. This plan does not have to be complete, as at any time we can review and plan anew.
  - Describe the next steps in a consistently formatted way (JSON LIST of ACTIONS) that describes what searches to perform or which files/functions or lines of code you might want to see first.
      - You can only ask for very specific things (for each step you can specify these things in narrowing order --> 'file' --> 'object' --> 'lines'):
          - Specific file(s)
          - Specific object(s) (will be attempted if no file is provided and will return the first found instance of the function/method/object)
          - Specific line(s) of code (requires a specified file)
          - Search for code (will search the codebase for the specified code string and will return the File, Function, and Line Numbers)
      - For example:
          - {{'file': 'util_in_here.py', 'lines': '123-130'}}
              - This would return --> *the 8 lines from the specified file. whatever they are*
          - {{'object': 'UtilConfig'}}
              - We would than take this and do the search and on the next step provide you with {{'file': ..., 'object': ..., 'lines': ...}}

Your Deliverables:
  - issue_restatement: An illuminating and logical restatement of the problem in your own words taking into consideration the previous steps requested information (if provided).
  - methodical_plan: A plan for identifying what code or information to request from me next (or in the first place).
  - clear_next_steps: The structured and properly formatted next steps in order that will allow us to solve the problem together.

Final Deliverable (Output Only When You Have Solved Everything With Absolute Confidence):
  - final_code_diff: This is only to be output when you are confident you have a solution to the problem statement. You should output a code_diff with the appropriate format so that it will be able to be applied as a unix patch.
  
Important Notes:
  - Do not make assumptions about the codebase. 
  - Be thorough, ask for more rather than less. This includes when you ask for specific lines of code, in that case ask for maybe 10 before and 10 after as well (or whatever you think is appropriate).
  - Do not ask me to reproduce the issue. The issue exists as described by the problem statement below.
  - If you don’t know where a relevant piece of code might be, propose a strategy to search for it (e.g., searching by function name, references to certain classes, or by keywords).
  - If the section 'Previously Requested Information' is empty, then this is the first step in the process. 
  - Do not return anything other than the deliverables as a JSON object with the deliverables as keys ('issue_restatement', 'methodical_plan', 'clear_next_steps')
  - If you return a 'file' string in the clear_next_steps, please ensure it only includes the path up to the package name (i.e. 'openai/openai-python/blob/main/src/openai/_client.py')
  - You must output your answer in JSON. The clear_next_steps should be formatted as JSON list of dicts mapping strings to strings.
  - If you think you can solve it, you should.

Relevant GitHub Issue Text (Problem Statement):
```markdown
{problem_statement}
```

Previously Requested Information:
```
{requested_info}
```
"""

conversation_history = []

### Generate patches

### Validate patches

```python
import unidiff

def is_valid_patch_format(patch: str) -> bool:
    """
    A quick check to confirm if a patch could be valid.
    """
    if not(isinstance(patch, str)):
        return False
    try:
        patch_set = unidiff.PatchSet(patch)
        if len(patch_set) == 0:
            return False
    except Exception:
        return False
    return True

# This should demo patch should fail.
is_valid_patch_format('Hullo world')
```