In [1]:
# Install required packages
!pip install litellm transformers datasets

# Import necessary libraries
import os
import subprocess
import sys
import tempfile
from datasets import load_dataset

print("Dependencies installed and libraries imported successfully.")

Dependencies installed and libraries imported successfully.


In [2]:
# Load the dataset from Hugging Face
dataset = load_dataset("isavita/advent-of-code")

# Filter the dataset for Go solutions
go_solutions = dataset['train'].filter(lambda example: example['solution_lang'] == 'go')

print(f"Loaded {len(go_solutions)} Go solutions from the dataset.")

Downloading readme:   0%|          | 0.00/9.28k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/157M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/10027 [00:00<?, ? examples/s]

Filter:   0%|          | 0/10027 [00:00<?, ? examples/s]

Loaded 441 Go solutions from the dataset.


In [5]:
class Constants:
    @staticmethod
    def file_ext():
        return "py"
    
    @staticmethod
    def lang():
        return "python"
    
    @staticmethod
    def compiler():
        return "/Users/isavita/.pyenv/shims/python"
    
    @staticmethod
    def runtime():
        return "/Users/isavita/.pyenv/shims/python"

print("Constants class defined.")

Constants class defined.


In [6]:
import tempfile
import subprocess
import os

def test_go_execution():
    # Hello World program
    code = '''
print("Hello, World!")
'''
    
    # Create a temporary file
    with tempfile.NamedTemporaryFile(suffix=f'.{Constants.file_ext()}', delete=False, mode='w') as temp_file:
        temp_file.write(code)
        temp_file_path = temp_file.name
    
    try:
        # Run the program
        result = subprocess.run(f"{Constants.runtime()} {temp_file_path}", shell=True, capture_output=True, text=True)
        
        if result.returncode == 0:
            print("Program executed successfully.")
            print("Output:", result.stdout.strip())
        else:
            print("Error executing program:")
            print(result.stderr)
    finally:
        # Clean up the temporary file
        os.unlink(temp_file_path)

# Run the test
test_go_execution()

Program executed successfully.
Output: Hello, World!


In [50]:
import litellm
import warnings
import os
from openai import OpenAI

# Suppress the specific warning
warnings.filterwarnings("ignore", message="Field .* has conflict with protected namespace .*")

class LLMWrapper:
    def __init__(self, model_name, temperature=0.3, api_base=None):
        self.model_name = model_name
        self.api_base = api_base
        self.temperature = temperature
        self.is_fine_tuned = model_name.startswith("ft:")
        if self.is_fine_tuned:
            self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY_FT"))

    def generate(self, system_prompt, user_prompt):
        if self.is_fine_tuned:
            return self._generate_with_openai(system_prompt, user_prompt)
        else:
            return self._generate_with_litellm(system_prompt, user_prompt)

    def _generate_with_litellm(self, system_prompt, user_prompt):
        try:
            completion_params = {
                "model": self.model_name,
                "messages": [
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_prompt}
                ],
                "temperature": self.temperature,
            }
            
            if self.api_base:
                completion_params["api_base"] = self.api_base

            response = litellm.completion(**completion_params)
            return response.choices[0].message.content
        except Exception as e:
            print(f"Error in LLM API call: {str(e)}")
            return None

    def _generate_with_openai(self, system_prompt, user_prompt):
        try:
            if not self.client.api_key:
                raise ValueError("OPENAI_API_KEY environment variable not set")

            response = self.client.chat.completions.create(
                model=self.model_name,
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_prompt}
                ],
                temperature=self.temperature
            )
            return response.choices[0].message.content
        except Exception as e:
            print(f"Error in OpenAI API call: {str(e)}")
            return None

# Test the LLMWrapper with the local Ollama model
def test_llm_wrapper():
    wrapper = LLMWrapper("ollama/llama3:8b-instruct-q5_K_M", 0.3, api_base="http://localhost:11434")
    
    system_prompt = "You are a helpful assistant."
    user_prompt = "Tell me a short joke about programming."
    
    print("Testing with local Ollama model:")
    response = wrapper.generate(system_prompt, user_prompt)
    if response:
        print("Local LLM Response:")
        print(response)
    else:
        print("Failed to get a response from the local LLM.")

# Run the test
# test_llm_wrapper()

In [8]:
import threading
import time
import queue

class TimeoutRunner:
    @staticmethod
    def run_with_timeout(func, args=(), kwargs={}, timeout_duration=10):
        result_queue = queue.Queue()

        def wrapper():
            try:
                result = func(*args, **kwargs)
                result_queue.put(result)
            except Exception as e:
                result_queue.put(e)

        thread = threading.Thread(target=wrapper)
        thread.daemon = True
        thread.start()
        
        try:
            return result_queue.get(timeout=timeout_duration)
        except queue.Empty:
            return None

# Test the TimeoutRunner
def test_timeout_runner():
    def quick_function():
        return "Quick function completed"

    def slow_function():
        time.sleep(5)
        return "Slow function completed"

    runner = TimeoutRunner()

    print("Testing quick function (should complete):")
    result = runner.run_with_timeout(quick_function, timeout_duration=2)
    print(result)

    print("\nTesting slow function (should timeout):")
    result = runner.run_with_timeout(slow_function, timeout_duration=2)
    print(result)

# Run the test
test_timeout_runner()

Testing quick function (should complete):
Quick function completed

Testing slow function (should timeout):
None


In [51]:
import os
import tempfile
import subprocess

class EvaluateCode:
    def __init__(self, code, input_data):
        self.code = code
        self.input_data = input_data

    def run(self):
        with tempfile.TemporaryDirectory() as temp_dir:
            # Save code to a file
            code_file = os.path.join(temp_dir, "main.py")
            with open(code_file, 'w') as f:
                f.write(self.code)

            # Save input to a file
            input_file = os.path.join(temp_dir, "input.txt")
            with open(input_file, 'w') as f:
                f.write(self.input_data)

            # Run the command in the temporary directory
            try:
                result = subprocess.run(
                    ["python", code_file],
                    input=self.input_data,
                    capture_output=True,
                    text=True,
                    cwd=temp_dir,
                    timeout=10  # Set a timeout of 10 seconds
                )
                if result.returncode != 0:
                    return f"Runtime error: {result.stderr}"
                return result.stdout.strip()
            except subprocess.TimeoutExpired:
                return "Timeout: Execution took too long"
            except Exception as e:
                return f"Error: {str(e)}"

 

# Test the EvaluateCode class
def test_evaluate_code():
    code = '''
print("Enter a number:")
print("The number is: ")
'''
    input_data = "43"

    evaluator = EvaluateCode(code, input_data)
    result = evaluator.run()
    print("Evaluation result:")
    print(result)

# Run the test
test_evaluate_code()

Evaluation result:
Enter a number:
The number is:


In [63]:
import re
import os
import signal
import time
from pathlib import Path

class TimeoutException(Exception):
    pass

def timeout_handler(signum, frame):
    raise TimeoutException("Execution timed out")

class ChallengeSolver:
    def __init__(self, model_name, llm_wrapper, language='python'):
        self.model_name = model_name
        self.llm_wrapper = llm_wrapper
        self.language = language

    def extract_code(self, input_text):
        pattern = rf"```(?:{self.language})?(.*?)```"
        matches = re.findall(pattern, input_text, re.DOTALL)
        return matches[0] if matches else input_text

    def extract_thoughts_and_code(self, response):
        thoughts_pattern = r"<thought>(.*?)</thought>"
        code_pattern = fr"```(?:{self.language})\n(.*?)```"
        
        thoughts_match = re.search(thoughts_pattern, response, re.DOTALL)
        code_match = re.search(code_pattern, response, re.DOTALL)
        
        thoughts = thoughts_match.group(1).strip() if thoughts_match else "No explicit thought process provided."
        code = code_match.group(1) if code_match else response
        
        print(f"Raw response snippet: {response[:200]}...")
        print(f"Extracted thoughts: {thoughts[:200]}{'...' if len(thoughts) > 200 else ''}")
        
        return thoughts, code
        
        return thoughts, code
        
    def valid_solution(self, result, answer):
        # Strip whitespace and convert to string for comparison
        result = str(result).strip()
        answer = str(answer).strip()

        # Check if the result contains the answer
        if answer in result:
            return True

        # Try to extract a number from the result
        import re
        numbers = re.findall(r'-?\d+', result)
        if numbers and str(numbers[0]) == answer:
            return True

        return False

    def solve(self, challenge, code, thoughts):
        evaluator = EvaluateCode(code, challenge['input'])
        result = evaluator.run()

        if "Timeout" in result:
            print(f"Challenge {challenge['name']} timed out.")
            self.save_solution(challenge, code, thoughts, suffix="_timeout")
            return False, result, code

        if self.valid_solution(result, challenge['answer']):
            print(f"Solved {challenge['name']} successfully!")
            self.save_solution(challenge, code, thoughts)
            return True, result, code
        else:
            print(f"Failed to solve {challenge['name']}")
            print(f"Expected answer: {challenge['answer']}")
            print(f"Actual output: {result}")
            self.save_solution(challenge, code, thoughts, suffix="_failed")
            return False, result, code

    def save_solution(self, challenge_data, code, thoughts, suffix=""):
        home_dir = str(Path.home())
        base_path = f"{home_dir}/code/advent_generated/training_data/reflection-tuning/{challenge_data['name']}"
        
        # Save code
        code_path = f"{base_path}{suffix}.{self.get_file_extension()}"
        os.makedirs(os.path.dirname(code_path), exist_ok=True)
        with open(code_path, 'w') as f:
            f.write(code)
        
        # Save thoughts
        thoughts_path = f"{base_path}{suffix}_{self.language}.txt"
        with open(thoughts_path, 'w') as f:
            f.write(thoughts if thoughts else "No thoughts provided.")

        print(f"Saved solution and thoughts for {challenge_data['name']}{suffix}")

    def get_file_extension(self):
        extensions = {
            'go': 'go',
            'python': 'py',
            'awk': 'awk',
            'zig': 'zig',
            'ruby': 'rb',
            'lua': 'lua',
            'julia': 'jl',
            'php': 'php',
            'elixir': 'exs',
            'perl': 'pl',
            'crystal': 'cr',
            'dart': 'dart',
            'nim': 'nim',
            'clojure': 'clj',
            'scala': 'scala',
            'java': 'java',
            'kotlin': 'kt',
            'ocaml': 'ocaml',
            'groovy': 'groovy',
            'racket': 'rkt',
            'R': 'R',
            'haskell': 'hs',
            'swift': 'swift',
            'objectivec': 'm',
            'pascal': 'pas',
            # Add more languages and their extensions as needed
        }
        return extensions.get(self.language, '')

In [64]:
# Test cases for ChallengeSolver functions

# Create an instance of ChallengeSolver for testing
solver = ChallengeSolver("test_model", None, language='python')

# Test extract_code function
def test_extract_code():
    input_text = """
    Here's some text before the code.
    ```python
    def hello_world():
        print("Hello, World!")
    ```
    And some text after the code.
    """
    expected_code = """def hello_world():
        print("Hello, World!")"""
    
    extracted_code = solver.extract_code(input_text)
    assert extracted_code.strip() == expected_code.strip(), f"Expected:\n{expected_code}\nGot:\n{extracted_code}"
    print("extract_code test passed!")

# Test extract_thoughts_and_code function
def test_extract_thoughts_and_code():
    input_text = """
    Here's some introductory text.
    <thought>
    This is my thought process for solving the problem.
    I think we should approach it this way.
    </thought>

    ```python
    def solve_problem():
        return 42
    ```
    """
    expected_thoughts = "This is my thought process for solving the problem. I think we should approach it this way."
    expected_code = "def solve_problem():\n        return 42"
    
    thoughts, code = solver.extract_thoughts_and_code(input_text)
    
    # Print raw extracted thoughts for debugging
    print("Raw extracted thoughts:")
    print(thoughts)
    
    # Normalize whitespace in thoughts
    normalized_thoughts = ' '.join(thoughts.split())
    normalized_expected_thoughts = ' '.join(expected_thoughts.split())
    
    assert normalized_thoughts == normalized_expected_thoughts, f"Expected thoughts:\n{expected_thoughts}\nGot:\n{thoughts}"
    assert code.strip() == expected_code.strip(), f"Expected code:\n{expected_code}\nGot:\n{code}"
    print("extract_thoughts_and_code test passed!")

# Test valid_solution function
def test_valid_solution():
    test_cases = [
        ("42", "42", True),
        ("The answer is 42", "42", True),
        ("The answer is 42", "43", False),
        ("The result: 42", "42", True),
        ("No number here", "42", False),
        ("Multiple numbers: 41, 42, 43", "42", True)
    ]
    
    for result, answer, expected in test_cases:
        assert solver.valid_solution(result, answer) == expected, f"Failed for result '{result}' and answer '{answer}'"
    
    print("valid_solution test passed!")

# Run all tests
test_extract_code()
test_extract_thoughts_and_code()
test_valid_solution()

print("All tests passed successfully!")

extract_code test passed!
Raw response snippet: 
    Here's some introductory text.
    <thought>
    This is my thought process for solving the problem.
    I think we should approach it this way.
    </thought>

    ```python
    def solve_proble...
Extracted thoughts: This is my thought process for solving the problem.
    I think we should approach it this way.
Raw extracted thoughts:
This is my thought process for solving the problem.
    I think we should approach it this way.
extract_thoughts_and_code test passed!
valid_solution test passed!
All tests passed successfully!


In [65]:
def get_existing_solutions(language):
    home_dir = str(Path.home())
    solutions_dir = os.path.join(home_dir, "code", "advent_generated", "training_data", "reflection-tuning")
    if not os.path.exists(solutions_dir):
        return []
    solutions = os.listdir(solutions_dir)
    return [os.path.splitext(solution)[0] for solution in solutions if solution.endswith(f".{language}")]

def solve_attempt(solver, challenge, target_language, retry=False, previous_result=None):
    system_prompt = "You are an expert programmer tasked with solving a coding challenge."

    if retry:
        user_prompt = f"""
        The previous attempt to solve this problem failed. Here's the error message or incorrect output:
        {previous_result}
        
        Without actually running the code, analyze the error or incorrect output and think about how you would fix the issue. Pretend you've spotted the error in the previous solution.
        Your response should follow this structure:
        1. <thought>Your thought process for identifying the issue and how you would approach fixing it</thought>
        2. Your revised code solution enclosed in ```{target_language} ``` tags

        Task:
        {challenge['task']}

        Provide a corrected solution to the above task in {target_language}, addressing the hypothetical error you've identified.
        """
    else:
        user_prompt = f"""
        Write a simple, concise, and efficient solution in {target_language} that reads its input from a file "input.txt" and solves the following task. The program should print the final answer.

        Your response should follow this structure:
        1. <thought>Your thought process for approaching and solving the problem</thought>
        2. Your code solution enclosed in ```{target_language} ``` tags

        Task:
        {challenge['task']}

        Provide an efficient solution to the above task in {target_language}.
        """

    response = solver.llm_wrapper.generate(system_prompt, user_prompt)
    thoughts, code = solver.extract_thoughts_and_code(response)
    
    success, result, code = solver.solve(challenge, code, thoughts)
    return success, result, code, thoughts

def main():
    target_language = 'python'  # Change this to your target language

    dataset = load_dataset("isavita/advent-of-code")
    go_solutions = dataset['train'].filter(lambda example: example['solution_lang'] == 'go')

    existing_solutions = get_existing_solutions(target_language)
    unsolved_challenges = [
        challenge for challenge in go_solutions 
        if challenge['name'] not in existing_solutions 
        and any(challenge['name'].startswith(f'day{i}_') for i in range(15, 26))
    ][:1]

    llm_wrapper = LLMWrapper("claude-3-5-sonnet-20240620")  # Change to your preferred model
    solver = ChallengeSolver("claude-3-5-sonnet-20240620", llm_wrapper, language=target_language)

    total_challenges = len(unsolved_challenges)
    print(f"Total unsolved challenges: {total_challenges}")

    for i, challenge in enumerate(unsolved_challenges, 1):
        success, result, code, thoughts = solve_attempt(solver, challenge, target_language)

        if "Timeout" in result:
            print(f"Challenge {challenge['name']} timed out.")
            timeout_thoughts = f"<thought>The previous solution appears to be too slow for efficient execution. Without running the code, I believe the issue might be due to [insert hypothetical reason]. To optimize it, we could [insert hypothetical optimization strategy].</thought>"
            solver.save_solution(challenge, code, timeout_thoughts, suffix="_timeout")
        elif success:
            print(f"Solved {challenge['name']} successfully!")
            solver.save_solution(challenge, code, thoughts)
        else:
            print(f"First attempt failed. Retrying with error information.")
            retry_success, retry_result, retry_code, retry_thoughts = solve_attempt(solver, challenge, target_language, retry=True, previous_result=result)
            
            if retry_success:
                print(f"Solved {challenge['name']} successfully after retry!")
                solver.save_solution(challenge, retry_code, retry_thoughts, suffix="_fixed")
            else:
                print(f"Failed to solve {challenge['name']} even after retry.")
                solver.save_solution(challenge, retry_code, retry_thoughts, suffix="_failed")

        progress = (i / total_challenges) * 100
        print(f"Progress: {progress:.1f}%")
        
        time.sleep(1)  # Sleep for 1 second between attempts

if __name__ == "__main__":
    main()

Total unsolved challenges: 1
Raw response snippet: <thought>
To solve this problem, we need to:
1. Parse the input file to extract ingredient properties.
2. Generate all possible combinations of ingredient amounts that sum to 100.
3. For each combinat...
Extracted thoughts: To solve this problem, we need to:
1. Parse the input file to extract ingredient properties.
2. Generate all possible combinations of ingredient amounts that sum to 100.
3. For each combination, calcu...
Failed to solve day15_part1_2015
Expected answer: 13882464
Actual output: 9409920
Saved solution and thoughts for day15_part1_2015_failed
First attempt failed. Retrying with error information.
Raw response snippet: <thought>
The previous solution likely had an issue with the calculation or the way it was searching for the optimal combination. The output of 9409920 seems too low for the best possible score, given...
Extracted thoughts: The previous solution likely had an issue with the calculation or the way it was s