In [63]:
import sys
sys.path.append("/Users/arshath/play/naptha/devagent/agents")

from dotenv import load_dotenv
load_dotenv()

from nest_asyncio import apply
apply()

In [2]:
import json
import dspy
import pickle
import json
import time
import subprocess
import traceback
from typing import Dict, Any
from IPython.display import display, Markdown, HTML
from pydantic import BaseModel
from typing import Optional, Union, Dict, Any, List

from doc_ingestion.fetcher import WebsiteFetcher
from doc_ingestion.crawl4ai import create_crawl4ai_fetcher
from doc_ingestion.analyzer import DocumentAnalyzer
from test_plan import TestPlanGenerator
from concurrent.futures import ThreadPoolExecutor, as_completed

dspy.configure(lm=dspy.LM("gemini/gemini-2.5-flash", max_tokens=30000))

In [3]:
# fetcher = WebsiteFetcher()
# contents = fetcher.fetch("https://openweathermap.org/api")
# pages = json.loads(contents.content['pages'])

In [4]:
# crawl4ai_fetcher = create_crawl4ai_fetcher(
#     crawl_type="deep", 
#     max_pages=100, 
#     max_depth=3, 
#     keywords=["api", "documentation"]
# )

# crawl4ai_contents = crawl4ai_fetcher.fetch("https://openweathermap.org/api")

In [5]:
# with open('crewai-data.pkl', 'wb') as f:
#     pickle.dump(crawl4ai_contents, f)

with open('crewai-data.pkl', 'rb') as f:
    crawl4ai_contents = pickle.load(f)

In [6]:
# analyzer = DocumentAnalyzer(max_workers=10)
# doc_analysis = analyzer.analyze(crawl4ai_contents)

# import pickle
# with open('doc-analysis.pkl', 'wb') as f:
#     pickle.dump(doc_analysis, f)

In [7]:
import pickle
with open('doc-analysis.pkl', 'rb') as f:
    doc_analysis = pickle.load(f)

In [9]:
import dspy
from test_plan import PageTestPlanExtractor, TestPlanConsolidator, DEFAULT_TEST_PLAN_INSTRUCTIONS

page_extractor = dspy.ChainOfThought(PageTestPlanExtractor)
# consolidator = dspy.ChainOfThought(TestPlanConsolidator)


In [54]:
def process_page(page, analysis):
    page_test_plan = page_extractor(
        page_url=page,
        page_analysis=analysis.model_dump_json(),
        raw_content=doc_analysis.pages[page].content,
        agent_instructions=DEFAULT_TEST_PLAN_INSTRUCTIONS
    )
    return page_test_plan

In [11]:
page_test_plans = []
with ThreadPoolExecutor(max_workers=15) as executor:
    # Submit all tasks
    future_to_page = {executor.submit(process_page, page, analysis): (page, analysis) 
                      for page, analysis in doc_analysis.pages.items()}
    
    # Collect results as they complete
    for future in as_completed(future_to_page):
        page, analysis = future_to_page[future]
        try:
            page_test_plan = future.result()
            page_test_plans.append((page, analysis, page_test_plan))
        except Exception as exc:
            print(f'Page {page} generated an exception: {exc}')

In [25]:
page_index = 4
page_url = page_test_plans[page_index][0]
page_analysis = page_test_plans[page_index][1]
page_test_plan = page_test_plans[page_index][2]
page_content = page_analysis.model_dump()['content']
page_test_scenarios = page_test_plan.test_scenarios
test_scenario = page_test_scenarios[1].model_dump_json()

In [26]:
page_url

'https://openweathermap.org/api/air-pollution'

In [None]:
import dspy
import subprocess
import sys
import time
import traceback
from typing import Dict, Any, List
from pydantic import BaseModel, Field
import json
import os
from enum import Enum

from pydantic import BaseModel, Field
from enum import Enum

class TestStatus(Enum):
    PASSED = "PASSED"
    MINOR_FAILURE = "MINOR_FAILURE"
    MAJOR_FAILURE = "MAJOR_FAILURE"

class TestResult(BaseModel):
    """Enhanced test execution result with generic tracking"""
    scenario_name: str
    passed: TestStatus = Field(description="Minor failure if the test passed but there were some minor issues, major failure if the test failed")
    execution_time: float
    output: str
    error_message: str = ""
    
    # Enhanced tracking - GENERIC for all tool types
    steps_taken: List[str] = []
    code_generated: List[str] = []  # All code that was generated
    tool_results: List[Dict[str, str]] = []  # Results from each tool call
    detected_patterns: List[Dict[str, Any]] = []  # Generic pattern detection
    final_reasoning: str = ""  # Agent's final reasoning

def extract_code(code_string: str) -> str:
    """Extract clean Python code from markdown-wrapped code string"""
    if code_string.startswith("'") and code_string.endswith("'"):
        code_string = code_string[1:-1]
    
    if code_string.startswith('```python'):
        code_string = code_string[9:]
    elif code_string.startswith('```'):
        code_string = code_string[3:]
    
    if code_string.endswith('```'):
        code_string = code_string[:-3]
    
    code_string = code_string.replace('\\n', '\n')
    return code_string.strip()

# Global context storage
_global_context = {}

# Global storage for detailed tracking
_execution_details = {
    'code_generated': [],
    'tool_results': [],
    'detected_patterns': [],  # Generic patterns instead of HTTP-specific
    'execution_success': False,
    'status_success': False,  # Generic status success (not just HTTP)
    'assertion_success': False,
    'pattern_success': False  # Based on success/failure indicators
}

def reset_execution_details():
    """Reset tracking for new test"""
    global _execution_details
    _execution_details = {
        'code_generated': [],
        'tool_results': [],
        'detected_patterns': [],
        'execution_success': False,
        'status_success': False,
        'assertion_success': False,
        'pattern_success': False
    }

def set_global_context(context: dict):
    """Set global context for tools to access"""
    global _global_context
    _global_context = context.copy() if context else {}

def set_environment_variables_direct(context: dict):
    """Set environment variables directly from context keys"""
    for env_key, value in context.items():
        if value:
            os.environ[env_key] = str(value)
            print(f"Set {env_key} = {str(value)[:8]}***")

# DSPy ReAct Tools - these need simple signatures
def check_available_api_keys():
    """Check what API keys are available in the context"""
    context = _global_context
    
    if not context:
        return "⚠️  NO CONTEXT PROVIDED - No API keys available"
    
    available_keys = []
    for key, value in context.items():
        if value:  # Only show keys that have values
            masked_value = str(value)[:8] + "***" if len(str(value)) > 8 else "***"
            available_keys.append(f"✅ {key}: {masked_value}")
    
    if available_keys:
        result = "🔑 AVAILABLE API KEYS:\n\n"
        result += "\n".join(available_keys) + "\n\n"
        result += "💡 Use os.getenv('KEY_NAME') to access these in your code\n"
        result += f"Available keys: {', '.join(context.keys())}"
    else:
        result = "⚠️  NO API KEYS IN CONTEXT!"
    
    return result

def execute_code(code: str):
    """Execute Python code with environment variables from global context"""
    context = _global_context
    start_time = time.time()
    
    try:
        # Clean the code first
        clean_code = extract_code(code)
        
        # Set environment variables directly from context
        if context:
            set_environment_variables_direct(context)
        
        # Basic execution environment
        exec_globals = {
            '__builtins__': __builtins__,
            'time': time,
            'json': json,
            'subprocess': subprocess,
            'sys': sys,
        }
        
        # Capture output
        captured_output = []
        
        def capture_print(*args, **kwargs):
            line = ' '.join(str(arg) for arg in args)
            captured_output.append(line)
            print(line)  # Still print to console
        
        exec_globals['print'] = capture_print
        
        # Execute the code
        exec(clean_code, exec_globals)
        
        execution_time = time.time() - start_time
        output = '\n'.join(captured_output) if captured_output else "Code executed successfully (no output)"
        print(f"Output: {output}")
        
        return f"SUCCESS: Code executed in {execution_time:.2f}s\nOutput:\n{output}"
        
    except ImportError as e:
        execution_time = time.time() - start_time
        missing_module = str(e).split("'")[1] if "'" in str(e) else str(e)
        return f"MISSING_DEPENDENCY: {missing_module}\nError: {str(e)}\nTime: {execution_time:.2f}s"
        
    except Exception as e:
        execution_time = time.time() - start_time
        error_trace = traceback.format_exc()
        return f"ERROR: Code execution failed in {execution_time:.2f}s\nError: {str(e)}\nTrace:\n{error_trace}"

def install_package(package_name: str):
    """Try to install a Python package with tracking"""
    global _execution_details
    
    try:
        print(f"Attempting to install {package_name}...")
        result = subprocess.run(
            [sys.executable, '-m', 'pip', 'install', package_name],
            capture_output=True,
            text=True,
            timeout=60
        )
        
        success = result.returncode == 0
        tool_result = {
            'tool': 'install_package',
            'package': package_name,
            'success': success,
            'return_code': result.returncode
        }
        _execution_details['tool_results'].append(tool_result)
        
        if success:
            return f"Successfully installed {package_name}"
        else:
            return f"Failed to install {package_name}: {result.stderr}"
            
    except Exception as e:
        tool_result = {
            'tool': 'install_package',
            'package': package_name,
            'success': False,
            'error': str(e)
        }
        _execution_details['tool_results'].append(tool_result)
        return f"Installation error: {str(e)}"

def check_environment():
    """Check what packages are available with tracking"""
    global _execution_details
    
    try:
        result = subprocess.run(
            [sys.executable, '-m', 'pip', 'list'],
            capture_output=True,
            text=True,
            timeout=30
        )
        
        tool_result = {
            'tool': 'check_environment',
            'success': result.returncode == 0,
            'package_count': len(result.stdout.split('\n')) if result.stdout else 0
        }
        _execution_details['tool_results'].append(tool_result)
        
        return f"Environment check completed. Installed packages:\n{result.stdout[:500]}..."
        
    except Exception as e:
        tool_result = {
            'tool': 'check_environment',
            'success': False,
            'error': str(e)
        }
        _execution_details['tool_results'].append(tool_result)
        return f"Environment check failed: {str(e)}"

def check_available_api_keys():
    """Check what API keys are available with tracking"""
    global _execution_details
    context = _global_context
    
    if not context:
        tool_result = {
            'tool': 'check_available_api_keys',
            'success': False,
            'key_count': 0
        }
        _execution_details['tool_results'].append(tool_result)
        return "⚠️  NO CONTEXT PROVIDED - No API keys available"
    
    available_keys = []
    for key, value in context.items():
        if value:  # Only show keys that have values
            masked_value = str(value)[:8] + "***" if len(str(value)) > 8 else "***"
            available_keys.append(f"✅ {key}: {masked_value}")
    
    tool_result = {
        'tool': 'check_available_api_keys',
        'success': len(available_keys) > 0,
        'key_count': len(available_keys),
        'keys': list(context.keys())
    }
    _execution_details['tool_results'].append(tool_result)
    
    if available_keys:
        result = "🔑 AVAILABLE API KEYS:\n\n"
        result += "\n".join(available_keys) + "\n\n"
        result += "💡 Use os.getenv('KEY_NAME') to access these in your code\n"
        result += f"Available keys: {', '.join(context.keys())}"
    else:
        result = "⚠️  NO API KEYS IN CONTEXT!"
    
    return result

class TestExecutorSignature(dspy.Signature):
    """You are a test execution agent that runs test scenarios for developer tools.
    
    You can execute code, install dependencies, check the environment, and check available API keys.
    Your goal is to successfully execute the test scenario and determine if it PASSED, MINOR_FAILURE, or MAJOR_FAILURE.

    CLASSIFICATION CRITERIA:
    
    PASSED: 
    - Test scenario executed successfully without any issues
    - All assertions passed as expected
    - No code modifications or workarounds were needed
    - API responses matched expected format exactly
    
    MINOR_FAILURE: 
    - Test scenario succeeded but required workarounds or adjustments
    - API responses were in different format than expected (but still valid)
    - Had to modify test logic due to documentation inconsistencies  
    - Missing dependencies that were easily installable
    - Minor authentication or configuration issues that were resolved
    - Expected format didn't match actual API behavior
    
    MAJOR_FAILURE:
    - Test scenario completely failed to execute
    - Authentication failed and couldn't be resolved
    - Critical dependencies missing and couldn't be installed
    - API endpoints don't exist or are completely non-functional
    - Code errors that couldn't be resolved
    
    Available tools:
    - check_available_api_keys(): See what API keys are available
    - execute_code(code): Run Python code 
    - install_package(name): Install missing packages
    - check_environment(): List installed packages
    
    IMPORTANT: If you need to modify test expectations or make workarounds due to 
    documentation inconsistencies, classify as MINOR_FAILURE, not PASSED.
    """
    
    scenario: str = dspy.InputField(desc="JSON string of the test scenario to execute")
    context: str = dspy.InputField(desc="JSON string of context with API keys and configurations")
    
    execution_result: str = dspy.OutputField(desc="Final result: PASSED, MINOR_FAILURE, or MAJOR_FAILURE with explanation")

# Create the ReAct agent
test_executor_agent = dspy.ReAct(
    TestExecutorSignature,
    tools=[
        check_available_api_keys,
        execute_code,
        install_package, 
        check_environment
    ],
    max_iters=10
)

def run_test_with_react(scenario, page_content: str, context: Dict[str, str] = None) -> TestResult:
    """Run a test scenario using the ReAct agent"""
    context = context or {}
    start_time = time.time()
    
    try:
        # Set global context for tools to access
        set_global_context(context)
        
        # Run the ReAct agent
        result = test_executor_agent(
            scenario=scenario.model_dump_json(),
            context=json.dumps(page_content)
        )
        
        print("=== REACT AGENT RESULT ===")
        print(f"Execution Result: {result.execution_result}")
        if hasattr(result, 'reasoning'):
            print(f"Reasoning: {result.reasoning}")
        
        # Parse the result
        if "PASSED" in result.execution_result.upper():
            passed = TestStatus.PASSED
        elif "MINOR_FAILURE" in result.execution_result.upper():
            passed = TestStatus.MINOR_FAILURE
        elif "MAJOR_FAILURE" in result.execution_result.upper():
            passed = TestStatus.MAJOR_FAILURE
        else:
            passed = TestStatus.MAJOR_FAILURE  # Default to MAJOR_FAILURE if unknown
        

        reasoning = ""
        if hasattr(result, 'reasoning'):
            reasoning = result.reasoning
            print(f"Reasoning: {reasoning}")
        
        # Extract steps from trajectory
        steps = []
        if hasattr(result, 'trajectory'):
            for i in range(20):  # Check up to 20 steps
                thought_key = f'thought_{i}'
                tool_key = f'tool_name_{i}'
                if thought_key in result.trajectory and tool_key in result.trajectory:
                    thought = result.trajectory[thought_key]
                    tool = result.trajectory[tool_key]
                    steps.append(f"Step {i+1}: {thought} -> Used {tool}")
                else:
                    break
        
        return TestResult(
            scenario_name=scenario.name,
            passed=passed,
            execution_time=time.time() - start_time,
            output=result.execution_result,
            error_message="" if passed == TestStatus.PASSED else result.execution_result,
            steps_taken=steps,
            code_generated=[],
            code_execution_output=[],
            final_reasoning=reasoning
        )
        
    except Exception as e:
        print(f"ERROR in run_test_with_react_enhanced: {str(e)}")
        traceback.print_exc()
        return TestResult(
            scenario_name=scenario.name,
            passed=TestStatus.MAJOR_FAILURE,
            execution_time=time.time() - start_time,
            output="",
            error_message=f"ReAct agent error: {str(e)}",
            steps_taken=["Agent execution failed"],
            code_generated=[],
            code_execution_output=[],
        )

In [96]:
context = {
    "OPEN_WEATHER_API_KEY": "7a4834a9d4b666e30261978ec5950ab6"
}
result = run_test_with_react(
    page_test_scenarios[2], 
    page_content,
    context
)

Set OPEN_WEATHER_API_KEY = 7a4834a9***
Constructed URL: http://api.openweathermap.org/data/2.5/air_pollution/forecast?lat=50&lon=50&appid=7a4834a9d4b666e30261978ec5950ab6
HTTP Status Code: 200
Response JSON: {
  "coord": {
    "lon": 50,
    "lat": 50
  },
  "list": [
    {
      "main": {
        "aqi": 2
      },
      "components": {
        "co": 116.04,
        "no": 0.01,
        "no2": 0.29,
        "o3": 68.39,
        "so2": 0.12,
        "pm2_5": 3.25,
        "pm10": 12.44,
        "nh3": 0.38
      },
      "dt": 1752548400
    },
    {
      "main": {
        "aqi": 2
      },
      "components": {
        "co": 116.74,
        "no": 0.02,
        "no2": 0.27,
        "o3": 69.83,
        "so2": 0.14,
        "pm2_5": 3.16,
        "pm10": 12.64,
        "nh3": 0.37
      },
      "dt": 1752552000
    },
    {
      "main": {
        "aqi": 2
      },
      "components": {
        "co": 117.51,
        "no": 0.02,
        "no2": 0.27,
        "o3": 70.14,
        "so2": 0.

In [94]:
result.model_dump()

{'scenario_name': 'Get Forecast Air Pollution Data with Valid API Key and Coordinates',
 'passed': <TestStatus.MINOR_FAILURE: 'MINOR_FAILURE'>,
 'execution_time': 20.008936882019043,
 'output': 'MINOR_FAILURE: Test scenario succeeded but required a workaround/adjustment because the \'coord\' field in the API response was an object `{"lon": 50, "lat": 50}` instead of the expected list `[50.0, 50.0]` as specified in the test steps.',
 'error_message': 'MINOR_FAILURE: Test scenario succeeded but required a workaround/adjustment because the \'coord\' field in the API response was an object `{"lon": 50, "lat": 50}` instead of the expected list `[50.0, 50.0]` as specified in the test steps.',
 'steps_taken': ["Step 1: I need to retrieve the OpenWeather API key to construct the request URL. I will use `check_available_api_keys` to see if it's available in the environment. -> Used check_available_api_keys",
  "Step 2: The OpenWeather API key is available. I will now execute the Python code to 