**Generative Models for Code** -- Final Project<br><br>
**Maria Gancayco (mig2131@columbia.edu)**<br>
**Stephen Wright (svw2112@columbia.edu)**<br>
*Due:* Wednesday, 12 Dec 2024 at 11:59pm ET

### Imports and Setup

In [None]:
# Setup: Environment and Memory Management

import torch
import gc
from pathlib import Path
from dataclasses import dataclass
from typing import Optional

# Check and display GPU availability for transparency
print("CUDA available:", torch.cuda.is_available())
print("GPU device name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU found")

# Memory management utilities
def clear_memory() -> None:
    """
    Clears GPU memory cache and performs garbage collection.

    This function is crucial for maintaining optimal memory usage during model evaluation,
    especially when loading and comparing multiple large language models.
    """
    if torch.cuda.is_available():
        torch.cuda.empty_cache()  # Clear CUDA cache
    gc.collect()  # Trigger Python garbage collection

def get_memory_status() -> None:
    """
    Displays current GPU memory usage statistics.

    Reports both allocated and reserved memory in megabytes (MB).
    This helps monitor memory consumption during model operations.

    Note:
        - Allocated memory: Actually used GPU memory
        - Reserved memory: Total memory reserved by PyTorch
    """
    if torch.cuda.is_available():
        # Convert bytes to MB for better readability
        allocated = torch.cuda.memory_allocated() / 1024**2
        reserved = torch.cuda.memory_reserved() / 1024**2
        print(f"GPU Memory: Allocated: {allocated:.2f}MB, Reserved: {reserved:.2f}MB")
clear_memory()
# Initialize by checking current memory status
get_memory_status()

CUDA available: True
GPU device name: NVIDIA A100-SXM4-40GB
GPU Memory: Allocated: 0.00MB, Reserved: 0.00MB


In [None]:
# Configuration and Setup

@dataclass
class ExperimentConfig:
    """
    Configuration dataclass containing all hyperparameters and settings for model evaluation.

    Attributes:
        model_name (str): Name/path of the model to be evaluated
        batch_size (int): Number of samples processed in each batch
        learning_rate (float): Learning rate for model optimization
        num_epochs (int): Number of training epochs
        max_seq_length (int): Maximum sequence length for input tokenization
        gradient_accumulation_steps (int): Number of steps to accumulate gradients
        warmup_steps (Optional[int]): Number of warmup steps for learning rate scheduler
        weight_decay (float): L2 regularization factor
        eval_steps (int): Frequency of evaluation steps
        save_steps (int): Frequency of model checkpoint saves
        logging_steps (int): Frequency of logging training metrics
    """
    model_name: str
    batch_size: int
    learning_rate: float
    num_epochs: int
    max_seq_length: int
    gradient_accumulation_steps: int
    warmup_steps: Optional[int] = None
    weight_decay: float = 0.01
    eval_steps: int = 100
    save_steps: int = 100
    logging_steps: int = 10

# Set up results directory for storing evaluation outputs
results_dir = Path("./results")
results_dir.mkdir(parents=True, exist_ok=True)  # Create directory if it doesn't exist

print("Configuration and directories initialized!")

Configuration and directories initialized!


In [None]:
config = ExperimentConfig(
    model_name="deepseek-ai/deepseek-coder-7b-instruct-v1.5",
    batch_size=1,                    # Small batch size due to model size
    learning_rate=5e-5,             # Conservative learning rate for fine-tuning
    num_epochs=3,                   # Number of training epochs
    max_seq_length=512,            # Maximum sequence length for input processing
    gradient_accumulation_steps=32, # Accumulate gradients to simulate larger batch size
    warmup_steps=100               # Warmup steps for learning rate scheduler
)

In [None]:
# Model Dependencies and Imports

# Install core dependencies for transformer model handling and evaluation
!pip install transformers torch timeout-decorator

# Import required libraries
import torch  # PyTorch for deep learning operations
from transformers import (
    AutoTokenizer,         # For tokenization of input text
    AutoModelForCausalLM   # For loading pre-trained causal language models
)
from typing import List, Dict  # Type hints for better code documentation
import timeout_decorator
!pip install datasets
from datasets import load_dataset
import numpy as np

In [None]:
dataset = load_dataset("openai_humaneval")

In [None]:
# Model Loading and Code Generation

def load_model_and_tokenizer(config: ExperimentConfig) -> tuple[AutoModelForCausalLM, AutoTokenizer]:

    try:
        # Clear memory before loading new model to prevent OOM errors
        clear_memory()

        print(f"Loading {config.model_name}...")

        # Initialize tokenizer with remote code execution enabled
        tokenizer = AutoTokenizer.from_pretrained(
            config.model_name,
            trust_remote_code=True  # Required for custom tokenizer implementations
        )

        # Load model with memory-efficient settings
        model = AutoModelForCausalLM.from_pretrained(
            config.model_name,
            trust_remote_code=True,
            torch_dtype=torch.bfloat16,    # Use bfloat16 for memory efficiency
            device_map="auto",             # Optimize model placement across available devices
            low_cpu_mem_usage=True         # Minimize CPU memory during loading
        )

        # Enable gradient checkpointing if available
        if hasattr(model, "gradient_checkpointing_enable"):
            model.gradient_checkpointing_enable()  # Trade compute for memory savings

        print("Model loaded successfully!")
        get_memory_status()  # Display current memory usage

        return model, tokenizer

    except Exception as e:
        print(f"Error loading model: {str(e)}")
        raise

def generate_code(
    model: AutoModelForCausalLM,
    tokenizer: AutoTokenizer,
    prompt: str,
    max_new_tokens: int = 512,
    temperature: float = 0.8,
    top_p: float = 0.95,
    top_k: int = 50
) -> str:

    try:
        # Format prompt as chat message
        messages = [{"role": "user", "content": prompt}]
        print("Generating inputs...")
        # Tokenize input with chat template
        inputs = tokenizer.apply_chat_template(
            messages,
            add_generation_prompt=True,
            return_tensors="pt"
        ).to(model.device)
        print("Generating outputs...")
        # Generate code with specified parameters
        outputs = model.generate(
            inputs,
            max_new_tokens=max_new_tokens,  # Control generation length
            do_sample=True,                 # Enable sampling-based generation
            temperature=temperature,         # Control randomness
            top_p=top_p,                    # Nucleus sampling threshold
            top_k=top_k,                    # Top-k sampling parameter
            num_return_sequences=1,         # Generate single sequence
            pad_token_id=tokenizer.pad_token_id,
            eos_token_id=tokenizer.eos_token_id
        )

        # Decode and return only the generated portion (excluding prompt)
        return tokenizer.decode(outputs[0][len(inputs[0]):], skip_special_tokens=True)

    except Exception as e:
        print(f"Error in code generation: {str(e)}")
        return ""


In [None]:
# Initialize model and tokenizer using configuration
deepseek_7b_model, deepseek_7b_tokenizer = load_model_and_tokenizer(config)

### HumanEval+ Test Case Generation

In [None]:
import re
def generate_humaneval_plus_tests(model_type, deepseek_model=None, deepseek_tokenizer=None, num_total_tests=100):
    dataset = load_dataset("openai_humaneval")
    results = []
    total_tests_generated = 0
    with open(f'{model_type}_test_case_generation_results.txt', 'w') as f:
      for i in range(len(dataset['test'])):
          if total_tests_generated >= num_total_tests:
              break

          problem = dataset['test'][i]
          prompt = problem['prompt']
          solution = problem['canonical_solution']
          entry_point = problem['entry_point']
          test_code = problem['test']

          # Extract working test cases
          check_match = re.search(r'def check\(candidate\):\s*(.*?)(?=\n\n|$)', test_code, re.DOTALL)
          test_cases = re.findall(r'assert.*?(?=\n|$)', check_match.group(1) if check_match else '')

          test_prompt = f"""
Please provide executable test cases for this function:
{prompt}

Working test examples:
{test_cases}

Include these types of tests:
1. Performance test:
def test_{entry_point}_perf():
    {test_cases[0].replace('candidate', entry_point)}

2. Edge case test:
def test_{entry_point}_edge():
    {test_cases[-1].replace('candidate', entry_point)}

3. Error test:
def test_{entry_point}_error():
    with pytest.raises(TypeError):
        {entry_point}(None)

Only provide executable test cases. No placeholders."""

          try:
              generated_tests, cleaned_tests = None, None
              if model_type == "semcoder":
                generated_tests = semcoder.generate_code(test_prompt)
                cleaned_tests = evaluator.clean_generated_code(generated_tests)
              elif "deepseek"in model_type:
                generated_tests = generate_code(deepseek_model, deepseek_tokenizer, test_prompt, max_new_tokens=4096)
                cleaned_tests = clean_deepseek_generated_code(generated_tests)
              elif model_type == "gpt-4":
                response = openai.ChatCompletion.create(
                    model="gpt-4",
                    messages=[
                        {"role": "system", "content": "You are a helpful assistant."},
                        {"role": "user", "content": test_prompt}
                    ],
                    max_tokens=4096,
                    temperature=0.8,
                    top_p=0.95
                )
                generated_tests = response["choices"][0]["message"]["content"].strip()
                cleaned_tests = clean_deepseek_generated_code(generated_tests) #TODO:- Please rename this since we're also using for OpenAI
              if cleaned_tests:
                  num_tests = len(re.findall(r'def test_', cleaned_tests))
                  total_tests_generated += num_tests

                  result = {
                      'problem_id': i,
                      'entry_point': entry_point,
                      'tests': cleaned_tests,
                      'num_tests': num_tests
                  }
                  results.append(result)

                  print(f"Generated {num_tests} enhanced tests")
                  print(f"Total tests so far: {total_tests_generated}/{num_total_tests}")
                  print("\nTest prompt:")
                  print(test_prompt)
                  print("\nGenerated tests:")
                  print(generated_tests)
                  print("\nCleaned tests:")
                  print(cleaned_tests)

                  f.write(f"Generated {num_tests} enhanced tests\n")
                  f.write(f"Total tests so far: {total_tests_generated}/{num_total_tests}")
                  f.write("\nGenerated tests:\n")
                  f.write(cleaned_tests + "\n")
              else:
                  print("No valid tests generated")

          except Exception as e:
              print(f"Error generating tests: {str(e)}")
              continue

    return results, total_tests_generated

In [None]:
# Generate HumanEval+ tests
print("Generating HumanEval+ test cases...")
plus_results, total_plus_tests = generate_humaneval_plus_tests("deepseek_7b", deepseek_model=deepseek_7b_model, deepseek_tokenizer=deepseek_7b_tokenizer, num_total_tests=100)

### General Utilities

In [None]:
import re

def extract_test_suites(content: str) -> list[str]:
    """
    Extract test suites from the content and format them with function calls.
    Handles both standalone assert statements and function definitions.
    Returns a list of formatted test suite strings.
    """
    # Split content into test suite blocks
    test_blocks = re.split(r'Generated \d+ enhanced tests\nTotal tests so far: \d+/\d+\n+Generated tests:', content)

    # Remove empty blocks
    test_blocks = [block.strip() for block in test_blocks if block.strip()]

    formatted_suites = []
    for block in test_blocks:
        if "unittest.TestCase" in block:
          print("FORMATTED TEST SUITE:")
          print(block)
          formatted_suites.append(block)
          continue


        print("ORIGINAL TEST SUITE:")
        print(block)
        suite_parts = []

        # First, collect any imports at the start of the block
        import_statements = re.findall(r'^import [^\n]+', block, re.MULTILINE)

        # Extract function-based tests
        test_functions = re.finditer(r'def (test_\w+)\(\):\n((?:[ ]{4}.*\n?)+)', block)

        # Extract standalone assert statements (not within functions)
        # Looking for asserts that are at the start of a line and not indented
        standalone_asserts = re.finditer(r'^assert [^\n]+$', block, re.MULTILINE)

        # Extract standalone pytest.raises statements
        standalone_raises = re.finditer(r'^with pytest\.raises\([^\)]+\):\n[ ]{4}[^\n]+\n', block, re.MULTILINE)

        # Add imports if they exist
        if import_statements:
            suite_parts.extend(import_statements)
            suite_parts.append("")  # Add blank line after imports

        # Add standalone asserts
        for match in standalone_asserts:
            suite_parts.append(match.group(0))

        # Add standalone pytest.raises
        for match in standalone_raises:
            suite_parts.append(match.group(0).rstrip())

        # Add function-based tests
        for match in test_functions:
            func_name = match.group(1)
            func_body = match.group(2).rstrip()
            formatted_func = f"def {func_name}():\n{func_body}\n{func_name}()"
            suite_parts.append(formatted_func)

        if suite_parts:
            formatted_suite = "\n".join(suite_parts)
            print("FORMATTED TEST SUITE:")
            print(formatted_suite)
            print("-" * 50)
            formatted_suites.append(formatted_suite)

    return formatted_suites

def process_file_path(file_path: str) -> list[str]:
    """Process a file by path and return list of formatted test suite strings."""
    with open(file_path, 'r') as f:
        content = f.read()
    return extract_test_suites(content)

def process_file_content(content: str) -> list[str]:
    """Process file content directly and return list of formatted test suite strings."""
    return extract_test_suites(content)

In [None]:
import pytest
def execute_test_case(code: str, test_case: str) -> bool:
    try:
        namespace = {}
        # Execute the function code
        exec(code, namespace)
        # Execute the test case
        exec("import pytest", namespace)
        exec(test_case, namespace)
        return True
    except pytest.raises.Exception:
        # This catches when pytest.raises() fails (i.e., expected exception wasn't raised)
        return False
    except Exception as e:
        # Catch any other exceptions
        return False

def check_syntax(code: str) -> bool:
        try:
            compile(code, '<string>', 'exec')
            return True
        except SyntaxError:
            return False
@timeout_decorator.timeout(5)
def evaluate_single_test_suite(solution: str,
                               generated_tests: str) -> Dict:
        syntax_valid = check_syntax(solution + "\n" + generated_tests)

        # Execute test cases if syntax is valid
        if syntax_valid:
            # TODO:- consider using thread pool for parallel test execution
            execution_success = execute_test_case(solution, generated_tests)
        else:
            execution_success = False

        return {
            "syntax_valid": syntax_valid,
            "execution_success": execution_success
        }
def evaluate_test_suite(model_type, dataset, n_tasks, test_suites):
  solutions = dataset['test']["canonical_solution"]
  metrics = {"pass@1": 0.0,      # Single-attempt success rate
            "pass@10": 0.0,     # Success within 10 attempts
            "pass@100": 0.0,    # Success within 100 attempts
            "syntax_validity": 0.0,  # Syntactic correctness
            "execution_accuracy": 0.0  # Functional correctness
  }
  results = []
  with open(f'{model_type}_test_case_generation_accuracy_results.txt', 'w') as f:
          for i in range(n_tasks):
              solution = solutions[i]
              full_solution = dataset['test']["prompt"][i] + solution
              cleaned_tests = test_suites[i]
              result = evaluate_single_test_suite(full_solution, cleaned_tests)

              f.write(f"PROBLEM {i}:\n")
              print(f"PROBLEM {i}:\n")
              f.write("CANONICAL SOLUTION:\n")
              print("CANONICAL SOLUTION:\n")
              f.write(full_solution + "\n")
              print(full_solution + "\n")
              f.write("CLEANED TESTS:\n")
              print("CLEANED TESTS:\n")
              f.write(cleaned_tests + "\n")
              print(cleaned_tests)
              f.write("RESULT:\n" + str(result) + "\n")
              print("RESULT:\n" + str(result))

              results.append(result)

          metrics["syntax_validity"] = np.mean([r["syntax_valid"] for r in results])
          metrics["execution_accuracy"] = np.mean([r["execution_success"] for r in results])
          f.write(str(metrics))

def clean_deepseek_generated_code(code: str) -> str:
        """Clean up generated code to extract only the functions."""
        lines = code.split('\n')
        cleaned_lines = []
        found_start = False
        found_test_func_call = False
        for line in lines:
            if line.startswith('```python'):
                found_start = True
            elif line.startswith('```'):
                if found_test_func_call: break
                else: found_start = False
            elif found_start:
                if line.startswith('test_') and line.endswith('()'):
                    found_test_func_call = True
                cleaned_lines.append(line)

        return '\n'.join(cleaned_lines).strip()

In [None]:
deepseek_7b_extracted_test_suites = process_file_path("/content/deepseek_7b_test_case_generation_results.txt")

In [None]:
evaluate_test_suite("deepseek_7b", dataset, len(deepseek_7b_extracted_test_suites), deepseek_7b_extracted_test_suites)

### SemCoder Simple Prompt Results

In [None]:
def clean_generated_code(code: str) -> str:
    """Clean up generated code to extract only the functions."""
    lines = code.split('\n')
    cleaned_lines = []
    in_function = False

    for line in lines:
        if line.strip().startswith('def '):
            in_function = True
            cleaned_lines.append(line)
        elif in_function and (line.startswith('    ') or not line.strip()):
            cleaned_lines.append(line)
        elif in_function and line.strip() and not line.startswith('    '):
            in_function = False
            cleaned_lines.append('')

    return '\n'.join(cleaned_lines).strip()

### DeepSeek Simple Prompt Results

In [None]:
from typing import List, Dict
import numpy as np
import timeout_decorator
from datasets import load_dataset

In [None]:
metrics = {
            "pass@1": 0.0,      # Single-attempt success rate
            "pass@10": 0.0,     # Success within 10 attempts
            "pass@100": 0.0,    # Success within 100 attempts
            "syntax_validity": 0.0,  # Syntactic correctness
            "execution_accuracy": 0.0  # Functional correctness
}
# TODO:- rename to accomodate that we're also using this for GPT-4

def evaluate_model(model, dataset, model_type, tokenizer, n_tasks: int = None):
        solutions = dataset['test']["canonical_solution"]
        if n_tasks is None:
            n_tasks = len(solutions)

        results = []
        with open(f'{model_type}_test_case_generation_results.txt', 'w') as f:
          for i in range(n_tasks):
              solution = solutions[i]
              full_solution = dataset['test']["prompt"][i] + solution

              prompt = f"""
              Please provide and execute a set of test cases for the following function:
              {full_solution}

              Please do not include natural language or anything that cannot be compiled/executed.
              Please only provided the test cases and their immediate execution.

              Example:
              def test_hello_with_name():
                  assert hello("Alice") == "Hello, Alice"
                  assert hello("Bob") == "Hello, Bob"
              test_hello_with_name()

              def test_hello_without_name():
                  assert hello(None) == "Hello, world"
                  assert hello("") == "Hello, world"
              test_hello_without_name()
              """
              generated_tests = ""
              if model_type == "deepseek":
                  generated_tests = generate_code(
                      model,
                      tokenizer,
                      prompt,
                      max_new_tokens=4096
                  )
              elif model_type == "semcoder":
                  generated_tests = model.generate_code(prompt, max_new_tokens=4096)

              cleaned_tests = clean_deepseek_generated_code(generated_tests) if model_type == "deepseek" else "" #no-op for now
              result = evaluate_single_test_suite(full_solution, cleaned_tests)

              f.write(f"PROBLEM {i}:\n")
              print(f"PROBLEM {i}:\n")
              f.write("CANONICAL SOLUTION:\n")
              print("CANONICAL SOLUTION:\n")
              f.write(full_solution + "\n")
              print(full_solution + "\n")
              f.write("GENERATED TESTS:\n")
              print("GENERATED TESTS:\n")
              f.write(generated_tests + "\n")
              print(generated_tests)
              f.write("CLEANED TESTS:\n")
              print("CLEANED TESTS:\n")
              f.write(cleaned_tests + "\n")
              print(cleaned_tests)
              f.write("RESULT:\n" + str(result) + "\n")
              print("RESULT:\n" + str(result))

              results.append(result)

          # Calculate aggregate metrics
          metrics["syntax_validity"] = np.mean([r["syntax_valid"] for r in results])
          metrics["execution_accuracy"] = np.mean([r["execution_success"] for r in results])
          f.write(str(metrics))
        return metrics

In [None]:
evaluator = TestCaseEvaluator()

In [None]:
metrics = evaluator.evaluate_model(model, "deepseek", tokenizer, 100)
for metric, value in metrics.items():
    print(f"{metric}: {value:.4f}")
from google.colab import files
files.download('deepseek_test_case_generation_results.txt')

### Standardized SemCoder Results

In [None]:
metrics = evaluator.evaluate_model(semcoder, "semcoder", tokenizer, 100)
for metric, value in metrics.items():
    print(f"{metric}: {value:.4f}")
from google.colab import files
files.download('semcoder_test_case_generation_results.txt')

Setting `pad_token_id` to `eos_token_id`:None for open-end generation.
Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


PROBLEM 0:

CANONICAL SOLUTION:

from typing import List


def has_close_elements(numbers: List[float], threshold: float) -> bool:
    """ Check if in given list of numbers, are any two numbers closer to each other than
    given threshold.
    >>> has_close_elements([1.0, 2.0, 3.0], 0.5)
    False
    >>> has_close_elements([1.0, 2.8, 3.0, 4.0, 5.0, 2.0], 0.3)
    True
    """
    for idx, elem in enumerate(numbers):
        for idx2, elem2 in enumerate(numbers):
            if idx != idx2:
                distance = abs(elem - elem2)
                if distance < threshold:
                    return True

    return False


GENERATED TESTS:


              Please provide and execute a set of test cases for the following function:
              from typing import List


def has_close_elements(numbers: List[float], threshold: float) -> bool:
    """ Check if in given list of numbers, are any two numbers closer to each other than
    given threshold.
    >>> has_close_elements([1.0, 2.

Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


PROBLEM 1:

CANONICAL SOLUTION:

from typing import List


def separate_paren_groups(paren_string: str) -> List[str]:
    """ Input to this function is a string containing multiple groups of nested parentheses. Your goal is to
    separate those group into separate strings and return the list of those.
    Separate groups are balanced (each open brace is properly closed) and not nested within each other
    Ignore any spaces in the input string.
    >>> separate_paren_groups('( ) (( )) (( )( ))')
    ['()', '(())', '(()())']
    """
    result = []
    current_string = []
    current_depth = 0

    for c in paren_string:
        if c == '(':
            current_depth += 1
            current_string.append(c)
        elif c == ')':
            current_depth -= 1
            current_string.append(c)

            if current_depth == 0:
                result.append(''.join(current_string))
                current_string.clear()

    return result


GENERATED TESTS:


              Please 

Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


PROBLEM 2:

CANONICAL SOLUTION:



def truncate_number(number: float) -> float:
    """ Given a positive floating point number, it can be decomposed into
    and integer part (largest integer smaller than given number) and decimals
    (leftover part always smaller than 1).

    Return the decimal part of the number.
    >>> truncate_number(3.5)
    0.5
    """
    return number % 1.0


GENERATED TESTS:


              Please provide and execute a set of test cases for the following function:
              

def truncate_number(number: float) -> float:
    """ Given a positive floating point number, it can be decomposed into
    and integer part (largest integer smaller than given number) and decimals
    (leftover part always smaller than 1).

    Return the decimal part of the number.
    >>> truncate_number(3.5)
    0.5
    """
    return number % 1.0


              Please do not include natural language or anything that cannot be compiled/executed.
              Please only provid

KeyboardInterrupt: 

 ### Code Coverage Assessment

In [None]:
# First, install required packages
!pip install pytest pytest-cov coverage
from google.colab import files  # Colab-specific import

In [None]:
import os
import re
import tempfile
import subprocess
import statistics
from typing import Dict, List, Tuple
import json
from pathlib import Path
from google.colab import files  # Colab-specific import

In [None]:
def calculate_aggregate_metrics(results, target_score_name) -> Dict:
    if not results:
        return {'error': 'No valid results to analyze'}

    score_values = [r[target_score_name] for r in results if target_score_name in r]

    if not score_values:
        return {'error': 'No valid score values found'}

    return {
        f'mean_{target_score_name}': statistics.mean(score_values),
        f'median_{target_score_name}': statistics.median(score_values),
        f'min_{target_score_name}': min(score_values),
        f'max_{target_score_name}': max(score_values),
        f'std_dev': statistics.stdev(score_values) if len(score_values) > 1 else 0,
        'total_entries_analyzed': len(score_values)
    }

In [None]:
class TestCoverageAnalyzer:
    def __init__(self, input_file: str = "", output_dir: str = "/content/coverage_results"):
        """Initialize the analyzer with input file path and output directory."""
        self.input_file = input_file
        self.output_dir = output_dir
        self.coverage_results = []
        os.makedirs(output_dir, exist_ok=True)

    def create_test_files(self, solution: str, tests: str, temp_dir: str) -> Tuple[str, str]:
        """Create temporary Python files for the solution and tests."""
        # Create solution file
        solution_file = Path(temp_dir) / "solution.py"
        with open(solution_file, 'w') as f:
            f.write(solution)

        # Create test file with proper imports for Colab
        test_file = Path(temp_dir) / "test_solution.py"
        with open(test_file, 'w') as f:
            f.write("import sys\n")
            f.write(f"sys.path.append('{temp_dir}')\n")
            f.write("from solution import *\n")
            f.write(tests)

        return str(solution_file), str(test_file)

    def get_coverage_data():
        with open('coverage.json') as f:
            coverage_data = json.load(f)
            for file_path, file_data in coverage_data['files'].items():
                  if 'solution.py' in file_path:
                     return {
                        'line_coverage': file_data['summary']['percent_covered'],
                        'total_lines': file_data['summary']['num_statements'],
                        'covered_lines': file_data['summary']['covered_lines'],
                        'missing_lines': file_data['summary']['missing_lines']
                     }
    def run_coverage_analysis(self, solution_file: str, test_file: str, temp_dir: str) -> Dict:
        """Run pytest with coverage and return results."""
        try:
            orig_dir = os.getcwd()
            os.chdir(temp_dir)

            # Run pytest with coverage using python -m to ensure proper module resolution
            cmd = ['python3', '-m', 'pytest', '--cov=solution',
                '--cov-report=json', 'test_solution.py', '-v']

            env = os.environ.copy()
            env['PYTHONPATH'] = temp_dir  # Ensure proper module resolution

            result = subprocess.run(cmd, capture_output=True, text=True, env=env)
            if os.path.exists('coverage.json'): return get_coverage_data()
            return {'error': 'No coverage data generated'}

        except subprocess.CalledProcessError as e:
            print(f"Command output: {e.output}")
            return {'error': f'pytest failed: {str(e)}'}
        except Exception as e:
            print(f"Exception details: {str(e)}")
            return {'error': f'Analysis failed: {str(e)}'}
        finally:
            os.chdir(orig_dir)

    def analyze_all_entries(self) -> Dict:
        with open(self.input_file, 'r') as f:
            content = f.read()

        entries = content.split('CANONICAL SOLUTION:')[1:]  # Skip first empty split

        for i, entry in enumerate(entries):
            try:
                # Add back the header since we split on it
                entry = 'CANONICAL SOLUTION:' + entry

                with tempfile.TemporaryDirectory() as temp_dir:
                    solution, tests = extract_sections(entry)
                    if not tests.strip():
                        continue

                    solution_file, test_file = self.create_test_files(solution, tests, temp_dir)

                    result = self.run_coverage_analysis(solution_file, test_file, temp_dir)
                    print(result)
                    if 'line_coverage' in result:
                        self.coverage_results.append(result)
            except Exception as e:
                print(f"Error processing entry {i}: {str(e)}")
                continue

        return calculate_aggregate_metrics(self.coverage_results, "line_coverage")

In [None]:
coverage_analyzer = TestCoverageAnalyzer()
deep_seek_coverage_results = []
for index, test_suite in enumerate(extracted_test_suites):
  solution = dataset['test']["prompt"][index] + dataset['test']["canonical_solution"][index]
  with tempfile.TemporaryDirectory() as temp_dir:
    solution_file, test_file = coverage_analyzer.create_test_files(solution, test_suite, temp_dir)
    result = coverage_analyzer.run_coverage_analysis(solution_file, test_file, temp_dir)
    if 'line_coverage' in result:
      deep_seek_coverage_results.append(result)
print(calculate_aggregate_metrics(deep_seek_coverage_results, "line_coverage"))

{'mean_line_coverage': 97.34693877551021, 'median_line_coverage': 100.0, 'min_line_coverage': 21.428571428571427, 'max_line_coverage': 100.0, 'std_dev': 13.428673596709753, 'total_entries_analyzed': 35}


In [None]:
semcoder_coverage_results = []
for index, test_suite in enumerate(semcoder_extracted_test_suites):
  solution = dataset['test']["prompt"][index] + dataset['test']["canonical_solution"][index]
  with tempfile.TemporaryDirectory() as temp_dir:
    solution_file, test_file = coverage_analyzer.create_test_files(solution, test_suite, temp_dir)
    result = coverage_analyzer.run_coverage_analysis(solution_file, test_file, temp_dir)
    if 'line_coverage' in result:
      semcoder_coverage_results.append(result)
print(calculate_aggregate_metrics(semcoder_coverage_results, "line_coverage"))

{'mean_line_coverage': 96.75324675324676, 'median_line_coverage': 100.0, 'min_line_coverage': 21.428571428571427, 'max_line_coverage': 100.0, 'std_dev': 14.40695307294402, 'total_entries_analyzed': 33}


### Measuring Novelty and Diversity

#### Measuring with LLM as Judge

In [None]:
!pip install anthropic

In [None]:
from anthropic import Anthropic
import json
from google.colab import userdata
def analyze_novelty_with_claude(source_function: str, generated_tests: str, original_tests: str = None) -> dict:
    """Use Claude API to analyze test novelty."""

    anthropic = Anthropic(api_key=userdata.get('ANTHROPIC_API_KEY'))

    prompt = f"""
As an expert test engineer, analyze the semantic novelty and diversity of the generated test cases for the given function. Consider the function's purpose, edge cases, and expected behaviors.

Source Function:

{source_function}


Generated Test Suite:

{generated_tests}

Original Test Suite:

{original_tests}

Please analyze:
1. How well do the tests cover different aspects of the function's behavior?
2. What novel testing scenarios are introduced?
3. Are there important edge cases or boundary conditions tested?
4. How diverse are the test inputs and scenarios?
5. Are the tests relevant to the function's purpose?

Provide your analysis in the following JSON format:
{{
    "novelty_score": <float between 0.0 and 1.0>,
    "novel_aspects": [<list of strings describing novel aspects>],
    "unique_scenarios": [<list of strings describing unique test scenarios>],
    "coverage_assessment": <string describing overall test coverage>,
    "recommendations": [<list of strings with suggested additional test cases>]
}}
Do not provide any other additonal text other than the JSON in order to facilitate
text processing.

"""

    message = anthropic.messages.create(
        model="claude-3-sonnet-20240229",
        max_tokens=4096,
        temperature=0,  # Use 0 for consistent analysis
        messages=[{
            "role": "user",
            "content": prompt
        }]
    )

    try:
        # Parse the response as JSON
        analysis = json.loads(message.content[0].text)
        return analysis
    except json.JSONDecodeError:
        print("Failed to parse Claude's response as JSON")
        return None

In [None]:
deep_seek_novelty_results = []
for index, test_suite in enumerate(extracted_test_suites):
  solution = dataset['test']["prompt"][index] + dataset['test']["canonical_solution"][index]
  original_tests = dataset['test']["test"][index]
  result = analyze_novelty_with_claude(solution, test_suite, original_tests)
  print(result)
  deep_seek_novelty_results.append(result)
print(calculate_aggregate_metrics(deep_seek_novelty_results, "novelty_score"))

{'novelty_score': 0.6, 'novel_aspects': ['Tests for error handling (TypeError)', 'Tests for performance edge case'], 'unique_scenarios': ['Passing None as input', 'Large list with close elements'], 'coverage_assessment': 'The generated test suite covers some important aspects like error handling and performance edge cases, but lacks comprehensive coverage of boundary conditions and diverse input scenarios.', 'recommendations': ['Test with empty list', 'Test with list containing duplicate values', 'Test with list containing negative numbers', 'Test with threshold values at or near 0', 'Test with large threshold values']}
{'novelty_score': 0.6, 'novel_aspects': ['Tests for error handling (passing None as input)', 'Tests for performance (large input string)'], 'unique_scenarios': ['Empty string input', 'Nested parentheses within a group', 'Consecutive groups with no spaces', 'Single group with no spaces'], 'coverage_assessment': 'The tests cover a good range of scenarios, including edge c

In [None]:
print(calculate_aggregate_metrics(deep_seek_novelty_results[:34], "novelty_score"))

{'mean_novelty_score': 0.6558823529411765, 'median_novelty_score': 0.7, 'min_novelty_score': 0.4, 'max_novelty_score': 0.8, 'std_dev': 0.07859052479933758, 'total_entries_analyzed': 34}


In [None]:
semcoder_novelty_results = []
for index, test_suite in enumerate(semcoder_extracted_test_suites):
  solution = dataset['test']["prompt"][index] + dataset['test']["canonical_solution"][index]
  original_tests = dataset['test']["test"][index]
  result = analyze_novelty_with_claude(solution, test_suite, original_tests)
  semcoder_novelty_results.append(result)
  print(result)
print(calculate_aggregate_metrics(semcoder_novelty_results, "novelty_score"))

{'novelty_score': 0.6, 'novel_aspects': ['Tests for error handling (TypeError)', 'Tests for performance edge case'], 'unique_scenarios': ['Passing None as input', 'Large list with close elements'], 'coverage_assessment': 'The generated test suite covers some important aspects like error handling and performance edge cases, but lacks comprehensive coverage of boundary conditions and diverse input scenarios.', 'recommendations': ['Test with empty list', 'Test with list containing duplicate elements', 'Test with list containing negative numbers', 'Test with threshold values at or near 0', 'Test with large threshold values']}
{'novelty_score': 0.6, 'novel_aspects': ['Tests for error handling (passing None as input)', 'Tests for performance (large input string)'], 'unique_scenarios': ['Empty string input', 'Nested parentheses', 'Single group of parentheses', 'Multiple groups of parentheses', 'Unbalanced parentheses (not tested)'], 'coverage_assessment': 'The tests cover a good range of scen

In [None]:
# prompt: Write the contents of semcoder_novelty_results and deep_seek_novelty_results to their own respective files that I can then download

import json

# Assuming deep_seek_novelty_results and semcoder_novelty_results are lists of dictionaries
# as produced by your analyze_novelty_with_claude function.


def write_results_to_file(results, filename):
    with open(filename, 'w') as f:
        json.dump(results, f, indent=4)


write_results_to_file(deep_seek_novelty_results, 'deep_seek_novelty_results.json')
write_results_to_file(semcoder_novelty_results, 'semcoder_novelty_results.json')

from google.colab import files

files.download('deep_seek_novelty_results.json')
files.download('semcoder_novelty_results.json')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

#### Measuring with Patterns

In [None]:
from typing import Dict, List
import re
from collections import defaultdict

class CoveragePatternAnalyzer:
    """Analyzes test coverage patterns focusing on types of test cases."""

    def __init__(self):
        self.patterns = {
            'edge_cases': {
                'empty_input': r'assert.*(?:empty|\[\]|\{\}|\(\)|""|\'\'|\b==\s*\[\]|\b==\s*""|\b==\s*\'\')',
                'null_input': r'assert.*(?:None|null)',
                'single_element': r'assert.*\[[^,\]]+\]'
            },
            'boundary_testing': {
                'zero_values': r'assert.*\b0\b',
                'negative_values': r'assert.*-\d+',
                'large_values': r'assert.*\d{5,}'
            },
            'error_handling': {
                'exception_testing': r'with\s+pytest\.raises\([^)]+\)',
                'invalid_input': r'assert.*(invalid|wrong|incorrect|bad)'
            },
            'functionality': {
                'typical_case': r'assert.*normal|typical|standard',
                'complex_input': r'assert.*(?:\[.*,.*,.*\]|\{.*:.*,.*:.*\}|\(.*,.*,.*\))'
            }
        }

    def _extract_assertions(self, test_code: str) -> List[str]:
        """Extract assertions with improved handling of multi-line and truncated assertions."""
        lines = test_code.split('\n')
        assertions = []
        current_assertion = None
        in_raises_block = False
        bracket_count = 0
        paren_count = 0

        for line in lines:
            line = line.strip()

            # Skip empty lines
            if not line:
                continue

            # Start of pytest.raises block
            if 'pytest.raises' in line:
                in_raises_block = True
                current_assertion = line
                paren_count = line.count('(') - line.count(')')
                if paren_count == 0:
                    assertions.append(current_assertion)
                    current_assertion = None
                    in_raises_block = False
                continue

            # Start of regular assertion
            if line.startswith('assert'):
                current_assertion = line
                bracket_count = line.count('[') - line.count(']')
                paren_count = line.count('(') - line.count(')')
                if bracket_count == 0 and paren_count == 0:
                    assertions.append(current_assertion)
                    current_assertion = None
                continue

            # Continue previous assertion
            if current_assertion:
                current_assertion += ' ' + line
                if in_raises_block:
                    paren_count += line.count('(') - line.count(')')
                    if paren_count == 0:
                        assertions.append(current_assertion)
                        current_assertion = None
                        in_raises_block = False
                else:
                    bracket_count += line.count('[') - line.count(']')
                    paren_count += line.count('(') - line.count(')')
                    if bracket_count == 0 and paren_count == 0:
                        assertions.append(current_assertion)
                        current_assertion = None

        # Handle any remaining incomplete assertion
        if current_assertion:
            assertions.append(current_assertion + ' ...')

        return assertions

    def analyze_test_suite(self, test_code: str) -> Dict:
        """Analyze a test suite and return detailed coverage metrics."""
        assertions = self._extract_assertions(test_code)
        for assertion in assertions:
          print(assertion)
        total_assertions = len(assertions)
        if total_assertions == 0:
            return {'error': 'No assertions found'}

        # Track which patterns match each assertion
        assertion_patterns = {i: set() for i in range(total_assertions)}
        pattern_counts = defaultdict(lambda: defaultdict(int))
        uncategorized_assertions = []

        # Analyze each assertion
        for i, assertion in enumerate(assertions):
            matches_found = False
            for category, patterns in self.patterns.items():
                for name, pattern in patterns.items():
                    if re.search(pattern, assertion):
                        assertion_patterns[i].add(f"{category}:{name}")
                        pattern_counts[category][name] += 1
                        matches_found = True

            if not matches_found:
                uncategorized_assertions.append(assertion)

        # Calculate metrics
        results = {}
        for category, patterns in pattern_counts.items():
            category_assertions = len([i for i in assertion_patterns.values()
                                    if any(p.startswith(f"{category}:") for p in i)])
            results[category] = {
                'total_matches': category_assertions,
                'coverage_ratio': category_assertions / total_assertions,
                'pattern_breakdown': dict(patterns)
            }

        # Add overall metrics
        results['overall'] = {
            'total_assertions': total_assertions,
            'patterns_per_assertion': sum(len(p) for p in assertion_patterns.values()) / total_assertions,
            'pattern_coverage': len([p for p in sum([list(p.values()) for p in pattern_counts.values()], []) if p > 0]) / \
                              len(sum([list(p.values()) for p in self.patterns.values()], [])),
            'uncategorized': len(uncategorized_assertions),
            'uncategorized_assertions': uncategorized_assertions
        }

        return results

In [None]:
pattern_analyzer = CoveragePatternAnalyzer()

In [None]:
deepseek_extracted_test_suites_str = "\n\n".join(deepseek_extracted_test_suites)

In [None]:
deep_seek_pattern_analyzer_results = pattern_analyzer.analyze_test_suite(deepseek_extracted_test_suites_str)

assert has_close_elements([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.3) == True
assert has_close_elements([1.1, 2.2, 3.1, 4.1, 5.1], 0.5) == False
with pytest.raises(TypeError):
assert separate_paren_groups('(()()) ((())) () ((())()())') == ['()', '(())', '(()())', '((()))', '(((())))']
assert separate_paren_groups('( ) (( )) (( )( ))') == ['()', '(())', '(()())']
with pytest.raises(TypeError):
assert below_zero([]) == False
assert below_zero([1, -2, 2, -2, 5, -5, 4, -4]) == True
with pytest.raises(TypeError):
assert abs(mean_absolute_deviation([1.0, 2.0, 3.0]) - 2.0/3.0) < 1e-6
assert abs(mean_absolute_deviation([1.0, 2.0, 3.0, 4.0, 5.0]) - 6.0/5.0) < 1e-6
with pytest.raises(TypeError):
assert intersperse([], 7) == []
assert intersperse([5, 6, 3, 2], 8) == [5, 8, 6, 8, 3, 8, 2]
assert intersperse([2, 2, 2], 2) == [2, 2, 2, 2, 2]
with pytest.raises(TypeError):
assert parse_nested_parens('(()()) ((())) () ((())()())') == [2, 3, 1, 3]
assert parse_nested_parens('(()(())((())))') == [4]
with pytes

In [None]:
for item in deep_seek_pattern_analyzer_results:
  print(item)
  print(deep_seek_pattern_analyzer_results[item])

boundary_testing
{'total_matches': 35, 'coverage_ratio': 0.18617021276595744, 'pattern_breakdown': {'zero_values': 24, 'negative_values': 9, 'large_values': 6}}
functionality
{'total_matches': 48, 'coverage_ratio': 0.2553191489361702, 'pattern_breakdown': {'complex_input': 48}}
error_handling
{'total_matches': 40, 'coverage_ratio': 0.2127659574468085, 'pattern_breakdown': {'exception_testing': 40}}
edge_cases
{'total_matches': 37, 'coverage_ratio': 0.19680851063829788, 'pattern_breakdown': {'empty_input': 29, 'single_element': 9, 'null_input': 1}}
overall
{'total_assertions': 188, 'patterns_per_assertion': 0.8829787234042553, 'pattern_coverage': 0.8, 'uncategorized': 53, 'uncategorized_assertions': ["assert is_palindrome('a') == True", "assert is_palindrome('aa') == True", "assert is_palindrome('ab') == False", "assert make_palindrome('x') == 'x'", "assert make_palindrome('xyz') == 'xyzyx'", "assert make_palindrome('xyx') == 'xyx'", "assert make_palindrome('jerry') == 'jerryrrej'", "as

In [None]:
semcoder_extracted_test_suites_str = "\n\n".join(semcoder_extracted_test_suites)

In [None]:
semcoder_pattern_analyzer_results = pattern_analyzer.analyze_test_suite(semcoder_extracted_test_suites_str)

assert has_close_elements([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.3) == True
assert has_close_elements([1.1, 2.2, 3.1, 4.1, 5.1], 0.5) == False
with pytest.raises(TypeError):
assert separate_paren_groups('( ) (( )) (( )( ))') == ['()', '(())', '(()())']
with pytest.raises(TypeError):
assert truncate_number(3.5) == 0.5
assert abs(truncate_number(123.456) - 0.456) < 1e-6
with pytest.raises(TypeError):
assert below_zero([]) == False
assert below_zero([1, -2, 2, -2, 5, -5, 4, -4]) == True
with pytest.raises(TypeError):
assert abs(mean_absolute_deviation([1.0, 2.0, 3.0]) - 2.0/3.0) < 1e-6
assert abs(mean_absolute_deviation([1.0, 2.0, 3.0, 4.0, 5.0]) - 6.0/5.0) < 1e-6
with pytest.raises(TypeError):
assert intersperse([], 7) == []
assert intersperse([2, 2, 2], 2) == [2, 2, 2, 2, 2]
with pytest.raises(TypeError):
assert parse_nested_parens('(()()) ((())) () ((())()())') == [2, 3, 1, 3]
assert parse_nested_parens('(()(())((())))') == [4]
with pytest.raises(TypeError):
assert filter_by_substring([], '

In [None]:
for item in semcoder_pattern_analyzer_results:
  print(item)
  print(semcoder_pattern_analyzer_results[item])

boundary_testing
{'total_matches': 21, 'coverage_ratio': 0.2079207920792079, 'pattern_breakdown': {'zero_values': 15, 'negative_values': 7, 'large_values': 2}}
functionality
{'total_matches': 25, 'coverage_ratio': 0.24752475247524752, 'pattern_breakdown': {'complex_input': 25}}
error_handling
{'total_matches': 34, 'coverage_ratio': 0.33663366336633666, 'pattern_breakdown': {'exception_testing': 34}}
edge_cases
{'total_matches': 24, 'coverage_ratio': 0.2376237623762376, 'pattern_breakdown': {'empty_input': 22, 'single_element': 3, 'null_input': 1}}
overall
{'total_assertions': 101, 'patterns_per_assertion': 1.0792079207920793, 'pattern_coverage': 0.8, 'uncategorized': 12, 'uncategorized_assertions': ["assert make_palindrome('jerry') == 'jerryrrej'", "assert string_xor('0101', '0000') == '0101'", 'assert greatest_common_divisor(3, 7) == 1', 'assert greatest_common_divisor(144, 60) == 12', "assert count_distinct_characters('Jerry jERRY JeRRRY') == 5", "assert how_many_times('john doe', 'j

### GPT-4 Results

In [None]:
!pip install openai==0.28 #the latest version is acting crazy weird- ugh rollback

In [None]:
generate_humaneval_plus_tests("gpt-4", deep_seek_tokenizer=None, num_total_tests=100) #gpt-4 doesn't have reasoning training; only gpt-4o- interesting case comparison

In [None]:
gpt_4_extracted_test_suites = process_file_path("/content/gpt-4_test_case_generation_results.txt")

In [None]:
evaluate_test_suite("gpt-4",dataset, 21, gpt_4_extracted_test_suites)

PROBLEM 0:

CANONICAL SOLUTION:

from typing import List


def has_close_elements(numbers: List[float], threshold: float) -> bool:
    """ Check if in given list of numbers, are any two numbers closer to each other than
    given threshold.
    >>> has_close_elements([1.0, 2.0, 3.0], 0.5)
    False
    >>> has_close_elements([1.0, 2.8, 3.0, 4.0, 5.0, 2.0], 0.3)
    True
    """
    for idx, elem in enumerate(numbers):
        for idx2, elem2 in enumerate(numbers):
            if idx != idx2:
                distance = abs(elem - elem2)
                if distance < threshold:
                    return True

    return False


CLEANED TESTS:

import pytest

def test_has_close_elements_1():
    assert has_close_elements([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.3) == True
test_has_close_elements_1()
def test_has_close_elements_2():
    assert has_close_elements([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.05) == False
test_has_close_elements_2()
def test_has_close_elements_3():
    assert has_close_elemen

In [None]:
gpt_4_coverage_analyzer = TestCoverageAnalyzer()
gpt_4_coverage_results = []
for index, test_suite in enumerate(gpt_4_extracted_test_suites):
  solution = dataset['test']["prompt"][index] + dataset['test']["canonical_solution"][index]
  with tempfile.TemporaryDirectory() as temp_dir:
    solution_file, test_file = gpt_4_coverage_analyzer.create_test_files(solution, test_suite, temp_dir)
    result = gpt_4_coverage_analyzer.run_coverage_analysis(solution_file, test_file, temp_dir)
    if 'line_coverage' in result:
      gpt_4_coverage_results.append(result)
print(calculate_aggregate_metrics(gpt_4_coverage_results, "line_coverage"))

In [None]:
gpt_4_novelty_results = []
for index, test_suite in enumerate(gpt_4_extracted_test_suites):
  solution = dataset['test']["prompt"][index] + dataset['test']["canonical_solution"][index]
  original_tests = dataset['test']["test"][index]
  result = analyze_novelty_with_claude(solution, test_suite, original_tests)
  print(result)
  gpt_4_novelty_results.append(result)
print(calculate_aggregate_metrics(gpt_4_novelty_results, "novelty_score"))

{'novelty_score': 0.7, 'novel_aspects': ['Tests for error handling with invalid inputs (None, string)', 'Tests for edge cases with threshold values close to 0 and 1'], 'unique_scenarios': ['Tests with duplicate numbers in the input list', 'Tests with numbers very close to the threshold value'], 'coverage_assessment': 'The generated tests cover a good range of scenarios, including typical cases, edge cases, and error handling. However, some important edge cases and boundary conditions are still missing.', 'recommendations': ['Test with an empty list input', 'Test with a list containing only one element', 'Test with a list containing only duplicate elements', 'Test with a threshold value of 0', 'Test with a threshold value of a very large number']}
{'novelty_score': 0.7, 'novel_aspects': ['Tests for error handling with invalid input types (None, int, list)', 'Tests for edge cases like empty string and string with only spaces'], 'unique_scenarios': ['Testing with None input', 'Testing wit

In [None]:
write_results_to_file(gpt_4_novelty_results, 'gpt_4_novelty_results.json')
files.download('gpt_4_novelty_results.json')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>