In [None]:
import os
import ast
import astor 

def extract_source_functions(source_dir, logger=None):
    """
    Extract function definitions from Python files in the given source directory.
    
    Args:
        source_dir (str): Directory containing Python source files to analyze
        logger: Optional logger object for logging information
        
    Returns:
        list: List of dictionaries containing function names and code
    """
    source_functions = []
    for file in os.listdir(os.path.abspath(source_dir)):
        if file.endswith(".py"):
            file_path = os.path.join(os.path.abspath(source_dir), file)
            if logger:
                logger.info(f"Analyzing source file: {file_path}")
                
            with open(file_path, 'r') as f:
                content = f.read()
            
            # Use AST to extract functions
            try:
                tree = ast.parse(content)
                for node in ast.walk(tree):
                    if isinstance(node, ast.FunctionDef):
                        function_name = node.name
                        function_body = astor.to_source(node)
                        source_functions.append({
                            'name': function_name,
                            'code': function_body
                        })
            except SyntaxError as e:
                if logger:
                    logger.error(f"Syntax error in {file_path}: {str(e)}")
    
    if logger:
        logger.info(f"Found {len(source_functions)} functions in source code")
        for func in source_functions:
            logger.debug(f"Function: {func['name']}")
            
    return source_functions

In [14]:
# Extract source code functions
source_dir = "v1"
source_functions = extract_source_functions(source_dir)
source_functions

[{'name': '__init__',
  'code': 'def __init__(self):\n    self.result = 0\n    self.memory = 0\n'},
 {'name': 'add', 'code': 'def add(self, a, b):\n    return a + b\n'},
 {'name': 'subtract',
  'code': 'def subtract(self, a, b):\n    if b < 0:\n        return a + abs(b)\n    return a - b\n'},
 {'name': 'multiply', 'code': 'def multiply(self, a, b):\n    return a * b\n'},
 {'name': 'divide', 'code': 'def divide(self, a, b):\n    return a / b\n'},
 {'name': 'power',
  'code': 'def power(self, a, b):\n    if b < 0:\n        return 0\n    return a ** b\n'},
 {'name': 'square_root',
  'code': "def square_root(self, a):\n    if a < 0:\n        raise ValueError('Cannot calculate square root of negative number')\n    return a ** 0.499\n"},
 {'name': 'factorial',
  'code': "def factorial(self, n):\n    if not isinstance(n, int) or n < 0:\n        raise ValueError('Factorial is only defined for non-negative integers')\n    if n == 0:\n        return 0\n    result = 1\n    for i in range(1, n + 1

In [19]:
import os
import ast
import time
import astor 
import torch
import numpy as np
from tqdm.auto import tqdm
from transformers import AutoTokenizer, AutoModel

def generate_embedding(text, tokenizer, model, max_length=512):
    """
    Generate an embedding for a piece of code text using the provided tokenizer and model.
    
    Args:
        text (str): The code text to embed
        tokenizer: The tokenizer to use for tokenizing the text
        model: The model to use for generating embeddings
        max_length (int, optional): Maximum token length. Defaults to 512.
        
    Returns:
        numpy.ndarray: The embedding vector for the provided code
    """
    tokens = tokenizer.tokenize(text)
    tokens = tokens[:max_length]  # Truncate to max length
    token_ids = tokenizer.convert_tokens_to_ids(tokens)
    input_ids = torch.tensor([tokenizer.cls_token_id] + token_ids + [tokenizer.eos_token_id]).unsqueeze(0)
    
    # Move input_ids to the same device as the model
    device = next(model.parameters()).device
    input_ids = input_ids.to(device)
    
    with torch.no_grad():
        output = model(input_ids)
        embedding = output.last_hidden_state.mean(dim=1).squeeze().cpu().numpy()
    
    return embedding

In [16]:
tokenizer = AutoTokenizer.from_pretrained("microsoft/unixcoder-base")
model = AutoModel.from_pretrained("microsoft/unixcoder-base")

2025-04-01 10:08:11.289747: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-04-01 10:08:11.737986: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:479] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-04-01 10:08:11.953535: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:10575] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-04-01 10:08:11.954479: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1442] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-04-01 10:08:12.151888: I tensorflow/core/platform/cpu_feature_gua

In [None]:
# For source functions
model.to('cuda')
function_embeddings = []
for func in tqdm(source_functions, desc="Embedding functions"):
    embedding = generate_embedding(func['code'], tokenizer, model)
    function_embeddings.append(embedding)

# For test cases
test_embeddings = []
for test in tqdm(tests, desc="Embedding tests"):
    embedding = generate_embedding(test['code'], tokenizer, model)
    test_embeddings.append(embedding)

Embedding functions: 100%|██████████| 20/20 [00:01<00:00, 17.77it/s]


In [24]:
len(function_embeddings)

20

In [None]:
import os
import ast
import time
import astor 
import torch
import numpy as np
from tqdm.auto import tqdm
from transformers import AutoTokenizer, AutoModel


def submod_ordering(tests, source_dir="../v1", logger=None):
    """
    Prioritize tests using a submodular optimization approach with code embeddings.
    Uses UnixCoder to embed functions and test cases, then greedily selects tests
    that maximize marginal similarity gain. 
    """
    
    if logger:
        logger.info("Loading UnixCoder model for code embeddings...")
    
    # Load UnixCoder model and tokenizer
    try:
        tokenizer = AutoTokenizer.from_pretrained("microsoft/unixcoder-base")
        model = AutoModel.from_pretrained("microsoft/unixcoder-base")
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
        model.to(device)
        if logger:
            logger.info(f"Successfully loaded UnixCoder model and tokenizer to {device}")
    except Exception as e:
        if logger:
            logger.error(f"Error loading UnixCoder: {str(e)}")
    
    # Extract source code functions
    source_functions = extract_source_functions(source_dir, logger)
    
    # Generate embeddings for source functions
    function_embeddings = []
    for func in tqdm(source_functions, desc="Embedding functions"):
        embedding = generate_embedding(func['code'], tokenizer, model)
        function_embeddings.append(embedding)
    
    function_embeddings = np.array(function_embeddings)
        
    if logger:
        logger.info(f"Generated embeddings for {len(function_embeddings)} source functions")
    
    # Generate embeddings for test cases
    test_embeddings = []
    for test in tqdm(tests, desc="Embedding tests"):
        embedding = generate_embedding(test['code'], tokenizer, model)
        test_embeddings.append(embedding)
    
    test_embeddings = np.array(test_embeddings)
    
    if logger:
        logger.info(f"Generated embeddings for {len(test_embeddings)} test cases")
    
    # Submodular function optimization with greedy algorithm
    if logger:
        logger.info("Running submodular optimization...")
    
    def similarity(embedding_a, embedding_b):
        """Calculate cosine similarity between two embeddings"""
        return np.dot(embedding_a, embedding_b) / (np.linalg.norm(embedding_a) * np.linalg.norm(embedding_b))
    
    def calculate_gain(selected_indices, candidate_index):
        """
        Calculate the marginal gain of adding a new test
        This implements a facility location objective function, which is submodular
        """
        if not selected_indices:
            # For the first selection, just use max similarity to any function
            similarities = [similarity(test_embeddings[candidate_index], func_embedding) 
                           for func_embedding in function_embeddings]
            return np.mean(similarities)
        
        # For subsequent selections, calculate marginal gain
        gain = 0
        for func_idx, func_embedding in enumerate(function_embeddings):
            # Current max similarity for this function from selected tests
            current_max = max([similarity(test_embeddings[idx], func_embedding) for idx in selected_indices])
            # Max similarity if we add the candidate test
            new_sim = similarity(test_embeddings[candidate_index], func_embedding)
            # Add the marginal gain (limited to positive values)
            gain += max(0, new_sim - current_max)
        
        return gain
    
    # Greedy selection
    remaining_indices = set(range(len(tests)))
    selected_indices = []
    prioritized_tests = []
    
    start_time = time.time()
    while remaining_indices:
        best_gain = -float('inf')
        best_idx = None
        
        for idx in remaining_indices:
            gain = calculate_gain(selected_indices, idx)
            if gain > best_gain:
                best_gain = gain
                best_idx = idx
        
        selected_indices.append(best_idx)
        remaining_indices.remove(best_idx)
        prioritized_tests.append(tests[best_idx])
        
        if logger and len(selected_indices) % 10 == 0:
            logger.info(f"Selected {len(selected_indices)}/{len(tests)} tests")
    
    end_time = time.time()
    if logger:
        logger.info(f"Submodular optimization complete in {end_time - start_time:.2f} seconds")
        logger.info(f"First 5 selected tests:")
        for i, test in enumerate(prioritized_tests[:5]):
            logger.info(f"  {i+1}. {test['full_name']}")
    
    return prioritized_tests

In [3]:
def number_to_ascii(number):
    """
    Convert a number to its ASCII representation
    
    Args:
        number (int): The number to convert
    
    Returns:
        str: ASCII representation of the number
    """
    try:
        # Convert number to ASCII using chr()
        return chr(number)
    except ValueError:
        return "Invalid input. Number must be between 0 and 127."
    except Exception as e:
        return f"Error: {str(e)}"

In [7]:
import ast
import astunparse  # you may need to install this with pip install astunparse

def display_ast(code_string):
    """
    Extract and display the Abstract Syntax Tree (AST) for the given code
    
    Args:
        code_string (str): The code to analyze
    
    Returns:
        None: Prints AST information to console
    """
    # Parse the code into an AST
    tree = ast.parse(code_string)
    
    # # Print the raw AST representation
    # print("Raw AST:")
    # print(ast.dump(tree, indent=2))
    
    # Print a more readable version using astunparse if available
    try:
        print("\nAST as code:")
        print(astunparse.dump(tree))
    except:
        pass
    
    # Walk through the AST to display key elements
    print("\nKey elements in the AST:")
    for node in ast.walk(tree):
        if isinstance(node, ast.FunctionDef):
            print(f"Function: {node.name}")
            print(f"  Arguments: {[arg.arg for arg in node.args.args]}")
        elif isinstance(node, ast.Try):
            print(f"Try block with {len(node.handlers)} exception handlers")
        elif isinstance(node, ast.Return):
            print(f"Return statement at line {node.lineno}")

In [8]:
import inspect

# Get source code from the function
function_source = inspect.getsource(number_to_ascii)
display_ast(function_source)


AST as code:
Module(
  body=[FunctionDef(
    name='number_to_ascii',
    args=arguments(
      posonlyargs=[],
      args=[arg(
        arg='number',
        annotation=None,
        type_comment=None)],
      vararg=None,
      kwonlyargs=[],
      kw_defaults=[],
      kwarg=None,
      defaults=[]),
    body=[
      Expr(value=Constant(
        value='\n    Convert a number to its ASCII representation\n    \n    Args:\n        number (int): The number to convert\n    \n    Returns:\n        str: ASCII representation of the number\n    ',
        kind=None)),
      Try(
        body=[Return(value=Call(
          func=Name(
            id='chr',
            ctx=Load()),
          args=[Name(
            id='number',
            ctx=Load())],
          keywords=[]))],
        handlers=[
          ExceptHandler(
            type=Name(
              id='ValueError',
              ctx=Load()),
            name=None,
            body=[Return(value=Constant(
              value='Invalid i