# Threat Hunting Query Generation - Practical Examples

This notebook demonstrates the threat hunting query generator with real-world examples.

## Setup

Make sure the backend API is running on `http://localhost:8000` and Ollama is running with llama3.2 model.

In [None]:
import requests
import json
import pandas as pd
from datetime import datetime

# Configuration
API_BASE_URL = "http://localhost:8000/api"

def generate_query(description, query_type='spl', include_mitre=True):
    """Generate a threat hunting query"""
    payload = {
        'description': description,
        'query_type': query_type,
        'include_mitre': include_mitre
    }
    
    response = requests.post(f"{API_BASE_URL}/generate-query", json=payload)
    
    if response.status_code == 200:
        return response.json()
    else:
        return {'error': f"HTTP {response.status_code}: {response.text}"}

def print_result(result):
    """Pretty print query generation result"""
    if 'error' in result:
        print(f"‚ùå Error: {result['error']}")
        return
    
    print("\n" + "="*80)
    print("‚úÖ QUERY GENERATED SUCCESSFULLY")
    print("="*80)
    
    print(f"\nüìù Query Type: {result.get('query_type', 'N/A').upper()}")
    print(f"\nüîç Generated Query:")
    print("-" * 80)
    print(result.get('query', 'N/A'))
    print("-" * 80)
    
    if result.get('explanation'):
        print(f"\nüí° Explanation:")
        print(result['explanation'])
    
    if result.get('mitre_technique'):
        technique = result['mitre_technique']
        print(f"\nüéØ MITRE ATT&CK Technique:")
        print(f"  ID: {technique.get('id', 'N/A')}")
        print(f"  Name: {technique.get('name', 'N/A')}")
        print(f"  Description: {technique.get('description', 'N/A')}")
    
    if result.get('validation_result'):
        val = result['validation_result']
        status = "‚úÖ Valid" if val.get('valid') else "‚ùå Invalid"
        print(f"\n{status}")
        
        if val.get('warnings'):
            print("\n‚ö†Ô∏è  Warnings:")
            for warning in val['warnings']:
                print(f"  - {warning}")
        
        if val.get('optimization_suggestions'):
            print("\nüí° Optimization Suggestions:")
            for suggestion in val['optimization_suggestions']:
                print(f"  - {suggestion}")
    
    print("\n" + "="*80 + "\n")

## Example 1: Brute Force Detection (Splunk SPL)

In [None]:
description = "Find all failed login attempts from external IP addresses in the last 24 hours, focusing on brute force attacks"

result = generate_query(description, query_type='spl')
print_result(result)

## Example 2: Ransomware Detection (KQL)

In [None]:
description = "Detect ransomware activity by looking for mass file encryption events and shadow copy deletion"

result = generate_query(description, query_type='kql')
print_result(result)

## Example 3: Data Exfiltration (Elasticsearch DSL)

In [None]:
description = "Identify potential data exfiltration by detecting large outbound data transfers to external cloud storage services"

result = generate_query(description, query_type='dsl')
print_result(result)

## Example 4: Lateral Movement (Splunk SPL)

In [None]:
description = "Hunt for lateral movement using RDP and SMB connections between internal hosts"

result = generate_query(description, query_type='spl')
print_result(result)

## Example 5: PowerShell Exploitation (KQL)

In [None]:
description = "Find suspicious PowerShell commands with encoded payloads or base64 encoding"

result = generate_query(description, query_type='kql')
print_result(result)

## Example 6: Credential Dumping (Splunk SPL)

In [None]:
description = "Detect credential dumping attempts using tools like Mimikatz through LSASS access patterns"

result = generate_query(description, query_type='spl')
print_result(result)

## Batch Testing Multiple Scenarios

In [None]:
# Define test scenarios
test_scenarios = [
    {
        'name': 'Port Scanning',
        'description': 'Identify port scanning activity from internal hosts',
        'query_type': 'spl'
    },
    {
        'name': 'DNS Tunneling',
        'description': 'Detect DNS tunneling for command and control communications',
        'query_type': 'kql'
    },
    {
        'name': 'Admin Account Abuse',
        'description': 'Find unauthorized use of administrative accounts',
        'query_type': 'dsl'
    },
    {
        'name': 'Web Shell Detection',
        'description': 'Hunt for web shells in web server directories',
        'query_type': 'spl'
    }
]

# Run all scenarios
results = []
for scenario in test_scenarios:
    print(f"\nüî¨ Testing: {scenario['name']}")
    result = generate_query(scenario['description'], scenario['query_type'])
    
    results.append({
        'scenario': scenario['name'],
        'query_type': scenario['query_type'],
        'success': 'query' in result,
        'valid': result.get('validation_result', {}).get('valid', False),
        'mitre_mapped': bool(result.get('mitre_technique'))
    })
    
    print_result(result)

# Create summary DataFrame
df = pd.DataFrame(results)
print("\nüìä BATCH TEST SUMMARY")
print("="*80)
print(df)
print("\n")
print(f"Success Rate: {(df['success'].sum() / len(df)) * 100:.1f}%")
print(f"Validation Rate: {(df['valid'].sum() / len(df)) * 100:.1f}%")
print(f"MITRE Mapping Rate: {(df['mitre_mapped'].sum() / len(df)) * 100:.1f}%")

## Testing All Query Types

In [None]:
description = "Detect suspicious processes spawned by Microsoft Office applications"

print("\nüî¨ Testing same threat across all query types...\n")

for qtype in ['spl', 'kql', 'dsl']:
    print(f"\n{'='*80}")
    print(f"Testing {qtype.upper()}")
    print(f"{'='*80}")
    result = generate_query(description, query_type=qtype)
    print_result(result)

## Check MITRE ATT&CK Techniques

In [None]:
# Get all MITRE techniques from API
response = requests.get(f"{API_BASE_URL}/mitre-techniques")

if response.status_code == 200:
    data = response.json()
    techniques = data.get('techniques', [])
    
    print(f"\nüìö Available MITRE ATT&CK Techniques: {len(techniques)}\n")
    
    # Create DataFrame for better visualization
    df_techniques = pd.DataFrame(techniques)
    print(df_techniques[['id', 'name', 'tactic']].head(20))
    
    # Group by tactic
    if 'tactic' in df_techniques.columns:
        tactic_counts = df_techniques['tactic'].value_counts()
        print(f"\n\nüìä Techniques by Tactic:\n")
        print(tactic_counts)
else:
    print(f"Error: {response.status_code}")

## Conclusion

This notebook demonstrates:
- Query generation for multiple SIEM platforms (Splunk, Azure Sentinel, Elasticsearch)
- MITRE ATT&CK technique mapping
- Query validation and optimization
- Real-world threat hunting scenarios

## Next Steps

1. Test generated queries against actual SIEM platforms
2. Collect analyst feedback on query accuracy
3. Fine-tune the LLM prompts based on results
4. Build a library of validated queries for common threats