# STIG to Ansible Prompt Engineering Workflow

This notebook allows manual iteration through the 7-step workflow for converting STIG findings to Ansible playbooks.

You can:
- Test each prompt individually
- Edit prompts in the `prompts/` directory and reload them
- See the intermediate results at each step
- Manually adjust the workflow state between steps

In [None]:
# Import required libraries
import os
import sys
import json
import yaml
import re
from pathlib import Path
from string import Template
from typing import Dict, Any, List, Optional
from datetime import datetime
import asyncio
from IPython.display import display, Markdown, JSON

# Add src to path
sys.path.insert(0, '../src')

# Import our LLM interface
from llm_interface import LLMInterface

# Initialize LLM
llm = LLMInterface()
print(f"LLM initialized: {llm.model_name}")

In [None]:
# Helper functions

def load_prompt(prompt_name: str) -> Dict[str, Any]:
    """Load a prompt from the prompts directory"""
    prompt_path = Path(f"../prompts/{prompt_name}.yaml")
    with open(prompt_path, 'r') as f:
        return yaml.safe_load(f)

def format_prompt(prompt_data: Dict[str, Any], **kwargs) -> str:
    """Format a prompt template with provided variables"""
    template_str = prompt_data['template']
    template = Template(template_str)
    return template.safe_substitute(**kwargs)

def extract_json_from_response(response: str) -> Optional[Dict[str, Any]]:
    """Extract JSON from LLM response using regex"""
    # Remove any markdown code blocks
    response = re.sub(r'```json\s*', '', response)
    response = re.sub(r'```\s*$', '', response)
    
    # Find JSON pattern
    json_pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}'
    matches = re.findall(json_pattern, response)
    
    # Try each match to see if it's valid JSON
    for match in matches:
        try:
            return json.loads(match)
        except json.JSONDecodeError:
            continue
    
    return None

async def llm_call_with_json(prompt: str, expected_keys: List[str], max_tokens: int = 100) -> Dict[str, Any]:
    """Make LLM call and extract JSON response"""
    response = await llm.generate_ansible_task_async(
        prompt=prompt,
        max_tokens=max_tokens
    )
    print(f"\nRaw LLM Response:\n{response}\n")
    
    json_data = extract_json_from_response(response)
    if json_data:
        print(f"Extracted JSON: {json.dumps(json_data, indent=2)}")
        return json_data
    else:
        print("Failed to extract JSON from response")
        return {key: "unknown" for key in expected_keys}

# Display helper
def display_prompt(prompt_text: str):
    """Display a prompt nicely formatted"""
    display(Markdown(f"### Prompt:\n```\n{prompt_text}\n```"))


In [None]:
# Load a test finding from the findings directory
findings_file = "../findings/node2.example.com-STIG-20250710162433_findings.json"

with open(findings_file, 'r') as f:
    findings_data = json.load(f)

# Extract the findings array from the JSON structure
all_findings = findings_data.get('findings', [])

print(f"Loaded {len(all_findings)} findings from the file")

# Select a test finding - let's use the telnet one we've been testing with
test_finding = None
for finding in all_findings:
    if finding.get('rule_id') == 'xccdf_org.ssgproject.content_rule_package_telnet_removed':
        test_finding = finding
        break

# If not found, just use the first one
if not test_finding:
    test_finding = all_findings[0] if all_findings else {}

if test_finding:
    print(f"Selected test finding: {test_finding.get('rule_id', 'Unknown')}")
    print(f"Title: {test_finding.get('title', 'No title')}")
    print(f"Severity: {test_finding.get('severity', 'Unknown')}")
    print(f"\nDescription: {test_finding.get('description', 'No description')[:200]}...")
    print(f"\nFix Text: {test_finding.get('fix_text', 'No fix text')[:200]}...")
else:
    print("No findings found in the file")

In [None]:
# Initialize workflow state
state = {
    'finding': test_finding,
    'action_type': None,
    'target': None,
    'parameters': None,
    'task_name': None,
    'final_playbook': None,
    'validation_result': None,
    'annotated_playbook': None,
    'errors': [],
    'metadata': {
        'workflow_start': datetime.now().isoformat(),
        'rule_id': test_finding.get('rule_id', 'unknown') if test_finding else 'unknown'
    }
}

print("Workflow state initialized")
if test_finding:
    print(f"Working with finding: {state['finding'].get('rule_id', 'unknown')}")
else:
    print("No finding selected - workflow may not work properly")

## Step 1: Extract Action Type

This step extracts the primary action type from the STIG finding (e.g., remove_package, install_package, configure_file, etc.)

In [None]:
# Step 1: Extract Action Type
prompt_data = load_prompt('extract_action')
print(f"Prompt: {prompt_data['name']}")
print(f"Description: {prompt_data['description']}")

# Format the prompt with finding data
prompt = format_prompt(
    prompt_data,
    title=state['finding'].get('title', ''),
    description=state['finding'].get('description', ''),
    fix_text=state['finding'].get('fix_text', '')
)

display_prompt(prompt)

# Make the LLM call
result = await llm_call_with_json(prompt, ['action_type'], max_tokens=50)

# Update state
state['action_type'] = result.get('action_type', 'unknown')
state['metadata']['step1_complete'] = datetime.now().isoformat()

print(f"\n✅ Extracted action type: {state['action_type']}")

## Step 2: Extract Target

This step extracts the target of the action (e.g., package name, file path, service name)

In [None]:
# Step 2: Extract Target
prompt_data = load_prompt('extract_target')
print(f"Prompt: {prompt_data['name']}")
print(f"Description: {prompt_data['description']}")

# Format the prompt with finding data and previous step result
prompt = format_prompt(
    prompt_data,
    title=state['finding'].get('title', ''),
    fix_text=state['finding'].get('fix_text', ''),
    action_type=state.get('action_type', 'other')
)

display_prompt(prompt)

# Make the LLM call
result = await llm_call_with_json(prompt, ['target'], max_tokens=50)

# Update state
state['target'] = result.get('target', 'unknown')
state['metadata']['step2_complete'] = datetime.now().isoformat()

print(f"\n✅ Extracted target: {state['target']}")

## Step 3: Extract Parameters

This step extracts any parameters needed for the action (e.g., state: absent, mode: 0644)

In [None]:
# Step 3: Extract Parameters
prompt_data = load_prompt('extract_parameters')
print(f"Prompt: {prompt_data['name']}")
print(f"Description: {prompt_data['description']}")

# Format the prompt with finding data and previous step results
prompt = format_prompt(
    prompt_data,
    title=state['finding'].get('title', ''),
    fix_text=state['finding'].get('fix_text', ''),
    action_type=state.get('action_type', 'other'),
    target=state.get('target', 'unknown')
)

display_prompt(prompt)

# Make the LLM call
result = await llm_call_with_json(prompt, ['parameter'], max_tokens=50)

# Update state
state['parameters'] = result.get('parameter', 'default')
state['metadata']['step3_complete'] = datetime.now().isoformat()

print(f"\n✅ Extracted parameters: {state['parameters']}")

## Step 4: Generate Task Name

This step generates a descriptive task name for the Ansible task

In [None]:
# Step 4: Generate Task Name
prompt_data = load_prompt('generate_task_name')
print(f"Prompt: {prompt_data['name']}")
print(f"Description: {prompt_data['description']}")

# Format the prompt with all extracted data
prompt = format_prompt(
    prompt_data,
    rule_id=state['finding'].get('rule_id', ''),
    action_type=state.get('action_type', 'other'),
    target=state.get('target', 'unknown'),
    severity=state['finding'].get('severity', 'medium')
)

display_prompt(prompt)

# Make the LLM call
result = await llm_call_with_json(prompt, ['task_name'], max_tokens=100)

# Update state
state['task_name'] = result.get('task_name', f"STIG Task: {state.get('target', 'unknown')}")
state['metadata']['step4_complete'] = datetime.now().isoformat()

print(f"\n✅ Generated task name: {state['task_name']}")

## Review Extracted Components

Let's review what we've extracted so far before assembling the playbook

In [None]:
# Review extracted components
print("🔍 Extracted Components Summary:\n")
print(f"1. Action Type: {state['action_type']}")
print(f"2. Target: {state['target']}")
print(f"3. Parameters: {state['parameters']}")
print(f"4. Task Name: {state['task_name']}")
print(f"\nRule ID: {state['finding'].get('rule_id', 'unknown')}")
print(f"Severity: {state['finding'].get('severity', 'unknown')}")

# You can manually adjust these values here if needed
# state['action_type'] = 'remove_package'
# state['target'] = 'telnet'
# state['parameters'] = 'absent'

## Step 5: Assemble Playbook

This step combines all extracted components into a complete Ansible playbook

In [None]:
# Step 5: Assemble Playbook
prompt_data = load_prompt('assemble_playbook')
print(f"Prompt: {prompt_data['name']}")
print(f"Description: {prompt_data['description']}")

# Load the template for reference (if available)
template_content = ""
try:
    with open('../examples/ansible_playbook_template.yaml', 'r') as f:
        template_content = f.read()
    print("✅ Loaded Ansible template for reference")
except Exception as e:
    print(f"⚠️ Could not load template: {e}")

# Format the prompt with all components
prompt = format_prompt(
    prompt_data,
    task_name=state.get('task_name', 'STIG Task'),
    action_type=state.get('action_type', 'other'),
    target=state.get('target', 'unknown'),
    parameters=state.get('parameters', 'default'),
    rule_id=state['finding'].get('rule_id', ''),
    severity=state['finding'].get('severity', 'medium'),
    template_content=template_content
)

# Show a truncated version of the prompt (since template is long)
display_prompt(prompt[:1000] + "\n...\n[Template content truncated]...")

# Make the LLM call
result = await llm_call_with_json(prompt, ['playbook'], max_tokens=500)

# Extract and clean the playbook
playbook = result.get('playbook', '')

# Remove markdown formatting if present
if playbook.startswith('```yaml'):
    playbook = playbook[7:]
if playbook.endswith('```'):
    playbook = playbook[:-3]

# Update state
state['final_playbook'] = playbook
state['metadata']['step5_complete'] = datetime.now().isoformat()

print(f"\n✅ Assembled playbook:")
print("\n" + "="*60)
print(playbook)
print("="*60)

## Step 6: Validate and Fix Playbook

This step validates the generated playbook and fixes any issues

In [None]:
# Step 6: Validate and Fix Playbook
prompt_data = load_prompt('validate_and_fix_playbook')
print(f"Prompt: {prompt_data['name']}")
print(f"Description: {prompt_data['description']}")

# Format the prompt with playbook and template
prompt = format_prompt(
    prompt_data,
    playbook_content=state['final_playbook'],
    template_content=template_content
)

# Show a truncated version
display_prompt(prompt[:1000] + "\n...\n[Content truncated]...")

# Make the LLM call
result = await llm_call_with_json(
    prompt, 
    ['is_valid', 'issues_found', 'fixes_applied', 'fixed_playbook', 'suggestions'], 
    max_tokens=800
)

# Update state
state['validation_result'] = result

# If fixes were applied, update the playbook
if result.get('fixes_applied') and result.get('fixed_playbook'):
    state['final_playbook'] = result['fixed_playbook']
    print(f"\n✅ Applied {len(result['fixes_applied'])} fixes")
else:
    print("\n✅ No fixes needed")

state['metadata']['step6_complete'] = datetime.now().isoformat()

# Display validation results
print(f"\nValidation Results:")
print(f"- Valid: {result.get('is_valid', False)}")
print(f"- Issues Found: {len(result.get('issues_found', []))}")
if result.get('issues_found'):
    for issue in result['issues_found']:
        print(f"  • {issue}")
print(f"- Fixes Applied: {len(result.get('fixes_applied', []))}")
if result.get('fixes_applied'):
    for fix in result['fixes_applied']:
        print(f"  • {fix}")
print(f"- Suggestions: {len(result.get('suggestions', []))}")
if result.get('suggestions'):
    for suggestion in result['suggestions']:
        print(f"  • {suggestion}")

print("\n" + "="*60)
print("Final Validated Playbook:")
print(state['final_playbook'])
print("="*60)

## Step 7: Annotate Playbook

This step adds documentation and comments to the validated playbook

In [None]:
# Step 7: Annotate Playbook
prompt_data = load_prompt('annotate_playbook')
print(f"Prompt: {prompt_data['name']}")
print(f"Description: {prompt_data['description']}")

# Format the prompt with playbook and finding details
prompt = format_prompt(
    prompt_data,
    playbook_content=state['final_playbook'],
    rule_id=state['finding'].get('rule_id', ''),
    title=state['finding'].get('title', ''),
    severity=state['finding'].get('severity', ''),
    description=state['finding'].get('description', ''),
    check_text=state['finding'].get('check_text', ''),
    fix_text=state['finding'].get('fix_text', ''),
    references=', '.join(state['finding'].get('references', []))
)

display_prompt(prompt[:800] + "\n...\n[Content truncated]...")

# Make the LLM call
result = await llm_call_with_json(prompt, ['annotated_playbook'], max_tokens=600)

# Update state
annotated_playbook = result.get('annotated_playbook', '')
if annotated_playbook:
    state['annotated_playbook'] = annotated_playbook
    print("\n✅ Successfully annotated playbook")
else:
    state['annotated_playbook'] = state['final_playbook']
    print("\n⚠️ No annotation returned, keeping original")

state['metadata']['step7_complete'] = datetime.now().isoformat()

print("\n" + "="*60)
print("Final Annotated Playbook:")
print(state['annotated_playbook'])
print("="*60)

## Workflow Summary

Review the complete workflow results

In [None]:
# Workflow Summary
print("🎯 Workflow Summary\n" + "="*60)
print(f"\nFinding: {state['finding'].get('rule_id', 'unknown')}")
print(f"Title: {state['finding']['title']}")
print(f"Severity: {state['finding'].get('severity', 'unknown')}")

print("\n📊 Extracted Components:")
print(f"  1. Action Type: {state['action_type']}")
print(f"  2. Target: {state['target']}")
print(f"  3. Parameters: {state['parameters']}")
print(f"  4. Task Name: {state['task_name']}")

print("\n✅ Workflow Steps Completed:")
for i in range(1, 8):
    key = f'step{i}_complete'
    if key in state['metadata']:
        print(f"  Step {i}: ✓ ({state['metadata'][key]})")
    else:
        print(f"  Step {i}: ✗")

print("\n📝 Final Output:")
final_playbook = state.get('annotated_playbook') or state.get('final_playbook', 'No playbook generated')
print("\n" + "="*60)
print(final_playbook)
print("="*60)

# Save to file if desired
save_path = f"../playbooks/manual_test_{state['finding'].get('rule_id', 'unknown')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.yml"
print(f"\n💾 To save this playbook, run:")
print(f"with open('{save_path}', 'w') as f:\n    f.write(final_playbook)")

## Testing Different Findings

You can test with different findings by changing the selection criteria

In [None]:
# List available findings to test
print("Available findings to test:\n")
for i, finding in enumerate(all_findings[:10]):  # Show first 10
    print(f"{i}: {finding.get('rule_id', 'unknown')} ({finding['severity']})")
    print(f"   {finding['title'][:60]}...\n")

# To test a different finding, update test_finding and re-run from cell 4
# test_finding = all_findings[3]  # Change index to test different finding

## Prompt Engineering Notes

Use this cell to document what worked and what didn't

In [None]:
# Prompt Engineering Notes
# 
# What worked:
# - JSON schema in prompts helps enforce structured output
# - Short, focused prompts work better with small models
# - Breaking down complex tasks into simple extractions
# 
# What didn't work:
# - Long, complex prompts with multiple instructions
# - Asking for too much output at once
# 
# Ideas to try:
# - 
# - 
# 

# You can edit prompts in the ../prompts/ directory and re-run cells to test changes