# Azure AI Foundry Security Pen-Testing Tracing with ADX

This notebook demonstrates advanced tracing capabilities for security pen-testing scenarios using Azure AI Foundry with Azure Data Explorer (ADX). We'll simulate a security team running various penetration tests while capturing comprehensive telemetry data.

## Simple 3-Step Setup

### Step 1: Deploy Resources
```bash
cd terraform
./deploy-adx-complete.sh
```

### Step 2: Load Environment
```bash
source ../../.env
```

### Step 3: Run This Notebook
Just run all cells below! Everything is automated.

---

## Scenario Overview
- **Context**: Security team conducting comprehensive pen-testing
- **Goal**: Generate 100+ realistic security test traces
- **Tools**: OpenAI models for security analysis, ADX for data storage and analytics
- **Outcome**: Rich dataset for security analytics and cost optimization

## What This Notebook Does
- **Automatically connects** to your deployed ADX cluster  
- **Generates 120 realistic** security test scenarios  
- **Uses AI** to analyze vulnerabilities and generate recommendations  
- **Exports data** to ADX for advanced analytics  
- **Provides KQL queries** for immediate insights

## 1. Initialize Environment

The notebook automatically imports libraries and connects to your deployed resources. Just run the cells below!

In [1]:
# Import Required Libraries
import os, json, random, time, uuid, math, hashlib
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import List, Dict, Any, Optional
import pandas as pd

# Azure AI Foundry imports
from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential

# Environment configuration
from dotenv import load_dotenv

# OpenTelemetry / tracing
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter

# Initialize tracer provider
if not isinstance(trace.get_tracer_provider(), TracerProvider):
    provider = TracerProvider()
    provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
    trace.set_tracer_provider(provider)

tracer = trace.get_tracer("security.pentest")

print("‚úÖ All libraries imported successfully!")
print("Starting security pen-testing tracing simulation...")
print("Next: Environment configuration will load...")

‚úÖ All libraries imported successfully!
Starting security pen-testing tracing simulation...
Next: Environment configuration will load...


In [2]:
# Load environment variables
load_dotenv()

print("Loading configuration from environment...")

# Configuration from environment
AZURE_AI_PROJECT_ENDPOINT = os.getenv("PROJECT_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME", "gpt-4o-mini")
AZURE_OPENAI_API_VERSION = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-01")

# ADX Configuration (auto-configured by deployment script)
ADX_CLUSTER_URI = os.getenv("ADX_CLUSTER_URI")
ADX_DATABASE_NAME = os.getenv("ADX_DATABASE_NAME", "TracingDB")

print("\nConfiguration Loaded:")
print(f"   Azure AI Project:  ‚úÖ {'Configured' if AZURE_AI_PROJECT_ENDPOINT else 'Missing'}")
print(f"   Deployment: {AZURE_OPENAI_DEPLOYMENT_NAME}")
print(f"   ADX Cluster:  ‚úÖ {'Configured' if ADX_CLUSTER_URI else 'Missing'}")
print(f"   ADX Database: {ADX_DATABASE_NAME}")

# Configuration validation
config_ok = True
if not AZURE_AI_PROJECT_ENDPOINT:
    print("\nPROJECT_ENDPOINT ‚ùå not set. Please check:")
    print("   1. Did you run './deploy-adx-complete.sh'?")
    print("   2. Did you run 'source ../../.env'?")
    config_ok = False

if not ADX_CLUSTER_URI:
    print("\nADX_CLUSTER_URI ‚ùå not set. Please run:")
    print("   cd ../terraform && ./deploy-adx-complete.sh")
    config_ok = False

if config_ok:
    print("\nAll configuration ‚úÖ looks good! Proceeding with setup...")
else:
    print("\nConfiguration issues detected. Notebook will continue with limited functionality.")

# Initialize Azure AI Project Client
try:
    if AZURE_AI_PROJECT_ENDPOINT:
        project_client = AIProjectClient(
            credential=DefaultAzureCredential(),
            endpoint=AZURE_AI_PROJECT_ENDPOINT,
        )
        print("AI Project Client initialized ‚úÖ successfully!")
    else:
        project_client = None
        print("AI Project Client not initialized - using mock mode")
except Exception as e:
    print(f"‚ùå Error initializing AI Project Client: {e}")
    print("Continuing with mock mode...")
    project_client = None

Loading configuration from environment...

Configuration Loaded:
   Azure AI Project:  ‚úÖ Missing
   Deployment: gpt-4o-mini
   ADX Cluster:  ‚úÖ Configured
   ADX Database: TracingDB

PROJECT_ENDPOINT ‚ùå not set. Please check:
   1. Did you run './deploy-adx-complete.sh'?
   2. Did you run 'source ../../.env'?

Configuration issues detected. Notebook will continue with limited functionality.
AI Project Client not initialized - using mock mode


## Authentication

Uses existing environment variables and Azure CLI login.

Prerequisites:
- Run `az login` once in a terminal
- `.env` (or prior cell) provides: `ADX_CLUSTER_URI`, `ADX_DATABASE_NAME`

In [None]:
# Authentication & ADX client setup
import os, time
from azure.identity import AzureCliCredential
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties
from azure.core.exceptions import AzureError

ADX_CLUSTER_URI = os.getenv("ADX_CLUSTER_URI")
ADX_DATABASE_NAME = os.getenv("ADX_DATABASE_NAME", "TracingDB")
ADX_CLUSTER_NAME = os.getenv("ADX_CLUSTER_NAME")
RESOURCE_GROUP = os.getenv("AZURE_RESOURCE_GROUP")

if not ADX_CLUSTER_URI or not ADX_CLUSTER_NAME or not RESOURCE_GROUP:
    raise ValueError("Missing required ADX env vars (ADX_CLUSTER_URI, ADX_CLUSTER_NAME, AZURE_RESOURCE_GROUP)")

azure_credential = AzureCliCredential()

kcsb_query = KustoConnectionStringBuilder.with_az_cli_authentication(ADX_CLUSTER_URI)
adx_client = KustoClient(kcsb_query)

# Ingest endpoint
INGEST_URI = ADX_CLUSTER_URI.replace("https://", "https://ingest-")
kcsb_ingest = KustoConnectionStringBuilder.with_az_cli_authentication(INGEST_URI)
adx_ingest_client = QueuedIngestClient(kcsb_ingest)

import subprocess
try:
    state = subprocess.check_output([
        "az","kusto","cluster","show","--name",ADX_CLUSTER_NAME,"--resource-group",RESOURCE_GROUP,"--query","state","-o","tsv"
    ], text=True).strip()
    if state != "Running":
        subprocess.run([
            "az","kusto","cluster","start","--name",ADX_CLUSTER_NAME,"--resource-group",RESOURCE_GROUP,"--no-wait"
        ], check=False)
        for _ in range(16):
            time.sleep(15)
            state = subprocess.check_output([
                "az","kusto","cluster","show","--name",ADX_CLUSTER_NAME,"--resource-group",RESOURCE_GROUP,"--query","state","-o","tsv"
            ], text=True).strip()
            if state == "Running":
                break
    if state != "Running":
        raise RuntimeError(f"Cluster not Running (state={state})")
except Exception as e:
    raise RuntimeError(f"Cluster state check failed: {e}")

for attempt in range(4):
    try:
        r = adx_client.execute("", ".show databases")
        dbs = [row[0] for row in r.primary_results[0]]
        if ADX_DATABASE_NAME in dbs:
            print("‚úÖ Auth OK | Cluster Running | DB:", ADX_DATABASE_NAME)
        else:
            print("‚ö†Ô∏è DB not found yet, present DBs:", dbs)
        break
    except Exception as ex:
        if attempt == 3:
            raise
        time.sleep(5)

In [None]:
# Status
print("Clients: Azure ‚úÖ, ADX ‚úÖ, DB:", ADX_DATABASE_NAME)

## ADX Schema Ready 

**Great! If you used the setup, your ADX schema is already configured.**

### What Was Set Up:
- ‚úÖ **3 Tables**: OTelTraces, SecurityTraces, LLMInteractions
- ‚úÖ **3 JSON Mappings**: For data ingestion
- ‚úÖ **7 Analytics Functions**: Ready-to-use KQL functions
- ‚úÖ **EventHub Data Connection**: Live streaming enabled


## 2. Security Testing Framework

The next few cells configure realistic security testing scenarios. No configuration needed!

In [None]:
# Security Testing Data Models
@dataclass
class SecurityTest:
    test_id: str
    test_type: str
    test_name: str
    target: str
    severity: str
    status: str
    duration: float
    findings: Dict[str, Any]
    recommendations: List[str]
    tester_info: Dict[str, str]
    environment: str
    timestamp: datetime

@dataclass
class LLMInteraction:
    interaction_id: str
    trace_id: str
    span_id: str
    model: str
    tokens_used: int
    prompt_tokens: int
    completion_tokens: int
    temperature: float
    max_tokens: int
    prompt_hash: str
    response_length: int
    processing_time: float
    cost: float
    success: bool
    error_message: str
    timestamp: datetime

# Security Testing Configuration
SECURITY_TEST_TYPES = [
    "vulnerability_scan",
    "penetration_test", 
    "code_analysis",
    "infrastructure_assessment",
    "social_engineering",
    "web_application_test",
    "network_security_test",
    "database_security_test",
    "mobile_security_test",
    "cloud_security_test"
]

SEVERITY_LEVELS = ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]
TEST_STATUSES = ["PASSED", "FAILED", "VULNERABLE", "INCONCLUSIVE", "BLOCKED"]
ENVIRONMENTS = ["development", "staging", "production", "test"]

# Target systems for testing
TARGET_SYSTEMS = [
    {"name": "web-app-01", "type": "web_application", "ip": "10.0.1.100"},
    {"name": "api-gateway", "type": "api", "ip": "10.0.1.101"},
    {"name": "database-01", "type": "database", "ip": "10.0.2.50"},
    {"name": "file-server", "type": "file_system", "ip": "10.0.2.51"},
    {"name": "email-server", "type": "email", "ip": "10.0.3.100"},
    {"name": "cloud-storage", "type": "cloud", "ip": "external"},
    {"name": "mobile-app", "type": "mobile", "ip": "external"},
    {"name": "network-device", "type": "network", "ip": "10.0.0.1"},
    {"name": "workstation-01", "type": "endpoint", "ip": "10.0.4.100"},
    {"name": "legacy-system", "type": "legacy", "ip": "10.0.5.50"}
]

# Security team members
SECURITY_TESTERS = [
    {"name": "Alice Johnson", "role": "Senior Penetration Tester", "specialization": "web_apps"},
    {"name": "Bob Smith", "role": "Network Security Specialist", "specialization": "infrastructure"},
    {"name": "Carol Davis", "role": "Code Security Analyst", "specialization": "code_analysis"},
    {"name": "David Wilson", "role": "Cloud Security Engineer", "specialization": "cloud"},
    {"name": "Eve Brown", "role": "Mobile Security Tester", "specialization": "mobile"},
]

print(" Security testing framework configured!")
print(f" Test types: {len(SECURITY_TEST_TYPES)}")
print(f" Target systems: {len(TARGET_SYSTEMS)}")
print(f" Security team: {len(SECURITY_TESTERS)} members")

## 3. AI-Powered Security Analysis

These functions use your deployed AI models to analyze security findings. The setup is automatic!

In [None]:
# Get OpenAI client
if project_client:
    openai_client = project_client.get_openai_client(api_version=AZURE_OPENAI_API_VERSION)
else:
    openai_client = None
    print(" OpenAI client not available - using mock responses")

# Helper function to calculate token cost (approximate)
def calculate_cost(prompt_tokens: int, completion_tokens: int, model: str = "gpt-4.1-mini") -> float:
    """Calculate approximate cost for token usage"""
    # Approximate 
    cost_per_prompt_token = 0.00015 / 1000  # $0.15 per 1K tokens
    cost_per_completion_token = 0.0006 / 1000  # $0.60 per 1K tokens
    
    return (prompt_tokens * cost_per_prompt_token) + (completion_tokens * cost_per_completion_token)

@tracer.start_as_current_span("analyze_vulnerability_report")
def analyze_vulnerability_report(scan_results: str, target_system: str, test_type: str) -> Dict[str, Any]:
    """Analyze vulnerability scan results using LLM"""
    current_span = trace.get_current_span()
    start_time = time.time()
    
    # Add span attributes
    current_span.set_attribute("analysis.target_system", target_system)
    current_span.set_attribute("analysis.test_type", test_type)
    current_span.set_attribute("analysis.input_length", len(scan_results))
    
    prompt = f"""
    As a senior cybersecurity analyst, analyze the following vulnerability scan results for {target_system}:
    
    Scan Results:
    {scan_results}
    
    Provide a comprehensive analysis including:
    1. Risk severity assessment (CRITICAL, HIGH, MEDIUM, LOW, INFO)
    2. Exploitability analysis
    3. Business impact assessment
    4. Remediation recommendations
    5. Timeline for fixes
    
    Format your response as JSON with the following structure:
    {{
        "severity": "CRITICAL|HIGH|MEDIUM|LOW|INFO",
        "exploitability": "IMMEDIATE|HIGH|MEDIUM|LOW|NONE",
        "business_impact": "description",
        "vulnerabilities_found": [list of vulnerabilities],
        "recommendations": [list of actionable recommendations],
        "timeline": "IMMEDIATE|1_WEEK|1_MONTH|QUARTERLY",
        "confidence": 0.95
    }}
    """
    
    if openai_client:
        try:
            response = openai_client.chat.completions.create(
                model=AZURE_OPENAI_DEPLOYMENT_NAME,
                messages=[
                    {"role": "system", "content": "You are a senior cybersecurity analyst with expertise in vulnerability assessment and penetration testing."},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.3,
                max_tokens=800
            )
            
            processing_time = time.time() - start_time
            
            # Extract and parse response
            analysis_text = response.choices[0].message.content
            
            # Try to parse JSON response
            try:
                analysis_result = json.loads(analysis_text)
            except json.JSONDecodeError:
                # Fallback if JSON parsing fails
                analysis_result = {
                    "severity": "MEDIUM",
                    "exploitability": "MEDIUM", 
                    "business_impact": analysis_text[:200],
                    "vulnerabilities_found": ["Parsing error - raw response available"],
                    "recommendations": ["Review raw analysis output"],
                    "timeline": "1_WEEK",
                    "confidence": 0.7,
                    "raw_response": analysis_text
                }
            
            # Record LLM interaction
            llm_interaction = LLMInteraction(
                interaction_id=str(uuid.uuid4()),
                trace_id=current_span.get_span_context().trace_id,
                span_id=current_span.get_span_context().span_id,
                model=AZURE_OPENAI_DEPLOYMENT_NAME,
                tokens_used=response.usage.total_tokens,
                prompt_tokens=response.usage.prompt_tokens,
                completion_tokens=response.usage.completion_tokens,
                temperature=0.3,
                max_tokens=800,
                prompt_hash=hashlib.md5(prompt.encode()).hexdigest(),
                response_length=len(analysis_text),
                processing_time=processing_time,
                cost=calculate_cost(response.usage.prompt_tokens, response.usage.completion_tokens),
                success=True,
                error_message="",
                timestamp=datetime.now()
            )
            
            # Add span attributes
            current_span.set_attribute("llm.tokens_used", response.usage.total_tokens)
            current_span.set_attribute("llm.processing_time", processing_time)
            current_span.set_attribute("llm.cost", llm_interaction.cost)
            current_span.set_attribute("analysis.severity", analysis_result.get("severity", "UNKNOWN"))
            
            return {
                "analysis": analysis_result,
                "llm_interaction": llm_interaction,
                "success": True
            }
            
        except Exception as e:
            processing_time = time.time() - start_time
            error_msg = str(e)
            
            current_span.record_exception(e)
            current_span.set_attribute("error.message", error_msg)
            
            # Record ‚ùå failed LLM interaction
            llm_interaction = LLMInteraction(
                interaction_id=str(uuid.uuid4()),
                trace_id=current_span.get_span_context().trace_id,
                span_id=current_span.get_span_context().span_id,
                model=AZURE_OPENAI_DEPLOYMENT_NAME,
                tokens_used=0,
                prompt_tokens=0,
                completion_tokens=0,
                temperature=0.3,
                max_tokens=800,
                prompt_hash=hashlib.md5(prompt.encode()).hexdigest(),
                response_length=0,
                processing_time=processing_time,
                cost=0.0,
                success=False,
                error_message=error_msg,
                timestamp=datetime.now()
            )
            
            return {
                "analysis": {"severity": "UNKNOWN", "error": error_msg},
                "llm_interaction": llm_interaction,
                "success": False
            }
    else:
        # Mock response when OpenAI client is not available
        processing_time = time.time() - start_time
        
        mock_analysis = {
            "severity": random.choice(SEVERITY_LEVELS),
            "exploitability": random.choice(["IMMEDIATE", "HIGH", "MEDIUM", "LOW", "NONE"]),
            "business_impact": f"Mock analysis for {target_system} - {test_type}",
            "vulnerabilities_found": [f"Mock vulnerability in {target_system}"],
            "recommendations": ["Mock recommendation 1", "Mock recommendation 2"],
            "timeline": random.choice(["IMMEDIATE", "1_WEEK", "1_MONTH", "QUARTERLY"]),
            "confidence": 0.85
        }
        
        llm_interaction = LLMInteraction(
            interaction_id=str(uuid.uuid4()),
            trace_id=current_span.get_span_context().trace_id,
            span_id=current_span.get_span_context().span_id,
            model="mock-model",
            tokens_used=random.randint(200, 800),
            prompt_tokens=random.randint(100, 400),
            completion_tokens=random.randint(100, 400),
            temperature=0.3,
            max_tokens=800,
            prompt_hash=hashlib.md5(prompt.encode()).hexdigest(),
            response_length=len(str(mock_analysis)),
            processing_time=processing_time,
            cost=random.uniform(0.01, 0.05),
            success=True,
            error_message="",
            timestamp=datetime.now()
        )
        
        current_span.set_attribute("analysis.severity", mock_analysis["severity"])
        current_span.set_attribute("analysis.mode", "mock")
        
        return {
            "analysis": mock_analysis,
            "llm_interaction": llm_interaction,
            "success": True
        }

print(" AI-powered security analysis functions ready!")

## 4. Security Test Simulation

Functions to generate realistic security test scenarios with authentic vulnerability findings.

In [None]:
def generate_realistic_scan_results(target: Dict[str, str], test_type: str) -> str:
    """Generate realistic vulnerability scan results"""
    
    vulnerability_templates = {
        "vulnerability_scan": [
            f"CVE-2024-{random.randint(1000, 9999)}: SQL Injection vulnerability in {target['name']}",
            f"CVE-2024-{random.randint(1000, 9999)}: Cross-Site Scripting (XSS) in web interface",
            f"Open port {random.randint(1000, 9999)} detected on {target['ip']}",
            f"Outdated software version detected: {random.choice(['Apache', 'Nginx', 'MySQL', 'PHP'])} {random.randint(1, 3)}.{random.randint(0, 9)}",
            f"Weak SSL/TLS configuration on {target['ip']}:443",
            f"Missing security headers in HTTP response",
            f"Directory traversal vulnerability detected",
            f"Weak password policy implementation"
        ],
        "penetration_test": [
            f"Successfully exploited buffer overflow in {target['name']}",
            f"Privilege escalation achieved on {target['ip']}",
            f"Unauthorized access to sensitive directory: /etc/passwd",
            f"Password brute force attack successful: admin/password123",
            f"Remote code execution via {random.choice(['RFI', 'LFI', 'Command Injection'])}",
            f"Session hijacking vulnerability exploited",
            f"Authentication bypass discovered",
            f"File upload vulnerability allows arbitrary code execution"
        ],
        "code_analysis": [
            f"SAST finding: Hardcoded credentials in {target['name']}/config.py",
            f"Insecure deserialization vulnerability detected",
            f"Missing input validation in API endpoint /api/users",
            f"Use of deprecated cryptographic functions",
            f"Insufficient error handling exposes stack traces",
            f"SQL injection in database query construction",
            f"Cross-site request forgery (CSRF) vulnerability",
            f"Insecure random number generation"
        ],
        "infrastructure_assessment": [
            f"Default credentials found on {target['ip']}",
            f"Unpatched system: {random.randint(15, 45)} critical updates missing",
            f"Network segmentation issue: {target['ip']} accessible from DMZ",
            f"Backup files exposed in web directory",
            f"Database server {target['ip']} allows anonymous connections",
            f"Firewall misconfiguration allows unauthorized access",
            f"Unencrypted data transmission detected",
            f"Weak access controls on administrative interfaces"
        ],
        "social_engineering": [
            f"Phishing campaign: {random.randint(15, 40)}% click rate",
            f"USB drop test: {random.randint(5, 25)}% insertion rate",
            f"Tailgating attempt successful at main entrance",
            f"Phone-based social engineering: Password reset successful",
            f"Pretexting attack: Obtained IT support credentials",
            f"Baiting attack with malicious USB drives",
            f"Watering hole attack targeting company website",
            f"Spear-phishing targeting executives"
        ],
        "web_application_test": [
            f"Cross-Site Scripting (XSS) vulnerability in {target['name']}",
            f"SQL injection in login form",
            f"Insecure direct object references",
            f"Session management flaws detected",
            f"Authentication bypass vulnerability",
            f"Cross-Site Request Forgery (CSRF) vulnerability",
            f"Insufficient input validation",
            f"Information disclosure through error messages"
        ],
        "network_security_test": [
            f"Open ports detected: {random.randint(20, 100)} services exposed",
            f"Weak network encryption protocols in use",
            f"Network sniffing reveals sensitive data",
            f"Man-in-the-middle attack successful",
            f"DNS spoofing vulnerability detected",
            f"Network segmentation bypass possible",
            f"Wireless security vulnerabilities found",
            f"Network device default credentials detected"
        ],
        "database_security_test": [
            f"Database user with excessive privileges",
            f"Unencrypted sensitive data in database",
            f"SQL injection vectors in stored procedures",
            f"Database backup files accessible",
            f"Weak database authentication mechanisms",
            f"Database audit logging disabled",
            f"Database version contains known vulnerabilities",
            f"Database connection string exposure"
        ],
        "mobile_security_test": [
            f"Mobile app stores sensitive data unencrypted",
            f"Insecure API endpoints in mobile application",
            f"Mobile app certificate pinning bypass",
            f"Hardcoded secrets in mobile application",
            f"Insecure data storage on mobile device",
            f"Mobile app authentication bypass",
            f"Mobile application reverse engineering possible",
            f"Insecure mobile communication protocols"
        ],
        "cloud_security_test": [
            f"Cloud storage bucket publicly accessible",
            f"IAM permissions overly permissive",
            f"Cloud configuration drift detected",
            f"Unencrypted cloud storage volumes",
            f"Cloud API keys exposed in source code",
            f"Cloud security group misconfiguration",
            f"Cloud logging and monitoring gaps",
            f"Cloud container vulnerabilities detected"
        ]
    }
    
    # Get the available templates for this test type
    available_templates = vulnerability_templates.get(test_type, [f"Generic security finding for {target['name']}"])
    
    # Ensure we don't try to sample more items than available
    max_findings = min(len(available_templates), 4)  # Maximum 4 findings
    num_findings = random.randint(1, max_findings)
    
    # Use random.sample safely
    if len(available_templates) >= num_findings:
        findings = random.sample(available_templates, num_findings)
    else:
        # If we somehow still have issues, just select all available and add generic ones
        findings = available_templates[:num_findings]
    
    return "\n".join([
        f"=== Security Scan Results for {target['name']} ({target['ip']}) ===",
        f"Scan Type: {test_type}",
        f"Scan Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
        f"Target: {target['type']}",
        "",
        "FINDINGS:",
        *[f"- {finding}" for finding in findings],
        "",
        f"Total Issues Found: {len(findings)}",
        f"Scan Status: COMPLETED"
    ])

@tracer.start_as_current_span("simulate_security_test")
def simulate_security_test(target: Dict[str, str], test_type: str, tester: Dict[str, str], environment: str) -> SecurityTest:
    """Simulate a complete security test with AI analysis"""
    current_span = trace.get_current_span()
    test_start = time.time()
    
    # Generate test ID and basic info
    test_id = str(uuid.uuid4())
    current_span.set_attribute("test.id", test_id)
    current_span.set_attribute("test.type", test_type)
    current_span.set_attribute("test.target", target['name'])
    current_span.set_attribute("test.tester", tester['name'])
    current_span.set_attribute("test.environment", environment)
    
    # Simulate test execution time
    execution_time = random.uniform(30, 300)  # 30 seconds to 5 minutes
    time.sleep(0.1)  # Brief pause for realism
    
    # Generate scan results
    scan_results = generate_realistic_scan_results(target, test_type)
    current_span.add_event("scan_completed", {"results_length": len(scan_results)})
    
    # Analyze results using AI
    with tracer.start_as_current_span("ai_analysis") as analysis_span:
        analysis_result = analyze_vulnerability_report(scan_results, target['name'], test_type)
        
        if analysis_result["success"]:
            analysis = analysis_result["analysis"]
            llm_interaction = analysis_result["llm_interaction"]
            
            # Store LLM interaction for cost tracking
            current_span.set_attribute("ai.tokens_used", llm_interaction.tokens_used)
            current_span.set_attribute("ai.cost", llm_interaction.cost)
            
            # Collect LLM interaction
            llm_interactions.append(llm_interaction)
        else:
            analysis = {"severity": "UNKNOWN", "error": "AI analysis failed"}
            llm_interaction = analysis_result["llm_interaction"]
            
            # Still collect failed interactions for tracking
            llm_interactions.append(llm_interaction)
    
    # Determine test status based on findings
    severity = analysis.get("severity", "MEDIUM")
    if severity in ["CRITICAL", "HIGH"]:
        status = random.choice(["FAILED", "VULNERABLE"])
    elif severity == "MEDIUM":
        status = random.choice(["FAILED", "VULNERABLE", "PASSED"])
    else:
        status = random.choice(["PASSED", "PASSED", "INCONCLUSIVE"])
    
    # Generate findings and recommendations
    findings = {
        "scan_results": scan_results,
        "ai_analysis": analysis,
        "risk_score": random.randint(1, 100),
        "cvss_score": round(random.uniform(0.1, 10.0), 1),
        "affected_assets": [target['name']],
        "evidence": f"Evidence collected during {test_type} on {target['name']}"
    }
    
    recommendations = analysis.get("recommendations", [
        f"Patch vulnerabilities found in {target['name']}",
        f"Review {test_type} findings and implement security controls",
        "Conduct follow-up testing after remediation"
    ])
    
    test_duration = time.time() - test_start
    
    # Create security test record
    security_test = SecurityTest(
        test_id=test_id,
        test_type=test_type,
        test_name=f"{test_type.replace('_', ' ').title()} - {target['name']}",
        target=target['name'],
        severity=severity,
        status=status,
        duration=test_duration,
        findings=findings,
        recommendations=recommendations,
        tester_info={
            "name": tester['name'],
            "role": tester['role'],
            "specialization": tester['specialization']
        },
        environment=environment,
        timestamp=datetime.now()
    )
    
    # Add final span attributes
    current_span.set_attribute("test.status", status)
    current_span.set_attribute("test.severity", severity)
    current_span.set_attribute("test.duration", test_duration)
    current_span.set_attribute("test.findings_count", len(findings))
    
    current_span.add_event("test_completed", {
        "status": status,
        "severity": severity,
        "duration": test_duration
    })
    
    return security_test

print(" Security test simulation functions ready!")

## 5. Data Export Functions

Functions to export data to ADX or save locally if ADX is unavailable.

In [None]:
# Storage for collected data
security_tests: List[SecurityTest] = []
llm_interactions: List[LLMInteraction] = []

def export_to_adx(security_tests: List[SecurityTest], llm_interactions: List[LLMInteraction]) -> bool:
    """Export collected data to Azure Data Explorer"""
    if not adx_ingest_client or not ADX_DATABASE_NAME:
        print(" ADX not configured - export skipped")
        return False
    
    try:
        import io
        
        # Convert security tests to JSON for ingestion
        security_data = []
        for test in security_tests:
            security_record = {
                "timestamp": test.timestamp.isoformat(),
                "traceId": test.test_id,
                "spanId": str(uuid.uuid4()),
                "testType": test.test_type,
                "testName": test.test_name,
                "target": test.target,
                "severity": test.severity,
                "status": test.status,
                "duration": f"00:00:{int(test.duration):02d}.{int((test.duration % 1) * 1000):03d}",
                "findings": test.findings,
                "recommendations": test.recommendations,
                "testerInfo": test.tester_info,
                "environment": test.environment
            }
            security_data.append(security_record)
        
        # Convert LLM interactions to JSON for ingestion
        llm_data = []
        for interaction in llm_interactions:
            llm_record = {
                "timestamp": interaction.timestamp.isoformat(),
                "traceId": str(interaction.trace_id),
                "spanId": str(interaction.span_id),
                "model": interaction.model,
                "tokensUsed": interaction.tokens_used,
                "promptTokens": interaction.prompt_tokens,
                "completionTokens": interaction.completion_tokens,
                "temperature": interaction.temperature,
                "maxTokens": interaction.max_tokens,
                "promptHash": interaction.prompt_hash,
                "responseLength": interaction.response_length,
                "processingTime": f"00:00:00.{int(interaction.processing_time * 1000):03d}",
                "cost": interaction.cost,
                "success": interaction.success,
                "errorMessage": interaction.error_message
            }
            llm_data.append(llm_record)
        
        # Define ingestion properties
        security_ingestion_props = IngestionProperties(
            database=ADX_DATABASE_NAME,
            table="SecurityTraces",
            data_format=DataFormat.JSON,
            ingestion_mapping_reference="SecurityTracesMapping"
        )
        
        llm_ingestion_props = IngestionProperties(
            database=ADX_DATABASE_NAME,
            table="LLMInteractions", 
            data_format=DataFormat.JSON,
            ingestion_mapping_reference="LLMInteractionsMapping"
        )
        
        # Ingest security test data using StringIO
        if security_data:
            security_json = "\n".join([json.dumps(record) for record in security_data])
            security_stream = io.StringIO(security_json)
            adx_ingest_client.ingest_from_stream(
                security_stream,
                ingestion_properties=security_ingestion_props
            )
            print(f" Exported {len(security_data)} security test records to ADX")
        
        # Ingest LLM interaction data using StringIO
        if llm_data:
            llm_json = "\n".join([json.dumps(record) for record in llm_data])
            llm_stream = io.StringIO(llm_json)
            adx_ingest_client.ingest_from_stream(
                llm_stream,
                ingestion_properties=llm_ingestion_props
            )
            print(f" Exported {len(llm_data)} LLM interaction records to ADX")
        
        return True
        
    except Exception as e:
        print(f" ‚ùå Error exporting to ADX: {e}")
        print(f" Full ‚ùå error details: {type(e).__name__}: {str(e)}")
        return False

def save_data_locally(security_tests: List[SecurityTest], llm_interactions: List[LLMInteraction]):
    """Save data locally as JSON files"""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    # Save security tests
    security_data = []
    for test in security_tests:
        security_data.append({
            "timestamp": test.timestamp.isoformat(),
            "test_id": test.test_id,
            "test_type": test.test_type,
            "test_name": test.test_name,
            "target": test.target,
            "severity": test.severity,
            "status": test.status,
            "duration": test.duration,
            "findings": test.findings,
            "recommendations": test.recommendations,
            "tester_info": test.tester_info,
            "environment": test.environment
        })
    
    with open(f"security_tests_{timestamp}.json", "w") as f:
        json.dump(security_data, f, indent=2, default=str)
    
    # Save LLM interactions
    llm_data = []
    for interaction in llm_interactions:
        llm_data.append({
            "timestamp": interaction.timestamp.isoformat(),
            "interaction_id": interaction.interaction_id,
            "trace_id": str(interaction.trace_id),
            "span_id": str(interaction.span_id),
            "model": interaction.model,
            "tokens_used": interaction.tokens_used,
            "prompt_tokens": interaction.prompt_tokens,
            "completion_tokens": interaction.completion_tokens,
            "temperature": interaction.temperature,
            "max_tokens": interaction.max_tokens,
            "prompt_hash": interaction.prompt_hash,
            "response_length": interaction.response_length,
            "processing_time": interaction.processing_time,
            "cost": interaction.cost,
            "success": interaction.success,
            "error_message": interaction.error_message
        })
    
    with open(f"llm_interactions_{timestamp}.json", "w") as f:
        json.dump(llm_data, f, indent=2, default=str)
    
    print(f" Data saved locally:")
    print(f"    security_tests_{timestamp}.json ({len(security_data)} records)")
    print(f"    llm_interactions_{timestamp}.json ({len(llm_data)} records)")

def collect_llm_interaction_from_result(analysis_result: Dict[str, Any]):
    """Helper function to collect LLM interactions from analysis results"""
    if "llm_interaction" in analysis_result:
        llm_interactions.append(analysis_result["llm_interaction"])

print(" Data export functions ready!")

## 6. Generate Security Test Data (Main Simulation)

**This is where the magic happens!** 

Run the cell below to generate 20 realistic security test scenarios with AI-powered analysis. The simulation will:

- ‚úÖ **Create diverse test scenarios** across 10 different security test types
- ‚úÖ **Analyze findings with AI** using your deployed models  
- ‚úÖ **Generate realistic vulnerabilities** and recommendations
- ‚úÖ **Track costs and token usage** for budget management
- ‚úÖ **Export to ADX** for advanced analytics

**Estimated time:** 2-3 minutes  
**What you'll see:** Progress updates every 10 tests

In [None]:
# Reset variables for clean simulation
security_tests = []
llm_interactions = []

# Configuration for test generation
NUM_TESTS = 20  # Generate 120 tests for variety
BATCH_SIZE = 10  # Process in batches for better progress tracking

print(f"üöÄ Starting comprehensive security testing simulation...")
print(f"üéØ Target: {NUM_TESTS} security tests")
print(f"üñ•Ô∏è  Targets: {len(TARGET_SYSTEMS)} systems")
print(f"üë• Testers: {len(SECURITY_TESTERS)} team members")
print(f"üîß Test types: {len(SECURITY_TEST_TYPES)} different types")
print("=" * 60)

# Track progress and metrics
start_time = time.time()
successful_tests = 0
failed_tests = 0
total_cost = 0.0
total_tokens = 0

for batch_num in range(0, NUM_TESTS, BATCH_SIZE):
    batch_end = min(batch_num + BATCH_SIZE, NUM_TESTS)
    batch_size = batch_end - batch_num
    
    print(f"\nüîÑ Processing batch {batch_num//BATCH_SIZE + 1}/{(NUM_TESTS-1)//BATCH_SIZE + 1} (Tests {batch_num+1}-{batch_end})")
    
    batch_start_time = time.time()
    
    for test_num in range(batch_num, batch_end):
        # Randomly select test parameters for diversity
        target = random.choice(TARGET_SYSTEMS)
        test_type = random.choice(SECURITY_TEST_TYPES)
        tester = random.choice(SECURITY_TESTERS)
        environment = random.choice(ENVIRONMENTS)
        
        # Weight test types based on tester specialization
        if tester['specialization'] in test_type:
            # Higher chance of using specialized test type
            if random.random() < 0.7:
                specialized_tests = [t for t in SECURITY_TEST_TYPES if tester['specialization'] in t]
                if specialized_tests:
                    test_type = random.choice(specialized_tests)
        
        try:
            # Run the security test simulation
            with tracer.start_as_current_span(f"security_test_batch_{batch_num//BATCH_SIZE + 1}"):
                security_test = simulate_security_test(target, test_type, tester, environment)
                security_tests.append(security_test)
                successful_tests += 1
                
                # Show progress every 10 tests
                if (test_num + 1) % 10 == 0:
                    print(f"   ‚úÖ Completed test {test_num + 1}: {test_type} on {target['name']} ({security_test.status})")
                
        except Exception as e:
            print(f"   ‚ùå Failed test {test_num + 1}: {e}")
            failed_tests += 1
    
    batch_duration = time.time() - batch_start_time
    print(f"  ‚è±Ô∏è  Batch completed in {batch_duration:.1f}s")
    
    # Small delay between batches to avoid overwhelming the system
    if batch_end < NUM_TESTS:
        time.sleep(0.5)

# Calculate summary statistics
total_duration = time.time() - start_time
total_cost = sum(interaction.cost for interaction in llm_interactions)
total_tokens = sum(interaction.tokens_used for interaction in llm_interactions)

print("\n" + "=" * 60)
print("üèÜ SECURITY TESTING SIMULATION COMPLETED!")
print("=" * 60)
print(f"üìä Summary Statistics:")
print(f"   Successful tests: ‚úÖ {successful_tests}")
print(f"   Failed tests: ‚ùå {failed_tests}")
print(f"   ‚è±Ô∏è  Total duration: {total_duration:.1f} seconds")
print(f"   üí∞ Total AI cost: ${total_cost:.4f}")
print(f"   üî¢ Total tokens used: {total_tokens:,}")
print(f"   ‚ö° Tests per second: {successful_tests/total_duration:.2f}")

# Analyze results by category
print(f"\nüìà Test Distribution:")
test_type_counts = {}
severity_counts = {}
status_counts = {}
environment_counts = {}

for test in security_tests:
    test_type_counts[test.test_type] = test_type_counts.get(test.test_type, 0) + 1
    severity_counts[test.severity] = severity_counts.get(test.severity, 0) + 1
    status_counts[test.status] = status_counts.get(test.status, 0) + 1
    environment_counts[test.environment] = environment_counts.get(test.environment, 0) + 1

print("\nBy Test Type:")
for test_type, count in sorted(test_type_counts.items()):
    print(f"   {test_type}: {count} tests")

print("\nBy Severity:")
for severity, count in sorted(severity_counts.items(), key=lambda x: ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"].index(x[0]) if x[0] in ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"] else 999):
    print(f"   {severity}: {count} tests")

print("\nBy Status:")
for status, count in sorted(status_counts.items()):
    print(f"   {status}: {count} tests")

print("\nBy Environment:")
for env, count in sorted(environment_counts.items()):
    print(f"   {env}: {count} tests")

print(f"\n‚úÖ Ready for ADX export: {len(security_tests)} security tests, {len(llm_interactions)} LLM interactions")

## 7. Export to Azure Data Explorer

The cell below exports all generated data to your ADX cluster for advanced analytics.

In [None]:
# Export data to ADX with fixed function implementation
print("üîÑ Attempting export to Azure Data Explorer with fixed function...")
print("=" * 50)

# First, implement the missing export_to_adx function
def export_to_adx(security_tests_data, llm_interactions_data):
    """
    Export security test data and LLM interactions to Azure Data Explorer
    Returns True if successful, False otherwise
    """
    try:
        # Check if ADX clients are available
        if not adx_client or not adx_ingest_client:
            print("‚ùå ADX clients not initialized - cannot export to ADX")
            return False
        
        if not ADX_DATABASE_NAME:
            print("‚ùå ADX database name not configured")
            return False
            
        print("üîÑ Starting ADX export...")
        
        # Convert security tests to JSON for ingestion
        security_data_json = []
        for test in security_tests_data:
            security_data_json.append({
                "TestId": test.test_id,
                "TimeStamp": test.timestamp.isoformat(),
                "TestType": test.test_type,
                "TestName": test.test_name,
                "Target": test.target,
                "Severity": test.severity,
                "Status": test.status,
                "Duration": test.duration,
                "Findings": test.findings,
                "Recommendations": test.recommendations,
                "TesterInfo": test.tester_info,
                "Environment": test.environment
            })
        
        # Convert LLM interactions to JSON for ingestion
        llm_data_json = []
        for interaction in llm_interactions_data:
            llm_data_json.append({
                "InteractionId": interaction.interaction_id,
                "TraceId": str(interaction.trace_id),
                "SpanId": str(interaction.span_id),
                "TimeStamp": interaction.timestamp.isoformat(),
                "Model": interaction.model,
                "TokensUsed": interaction.tokens_used,
                "PromptTokens": interaction.prompt_tokens,
                "CompletionTokens": interaction.completion_tokens,
                "Temperature": interaction.temperature,
                "MaxTokens": interaction.max_tokens,
                "PromptHash": interaction.prompt_hash,
                "ResponseLength": interaction.response_length,
                "ProcessingTime": interaction.processing_time,
                "Cost": interaction.cost,
                "Success": interaction.success,
                "ErrorMessage": interaction.error_message
            })
        
        # Ingest security tests data
        print(f"üìä Ingesting {len(security_data_json)} security test records...")
        
        # Use the already imported IngestionProperties and DataFormat
        security_props = IngestionProperties(
            database=ADX_DATABASE_NAME,
            table="SecurityTraces",
            data_format=DataFormat.JSON
        )
        
        # Convert to JSON string
        import json
        security_json_str = '\n'.join([json.dumps(record) for record in security_data_json])
        
        # Ingest security data using from_stream method
        from io import StringIO
        security_stream = StringIO(security_json_str)
        adx_ingest_client.ingest_from_stream(
            security_stream,
            ingestion_properties=security_props
        )
        
        # Ingest LLM interactions data
        print(f"ü§ñ Ingesting {len(llm_data_json)} LLM interaction records...")
        
        llm_props = IngestionProperties(
            database=ADX_DATABASE_NAME,
            table="LLMInteractions", 
            data_format=DataFormat.JSON
        )
        
        llm_json_str = '\n'.join([json.dumps(record) for record in llm_data_json])
        llm_stream = StringIO(llm_json_str)
        
        # Ingest LLM data using from_stream method
        adx_ingest_client.ingest_from_stream(
            llm_stream,
            ingestion_properties=llm_props
        )
        
        print("‚úÖ Data successfully submitted to ADX ingestion queue")
        return True
        
    except Exception as e:
        print(f"‚ùå ADX export failed: {e}")
        print(f"Error type: {type(e).__name__}")
        return False

# Verify we have data to export
print(f"üìä Data summary:")
print(f"   Security tests: {len(security_tests)}")
print(f"   LLM interactions: {len(llm_interactions)}")

if len(security_tests) == 0 or len(llm_interactions) == 0:
    print("‚ùå No data found to export. Please run the simulation cell first.")
else:
    print(f"‚úÖ Data is ready for export")
    
    # Attempt ADX export with error handling
    print("\nüîÑ Attempting ADX export...")
    try:
        export_success = export_to_adx(security_tests, llm_interactions)
        
        if export_success:
            print("\nüèÜ Data export to ADX completed successfully!")
            print("\nüîó Access your data:")
            if ADX_CLUSTER_URI:
                # Extract cluster name from URI
                cluster_name = ADX_CLUSTER_URI.replace("https://", "").split(".")[0]
                print(f"   ADX Web UI: https://dataexplorer.azure.com/clusters/{ADX_CLUSTER_URI.replace('https://', '')}/databases/{ADX_DATABASE_NAME}")
                print(f"   Database: {ADX_DATABASE_NAME}")
            print("   Tables: SecurityTraces, LLMInteractions")
            
            print("\n‚è±Ô∏è  Data ingestion note:")
            print("   ADX data may take 2-5 minutes to appear in queries")
            print("   Use 'SecurityTraces | count' to verify data arrival")
            
        else:
            print("\n‚ùå ADX export failed")
            print("üí° Please check ADX connection and configuration")
    
    except Exception as e:
        print(f"\n‚ùå Export function failed: {e}")
        print(f"Error type: {type(e).__name__}")
        print("üí° Please check ADX connection and configuration")

print("\nüöÄ Next Steps:")
print("1. üìä Query data using Azure Data Explorer Web UI")
print("2. üìà Create dashboards for security metrics")
print("3. üö® Set up alerts for critical findings")
print("4. üí∞ Monitor LLM costs and token usage")
print("5. üìÖ Analyze security trends over time")

# Create a summary report
print("\n" + "=" * 50)
print("üìã SECURITY TESTING SUMMARY REPORT")
print("=" * 50)

# Ensure we have data to analyze
if len(security_tests) > 0:
    # High-level metrics
    critical_high_tests = [t for t in security_tests if t.severity in ["CRITICAL", "HIGH"]]
    vulnerable_tests = [t for t in security_tests if t.status in ["FAILED", "VULNERABLE"]]
    
    if len(llm_interactions) > 0:
        total_cost = sum(interaction.cost for interaction in llm_interactions)
        total_tokens = sum(interaction.tokens_used for interaction in llm_interactions)
        cost_per_test = total_cost / len(security_tests)
        avg_tokens = total_tokens / len(security_tests)
    else:
        total_cost = 0
        total_tokens = 0
        cost_per_test = 0
        avg_tokens = 0

    print(f"üö® Critical/High Severity: {len(critical_high_tests)} tests ({len(critical_high_tests)/len(security_tests)*100:.1f}%)")
    print(f"‚ö†Ô∏è  Vulnerable Systems: {len(vulnerable_tests)} tests ({len(vulnerable_tests)/len(security_tests)*100:.1f}%)")
    print(f"üí∞ Average Cost per Test: ${cost_per_test:.4f}")
    print(f"üî¢ Average Tokens per Test: {avg_tokens:.0f}")

    # Most vulnerable targets
    target_vulnerability_counts = {}
    for test in security_tests:
        if test.status in ["FAILED", "VULNERABLE"]:
            target_vulnerability_counts[test.target] = target_vulnerability_counts.get(test.target, 0) + 1

    if target_vulnerability_counts:
        print("\nüéØ Most Vulnerable Targets:")
        for target, count in sorted(target_vulnerability_counts.items(), key=lambda x: x[1], reverse=True)[:5]:
            print(f"   {target}: {count} vulnerabilities")

    # Most expensive test types by AI cost
    test_type_costs = {}
    test_type_counts_for_cost = {}
    for interaction in llm_interactions:
        # Find corresponding test
        for test in security_tests:
            if str(interaction.trace_id) == test.test_id:
                test_type_costs[test.test_type] = test_type_costs.get(test.test_type, 0) + interaction.cost
                test_type_counts_for_cost[test.test_type] = test_type_counts_for_cost.get(test.test_type, 0) + 1
                break

    if test_type_costs:
        print("\nüí∏ Most Expensive Test Types (AI Analysis):")
        for test_type, total_cost_type in sorted(test_type_costs.items(), key=lambda x: x[1], reverse=True)[:5]:
            avg_cost = total_cost_type / test_type_counts_for_cost.get(test_type, 1)
            print(f"   {test_type}: ${total_cost_type:.4f} total (${avg_cost:.4f} avg)")

    print("\nüèÜ Security pen-testing simulation completed successfully!")
    print(f"‚úÖ Generated {len(security_tests)} comprehensive security test records")
    print(f"ü§ñ Performed {len(llm_interactions)} AI-powered security analyses")
else:
    print("‚ùå No security test data found to analyze.")
    print("üí° Make sure to run the simulation cell first.")

## 8.  Ready-to-Use Analytics Queries

**Your data is now in ADX! Use these 5 powerful KQL queries for immediate insights:**

1. **Security Vulnerability Dashboard** - Overview of all vulnerabilities
2. **Target System Risk Analysis** - Which systems are most at risk
3. **Security Tester Performance** - Team effectiveness metrics  
4. **LLM Cost Analysis** - AI usage and cost optimization
5. **Security Trends Over Time** - Trend analysis and patterns

**Run the cell below to execute all queries automatically** (if ADX is connected) or copy them to ADX Web UI.

In [None]:
# Define 5 comprehensive KQL queries for security analytics

kql_queries = {
    "1. Security Vulnerability Dashboard": """
// Security Vulnerability Overview Dashboard
SecurityTraces
| where TimeStamp >= ago(7d)
| summarize 
    TotalTests = count(),
    FailedTests = countif(Status in ("FAILED", "VULNERABLE")),
    PassedTests = countif(Status == "PASSED"), 
    CriticalIssues = countif(Severity == "CRITICAL"),
    HighIssues = countif(Severity == "HIGH"),
    MediumIssues = countif(Severity == "MEDIUM"),
    LowIssues = countif(Severity == "LOW"),
    UniqueTargets = dcount(Target),
    AverageTestDuration = avg(Duration)
    by TestType
| extend SuccessRate = round(PassedTests * 100.0 / TotalTests, 1)
| project TestType, TotalTests, FailedTests, CriticalIssues, HighIssues, SuccessRate, UniqueTargets, AverageTestDuration
| order by CriticalIssues desc, HighIssues desc
""",
    
    "2. Target System Risk Analysis": """
// Risk Analysis by Target System
SecurityTraces
| where TimeStamp >= ago(30d)
| summarize 
    TotalTests = count(),
    FailedTests = countif(Status in ("FAILED", "VULNERABLE")),
    CriticalIssues = countif(Severity == "CRITICAL"),
    HighIssues = countif(Severity == "HIGH"),
    MediumIssues = countif(Severity == "MEDIUM"),
    LowIssues = countif(Severity == "LOW"),
    TestTypes = make_set(TestType),
    Environments = make_set(Environment)
    by Target
| extend 
    RiskScore = CriticalIssues * 10 + HighIssues * 5 + MediumIssues * 2 + LowIssues,
    FailureRate = round(FailedTests * 100.0 / TotalTests, 1)
| project Target, TotalTests, FailedTests, FailureRate, CriticalIssues, HighIssues, MediumIssues, LowIssues, RiskScore, TestTypes, Environments
| order by RiskScore desc, FailureRate desc
""",
    
    "3. Security Tester Performance Metrics": """
// Security Team Performance Analysis
SecurityTraces
| where TimeStamp >= ago(30d)
| extend TesterName = tostring(TesterInfo.name)
| summarize 
    TestsCompleted = count(),
    VulnerabilitiesFound = countif(Status in ("FAILED", "VULNERABLE")),
    CriticalFindings = countif(Severity == "CRITICAL"),
    HighFindings = countif(Severity == "HIGH"),
    AvgTestDuration = avg(Duration),
    UniqueTargets = dcount(Target),
    TestTypesSpecialty = make_set(TestType)
    by TesterName
| extend 
    EfficiencyScore = round(VulnerabilitiesFound * 100.0 / TestsCompleted, 1),
    ProductivityScore = round(TestsCompleted / AvgTestDuration, 2)
| project TesterName, TestsCompleted, VulnerabilitiesFound, EfficiencyScore,
         CriticalFindings, ProductivityScore, UniqueTargets, TestTypesSpecialty
| order by EfficiencyScore desc
""",
    
    "4. LLM Cost and Token Usage Analysis": """
// AI/LLM Cost Analysis and Optimization
LLMInteractions
| join kind=inner SecurityTraces on $left.TraceId == $right.TraceId
| where TimeStamp >= ago(7d)
| summarize 
    TotalInteractions = count(),
    TotalCost = sum(Cost),
    TotalTokens = sum(TokensUsed),
    AvgTokensPerCall = avg(TokensUsed),
    AvgCostPerCall = avg(Cost),
    SuccessfulCalls = countif(Success == true),
    FailedCalls = countif(Success == false)
    by TestType, Model
| extend 
    SuccessRate = round(SuccessfulCalls * 100.0 / TotalInteractions, 1),
    CostPerToken = round(TotalCost / TotalTokens, 6)
| project TestType, Model, TotalInteractions, TotalCost, TotalTokens, 
         AvgTokensPerCall, AvgCostPerCall, CostPerToken, SuccessRate
| order by TotalCost desc
""",
    
    "5. Time-based Security Trends": """
// Security Testing Trends Over Time
SecurityTraces
| where TimeStamp >= ago(30d)
| summarize 
    TestsRun = count(),
    VulnerabilitiesFound = countif(Status in ("FAILED", "VULNERABLE")),
    CriticalFindings = countif(Severity == "CRITICAL"),
    HighFindings = countif(Severity == "HIGH"),
    AvgTestDuration = avg(Duration),
    UniqueTargets = dcount(Target)
    by bin(TimeStamp, 1d)
| extend 
    VulnerabilityRate = round(VulnerabilitiesFound * 100.0 / TestsRun, 1)
| project TimeStamp, TestsRun, VulnerabilitiesFound, VulnerabilityRate, 
         CriticalFindings, HighFindings, UniqueTargets
| order by TimeStamp desc
"""
}

# Function to execute KQL queries
def execute_kql_query(query_name, query):
    """Execute a KQL query and return results as DataFrame"""
    try:
        if adx_client:
            result = adx_client.execute(ADX_DATABASE_NAME, query)
            # Handle both V1 and V2 response formats
            if hasattr(result, 'to_dataframe'):
                # V1 format - direct to_dataframe method
                df = result.to_dataframe()
            elif hasattr(result, 'primary_results') and len(result.primary_results) > 0:
                # V2 format - access primary results table
                table = result.primary_results[0]
                if hasattr(table, 'to_dataframe'):
                    df = table.to_dataframe()
                else:
                    # Manual conversion from KustoResultTable
                    import pandas as pd
                    df = pd.DataFrame(table.raw_rows, columns=[col.column_name for col in table.columns])
            else:
                # Fallback for other response formats
                df = pd.DataFrame()
            return df
        else:
            print("ADX client not available")
            return pd.DataFrame()
    except Exception as e:
        print(f"Query execution error: {e}")
        return pd.DataFrame()

print("üîç Executing Security Analytics Queries on ADX")
print("="*50)

query_results = {}

for query_name, query in kql_queries.items():
    print(f"\nüîç {query_name}")
    print("-" * 40)
    print("KQL Query:")
    print("```kusto")
    print(query)
    print("```")
    
    if adx_client:
        try:
            df = execute_kql_query(query_name, query)
            if not df.empty:
                print("\nResults:")
                print(df.to_string(index=False, max_rows=10))
                query_results[query_name] = df
            else:
                print("No results returned (data may still be ingesting)")
        except Exception as e:
            print(f"Query execution error: {e}")
    else:
        print("\n‚ùå ADX not configured - query ready for execution in ADX Web UI")
    
    print("\n" + "="*50)

# Summary of query execution
if query_results:
    print(f"\n‚úÖ Successfully executed {len(query_results)} queries!")
    print("\nüìä Query Results Summary:")
    for query_name, df in query_results.items():
        print(f"   {query_name}: {len(df)} rows")
else:
    print("\n‚ùå Queries are ready for execution in Azure Data Explorer Web UI")
    print("\nüìã To run these queries:")
    print("1. Open Azure Data Explorer Web UI")
    print("2. Connect to your cluster")
    print("3. Select the TracingDB database") 
    print("4. Copy and paste the KQL queries above")

## 9. Summary and Next Steps

### üèÜ What We Just Accomplished!

**‚úÖ Simple 3-Step Process Completed:**
1. **Deployed Resources** ‚Üí `./deploy-adx-complete.sh` created everything  
2. **Loaded Environment** ‚Üí `source ../../.env` configured settings
3. **Ran Notebook** ‚Üí Generated 120+ realistic security tests with AI analysis

**üìä Generated Comprehensive Security Dataset:**
- ‚úÖ **120 realistic security test scenarios** across multiple test types
- ‚úÖ **AI-powered vulnerability analysis** with risk scores and recommendations  
- ‚úÖ **Complete cost tracking** for LLM usage optimization
- ‚úÖ **Exported to ADX** for enterprise-scale analytics
- ‚úÖ **5 ready-to-use KQL queries** for immediate insights

### üéØ Immediate Benefits

1. **üìà Data-Driven Security** - Quantify your security posture with real metrics
2. **üí∞ Cost Optimization** - Track and optimize AI usage costs  
3. **üéØ Risk Prioritization** - Identify which systems need attention first
4. **üë• Team Performance** - Measure security team effectiveness
5. **üìã Executive Reporting** - Generate professional security reports

### üöÄ Next Steps (Choose Your Path)

#### **‚ö° Quick Wins (5 minutes)**
Access your ADX cluster using the URL shown in the cell above (dynamically generated based on your deployment).

#### **üìä Business Intelligence (30 minutes)**  
- Create Power BI dashboards using ADX as data source
- Set up automated weekly security reports
- Configure cost monitoring alerts

#### **üè≠ Production Integration (1-2 hours)**
- Integrate with real security tools (Nessus, Burp Suite, etc.)
- Set up automated vulnerability scanning workflows  
- Connect to your SIEM/SOAR platform

#### **üß† Advanced Analytics (2-4 hours)**
- Implement ML-based anomaly detection
- Create predictive security models
- Build custom security metrics dashboards

### üí° Pro Tips

**? Security Best Practices:**
   - Set up Azure AD authentication for ADX access
   - Use managed identities for service connections
   - Implement data retention policies for compliance

**üìà Performance Optimization:**
   - Use materialized views for frequent queries
   - Set up data partitioning for large datasets
   - Configure auto-scaling for cluster capacity

**üí∞ Cost Management:**
   - Monitor query costs with .show queries pricing
   - Use query optimization techniques
   - Set up budget alerts for unexpected usage

### üéØ Your Production-Ready Security Analytics Pipeline

**üè¢ Enterprise Features Available:**
   - Automated pen-testing workflows
   - Real-time security dashboards  
   - Cost-optimized AI analysis
   - Compliance reporting templates
   - Executive-ready insights

**üì± Access Your Data:**
   - ADX Web UI: Use the URL generated above
   - Azure AI Foundry: `https://ai.azure.com`

2. **üí∞ Monitor Costs:**
   - Use the LLM cost analysis query regularly
   - Set budget alerts in Azure Cost Management
   - Optimize prompts to reduce token usage

3. **üîÑ Keep Data Fresh:**
   - Re-run this notebook weekly for trend analysis
   - Integrate with CI/CD for automated security testing
   - Schedule regular security assessment cycles


**From Zero to Production Security Analytics in 3 Simple Steps** ‚ú®

In [None]:
# üèÜ SECURITY ANALYTICS READY!
print("="*60)
print("üéâ SECURITY PEN-TESTING TRACING COMPLETED!")
print("="*60)

print(f"\nüìä What You Generated:")
print(f"   Security Tests: {len(security_tests)}")
print(f"   AI Analyses: {len(llm_interactions)}")
if llm_interactions:
    total_cost = sum(i.cost for i in llm_interactions)
    total_tokens = sum(i.tokens_used for i in llm_interactions)
    print(f"   Total AI Cost: ${total_cost:.4f}")
    print(f"   Total Tokens: {total_tokens:,}")

print(f"\nüîó Your Resources:")
if ADX_CLUSTER_URI:
    # Fix: Dynamically construct the correct ADX Web UI URL
    adx_web_url = f"https://dataexplorer.azure.com/clusters/{ADX_CLUSTER_URI.replace('https://', '')}/databases/{ADX_DATABASE_NAME}"
    print(f"   ADX Web UI: {adx_web_url}")
    print(f"   Database: {ADX_DATABASE_NAME}")
print(f"   Azure AI Foundry: https://ai.azure.com")

# Show local files if any were created
import os
json_files = [f for f in os.listdir('.') if f.endswith('.json') and ('security_tests' in f or 'llm_interactions' in f)]
if json_files:
    print(f"\nüíæ Local Backup Files:")
    for file in sorted(json_files)[-2:]:  # Show last 2 files
        if os.path.exists(file):
            print(f"   {file}")

print(f"\nüöÄ Next Steps:")
print(f"  1. üìä Open ADX Web UI and explore your data")
print(f"  2. üìà Use the KQL queries from section 8")
print(f"  3. üìã Create dashboards and reports")
print(f"  4. üîÑ Re-run this notebook weekly for trends")

print(f"‚ú® From zero to production in just 3 simple steps!")