# AI Agent Command Injection - Hands-On Lab

**Part of HackLearn Pro - Module #10**

This interactive notebook demonstrates AI agent command injection vulnerabilities and defense strategies.

## Learning Objectives

1. Understand how AI agents can be exploited to execute unauthorized commands
2. Implement SQL injection attacks via AI agents
3. Demonstrate OS command injection vulnerabilities
4. Build secure defenses using parameterized queries and sandboxing
5. Implement multi-layer defense systems

## Disclaimer

**EDUCATIONAL USE ONLY**: The techniques demonstrated here are for authorized security testing and education. Unauthorized access to systems is illegal. Always obtain proper authorization before conducting security assessments.

## Setup

Install required packages for this lab:

In [None]:
# Install dependencies
!pip install -q pydantic typing-extensions

In [None]:
import sqlite3
import subprocess
import shlex
import re
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, validator, Field
import json

## Exercise 1: SQL Injection via AI Agent (Vulnerable)

This exercise demonstrates how an AI agent using string interpolation can be exploited for SQL injection.

### Scenario
An AI chatbot helps users search a database of products. The agent constructs SQL queries based on user input without proper sanitization.

In [None]:
# Create a sample database
conn = sqlite3.connect(':memory:')
cursor = conn.cursor()

# Create products table
cursor.execute('''
CREATE TABLE products (
    id INTEGER PRIMARY KEY,
    name TEXT NOT NULL,
    price REAL NOT NULL,
    category TEXT NOT NULL
)
''')

# Insert sample data
products = [
    (1, 'Laptop', 999.99, 'Electronics'),
    (2, 'Smartphone', 699.99, 'Electronics'),
    (3, 'Desk Chair', 249.99, 'Furniture'),
    (4, 'Monitor', 399.99, 'Electronics'),
    (5, 'Keyboard', 79.99, 'Accessories')
]

cursor.executemany('INSERT INTO products VALUES (?, ?, ?, ?)', products)
conn.commit()

print("Database created with sample products")
print("\nProducts table:")
cursor.execute('SELECT * FROM products')
for row in cursor.fetchall():
    print(row)

In [None]:
def vulnerable_ai_search(user_prompt: str) -> List[tuple]:
    """
    VULNERABLE: This AI agent uses string interpolation for SQL queries.
    An attacker can inject malicious SQL commands via the user_prompt.
    """
    # WARNING: Direct string interpolation - UNSAFE!
    query = f"SELECT * FROM products WHERE name LIKE '%{user_prompt}%' OR category LIKE '%{user_prompt}%';"
    
    print(f"[AI Agent] Executing query: {query}")
    cursor.execute(query)
    return cursor.fetchall()

# Normal usage
print("\n=== Normal Usage ===")
results = vulnerable_ai_search("Laptop")
print(f"Found {len(results)} products:")
for row in results:
    print(row)

In [None]:
# SQL Injection Attack
print("\n=== SQL Injection Attack ===")
malicious_prompt = "' OR '1'='1"
print(f"Attacker input: {malicious_prompt}")

results = vulnerable_ai_search(malicious_prompt)
print(f"\nAttack successful! Retrieved {len(results)} products (all records):")
for row in results:
    print(row)

print("\n[ANALYSIS] The attacker bypassed the search filter by injecting SQL logic.")
print("The query became: SELECT * FROM products WHERE name LIKE '%' OR '1'='1%'")
print("Since '1'='1' is always true, all records are returned.")

## Exercise 2: SQL Injection Defense (Secure)

Now we'll implement the secure version using parameterized queries.

In [None]:
def secure_ai_search(user_prompt: str) -> List[tuple]:
    """
    SECURE: This AI agent uses parameterized queries to prevent SQL injection.
    User input is treated as data, not executable SQL code.
    """
    # Use parameterized query with ? placeholders
    query = "SELECT * FROM products WHERE name LIKE ? OR category LIKE ?;"
    
    # Prepare the search pattern
    search_pattern = f"%{user_prompt}%"
    
    print(f"[Secure AI Agent] Executing parameterized query")
    print(f"Query template: {query}")
    print(f"Parameters: ('{search_pattern}', '{search_pattern}')")
    
    cursor.execute(query, (search_pattern, search_pattern))
    return cursor.fetchall()

# Test with normal input
print("\n=== Normal Usage (Secure) ===")
results = secure_ai_search("Laptop")
print(f"Found {len(results)} products:")
for row in results:
    print(row)

In [None]:
# Attempt SQL Injection (will fail)
print("\n=== SQL Injection Attempt (Blocked) ===")
malicious_prompt = "' OR '1'='1"
print(f"Attacker input: {malicious_prompt}")

results = secure_ai_search(malicious_prompt)
print(f"\nAttack blocked! Retrieved {len(results)} products:")
for row in results:
    print(row)

print("\n[ANALYSIS] The injection attempt was treated as literal text.")
print("The database searched for products containing the string \"' OR '1'='1\".")
print("No SQL injection occurred because user input was parameterized.")

## Exercise 3: OS Command Injection (Vulnerable)

This exercise demonstrates how an AI agent executing shell commands can be exploited.

In [None]:
def vulnerable_system_command(user_command: str) -> str:
    """
    VULNERABLE: This AI agent executes system commands based on user input.
    Attackers can inject additional commands using shell metacharacters.
    """
    # WARNING: Using shell=True with user input - UNSAFE!
    print(f"[AI Agent] Executing: {user_command}")
    
    try:
        result = subprocess.run(
            user_command,
            shell=True,  # DANGEROUS: Interprets shell metacharacters
            capture_output=True,
            text=True,
            timeout=5
        )
        return result.stdout + result.stderr
    except Exception as e:
        return f"Error: {e}"

# Normal usage
print("\n=== Normal Usage ===")
output = vulnerable_system_command("echo Hello World")
print(f"Output: {output}")

In [None]:
# Command Injection Attack
print("\n=== Command Injection Attack ===")
malicious_command = "echo Safe Command && echo INJECTED_COMMAND"
print(f"Attacker input: {malicious_command}")

output = vulnerable_system_command(malicious_command)
print(f"Output:\n{output}")

print("[ANALYSIS] The attacker chained commands using '&&'.")
print("Both the intended command and the injected command were executed.")
print("In a real attack, 'INJECTED_COMMAND' could be malicious (data exfiltration, backdoors, etc.).")

## Exercise 4: OS Command Injection Defense (Secure)

Now we'll implement a secure command execution system using allowlists and proper subprocess handling.

In [None]:
# Define allowed commands and their safe arguments
SAFE_COMMANDS = {
    'echo': {'base': 'echo', 'allowed_args': []},
    'date': {'base': 'date', 'allowed_args': []},
    'pwd': {'base': 'pwd', 'allowed_args': []}
}

def secure_system_command(command_name: str, args: List[str] = []) -> str:
    """
    SECURE: This AI agent uses an allowlist and subprocess.run without shell=True.
    Only pre-approved commands can be executed, and arguments are validated.
    """
    # Check if command is allowed
    if command_name not in SAFE_COMMANDS:
        return f"[BLOCKED] Command '{command_name}' is not in the allowlist."
    
    # Build the command list
    command_config = SAFE_COMMANDS[command_name]
    command_list = [command_config['base']] + args
    
    print(f"[Secure AI Agent] Executing: {command_list}")
    
    try:
        result = subprocess.run(
            command_list,
            shell=False,  # SAFE: No shell interpretation
            capture_output=True,
            text=True,
            timeout=5
        )
        return result.stdout + result.stderr
    except Exception as e:
        return f"Error: {e}"

# Test with allowed command
print("\n=== Normal Usage (Secure) ===")
output = secure_system_command('echo', ['Hello', 'World'])
print(f"Output: {output}")

In [None]:
# Attempt Command Injection (will fail)
print("\n=== Command Injection Attempt (Blocked) ===")

# Attempt 1: Disallowed command
print("\nAttempt 1: Execute disallowed command")
output = secure_system_command('rm', ['-rf', '/'])
print(f"Result: {output}")

# Attempt 2: Command chaining via arguments
print("\nAttempt 2: Command chaining via arguments")
output = secure_system_command('echo', ['Hello', '&&', 'echo', 'INJECTED'])
print(f"Output: {output}")
print("[ANALYSIS] The '&&' is treated as a literal argument, not a shell operator.")
print("Without shell=True, metacharacters have no special meaning.")

## Exercise 5: Input Validation with Pydantic

Implement strong input validation using Pydantic models to enforce type safety and constraints.

In [None]:
class SecureCommandRequest(BaseModel):
    """Pydantic model for validating command requests."""
    command: str = Field(..., regex=r'^[a-zA-Z0-9_-]+$', max_length=50)
    arguments: List[str] = Field(default_factory=list, max_items=10)
    
    @validator('arguments', each_item=True)
    def validate_arguments(cls, arg: str) -> str:
        # Reject arguments with shell metacharacters
        dangerous_chars = ['&', '|', ';', '>', '<', '`', '$', '(', ')', '{', '}']
        if any(char in arg for char in dangerous_chars):
            raise ValueError(f"Argument contains dangerous characters: {arg}")
        return arg

class SecureSearchRequest(BaseModel):
    """Pydantic model for validating database search requests."""
    query: str = Field(..., min_length=1, max_length=200)
    category: Optional[str] = Field(None, regex=r'^[a-zA-Z0-9\s]+$')
    
    @validator('query')
    def sanitize_query(cls, value: str) -> str:
        # Remove SQL keywords and special characters
        sql_keywords = ['SELECT', 'DROP', 'DELETE', 'UPDATE', 'INSERT', 'UNION', '--', ';']
        for keyword in sql_keywords:
            if keyword.upper() in value.upper():
                raise ValueError(f"Query contains prohibited keyword: {keyword}")
        return value

# Test validation
print("=== Testing Input Validation ===")

# Valid request
try:
    valid_search = SecureSearchRequest(query="Laptop", category="Electronics")
    print(f"\n✓ Valid search request: {valid_search.dict()}")
except Exception as e:
    print(f"✗ Validation failed: {e}")

# Invalid request (SQL injection attempt)
try:
    malicious_search = SecureSearchRequest(query="' OR '1'='1", category="Electronics")
    print(f"✓ Malicious search request accepted: {malicious_search.dict()}")
except Exception as e:
    print(f"\n✓ Malicious request blocked: {e}")

# Invalid command (metacharacters)
try:
    malicious_command = SecureCommandRequest(command="echo", arguments=["Hello", "&&", "rm", "-rf"])
    print(f"✓ Malicious command accepted: {malicious_command.dict()}")
except Exception as e:
    print(f"\n✓ Malicious command blocked: {e}")

## Exercise 6: Multi-Layer Defense System

Implement a comprehensive defense-in-depth approach combining multiple security controls.

In [None]:
class SecureAIAgent:
    """
    Production-ready AI agent with multi-layer security.
    
    Security layers:
    1. Input validation (Pydantic)
    2. Command allowlisting
    3. Parameterized queries
    4. Rate limiting (basic)
    5. Audit logging
    """
    
    def __init__(self):
        self.allowed_commands = {'echo', 'date', 'pwd'}
        self.request_count = 0
        self.max_requests = 100
        self.audit_log = []
    
    def log_action(self, action: str, details: Dict[str, Any], success: bool):
        """Audit logging for security monitoring."""
        log_entry = {
            'timestamp': 'TIMESTAMP',
            'action': action,
            'details': details,
            'success': success
        }
        self.audit_log.append(log_entry)
        print(f"[AUDIT] {action}: {'SUCCESS' if success else 'BLOCKED'} - {details}")
    
    def check_rate_limit(self) -> bool:
        """Simple rate limiting."""
        if self.request_count >= self.max_requests:
            return False
        self.request_count += 1
        return True
    
    def execute_search(self, query: str) -> List[tuple]:
        """Secure database search with validation and parameterized queries."""
        # Layer 1: Rate limiting
        if not self.check_rate_limit():
            self.log_action('DATABASE_SEARCH', {'query': query}, False)
            raise Exception("Rate limit exceeded")
        
        # Layer 2: Input validation
        try:
            validated_request = SecureSearchRequest(query=query)
        except Exception as e:
            self.log_action('DATABASE_SEARCH', {'query': query, 'error': str(e)}, False)
            raise
        
        # Layer 3: Parameterized query
        sql_query = "SELECT * FROM products WHERE name LIKE ? OR category LIKE ?;"
        search_pattern = f"%{validated_request.query}%"
        
        cursor.execute(sql_query, (search_pattern, search_pattern))
        results = cursor.fetchall()
        
        self.log_action('DATABASE_SEARCH', {'query': query, 'results': len(results)}, True)
        return results
    
    def execute_command(self, command: str, args: List[str]) -> str:
        """Secure command execution with validation and allowlisting."""
        # Layer 1: Rate limiting
        if not self.check_rate_limit():
            self.log_action('COMMAND_EXECUTION', {'command': command}, False)
            raise Exception("Rate limit exceeded")
        
        # Layer 2: Input validation
        try:
            validated_request = SecureCommandRequest(command=command, arguments=args)
        except Exception as e:
            self.log_action('COMMAND_EXECUTION', {'command': command, 'error': str(e)}, False)
            raise
        
        # Layer 3: Command allowlist
        if command not in self.allowed_commands:
            self.log_action('COMMAND_EXECUTION', {'command': command, 'error': 'Not in allowlist'}, False)
            raise Exception(f"Command '{command}' not allowed")
        
        # Layer 4: Safe subprocess execution
        command_list = [command] + args
        result = subprocess.run(command_list, shell=False, capture_output=True, text=True, timeout=5)
        
        self.log_action('COMMAND_EXECUTION', {'command': command, 'args': args}, True)
        return result.stdout + result.stderr

# Test the secure agent
print("=== Testing Multi-Layer Defense System ===")
agent = SecureAIAgent()

# Valid operations
print("\n--- Valid Operations ---")
results = agent.execute_search("Laptop")
print(f"Search results: {len(results)} products found\n")

output = agent.execute_command("echo", ["Hello", "World"])
print(f"Command output: {output}")

# Invalid operations
print("\n--- Attack Attempts ---")
try:
    agent.execute_search("' OR '1'='1")
except Exception as e:
    print(f"✓ SQL injection blocked: {e}\n")

try:
    agent.execute_command("rm", ["-rf", "/"])
except Exception as e:
    print(f"✓ Dangerous command blocked: {e}\n")

try:
    agent.execute_command("echo", ["Hello", "&&", "evil"])
except Exception as e:
    print(f"✓ Command injection blocked: {e}\n")

print("\n--- Audit Log ---")
for entry in agent.audit_log:
    print(f"{entry['action']}: {entry['success']} - {entry['details']}")

## Key Takeaways

1. **Never use string interpolation for SQL queries** - Always use parameterized queries with placeholders
2. **Never use shell=True with user input** - Use subprocess.run with shell=False and command lists
3. **Implement allowlists, not denylists** - Only permit explicitly approved commands and operations
4. **Use strong input validation** - Pydantic models enforce type safety and constraints at the input layer
5. **Apply defense-in-depth** - Multiple security layers provide redundancy if one control fails
6. **Audit all actions** - Logging enables detection and forensic analysis of attacks
7. **Rate limiting** - Prevents abuse and automated attacks
8. **Least privilege** - AI agents should only have minimal necessary permissions

## Real-World Impact

- **CVE-2025-32711 (EchoLeak)**: CVSS 9.3, Microsoft 365 Copilot zero-click injection
- **CVE-2024-5565 (Vanna AI)**: CVSS 9.2, prompt injection to Python RCE
- **CVE-2024-6091 (AutoGPT)**: CVSS 9.8, shell command bypass
- **IBM 2025 Report**: AI breaches cost $670K more than traditional attacks

## Next Steps

1. Implement sandboxing with gVisor or similar runtime isolation
2. Add content security policies for web-based AI agents
3. Deploy commercial AI security platforms (Microsoft Security Copilot, Palo Alto Prisma AIRS)
4. Conduct regular security audits of AI agent tool integrations
5. Monitor OWASP LLM Top 10 (LLM07: Insecure Plugin Design, LLM08: Excessive Agency)

## Resources

- OWASP LLM Top 10: https://owasp.org/www-project-top-10-for-large-language-model-applications/
- CVE Database: https://cve.mitre.org
- gVisor Sandboxing: https://gvisor.dev
- Pydantic Documentation: https://docs.pydantic.dev
- IBM Cost of Data Breach Report: https://www.ibm.com/reports/data-breach