In [1]:
import os
from groq import Groq
import re
from typing import List, Dict, Any
import yaml
from tqdm import tqdm
import json
from openai import OpenAI
import tiktoken
from typing import Dict, Set, Optional, Tuple
import ast

In [2]:
API_KEY = ""
DJANGO_PROJECT_PATH = "/Users/ryanmarr/Documents/sentry"
MODEL_NAME = "gpt-4o"
MAX_TOKENS = 2048
TEMPERATURE = 0.1
TOP_P = 1
STREAM = False
MAX_TOKENS = 30000
PROMPT_TOKENS = 251

In [3]:
# Groq client setup
client = OpenAI(api_key=API_KEY)

In [4]:
def count_tokens_for_gpt4o(text: str) -> int:
    """Count tokens for GPT-4o using tiktoken"""
    try:
        # Use the cl100k_base encoding which is used by GPT-4o
        encoding = tiktoken.get_encoding("cl100k_base")
        tokens = encoding.encode(text)
        return len(tokens)
    except Exception as e:
        print(f"Error counting tokens: {e}")
        return 0

In [5]:
def find_django_files(directory: str) -> List[str]:
    """Perform DFS to find all Python files in Django project"""
    django_files = []
    priority_files = []

    for root, dirs, files in os.walk(directory):
        # Skip common directories that don't contain Django code
        dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ['node_modules', '__pycache__', 'venv', 'env', '.git']]
        
        for file in files:
            if file == 'views.py':
                priority_files.append(os.path.join(root, file))
            elif file.endswith('.py'):
                file_path = os.path.join(root, file)
                django_files.append(file_path)

    return priority_files + django_files

In [6]:
import ast
import os
from typing import Dict, Set, Optional, List, Any

def find_function_in_ast(tree: ast.AST, function_name: str) -> Optional[ast.FunctionDef]:
    """Find a function definition in the AST by name."""
    for node in ast.walk(tree):
        if isinstance(node, ast.FunctionDef) and node.name == function_name:
            return node
    return None

def find_class_in_ast(tree: ast.AST, class_name: str) -> Optional[ast.ClassDef]:
    """Find a class definition in the AST by name."""
    for node in ast.walk(tree):
        if isinstance(node, ast.ClassDef) and node.name == class_name:
            return node
    return None

def find_all_imports_in_file(tree: ast.AST, current_file_path: str) -> Dict[str, str]:
    """
    Find all imports in the entire file, not just within a function.
    
    Returns:
        Dict mapping import names to their file paths
    """
    all_imports = {}
    
    for node in ast.walk(tree):
        if isinstance(node, ast.Import):
            for alias in node.names:
                import_name = alias.asname or alias.name
                import_path = resolve_local_import_path(alias.name, current_file_path)
                if import_path:
                    all_imports[import_name] = import_path
                    
        elif isinstance(node, ast.ImportFrom):
            if node.module:
                import_path = resolve_local_import_path(node.module, current_file_path)
                if import_path:
                    for alias in node.names:
                        import_name = alias.asname or alias.name
                        all_imports[import_name] = import_path
    
    return all_imports

def find_local_imports_in_function(function_ast: ast.FunctionDef, current_file_path: str) -> Dict[str, str]:
    """
    Find all local imports used within a function.
    
    Returns:
        Dict mapping import names to their file paths
    """
    local_imports = {}
    
    for node in ast.walk(function_ast):
        if isinstance(node, ast.Import):
            for alias in node.names:
                import_name = alias.asname or alias.name
                import_path = resolve_local_import_path(alias.name, current_file_path)
                if import_path:
                    local_imports[import_name] = import_path
                    
        elif isinstance(node, ast.ImportFrom):
            if node.module:
                import_path = resolve_local_import_path(node.module, current_file_path)
                if import_path:
                    for alias in node.names:
                        import_name = alias.asname or alias.name
                        local_imports[import_name] = import_path
    
    return local_imports

def resolve_local_import_path(module_name: str, current_file_path: str) -> Optional[str]:
    """
    Resolve a local import path to an actual file path.
    
    Args:
        module_name (str): The module name from the import statement
        current_file_path (str): Path to the current file
        
    Returns:
        Optional[str]: Resolved file path or None if not found
    """
    current_dir = os.path.dirname(current_file_path)
    
    # Handle relative imports (e.g., from .models import User)
    if module_name.startswith('.'):
        # Remove the leading dot
        relative_path = module_name[1:]
        if relative_path:
            possible_paths = [
                os.path.join(current_dir, f"{relative_path}.py"),
                os.path.join(current_dir, relative_path, "__init__.py"),
                os.path.join(current_dir, relative_path, f"{relative_path}.py")
            ]
        else:
            # Just a dot means current directory
            possible_paths = [
                os.path.join(current_dir, "__init__.py")
            ]
    else:
        # Absolute imports - try multiple strategies
        possible_paths = []
        
        # Strategy 1: Look in current directory
        possible_paths.extend([
            os.path.join(current_dir, f"{module_name}.py"),
            os.path.join(current_dir, module_name, "__init__.py"),
            os.path.join(current_dir, module_name, f"{module_name}.py")
        ])
        
        # Strategy 2: Look in parent directories
        parent_dir = current_dir
        for _ in range(3):  # Go up to 3 levels
            parent_dir = os.path.dirname(parent_dir)
            if parent_dir:
                possible_paths.extend([
                    os.path.join(parent_dir, f"{module_name}.py"),
                    os.path.join(parent_dir, module_name, "__init__.py"),
                    os.path.join(parent_dir, module_name, f"{module_name}.py")
                ])
        
        # Strategy 3: Handle nested module paths (e.g., dependency_test.b)
        if '.' in module_name:
            parts = module_name.split('.')
            # Try to find the module in various parent directories
            for i in range(len(parts)):
                module_path = os.path.join(*parts[i:])
                possible_paths.extend([
                    os.path.join(current_dir, f"{module_path}.py"),
                    os.path.join(current_dir, module_path, "__init__.py")
                ])
                
                # Also try in parent directories
                parent_dir = current_dir
                for _ in range(3):
                    parent_dir = os.path.dirname(parent_dir)
                    if parent_dir:
                        possible_paths.extend([
                            os.path.join(parent_dir, f"{module_path}.py"),
                            os.path.join(parent_dir, module_path, "__init__.py")
                        ])
    
    # Try to find the file
    for path in possible_paths:
        if os.path.exists(path):
            return path
    
    return None

def expand_imported_file_recursively(import_path: str, import_name: str, visited_files: Set[str], max_depth: int) -> str:
    """
    Recursively expand an imported file, showing all its functions and their dependencies.
    
    Args:
        import_path (str): Path to the imported file
        import_name (str): Name of the imported module
        visited_files (Set[str]): Set of visited files to prevent circular imports
        max_depth (int): Maximum recursion depth
        
    Returns:
        str: Recursively expanded content
    """
    try:
        with open(import_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        tree = ast.parse(content)
        
        # Find all functions and classes in this file
        functions = []
        classes = []
        
        for node in ast.walk(tree):
            if isinstance(node, ast.FunctionDef):
                functions.append(node)
            elif isinstance(node, ast.ClassDef):
                classes.append(node)
        
        result = f"# File: {import_path}\n"
        result += f"# Module: {import_name}\n"
        result += "-" * 60 + "\n\n"
        
        # Show classes first
        if classes:
            result += "# CLASSES:\n"
            for class_node in classes:
                result += f"# Class: {class_node.name}\n"
                result += ast.unparse(class_node) + "\n\n"
        
        # Show functions and recursively expand their imports
        if functions:
            result += "# FUNCTIONS:\n"
            for func_node in functions:
                func_name = func_node.name
                result += f"# Function: {func_name}\n"
                result += ast.unparse(func_node) + "\n\n"
                
                # Recursively expand this function's imports
                if max_depth > 0:
                    func_imports = find_local_imports_in_function(func_node, import_path)
                    if func_imports:
                        result += f"# Dependencies of {func_name}:\n"
                        for dep_name, dep_path in func_imports.items():
                            if os.path.exists(dep_path) and dep_path not in visited_files:
                                # Recursively expand this dependency
                                dep_content = expand_imported_file_recursively(
                                    dep_path, dep_name, visited_files.copy(), max_depth - 1
                                )
                                result += f"# From {dep_name}:\n"
                                result += dep_content + "\n\n"
        
        return result
        
    except Exception as e:
        return f"# Error reading {import_path}: {str(e)}"

def expand_function_with_imports_recursive(file_path: str, function_name: str, visited_files: Set[str] = None, max_depth: int = 10) -> str:
    """
    Recursively expand a function from a Django file, resolving ALL local imports
    recursively to show the complete implementation chain.
    
    Args:
        file_path (str): Path to the Django file
        function_name (str): Name of the function to expand
        visited_files (Set[str]): Set of already visited files to prevent circular imports
        max_depth (int): Maximum recursion depth to prevent infinite loops
        
    Returns:
        str: Complete expanded function implementation with all imports recursively resolved
    """
    if visited_files is None:
        visited_files = set()
    
    # Prevent circular imports and excessive recursion
    if file_path in visited_files or max_depth <= 0:
        return f"# Circular import or max depth reached: {file_path}"
    
    visited_files.add(file_path)
    
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        # Parse the Python file
        tree = ast.parse(content)
        
        # Find the target function
        function_ast = find_function_in_ast(tree, function_name)
        if not function_ast:
            return f"# Function '{function_name}' not found in {file_path}"
        
        # Get the function's source code
        function_source = ast.unparse(function_ast)
        
        # Find all imports in the file
        file_imports = find_all_imports_in_file(tree, file_path)
        
        # Find all local imports used within the function
        function_imports = find_local_imports_in_function(function_ast, file_path)
        
        # Combine all imports
        all_imports = {**file_imports, **function_imports}
        
        # Recursively expand each local import
        expanded_imports = {}
        for import_name, import_path in all_imports.items():
            if os.path.exists(import_path):
                # Recursively expand this imported file
                expanded_content = expand_imported_file_recursively(
                    import_path, import_name, visited_files.copy(), max_depth - 1
                )
                expanded_imports[import_name] = expanded_content
        
        # Build the complete expanded function
        result = f"# Function: {function_name}\n"
        result += f"# File: {file_path}\n"
        result += "=" * 80 + "\n\n"
        
        # Add expanded imports first
        if expanded_imports:
            result += "# RECURSIVELY EXPANDED IMPORTS:\n"
            result += "-" * 40 + "\n"
            for import_name, import_content in expanded_imports.items():
                result += f"# From: {import_name}\n"
                result += import_content + "\n\n"
        
        # Add the main function
        result += "# MAIN FUNCTION IMPLEMENTATION:\n"
        result += "-" * 40 + "\n"
        result += function_source
        
        return result
        
    except Exception as e:
        return f"# Error processing {file_path}: {str(e)}"

# Helper function to find all function calls in a function
def find_function_calls_in_function(function_ast: ast.FunctionDef) -> List[str]:
    """Find all function calls made within a function."""
    function_calls = []
    
    for node in ast.walk(function_ast):
        if isinstance(node, ast.Call):
            if isinstance(node.func, ast.Name):
                function_calls.append(node.func.id)
            elif isinstance(node.func, ast.Attribute):
                function_calls.append(node.func.attr)
    
    return function_calls

# Alternative version that shows the complete call chain
def expand_function_with_call_chain(file_path: str, function_name: str, visited_files: Set[str] = None) -> str:
    """
    Expand a function and show the complete call chain recursively.
    """
    if visited_files is None:
        visited_files = set()
    
    if file_path in visited_files:
        return f"# Circular import detected: {file_path}"
    
    visited_files.add(file_path)
    
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        tree = ast.parse(content)
        function_ast = find_function_in_ast(tree, function_name)
        
        if not function_ast:
            return f"# Function '{function_name}' not found in {file_path}"
        
        # Get the function source
        function_source = ast.unparse(function_ast)
        
        # Find function calls in this function
        function_calls = find_function_calls_in_function(function_ast)
        
        result = f"# Function: {function_name}\n"
        result += f"# File: {file_path}\n"
        result += "=" * 80 + "\n\n"
        
        result += "# MAIN FUNCTION:\n"
        result += "-" * 40 + "\n"
        result += function_source + "\n\n"
        
        # Recursively expand each function call
        if function_calls:
            result += "# FUNCTION CALL CHAIN:\n"
            result += "-" * 40 + "\n"
            
            for call_name in function_calls:
                # Try to find this function in the current file or imported files
                import_path = resolve_local_import_path(call_name, file_path)
                if import_path and os.path.exists(import_path):
                    result += f"# Expanding call to: {call_name}\n"
                    call_content = expand_function_with_call_chain(import_path, call_name, visited_files.copy())
                    result += call_content + "\n\n"
                else:
                    result += f"# Could not resolve: {call_name}\n\n"
        
        return result
        
    except Exception as e:
        return f"# Error: {str(e)}"

In [7]:
def python_file_to_string(file_path: str) -> str:
    """
    Read a Python file and return its contents as a string.
    
    Args:
        file_path (str): Path to the Python file
        
    Returns:
        str: Contents of the file as a string, or empty string if error occurs
        
    Raises:
        FileNotFoundError: If the file doesn't exist
        UnicodeDecodeError: If the file can't be decoded as UTF-8
    """
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            content = file.read()
            return content
    except FileNotFoundError:
        print(f"Error: File '{file_path}' not found")
        return ""
    except UnicodeDecodeError as e:
        print(f"Error: Unable to decode file '{file_path}' as UTF-8: {e}")
        return ""
    except PermissionError:
        print(f"Error: Permission denied reading file '{file_path}'")
        return ""
    except Exception as e:
        print(f"Unexpected error reading file '{file_path}': {e}")
        return ""

In [8]:
file_path = '/Users/ryanmarr/Documents/duct_env/duct/dependency_test/a.py'
content = expand_function_with_imports_recursive(file_path, "a")
print(f'content: {content}')

content: # Function: a
# File: /Users/ryanmarr/Documents/duct_env/duct/dependency_test/a.py

# RECURSIVELY EXPANDED IMPORTS:
----------------------------------------
# From: b
# File: /Users/ryanmarr/Documents/duct_env/duct/dependency_test/b.py
# Module: b
------------------------------------------------------------

# FUNCTIONS:
# Function: b
def b():
    return 'b' + c()



# MAIN FUNCTION IMPLEMENTATION:
----------------------------------------
def a():
    return 'a' + b()


In [9]:
asdfs

NameError: name 'asdfs' is not defined

In [None]:
def extract_rest_apis_from_file_with_openai(file_path: str) -> List[Dict[str, Any]]:
    """Use OpenAI to analyze file content and find REST APIs"""
    try:

        if file_path.endswith('.py'):
            content = python_file_to_string(file_path)

        # Skip files that don't contain common Django/API keywords
        # if not any(keyword in content.lower() for keyword in ['api', 'view', 'rest', 'http', 'request', 'response', 'serializer']):
        #     return []

        prompt = f"""
        Analyze this Python file and identify all Django REST API functions or classes. Only return the functions that are REST API endpoints. 
        Only return entire implemenation of the function including function signature and content. 
        
        Look for:
        1. Functions that handle HTTP methods (GET, POST, PUT, DELETE, PATCH)
        2. Functions that process requests and return responses
        3. Any other REST API endpoints
        
        File content:
        {content}
        
        IMPORTANT: Return ONLY valid JSON with this exact structure:
        {{
            "apis": [
                {{
                    "name": "function_or_class_name",
                    "http_method": "GET|POST|PUT|DELETE|PATCH|UNKNOWN",
                    "description": "Brief description of what this API does",
                    "content_django": actual function and entire implementation funtion include function signature and content,
                    "content_dafny": Based on the django function, create a Dafny function specification with preconditions and postconditions assume db schema exists as a dafny type
                }}
            ]
        }}
        
        If no REST APIs are found, return: {{"apis": []}}
        Do not include any text before or after the JSON.
        """
        
        try:
            completion = client.chat.completions.create(
                model=MODEL_NAME,
                messages=[
                    {
                        "role": "user",
                        "content": prompt
                    }
                ],
                temperature=TEMPERATURE,
                max_tokens=MAX_TOKENS,
                top_p=TOP_P,
                stream=STREAM
            )
            
            response = completion.choices[0].message.content.strip()
            
            # Improved JSON extraction
            try:
                # First, try to parse the entire response as JSON
                result = json.loads(response)
                if 'apis' in result:
                    for api in result.get('apis', []):
                        api['file_path'] = file_path
                    return result.get('apis', [])
            except json.JSONDecodeError:
                # If that fails, try to find JSON within the response
                # Look for content between the first { and last }
                start = response.find('{')
                end = response.rfind('}')
                
                if start != -1 and end != -1 and end > start:
                    json_str = response[start:end + 1]
                    try:
                        result = json.loads(json_str)
                        if 'apis' in result:
                            for api in result.get('apis', []):
                                api['file_path'] = file_path
                            return result.get('apis', [])
                    except json.JSONDecodeError:
                        print(f"Failed to parse extracted JSON from response for {file_path}")
                        print(f"Extracted JSON string: {json_str}")
                        return []
                else:
                    print(f"No JSON structure found in response for {file_path}")
                    print(f"Response: {response}")
                    return []

        except Exception as e:
            print(f"Error calling OpenAI API for {file_path}: {e}")
            return []
            
    except Exception as e:
        print(f"Error reading file {file_path}: {e}")
        return []

In [None]:
"""Analyze Django project and generate Dafny specifications"""

# Find all Python files
django_files = find_django_files(DJANGO_PROJECT_PATH)
print(f"Found {len(django_files)} Python files")

In [None]:
#django_files

In [None]:
# Extract REST APIs using OpenAI
all_apis = []
for file_path in tqdm(django_files, desc="Analyzing files"):
    apis = extract_rest_apis_from_file_with_openai(file_path)
    # print(f'apis: {apis}')
    all_apis.extend(apis)

In [None]:
all_apis

In [None]:
def display_full_api_info(apis: List[Dict[str, Any]]):
    """Display complete API information with full Django code and Dafny specs"""
    
    print("=" * 100)
    print("COMPLETE DJANGO REST API ANALYSIS")
    print("=" * 100)
    print(f"Total APIs found: {len(apis)}")
    print()
    
    # Group by file path
    files = {}
    for api in apis:
        file_path = api.get('file_path', 'Unknown')
        if file_path not in files:
            files[file_path] = []
        files[file_path].append(api)
    
    # Display by file
    for file_path, file_apis in files.items():
        print(f"📁 FILE: {file_path}")
        print("=" * 100)
        print(f"APIs found: {len(file_apis)}")
        print()
        
        for i, api in enumerate(file_apis, 1):
            print(f"🔗 API #{i}: {api.get('name', 'Unknown')}")
            print(f"   Method: {api.get('http_method', 'UNKNOWN')}")
            print(f"   Description: {api.get('description', 'No description')}")
            print()
            
            print("📝 DJANGO CODE:")
            print("-" * 50)
            django_code = api.get('content_django', 'No Django code available')
            print(django_code)
            print()
            
            print("🔬 DAFNY SPECIFICATION:")
            print("-" * 50)
            dafny_spec = api.get('content_dafny', 'No Dafny specification available')
            print(dafny_spec)
            print()
            
            print("─" * 100)
            print()
    
    # Summary statistics
    print("=" * 100)
    print("SUMMARY STATISTICS")
    print("=" * 100)
    
    # HTTP method distribution
    methods = {}
    for api in apis:
        method = api.get('http_method', 'UNKNOWN')
        methods[method] = methods.get(method, 0) + 1
    
    print("HTTP Methods:")
    for method, count in sorted(methods.items()):
        print(f"  {method}: {count}")
    
    print()
    print(f"Files analyzed: {len(files)}")
    print(f"Total APIs: {len(apis)}")

In [None]:
display_full_api_info(all_apis)