# FHIR Test Plan Generator

This notebook generates a consolidated test plan markdown file from FHIR Implementation Guide requirements. The output serves as a complete specification that can be used by an LLM to generate executable test scripts.

#### What it does

- Processes each requirement from a markdown input file
- Based on the IG capability statement, generates comprehensive test specifications including:
  - Testability assessment (Automatically testable/assertion/not testable) and level of complexity
  - Implementation strategy with specific FHIR operations
  - Required pre-reqs, inputs including required FHIR resources, and expected outputs
  - Validation criteria
- Creates a single, well-structured markdown file with a table of contents

#### How to use

1. **Setup**: Individual cert setup may need to be modified in `setup_clients()` function. API keys should be in .env file. Make sure you have API keys for at least one of:
   - Anthropic Claude (`ANTHROPIC_API_KEY`)
   - Google Gemini (`GEMINI_API_KEY`) 
   - OpenAI GPT-4 (`OPENAI_API_KEY`)

2. **Input**: A markdown file with requirements in the following format:
   ```markdown
   # REQ-ID
   **Summary**: Requirement summary
   **Description**: Detailed description
   **Verification**: Test approach
   **Actor**: System component responsible
   **Conformance**: SHALL/SHOULD/MAY
   **Conditional**: True/False
   **Source**: Original requirement sources
   ---
   ```
   And an IG capability statement file in markdown format.

3. **Run**: Execute the `run_test_plan_generator()` function and follow the prompts:
   - Select which LLM to use
   - Provide the path to your requirements file
   - Enter the Implementation Guide name
   - Specify the output directory, or use the default

4. **Output**: A single markdown file will be generated with the format:
   `[llm]_test_plan_[timestamp].md`

In [57]:
import re
import os
import logging
import time
import json
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from collections import defaultdict
import httpx

import pandas as pd
from dotenv import load_dotenv
from anthropic import Anthropic, RateLimitError
import google.generativeai as gemini
from openai import OpenAI
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type

# Set up logging
logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


In [58]:
# Constants
PROJECT_ROOT = Path.cwd().parent  # Go up one level to project root
OUTPUT_DIR = os.path.join(PROJECT_ROOT, '/reqs_extraction/test_plan_output')


In [59]:

# API Configuration
API_CONFIGS = {
    "claude": {
        "model_name": "claude-3-5-sonnet-20241022", 
        "max_tokens": 8192,
        "temperature": 0.3,  # Lower temperature for more consistent output
        "batch_size": 5,
        "delay_between_chunks": 1,
        "delay_between_batches": 3,
        "requests_per_minute": 900,
        "max_requests_per_day": 20000,
        "delay_between_requests": 0.1
    },
    "gemini": {
        "model": "models/gemini-1.5-pro-001",
        "max_tokens": 8192,
        "temperature": 0.3,
        "batch_size": 5,
        "delay_between_chunks": 2,
        "delay_between_batches": 5,
        "requests_per_minute": 900,
        "max_requests_per_day": 50000,
        "delay_between_requests": 0.1,
        "timeout": 60
    },
    "gpt": {
        "model": "gpt-4o",
        "max_tokens": 8192,
        "temperature": 0.3,
        "batch_size": 5,
        "delay_between_chunks": 2,
        "delay_between_batches": 5,
        "requests_per_minute": 450,
        "max_requests_per_day": 20000,
        "delay_between_requests": 0.15
    }
}

# System prompts for test generation
SYSTEM_PROMPT = """You are a specialized FHIR testing engineer with expertise in healthcare interoperability.
Your task is to analyze FHIR Implementation Guide requirements and generate practical, implementable test specifications."""



In [60]:
CONSOLIDATED_TEST_PLAN_WITH_CAPABILITY_PROMPT = """
Analyze the following FHIR Implementation Guide requirement and create a comprehensive test specification, 
taking into account the relevant Capability Statement information. 

For the requirement:
{requirement}

Relevant Capability Statement information for this requirement:
{capability_info}

Create a structured test specification with the following sections:

1. Requirement ID

2. Requirement Analysis:
   - Testability Assessment: Classify as automatically testable, an attestation, or not testable due to being too vague or covered by the validator
   - Complexity: Simple, Moderate, or Complex
   - Prerequisites: Required system configurations, data, or setup

3. Test Implementation Strategy:
   - Required inputs including required FHIR resources and expected outputs for the test
   - Required FHIR Operations: List any specific API calls/operations needed (ensure these are suported in the Capability Statement)
   - Validation Criteria: Specific checks to verify conformance; what assertions or results should there be to indicate passing of a test

Format your response as markdown with clear headers.
"""

# New prompt to identify requirement groups
REQUIREMENT_GROUPING_PROMPT = """
Analyze the following requirement from a FHIR Implementation Guide and identify the most appropriate category or group it belongs to.

Requirement:
{requirement}

Group the requirements by the resource profiles that make up the implementation guide from which these requirements were extracted:
Endpoint, HealthcareService, InsurancePlan, Location, Network, Organization, OrganizationAffiliation, Practitioner, and PractionerRole

- Plan-Net Endpoint: The technical details of an endpoint that can be used for electronic services, such as a portal or FHIR REST services, messaging or operations, or DIRECT messaging.
- Plan-Net HealthcareService: The HealthCareService resource typically describes services offered by an organization/practitioner at a location. The resource may be used to encompass a variety of services covering the entire healthcare spectrum, including promotion, prevention, diagnostics, pharmacy, hospital and ambulatory care, home care, long-term care, and other health-related and community services.
- Plan-Net InsurancePlan: An InsurancePlan is a discrete package of health insurance coverage benefits that are offered under a particular network type. A given payer’s products typically differ by network type and/or covered benefits. A plan pairs a product’s covered benefits with the particular cost sharing structure offered to a consumer. A given product may comprise multiple plans (i.e. each plan offers different cost sharing requirements for the same set of covered benefits). InsurancePlan describes a health insurance offering comprised of a list of covered benefits (i.e. the product), costs associated with those benefits (i.e. the plan), and additional information about the offering, such as who it is owned and administered by, a coverage area, contact information, etc.
- Plan-Net Location: A Location is the physical place where healthcare services are provided, practitioners are employed, organizations are based, etc. Locations can range in scope from a room in a building to a geographic region/area.
- Plan-Net Network: A Network refers to a healthcare provider insurance network. A healthcare provider insurance network is an aggregation of organizations and individuals that deliver a set of services across a geography through health insurance products/plans. A network is typically owned by a payer. In the PlanNet IG, individuals and organizations are represented as participants in a PLan-Net Network through the practitionerRole and Plan-Net-organizationAffiliation resources, respectively.
- Plan-Net Organization: An organization is a formal or informal grouping of people or organizations with a common purpose, such as a company, institution, corporation, community group, or healthcare practice. Guidance: When the contact is a department name, rather than a human (e.g., patient help line), include a blank family and given name, and provide the department name in contact.name.text
- Plan-Net OrganizationAffiliation: The OrganizationAffiliation resource describes relationships between two or more organizations, including the services one organization provides another, the location(s) where they provide services, the availability of those services, electronic endpoints, and other relevant information.
- Plan-Net Practitioner: Practitioner is a person who is directly or indirectly involved in the provisioning of healthcare.
- Plan-Net PractitionerRole: PractitionerRole typically describes details about a provider. When the provider is a practitioner, there may be a relationship to an organization. A provider renders services to patients at a location. Practitioner participation in healthcare provider insurance networks may be direct or through their role at an organization. PractitionerRole involves either the actual or potential (hence the optionality on Practitioner) of an individual to play this role on behalf of or under the auspices of an organization. The absence of a Practitioner resource does not imply that the Organization itself is playing the role of a Practitioner, instead it implies that that role has been established by the Organization and MAY apply that to a specific Practitioner.

Return only the category name that best represents this requirement's grouping, with no additional text or explanation. DO NOT group by actor type (e.g. client and server), only resource type.
"""

In [61]:
def create_rate_limiter():
    """Create a rate limiter state dictionary for all APIs"""
    return {
        api: {
            'requests': [],
            'daily_requests': 0,
            'last_reset': time.time()
        }
        for api in API_CONFIGS.keys()
    }

def check_rate_limits(rate_limiter: dict, api: str):
    """Check and wait if rate limits would be exceeded"""
    if api not in rate_limiter:
        raise ValueError(f"Unknown API: {api}")
        
    now = time.time()
    state = rate_limiter[api]
    config = API_CONFIGS[api]
    
    # Reset daily counts if needed
    day_seconds = 24 * 60 * 60
    if now - state['last_reset'] >= day_seconds:
        state['daily_requests'] = 0
        state['last_reset'] = now
    
    # Check daily limit
    if state['daily_requests'] >= config['max_requests_per_day']:
        raise Exception(f"{api} daily request limit exceeded")
    
    # Remove old requests outside the current minute
    state['requests'] = [
        req_time for req_time in state['requests']
        if now - req_time < 60
    ]
    
    # Wait if at rate limit
    if len(state['requests']) >= config['requests_per_minute']:
        sleep_time = 60 - (now - state['requests'][0])
        if sleep_time > 0:
            time.sleep(sleep_time)
        state['requests'] = state['requests'][1:] 
    
    # Add minimum delay between requests
    if state['requests'] and now - state['requests'][-1] < config['delay_between_requests']:
        time.sleep(config['delay_between_requests'])
    
    # Record this request
    state['requests'].append(now)
    state['daily_requests'] += 1

In [62]:
def setup_clients():
    """Initialize clients for each LLM service"""
    try:
        # Claude setup
        verify_path = '/opt/homebrew/etc/openssl@3/cert.pem'
        http_client = httpx.Client(
            verify=verify_path if os.path.exists(verify_path) else True,
            timeout=60.0
        )
        claude_client = Anthropic(
            api_key=os.getenv('ANTHROPIC_API_KEY'),
            http_client=http_client
        )
        
        # Gemini setup
        gemini_api_key = os.getenv('GEMINI_API_KEY')
        if not gemini_api_key:
            raise ValueError("GEMINI_API_KEY not found")
        gemini.configure(api_key=gemini_api_key)
        gemini_client = gemini.GenerativeModel(
            model_name=API_CONFIGS["gemini"]["model"],
            generation_config={
                "max_output_tokens": API_CONFIGS["gemini"]["max_tokens"],
                "temperature": API_CONFIGS["gemini"]["temperature"]
            }
        )
        
        # OpenAI setup
        openai_api_key = os.getenv('OPENAI_API_KEY')
        if not openai_api_key:
            raise ValueError("OPENAI_API_KEY not found")
        openai_client = OpenAI(
            api_key=openai_api_key,
            timeout=60.0
        )
        
        return {
            "claude": claude_client,
            "gpt": openai_client,
            "gemini": gemini_client
        }
        
    except Exception as e:
        logging.error(f"Error setting up clients: {str(e)}")
        raise

In [63]:
def parse_capability_statement(file_path: str) -> Dict[str, Any]:
    """
    Parse a FHIR Capability Statement markdown file into a structured dictionary
    
    Args:
        file_path: Path to the Capability Statement markdown file
        
    Returns:
        Dictionary containing structured Capability Statement information
    """
    with open(file_path, 'r') as f:
        content = f.read()
    
    # Extract resource capabilities
    resource_sections = {}
    
    # Find resource sections - they typically start with "#### ResourceName"
    resource_matches = re.finditer(r'#### ([A-Za-z]+)\n', content)
    
    for match in resource_matches:
        resource_name = match.group(1)
        start_pos = match.start()
        
        # Find the next resource section or end of document
        next_match = re.search(r'#### ([A-Za-z]+)\n', content[start_pos + len(match.group(0)):])
        if next_match:
            end_pos = start_pos + len(match.group(0)) + next_match.start()
            resource_section = content[start_pos:end_pos]
        else:
            resource_section = content[start_pos:]
        
        # Extract specific capabilities
        search_params = []
        search_param_section = re.search(r'Search Parameter Summary:.*?\| Conformance \| Parameter \| Type \| Example \|\n\| --- \| --- \| --- \| --- \|(.*?)(?:\n\n---|\Z)', 
                                       resource_section, re.DOTALL)
        
        if search_param_section:
            param_lines = search_param_section.group(1).strip().split('\n')
            for line in param_lines:
                if '|' in line:
                    parts = [p.strip() for p in line.split('|')]
                    if len(parts) >= 5 and parts[1] and parts[2]:
                        conformance = parts[1].replace('**', '')
                        param_name = parts[2]
                        param_type = parts[3]
                        search_params.append({
                            'name': param_name,
                            'type': param_type,
                            'conformance': conformance
                        })
        
        # Extract supported operations
        operations = []
        operations_section = re.search(r'Supported Operations:(.*?)(?:\n\n|\Z)', resource_section, re.DOTALL)
        if operations_section:
            op_lines = operations_section.group(1).strip().split('\n')
            for line in op_lines:
                if line.strip():
                    operations.append(line.strip())
        
        # Extract includes and revincludes
        includes = []
        includes_section = re.search(r'A Server \*\*SHALL\*\* be capable of supporting the following \_includes:(.*?)(?:\n\n|\Z)', 
                                   resource_section, re.DOTALL)
        if includes_section:
            include_lines = includes_section.group(1).strip().split('\n')
            for line in include_lines:
                if line.strip():
                    include_match = re.search(r'([A-Za-z]+):([A-Za-z\-]+)', line)
                    if include_match:
                        includes.append(f"{include_match.group(1)}:{include_match.group(2)}")
        
        revincludes = []
        revincludes_section = re.search(r'A Server \*\*SHALL\*\* be capable of supporting the following \_revincludes:(.*?)(?:\n\n|\Z)', 
                                      resource_section, re.DOTALL)
        if revincludes_section:
            revinclude_lines = revincludes_section.group(1).strip().split('\n')
            for line in revinclude_lines:
                if line.strip():
                    revinclude_match = re.search(r'([A-Za-z]+):([A-Za-z\-]+)', line)
                    if revinclude_match:
                        revincludes.append(f"{revinclude_match.group(1)}:{revinclude_match.group(2)}")
        
        resource_sections[resource_name] = {
            'search_parameters': search_params,
            'operations': operations,
            'includes': includes,
            'revincludes': revincludes
        }
    
    # Extract general capabilities
    general_capabilities = {}
    general_section = re.search(r'### FHIR RESTful Capabilities(.*?)(?:###|$)', content, re.DOTALL)
    if general_section:
        shall_match = re.search(r'The Plan-Net Server \*\*SHALL\*\*:(.*?)(?:The Plan-Net Server \*\*SHOULD\*\*:|\n\n\*\*Security:\*\*|\Z)', 
                              general_section.group(1), re.DOTALL)
        should_match = re.search(r'The Plan-Net Server \*\*SHOULD\*\*:(.*?)(?:\n\n\*\*Security:\*\*|\Z)', 
                               general_section.group(1), re.DOTALL)
        
        if shall_match:
            shall_items = re.findall(r'\d+\.\s*(.*?)(?:\n\d+\.|\Z)', shall_match.group(1), re.DOTALL)
            general_capabilities['SHALL'] = [item.strip() for item in shall_items]
        
        if should_match:
            should_items = re.findall(r'\d+\.\s*(.*?)(?:\n\d+\.|\Z)', should_match.group(1), re.DOTALL)
            general_capabilities['SHOULD'] = [item.strip() for item in should_items]
    
    return {
        'resources': resource_sections,
        'general_capabilities': general_capabilities
    }

In [None]:
def extract_relevant_capability_info(requirement: Dict[str, str], capability_statement: Dict[str, Any]) -> str:
    """
    Extract relevant capability statement information for a specific requirement
    
    Args:
        requirement: Requirement dictionary
        capability_statement: Parsed capability statement
        
    Returns:
        Formatted string with relevant capability information
    """
    # Determine which resource types are relevant to this requirement
    requirement_text = f"{requirement.get('description', '')} {requirement.get('summary', '')}"
    resource_types = []
    
    # IG FHIR resource types
    fhir_resources = [
        "Patient", "Practitioner", "Organization", "Location", "Endpoint", 
        "HealthcareService", "PractitionerRole", "OrganizationAffiliation",
        "InsurancePlan", "Network"
    ]
    
    # Check if requirement mentions specific resources
    for resource in fhir_resources:
        if resource in requirement_text:
            resource_types.append(resource)
    
    # If no specific resources found, check for general requirements
    if not resource_types:
        # If it's a server requirement
        if "Server" in requirement.get('actor', ''):
            resource_types = ["General Server Capabilities"]
        # If it's a client requirement
        elif "Client" in requirement.get('actor', '') or "Application" in requirement.get('actor', ''):
            resource_types = ["General Client Capabilities"]
    
    # Build relevant capability information
    relevant_info = "### Applicable Capability Statement Information\n\n"
    
    # Add general capabilities
    relevant_info += "#### General Capabilities\n"
    if "general_capabilities" in capability_statement:
        for level in ["SHALL", "SHOULD"]:
            if level in capability_statement["general_capabilities"]:
                relevant_info += f"\n**{level}**:\n"
                for item in capability_statement["general_capabilities"][level]:
                    relevant_info += f"- {item}\n"
    
    # Add resource-specific capabilities
    for resource_type in resource_types:
        if resource_type in capability_statement.get("resources", {}):
            resource_info = capability_statement["resources"][resource_type]
            
            relevant_info += f"\n#### {resource_type} Resource Capabilities\n"
            
            # Add search parameters
            if resource_info.get("search_parameters"):
                relevant_info += "\n**Supported Search Parameters**:\n"
                for param in resource_info["search_parameters"]:
                    relevant_info += f"- {param['name']} ({param['type']}): {param['conformance']}\n"
            
            # Add operations
            if resource_info.get("operations"):
                relevant_info += "\n**Supported Operations**:\n"
                for op in resource_info["operations"]:
                    relevant_info += f"- {op}\n"
            
            # Add includes
            if resource_info.get("includes"):
                relevant_info += "\n**Supported _includes**:\n"
                for include in resource_info["includes"]:
                    relevant_info += f"- {include}\n"
            
            # Add revincludes
            if resource_info.get("revincludes"):
                relevant_info += "\n**Supported _revincludes**:\n"
                for revinclude in resource_info["revincludes"]:
                    relevant_info += f"- {revinclude}\n"
    
    return relevant_info

In [65]:
def parse_requirements_file(file_path: str) -> List[Dict[str, str]]:
    """
    Parse an INCOSE requirements markdown file into a structured list of requirements
    
    Args:
        file_path: Path to the requirements markdown file
        
    Returns:
        List of dictionaries containing structured requirement information
    """
    with open(file_path, 'r') as f:
        content = f.read()
    
    # Split by requirement sections (separated by ---)
    req_sections = content.split('---')
    
    requirements = []
    for section in req_sections:
        if not section.strip():
            continue
            
        # Parse requirement data
        req_data = {}
        
        # Extract ID from format "# REQ-XX"
        id_match = re.search(r'#\s+([A-Z0-9\-]+)', section)
        if id_match:
            req_data['id'] = id_match.group(1)
        
        # Extract other fields
        for field in ['Summary', 'Description', 'Verification', 'Actor', 'Conformance', 'Conditional', 'Source']:
            pattern = rf'\*\*{field}\*\*:\s*(.*?)(?:\n\*\*|\n---|\\Z)'
            field_match = re.search(pattern, section, re.DOTALL)
            if field_match:
                req_data[field.lower()] = field_match.group(1).strip()
        
        if req_data:
            requirements.append(req_data)
    
    return requirements

In [66]:
def format_requirement_for_prompt(requirement: Dict[str, str]) -> str:
    """
    Format a requirement dictionary into markdown for inclusion in prompts
    
    Args:
        requirement: Requirement dictionary
        
    Returns:
        Formatted markdown string
    """
    formatted = f"# {requirement.get('id', 'UNKNOWN-ID')}\n"
    formatted += f"**Summary**: {requirement.get('summary', '')}\n"
    formatted += f"**Description**: {requirement.get('description', '')}\n"
    formatted += f"**Verification**: {requirement.get('verification', '')}\n"
    formatted += f"**Actor**: {requirement.get('actor', '')}\n"
    formatted += f"**Conformance**: {requirement.get('conformance', '')}\n"
    formatted += f"**Conditional**: {requirement.get('conditional', '')}\n"
    formatted += f"**Source**: {requirement.get('source', '')}\n"
    
    return formatted

In [67]:
@retry(
    wait=wait_exponential(multiplier=1, min=4, max=60),
    stop=stop_after_attempt(5),
    retry=retry_if_exception_type((RateLimitError, TimeoutError))
)
def make_llm_request(client, api_type: str, prompt: str, system_prompt: str, rate_limit_func) -> str:
    """Make rate-limited API request with retries"""
    rate_limit_func()
    
    config = API_CONFIGS[api_type]
    
    try:
        if api_type == "claude":
            response = client.messages.create(
                model=config["model_name"],
                max_tokens=config["max_tokens"],
                messages=[{
                    "role": "user", 
                    "content": prompt
                }],
                system=system_prompt
            )
            return response.content[0].text
            
        elif api_type == "gemini":
            response = client.generate_content(
                prompt,
                generation_config={
                    "max_output_tokens": config["max_tokens"],
                    "temperature": config["temperature"]
                }
            )
            if hasattr(response, 'text'):
                return response.text
            elif response.candidates:
                return response.candidates[0].content.parts[0].text
            else:
                raise ValueError("No response generated from Gemini API")
                    
        elif api_type == "gpt":
            response = client.chat.completions.create(
                model=config["model"],
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": prompt}
                ],
                max_tokens=config["max_tokens"],
                temperature=config["temperature"]
            )
            return response.choices[0].message.content
            
    except Exception as e:
        logging.error(f"Error in {api_type} API request: {str(e)}")
        raise

In [68]:
def identify_requirement_group(
    client, 
    api_type: str,
    requirement: Dict[str, str],
    rate_limit_func
) -> str:
    """
    Identify the appropriate group for a requirement using LLM
    
    Args:
        client: The API client
        api_type: API type (claude, gemini, gpt)
        requirement: Requirement dictionary
        rate_limit_func: Function to check rate limits
        
    Returns:
        Identified group name
    """
    # Use actor field as a possible hint if available
    actor = requirement.get('actor', '').strip()
    if actor and len(actor) > 3 and actor not in ['System', 'User', 'All']:
        # Simple heuristic - if actor is specific enough, it might be a good grouping
        possible_groups = ['Client', 'Server', 'Patient', 'Practitioner', 'Organization', 'HealthcareService']
        for group in possible_groups:
            if group.lower() in actor.lower():
                return group
    
    # Use LLM to identify group
    logger.info(f"Identifying group for requirement {requirement.get('id', 'unknown')} using {api_type}...")
    
    # Format requirement as markdown
    formatted_req = format_requirement_for_prompt(requirement)
    
    # Create prompt with the requirement
    prompt = REQUIREMENT_GROUPING_PROMPT.format(requirement=formatted_req)
    
    # Make the API request with simplified system prompt
    group_system_prompt = "You are a FHIR expert who categorizes requirements by their functional or resource type."
    group_name = make_llm_request(client, api_type, prompt, group_system_prompt, rate_limit_func).strip()
    
    # Clean up response (in case model returns extra text)
    if '\n' in group_name:
        group_name = group_name.split('\n')[0].strip()
    
    return group_name

In [69]:
def generate_test_specification_with_capability(
    client, 
    api_type: str,
    requirement: Dict[str, str],
    capability_statement: Dict[str, Any],
    rate_limit_func
) -> str:
    """
    Generate a comprehensive test specification for a single requirement, considering capability statement
    
    Args:
        client: The API client
        api_type: API type (claude, gemini, gpt)
        requirement: Requirement dictionary
        capability_statement: Parsed capability statement
        rate_limit_func: Function to check rate limits
        
    Returns:
        Test specification for the requirement
    """
    logger.info(f"Generating test specification for {requirement.get('id', 'unknown')} using {api_type}...")
    
    # Format requirement as markdown
    formatted_req = format_requirement_for_prompt(requirement)
    
    # Extract relevant capability information
    capability_info = extract_relevant_capability_info(requirement, capability_statement)
    
    # Create prompt with the requirement and capability information
    prompt = CONSOLIDATED_TEST_PLAN_WITH_CAPABILITY_PROMPT.format(
        requirement=formatted_req,
        capability_info=capability_info
    )
    
    # Make the API request
    return make_llm_request(client, api_type, prompt, SYSTEM_PROMPT, rate_limit_func)

In [70]:
def generate_consolidated_test_plan(
    api_type: str,
    requirements_file: str,
    capability_statement_file: str = None,
    ig_name: str = "FHIR Implementation Guide",
    output_dir: str = OUTPUT_DIR
) -> Dict[str, Any]:
    """
    Process requirements and generate a consolidated test plan
    
    Args:
        api_type: API type (claude, gemini, gpt)
        requirements_file: Path to requirements markdown file
        capability_statement_file: Path to capability statement markdown file (optional)
        ig_name: Name of the Implementation Guide
        output_dir: Directory for output files
        
    Returns:
        Dictionary containing path to output file
    """
    logger.info(f"Starting test plan generation with {api_type} for {ig_name}")
    
    # Initialize API clients and rate limiters
    clients = setup_clients()
    client = clients[api_type]
    config = API_CONFIGS[api_type]
    rate_limiter = create_rate_limiter()
    
    def check_limits():
        check_rate_limits(rate_limiter, api_type)
    
    # Create output directory
    os.makedirs(output_dir, exist_ok=True)
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    try:
        # Parse requirements from file
        requirements = parse_requirements_file(requirements_file)
        logger.info(f"Parsed {len(requirements)} requirements from {requirements_file}")
        
        # Parse capability statement if provided
        capability_statement = None
        if capability_statement_file and os.path.exists(capability_statement_file):
            capability_statement = parse_capability_statement(capability_statement_file)
            logger.info(f"Parsed capability statement from {capability_statement_file}")
        
        # Identify groups for each requirement
        req_groups = {}
        for req in requirements:
            req_id = req.get('id', 'UNKNOWN-ID')
            req_groups[req_id] = identify_requirement_group(client, api_type, req, check_limits)
            # Add small delay to avoid rate limiting
            time.sleep(0.5)
        
        # Group requirements by identified category
        grouped_requirements = defaultdict(list)
        for req in requirements:
            req_id = req.get('id', 'UNKNOWN-ID')
            group = req_groups.get(req_id, 'Uncategorized')
            grouped_requirements[group].append(req)
            
        # Log the grouping results
        logger.info(f"Requirements grouped into {len(grouped_requirements)} categories")
        for group, reqs in grouped_requirements.items():
            logger.info(f"Group '{group}': {len(reqs)} requirements")
        
        # Output file
        test_plan_path = os.path.join(
            output_dir, 
            f"{api_type}_test_plan_{timestamp}.md"
        )
        
        # Initialize test plan content
        test_plan = f"# Consolidated Test Plan for {ig_name}\n\n"
        test_plan += f"## Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
        
        # Add capability statement reference if used
        if capability_statement:
            test_plan += "## Capability Statement\n\n"
            test_plan += f"This test plan incorporates constraints and requirements from the {ig_name} Capability Statement.\n\n"
        
        test_plan += "## Table of Contents\n\n"
        
        # Add group headers to TOC
        for group in sorted(grouped_requirements.keys()):
            test_plan += f"- [{group}](#{group.lower().replace(' ', '-')})\n"
            for req in grouped_requirements[group]:
                req_id = req.get('id', 'UNKNOWN-ID')
                req_summary = req.get('summary', 'No summary')
                test_plan += f"  - [{req_id}: {req_summary}](#{req_id.lower()})\n"
        
        # Process each group and its requirements
        test_plan += "\n## Test Specifications\n\n"
        
        for group in sorted(grouped_requirements.keys()):
            # Add group header with anchor for TOC linking
            test_plan += f"<a id='{group.lower().replace(' ', '-')}'></a>\n\n"
            test_plan += f"## {group}\n\n"
            
            # Process each requirement in the group
            for i, req in enumerate(grouped_requirements[group]):
                req_id = req.get('id', 'UNKNOWN-ID')
                logger.info(f"Processing requirement for group '{group}': {req_id}")
                
                # Generate test specification with capability statement if available
                if capability_statement:
                    test_spec = generate_test_specification_with_capability(
                        client, api_type, req, capability_statement, check_limits
                    )
                else:
                    test_spec = generate_test_specification(client, api_type, req, check_limits)
                
                # Add to test plan content with proper anchor for TOC linking
                test_plan += f"<a id='{req_id.lower()}'></a>\n\n"
                test_plan += f"### {req_id}: {req.get('summary', 'No summary')}\n\n"
                test_plan += f"**Description**: {req.get('description', '')}\n\n"
                test_plan += f"**Actor**: {req.get('actor', '')}\n\n"
                test_plan += f"**Conformance**: {req.get('conformance', '')}\n\n"
                test_plan += f"{test_spec}\n\n"
                test_plan += "---\n\n"
                
                # Add delay between requests
                if i < len(grouped_requirements[group]) - 1:  # No need to delay after the last request
                    time.sleep(config["delay_between_chunks"])
            
            # Add spacing between groups
            test_plan += "\n\n"
        
        # Save consolidated test plan
        with open(test_plan_path, 'w') as f:
            f.write(test_plan)
        logger.info(f"Consolidated test plan saved to {test_plan_path}")
        
        return {
            "requirements_count": len(requirements),
            "group_count": len(grouped_requirements),
            "test_plan_path": test_plan_path
        }
        
    except Exception as e:
        logger.error(f"Error processing requirements: {str(e)}")
        raise

In [71]:
def run_test_plan_generator():
    # Load environment variables
    load_dotenv()
    
    # Get input from user or set default values
    print("\nFHIR IG Test Plan Generator")
    print("=" * 50)
    
    # Let user select the API
    print("\nSelect the API to use:")
    print("1. Claude")
    print("2. Gemini")
    print("3. GPT-4")
    api_choice = input("Enter your choice (1-3, default 1): ") or "1"
    
    api_mapping = {
        "1": "claude",
        "2": "gemini",
        "3": "gpt"
    }
    
    api_type = api_mapping.get(api_choice, "claude")
    
    # Get requirements file path
    requirements_file = input("\nEnter path to requirements markdown file: ")
    
    # Check if requirements file exists
    if not os.path.exists(requirements_file):
        logger.error(f"Requirements file not found: {requirements_file}")
        print(f"Error: Requirements file not found at {requirements_file}")
        return
    
    # Get capability statement file path (optional)
    capability_statement_file = input("\nEnter path to Capability Statement markdown file (optional, press Enter to skip): ")
    
    if capability_statement_file and not os.path.exists(capability_statement_file):
        logger.warning(f"Capability Statement file not found: {capability_statement_file}")
        print(f"Warning: Capability Statement file not found at {capability_statement_file}. Proceeding without it.")
        capability_statement_file = None
    
    # Get IG name
    ig_name = input("\nEnter Implementation Guide name (default 'FHIR Implementation Guide'): ") or "FHIR Implementation Guide"
    
    # Get output directory
    output_dir = input(f"\nEnter output directory (default '{OUTPUT_DIR}'): ") or OUTPUT_DIR
    
    print(f"\nProcessing requirements with {api_type.capitalize()}...")
    if capability_statement_file:
        print(f"Including Capability Statement from {capability_statement_file}")
    print(f"This may take several minutes depending on the number of requirements.")
    
    try:
        # Process requirements and generate test plan
        result = generate_consolidated_test_plan(
            api_type=api_type,
            requirements_file=requirements_file,
            capability_statement_file=capability_statement_file,
            ig_name=ig_name,
            output_dir=output_dir
        )
        
        # Output results
        print("\n" + "="*80)
        print(f"Test plan generation complete!")
        print(f"Processed {result['requirements_count']} requirements")
        print(f"Grouped into {result['group_count']} categories")
        print(f"Consolidated test plan: {result['test_plan_path']}")
        print("="*80)
        
        return result
        
    except Exception as e:
        logger.error(f"Error: {str(e)}")
        print(f"\nError occurred during processing: {str(e)}")
        print("Check the log for more details.")
        return None

In [72]:
# Run the generator when executed in a notebook cell
if __name__ == "__main__":
    run_test_plan_generator()


FHIR IG Test Plan Generator

Select the API to use:
1. Claude
2. Gemini
3. GPT-4


2025-04-16 10:40:41,784 - __main__ - INFO - Starting test plan generation with claude for Plan Net
2025-04-16 10:40:41,811 - __main__ - INFO - Parsed 10 requirements from /Users/ceadams/Documents/onclaive/onclaive/reqs_extraction/revised_reqs/claude_reqs_list_v220250416_103916.md
2025-04-16 10:40:41,813 - __main__ - INFO - Parsed capability statement from /Users/ceadams/Documents/onclaive/onclaive/full-ig/markdown7_cleaned/CapabilityStatement_plan_net.md



Processing requirements with Claude...
Including Capability Statement from /Users/ceadams/Documents/onclaive/onclaive/full-ig/markdown7_cleaned/CapabilityStatement_plan_net.md
This may take several minutes depending on the number of requirements.


2025-04-16 10:40:43,326 - __main__ - INFO - Identifying group for requirement REQ-04 using claude...
2025-04-16 10:40:45,596 - httpx - INFO - HTTP Request: POST https://api.anthropic.com/v1/messages "HTTP/1.1 200 OK"
2025-04-16 10:40:46,098 - __main__ - INFO - Identifying group for requirement REQ-05 using claude...
2025-04-16 10:40:47,336 - httpx - INFO - HTTP Request: POST https://api.anthropic.com/v1/messages "HTTP/1.1 200 OK"
2025-04-16 10:40:47,838 - __main__ - INFO - Identifying group for requirement REQ-06 using claude...
2025-04-16 10:40:48,924 - httpx - INFO - HTTP Request: POST https://api.anthropic.com/v1/messages "HTTP/1.1 200 OK"
2025-04-16 10:40:49,425 - __main__ - INFO - Identifying group for requirement REQ-07 using claude...
2025-04-16 10:40:50,778 - httpx - INFO - HTTP Request: POST https://api.anthropic.com/v1/messages "HTTP/1.1 200 OK"
2025-04-16 10:40:51,284 - __main__ - INFO - Identifying group for requirement REQ-08 using claude...
2025-04-16 10:40:52,917 - httpx


Test plan generation complete!
Processed 10 requirements
Grouped into 9 categories
Consolidated test plan: /Users/ceadams/Documents/onclaive/onclaive/test_kit_dev/test_plan_output/claude_test_plan_20250416_104041.md
