In [2]:
import gc
import torch
import re
import sys
from io import StringIO
import yaml
from transformers import AutoModelForCausalLM, AutoTokenizer

# Clear GPU memory and perform garbage collection.
torch.cuda.empty_cache()
gc.collect()
torch.cuda.empty_cache()  # Clears unused cached memory.
torch.cuda.ipc_collect()  # Collects unused memory.


In [3]:
def tool(func):
    """
    A decorator that marks a function as a tool.
    """
    return func


In [4]:
model_name = "/kaggle/input/deepseek-r1/transformers/deepseek-r1-distill-qwen-1.5b/2"
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype="auto",
    device_map="cuda"
)
tokenizer = AutoTokenizer.from_pretrained(model_name)


In [24]:
def generate_code(prompt, system="You are a coding assistant. Generate valid Python code that solves the following problem. Your response should consist only of a single Python code block enclosed in triple backticks. Do not include any commentary, explanation, or extra textâ€”just the code."):
    """
    Generate Python code from a natural language prompt using the loaded LLM.
    """
    messages = [
        {"role": "system", "content": system},
        {"role": "user", "content": prompt}
    ]
    text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    model_inputs = tokenizer([text], return_tensors="pt").to(model.device)
    generated_ids = model.generate(
        **model_inputs,
        max_new_tokens=3000,
        pad_token_id=tokenizer.eos_token_id
    )
    generated_ids = [output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)]
    response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
    # Extract code between triple backticks.
    match = re.search(r"```(?:python)?\n(.*?)\n```", response, re.DOTALL)
    if match:
        code = match.group(1)
    else:
        code = response.strip()
    return code

def execute_code(code: str, function_to_test: str = None, test_args=None, test_kwargs=None):
    """
    Execute code by simply using exec in an isolated local namespace and capturing output.
    """
    local_namespace = {}
    old_stdout = sys.stdout
    sys.stdout = mystdout = StringIO()
    try:
        exec(code, local_namespace)
        if function_to_test:
            test_args = test_args or []
            test_kwargs = test_kwargs or {}
            result = local_namespace[function_to_test](*test_args, **test_kwargs)
            print("Function output:", result)
    except Exception as e:
        print("Error during execution:", e)
    finally:
        sys.stdout = old_stdout
    output = mystdout.getvalue()
    return output

def run_tests_on_code(code: str, test_cases: list) -> dict:
    """
    Run a series of tests on the provided code.
    
    Each test case is a dict with:
      - 'function': Name of the function to test.
      - 'args': List of positional arguments.
      - 'kwargs': Dict of keyword arguments.
      - 'expected': The expected output.
    """
    local_namespace = {}
    try:
        exec(code, local_namespace)
    except Exception as e:
        return {"error": f"Error during code execution: {e}"}
    results = {}
    for i, test in enumerate(test_cases):
        func_name = test.get('function')
        args = test.get('args', [])
        kwargs = test.get('kwargs', {})
        expected = test.get('expected')
        if func_name not in local_namespace:
            results[i] = {'status': 'error', 'error': f"Function '{func_name}' not defined."}
            continue
        try:
            output = local_namespace[func_name](*args, **kwargs)
        except Exception as e:
            results[i] = {'status': 'error', 'error': f"Exception during execution: {e}"}
            continue
        if output == expected:
            results[i] = {'status': 'passed', 'output': output, 'expected': expected}
        else:
            results[i] = {'status': 'failed', 'output': output, 'expected': expected}
    return results

def refine_code(prompt, error_message, previous_code):
    """
    Create a new prompt that includes the full execution output and instructs the model
    to return only a valid Python code block (enclosed in triple backticks).
    If an "unterminated string literal" error is detected or the code isn't properly formatted,
    add extra instructions.
    """
    additional_instruction = ""
    if "unterminated string literal" in error_message:
        additional_instruction = (
            " Your entire response must be exactly a valid Python code block, "
            "starting with ```python on a new line and ending with ``` on a new line, with no extra commentary."
        )
    elif not previous_code.strip().startswith("```"):
        additional_instruction = (
            " Please ensure that the output is only a valid Python code block enclosed in triple backticks."
        )
    
    feedback_prompt = (
        f"The following code produced the output below:\n\n{previous_code}\n\n"
        f"Execution Output:\n{error_message}\n\n"
        "Please refine the code to fix the issue." + additional_instruction +
        " Return only the updated code enclosed in triple backticks."
    )
    return generate_code(feedback_prompt)



In [9]:
'''from io import StringIO
import sys

def safe_execute_code(code: str, function_to_test: str = None, test_args=None, test_kwargs=None):
    """
    Execute code in a sandboxed environment by restricting available built-ins.
    
    Parameters:
      - code (str): The Python code to execute.
      - function_to_test (str, optional): Name of a function defined in the code to test.
      - test_args (list, optional): List of positional arguments for the function.
      - test_kwargs (dict, optional): Dictionary of keyword arguments for the function.
      
    Returns:
      - output (str): Captured output from executing the code.
    """
    # Define a set of allowed built-in functions.
    allowed_builtins = {
        'print': print,
        'range': range,
        'len': len,
        'abs': abs,
        # Add other safe built-ins as necessary.
    }
    # Create a restricted globals dictionary with only the allowed built-ins.
    safe_globals = {"__builtins__": allowed_builtins}
    
    # Create a local namespace for code execution.
    local_namespace = {}
    old_stdout = sys.stdout
    sys.stdout = mystdout = StringIO()
    
    try:
        # Execute the code in the sandboxed environment.
        exec(code, safe_globals, local_namespace)
        # If we want to test a specific function, call it with the provided arguments.
        if function_to_test:
            test_args = test_args or []
            test_kwargs = test_kwargs or {}
            result = local_namespace[function_to_test](*test_args, **test_kwargs)
            print("Function output:", result)
    except Exception as e:
        print("Error during safe execution:", e)
    finally:
        # Restore the original standard output.
        sys.stdout = old_stdout
        
    # Return the captured output.
    output = mystdout.getvalue()
    return output

# Example usage:
safe_output = safe_execute_code(generated_code, function_to_test="gcd", test_args=[48, 18])
print("Safe Execution Output:\n", safe_output)


Safe Execution Output:
 Function output: 6



In [25]:
@tool
def generate_code_tool(prompt: str) -> str:
    """
    Tool for generating Python code from a natural language prompt.
    
    Args:
        prompt: The problem description.
    Returns:
        The generated Python code.
    """
    return generate_code(prompt)

@tool
def execute_code_tool(code: str, function_to_test: str, test_args: str = "[]", test_kwargs: str = "{}") -> str:
    """
    Tool for executing Python code and testing a specific function.
    
    Args:
        code: The Python code to execute.
        function_to_test: The name of the function to test.
        test_args: A JSON-formatted list of arguments.
        test_kwargs: A JSON-formatted dictionary of keyword arguments.
    Returns:
        The execution output.
    """
    import json
    try:
        args = json.loads(test_args)
        kwargs = json.loads(test_kwargs)
    except Exception as e:
        return f"Error parsing arguments: {e}"
    return execute_code(code, function_to_test, args, kwargs)

@tool
def test_code_tool(code: str, test_cases: str) -> str:
    """
    Tool for running a series of tests on provided code.
    
    Args:
        code: The Python code containing function definitions.
        test_cases: A YAML string representing a list of test case dictionaries.
    Returns:
        A summary of test results.
    """
    try:
        tests = yaml.safe_load(test_cases)
    except Exception as e:
        return f"Error parsing test cases: {e}"
    results = run_tests_on_code(code, tests)
    return str(results)

@tool
def final_answer(answer: str) -> str:
    """
    Tool for returning the final answer.
    """
    return answer


In [26]:
class MyModel:
    def __init__(self, model, tokenizer, max_tokens=2096, temperature=0.5, model_id="custom-model"):
        self.model = model
        self.tokenizer = tokenizer
        self.max_tokens = max_tokens
        self.temperature = temperature
        self.model_id = model_id

my_model = MyModel(model=model, tokenizer=tokenizer, model_id="Qwen/Qwen2.5-Coder-32B-Instruct")


In [27]:
class CodeAgent:
    def __init__(self, model, tools, max_steps=6, prompt_templates=None, name="CodeAgent", 
                 description="An AI agent that generates, executes, and tests Python code."):
        self.model = model
        self.tools = tools  # List of tool functions.
        self.max_steps = max_steps
        self.prompt_templates = prompt_templates
        self.name = name
        self.description = description

    def run(self, prompt, function_to_test=None, test_args=None, test_kwargs=None, expected_output=None):
        # Initial code generation.
        current_code = generate_code(prompt)
        
        # Check formatting: ensure code is enclosed in triple backticks.
        if not current_code.strip().startswith("```"):
            print("Warning: Generated code is not properly formatted. Refining to enforce code block formatting.")
            current_code = refine_code(prompt, "The code is not enclosed in triple backticks.", current_code)
        
        iteration = 0
        while iteration < self.max_steps:
            print(f"Iteration {iteration}:\nGenerated Code:\n{current_code}\n")
            output = execute_code(current_code, function_to_test, test_args, test_kwargs)
            print("Execution Output:\n", output)
            
            # If output meets expectation, return the results.
            if expected_output is None or expected_output in output:
                return current_code, output
            
            # Create a detailed error message with full execution output.
            error_message = f"{output}\nExpected output containing '{expected_output}' was not found."
            print("Refining code due to error:", error_message)
            current_code = refine_code(prompt, error_message, current_code)
            
            # Check again: if refined code is not properly formatted, enforce formatting.
            if not current_code.strip().startswith("```"):
                print("Warning: Refined code is not properly formatted. Enforcing code block formatting.")
                current_code = refine_code(prompt, "The code is not enclosed in triple backticks after refinement.", current_code)
            
            iteration += 1
        return current_code, output


In [28]:
# Optional: Load Prompt Templates from a YAML file.
try:
    with open("prompts.yaml", 'r') as stream:
        prompt_templates = yaml.safe_load(stream)
except Exception as e:
    prompt_templates = None

# Instantiate Our CodeAgent with Relevant Tools.
agent = CodeAgent(
    model=my_model,
    tools=[
        final_answer,
        generate_code_tool,
        execute_code_tool,
        test_code_tool
    ],
    max_steps=6,
    prompt_templates=prompt_templates,
    name="MyCodeAgent",
    description="An AI agent that generates, executes, and tests Python code."
)

# Example Interaction (Main Block)
if __name__ == "__main__":
    prompt_example = "Write a Python function named gcd that computes the greatest common divisor of two numbers."
    final_code, final_output = agent.run(
        prompt=prompt_example,
        function_to_test="gcd",
        test_args=[48, 18],
        expected_output="6"
    )
    print("Final Code:\n", final_code)
    print("Final Output:\n", final_output)


Iteration 0:
Generated Code:
def gcd(a, b):
    if a == 0:
        return abs(b)
    if b == 0:
        return abs(a)
    while b != 0:
        a, b = b, a % b
    return abs(a)

Execution Output:
 Function output: 6

Final Code:
 def gcd(a, b):
    if a == 0:
        return abs(b)
    if b == 0:
        return abs(a)
    while b != 0:
        a, b = b, a % b
    return abs(a)
Final Output:
 Function output: 6



In [29]:
# Example prompt for a Fibonacci function.
prompt_fib = "Write a Python function named fib that computes the nth Fibonacci number recursively."

# Suppose we want to test fib(10) which should return 55.
final_code, final_output = agent.run(
    prompt=prompt_fib,
    function_to_test="fib",
    test_args=[10],
    expected_output="55"  # this is a substring check; adjust as needed.
)
print("Final Code for Fibonacci Function:\n", final_code)
print("Final Output for Fibonacci Function:\n", final_output)


Iteration 0:
Generated Code:
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

Execution Output:
 Function output: 55

Final Code for Fibonacci Function:
 def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)
Final Output for Fibonacci Function:
 Function output: 55



In [30]:
# Example: Quicksort Function
prompt_quicksort = "Write a Python function named quicksort that sorts a list of integers in ascending order using the QuickSort algorithm."
final_code_qs, final_output_qs = agent.run(
    prompt=prompt_quicksort,
    function_to_test="quicksort",
    test_args=[[1, 4, 5, 9, 2, 6, 3]],
    expected_output="[1, 2, 3, 4, 5, 6, 9]"
)
print("Final Code for Quicksort:\n", final_code_qs)
print("Final Output for Quicksort:\n", final_output_qs)


Iteration 0:
Generated Code:
Okay, I need to write a Python function called quicksort that sorts a list of integers in ascending order using the QuickSort algorithm. Let me think about how to approach this.

First, I remember that QuickSort works by selecting a pivot element and partitioning the list into elements less than, equal to, and greater than the pivot. Then, recursively sorting the sublists. So, the steps are: if the list has one or zero elements, it's sorted. Choose a pivot, partition the list, and recursively sort the partitions.

I'll start by writing a function called quicksort that takes a list as an argument. The base case is if the list has one or zero elements, return it. Otherwise, select the pivot, which is the first element of the list.

Next, I need to partition the list into less than, equal to, and greater than the pivot. I'll create three empty lists: less, equal, and greater.

I'll loop through each element in the original list. For each element, if it's less 