### This notebook generates realistic compliance evaluation scenarios by leveraging AWS Bedrock's Retrieval-Augmented Generation (RAG) capabilities using the NIST control framework and specific organizational policies generated against that framework. It generates 1,000 (500 compliant, 500 non-compliant) complex, multi-policy scenarios that simulate real-world compliance situations (like employee onboarding, data access requests, or security incidents).


In [None]:
import boto3
import json
import time
from typing import List, Dict

KNOWLEDGE_BASE_ID = 'T8EW10IU3Z'  # 183023889407-us-east-1-compliance-rule-generator-kb-s3
MODELS = {
    'premium': 'arn:aws:bedrock:us-east-1:183023889407:inference-profile/global.anthropic.claude-opus-4-5-20251101-v1:0', # not available
    'good': 'arn:aws:bedrock:us-east-1:183023889407:inference-profile/global.anthropic.claude-sonnet-4-5-20250929-v1:0', # times out
    'balanced': 'arn:aws:bedrock:us-east-1:183023889407:inference-profile/us.anthropic.claude-sonnet-4-20250514-v1:0',
    'fast_cheap': 'arn:aws:bedrock:us-east-1:183023889407:inference-profile/us.anthropic.claude-haiku-4-5-20251001-v1:0',
    'aws_native_premier': 'arn:aws:bedrock:us-east-1:183023889407:inference-profile/us.amazon.nova-premier-v1:0',
    'aws_native_pro': 'arn:aws:bedrock:us-east-1:183023889407:inference-profile/us.amazon.nova-pro-v1:0'
}
MODEL_ARN = MODELS['good']

bedrock_agent_runtime = boto3.client('bedrock-agent-runtime', region_name='us-east-1')
bedrock_runtime = boto3.client('bedrock-runtime', region_name='us-east-1')

In [None]:
def retrieve_kb_context(knowledge_base_id: str, batch_size: int) -> str:
    """Get all KB context"""
    kb_query = f"""Retrieve {batch_size * 15} different company policies covering NIST control families including: AC (Access Control), AT (Awareness and Training), AU (Audit and Accountability),
    CA (Assessment, Authorization, and Monitoring), CM (Configuration Management), CP (Contingency Planning), IA (Identification and Authentication), IR (Incident Response), MA (Maintenance),
    MP (Media Protection), PE (Physical and Environmental Protection), PL (Planning), PM (Program Management), PS (Personnel Security), PT (PII Processing and Transparency), RA (Risk Assessment),
    SA (System and Services Acquisition), SC (System and Communications Protection), SI (System and Information Integrity), SR (Supply Chain Risk Management)."""
    
    kb_response = bedrock_agent_runtime.retrieve(
        knowledgeBaseId=knowledge_base_id,
        retrievalQuery={'text': kb_query},
        retrievalConfiguration={'vectorSearchConfiguration': {'numberOfResults': 100}}
    )
    
    all_context = [result['content']['text'] for result in kb_response.get('retrievalResults', [])]
    print(f"Retrieved {len(all_context)} context chunks from knowledge base")
    return '\n\n'.join(all_context)

def generate_scenario_batch(context: str, batch_num: int, batch_size: int, model_arn: str) -> List[Dict]:
    """Generate one batch using Converse API"""
    is_compliant = batch_num % 2 == 0
    
    prompt = f"""Based on the following comprehensive compliance policies and NIST controls, generate {batch_size} realistic compliance scenarios 
    that are {'compliant' if is_compliant else 'non-compliant'}.
    
    KNOWLEDGE BASE CONTEXT:
    {context}

    Each scenario must:
    - Reference multiple policies from the context above
    - Include specific business details (roles, systems, data, actions)
    - Be realistic and detailed (200+ words)
    - Have ID format: SCENARIO-{batch_num * batch_size + 1:04d} onwards
    
    Return as JSON array with fields: scenario-id, scenario-detail, is-compliant"""
    
    tool_config = {
        "tools": [{
            "toolSpec": {
                "name": "scenario_json",
                "description": "Return compliance scenarios as JSON",
                "inputSchema": {
                    "json": {
                        "type": "object",
                        "properties": {
                            "scenarios": {
                                "type": "array",
                                "items": {
                                    "type": "object",
                                    "properties": {
                                        "scenario-id": {"type": "string"},
                                        "scenario-detail": {"type": "string"},
                                        "is-compliant": {"type": "boolean"}
                                    },
                                    "required": ["scenario-id", "scenario-detail", "is-compliant"]
                                }
                            }
                        },
                        "required": ["scenarios"]
                    }
                }
            }
        }],
        "toolChoice": {"tool": {"name": "scenario_json"}}
    }
    
    model_id = model_arn.split('/')[-1] if '/' in model_arn else model_arn
    response = bedrock_runtime.converse(
        modelId=model_id,
        messages=[{"role": "user", "content": [{"text": prompt}]}],
        toolConfig=tool_config,
        inferenceConfig={"maxTokens": 4096, "temperature": 0.7}
    )
    
    if response.get('stopReason') == 'tool_use':
        for content_block in response['output']['message']['content']:
            if 'toolUse' in content_block:
                scenarios_data = content_block['toolUse']['input']
                return scenarios_data.get('scenarios', [])
    return []

def generate_compliance_scenarios(
    knowledge_base_id: str = KNOWLEDGE_BASE_ID,
    model_arn: str = MODEL_ARN,
    batch_size: int = 10,
    total_scenarios: int = 100
) -> List[Dict]:
    """Main function - calls the two helpers"""
    try:
        context = retrieve_kb_context(knowledge_base_id, batch_size)
    except Exception as e:
        print(f"Error retrieving from KB: {e}")
        return []
    
    all_scenarios = []
    for batch_num in range(total_scenarios // batch_size):
        try:
            scenarios = generate_scenario_batch(context, batch_num, batch_size, model_arn)
            all_scenarios.extend(scenarios)
            print(f"Batch {batch_num + 1}: Generated {len(scenarios)} scenarios")
        except Exception as e:
            print(f"Error in batch {batch_num + 1}: {e}")
        time.sleep(2)
    
    return all_scenarios

In [None]:
def save_scenarios_to_file(scenarios: List[Dict], output_path: str):
    """Save generated scenarios to a JSON file."""
    
    print(json.dumps(scenarios, indent=2))
    
    with open(output_path, 'w') as f:
        json.dump({
            'total_scenarios': len(scenarios),
            'compliant_count': sum(1 for s in scenarios if s['is_compliant']),
            'non_compliant_count': sum(1 for s in scenarios if not s['is_compliant']),
            'scenarios': scenarios
        }, f, indent=2)

In [None]:
scenarios = generate_compliance_scenarios(
    knowledge_base_id=KNOWLEDGE_BASE_ID,
    model_arn=MODELS['balanced'],
    batch_size=2,
    total_scenarios=4
)
save_scenarios_to_file(scenarios, '/home/sagemaker-user/ScenariosGenerated.json')