# Preventing SQL Injection Attacks with Amazon Bedrock AgentCore Gateway Interceptors

## Overview

This notebook demonstrates how to **prevent SQL injection attacks** using **Amazon Bedrock AgentCore Gateway interceptors**. The interceptor examines tool arguments before they reach database tools, using pattern matching to identify and block SQL injection attempts.

### Why Prevent SQL Injection at the Gateway?

When building AI agents that interact with databases, SQL injection remains a critical security threat:

- **Tool-Level Protection**: Block SQL injection attempts before they reach database tools
- **Centralized Security**: Apply injection detection consistently across all database tools
- **Pattern-Based Detection**: Use regex patterns to identify SQL injection indicators
- **Zero Trust Architecture**: Don't rely on downstream tools to sanitize inputs
- **Fast and Cost-Effective**: No external API calls, detection happens in milliseconds
- **Compliance**: Meet security requirements for database access controls

The Gateway interceptor provides a **centralized enforcement point** that validates tool arguments before any database query executes, without modifying individual tool implementations.

### Agent-Level vs Tool-Level Protection

**Important:** Bedrock Guardrails at the agent level protects against prompt injection attacks on the agent itself. However, once the agent decides to call a tool, the prompt has already passed through the agent. For SQL injection prevention, we need to focus on protecting the database tools by analyzing the tool arguments (query parameters) before they execute.

---

## What This Tutorial Covers

This tutorial implements SQL injection prevention using a **REQUEST interceptor** with **pattern matching**:

üõ°Ô∏è **SQL Injection Prevention (REQUEST interceptor + Pattern Matching)**  
   - Intercepts tool calls before they reach database tools
   - Analyzes tool arguments using SQL injection pattern matching
   - **Detects**: Stacked queries, SQL comments, UNION SELECT, tautologies, time-based injection
   - Blocks malicious queries and returns security warnings
   - Allows legitimate queries to proceed to database tools
   - **Demo Approach**: Heuristic detection; production should use parameterized queries

![SQL Injection Prevention Architecture](images/sql-injection-prevention.png)

---

## Why Use Gateway Interceptors?

Gateway Interceptors allow you to:

- **Tool Argument Validation**: Analyze tool parameters before they reach sensitive systems
- **SQL Injection Detection**: Use pattern matching to identify SQL injection indicators
- **Flexible Security**: Adapt detection logic without changing tool implementations
- **Audit & Monitoring**: Log all security events and blocked attempts
- **Request Blocking**: Reject malicious requests before database access
- **Recursive Scanning**: Check all string fields in tool arguments, not just top-level

Because interceptors are attached at the **Gateway layer**, they protect **any** underlying tool or MCP server without modifying application code.

---

## Tutorial Details

| Information              | Details                                                                      |
|--------------------------|------------------------------------------------------------------------------|
| **Tutorial type**        | Interactive                                                                  |
| **AgentCore components** | Amazon Bedrock AgentCore Gateway, Gateway Interceptors                      |
| **Gateway Target type**  | MCP Server (Lambda-based database tool)                                     |
| **Interceptor types**    | AWS Lambda (REQUEST)                                                        |
| **Inbound Auth IdP**     | Amazon Cognito (CUSTOM_JWT authorizer)                                      |
| **Security Pattern**     | SQL injection detection using pattern matching                              |
| **Tutorial components**  | Gateway, Lambda Interceptor, Amazon Cognito, MCP tools                      |
| **Tutorial vertical**    | Cross-vertical (applicable to any AI agent with database access)            |
| **Example complexity**   | Intermediate                                                                 |
| **SDK used**             | boto3                                                                        |

---

## Prerequisites

To execute this tutorial you will need:

- Jupyter notebook (Python kernel)
- AWS credentials with permissions for:
  - AWS Lambda
  - AWS IAM
  - Amazon Cognito
  - Amazon Bedrock AgentCore services (control plane)
- Python 3.9 or higher
- Basic understanding of AWS Lambda, IAM roles, Amazon Cognito, and Amazon Bedrock AgentCore Gateway

> ‚ö†Ô∏è **Note:** The Cleanup section at the end deletes the AWS resources created by this tutorial (Gateway, Lambdas, IAM roles, etc.). Only run it when you're ready to tear everything down.

> üìù **Production Note:** This demo uses heuristic pattern matching to detect SQL injection. In production, the recommended deterministic control is to disallow raw SQL and require structured query templates or parameterized execution.


---

## Part 1: Setup & Deployment

### Step 1.0: Install Required Dependencies

Install all necessary Python packages for this tutorial.

In [1]:
!pip install -r requirements.txt

zsh:1: command not found: pip


### Step 1.1: Import Required Libraries

In [2]:
import boto3
import json
import time
import sys
from pathlib import Path
from datetime import datetime
from botocore.exceptions import ClientError

# Add parent directory to path for utils
utils_dir = Path.cwd().parent
sys.path.insert(0, str(utils_dir))

import utils

print("‚úì Libraries imported")

# Generate unique identifier for this deployment
DEPLOYMENT_ID = datetime.now().strftime('%Y%m%d-%H%M%S')
print(f"\nDeployment ID: {DEPLOYMENT_ID}")

‚úì Libraries imported

Deployment ID: 20260220-125255


### Step 1.2: Configure Deployment Variables

In [3]:
# Configuration
REGION = "us-east-1"  
LAMBDA_FUNCTION_NAME = f"interceptor-lambda-{DEPLOYMENT_ID}"
LAMBDA_ROLE_NAME = f"interceptor-lambda-role-{DEPLOYMENT_ID}"
GATEWAY_NAME = f"interceptor-gateway-{DEPLOYMENT_ID}"

# Initialize clients
gateway_client = boto3.client('bedrock-agentcore-control', region_name=REGION)
cognito_client = boto3.client('cognito-idp', region_name=REGION)

print("Configuration:")
print(f"  Lambda Function: {LAMBDA_FUNCTION_NAME}")
print(f"  Lambda Role: {LAMBDA_ROLE_NAME}")
print(f"  Gateway Name: {GATEWAY_NAME}")
print(f"  Region: {REGION}")


Configuration:
  Lambda Function: interceptor-lambda-20260220-125255
  Lambda Role: interceptor-lambda-role-20260220-125255
  Gateway Name: interceptor-gateway-20260220-125255
  Region: us-east-1


In [4]:
# SQL Injection Detection Configuration
print("Configuring SQL injection detection patterns...")

# The Lambda function uses built-in pattern matching to detect SQL injection
# No external services required - detection happens entirely in the Lambda

SQL_PATTERNS_DETECTED = [
    "Statement Stacking (; followed by SQL keywords)",
    "SQL Comments (--, /*, */)",
    "UNION SELECT Combinations",
    "Tautologies (OR 1=1, AND 1=1)",
    "Time-Based Injection (SLEEP, WAITFOR DELAY, BENCHMARK)"
]

print("‚úì SQL injection detection configured")
print("\nHigh-signal patterns detected:")
for pattern in SQL_PATTERNS_DETECTED:
    print(f"  ‚Ä¢ {pattern}")

print("\nüìù Note: This is a DEMO using heuristic pattern matching.")
print("   Production should use parameterized queries as the primary defense.")

Configuring SQL injection detection patterns...
‚úì SQL injection detection configured

High-signal patterns detected:
  ‚Ä¢ Statement Stacking (; followed by SQL keywords)
  ‚Ä¢ SQL Comments (--, /*, */)
  ‚Ä¢ UNION SELECT Combinations
  ‚Ä¢ Tautologies (OR 1=1, AND 1=1)
  ‚Ä¢ Time-Based Injection (SLEEP, WAITFOR DELAY, BENCHMARK)

üìù Note: This is a DEMO using heuristic pattern matching.
   Production should use parameterized queries as the primary defense.


### Step 1.4: Create IAM Role for Lambda Interceptor

Grant Lambda permissions to execute, write CloudWatch logs, and apply Bedrock Guardrails.

In [5]:
# Create IAM role for Lambda interceptor using utils
print("Creating IAM role for Lambda interceptor...")

LAMBDA_ROLE_ARN = utils.create_lambda_role(
    role_name=LAMBDA_ROLE_NAME,
    description='Role for AgentCore Lambda Interceptor for SQL injection prevention'
)

print(f"  ARN: {LAMBDA_ROLE_ARN}")
print("\n‚úì Lambda role created with basic execution permissions")
print("  No additional permissions needed - SQL detection is built into Lambda")

Creating IAM role for Lambda interceptor...
‚úì IAM role created: interceptor-lambda-role-20260220-125255
  ARN: arn:aws:iam::824050487885:role/interceptor-lambda-role-20260220-125255

‚úì Lambda role created with basic execution permissions
  No additional permissions needed - SQL detection is built into Lambda


### Step 1.5: Deploy Lambda Interceptor Function

Lambda intercepts incoming requests and analyzes prompts for injection patterns using Bedrock Guardrails before allowing them to reach tools.

In [6]:
# Deploy Lambda interceptor using utils
print("Deploying Lambda interceptor...")
print("  Lambda uses built-in SQL injection pattern detection")
print("  No environment variables or external services required")

LAMBDA_ARN = utils.deploy_lambda_function(
    function_name=LAMBDA_FUNCTION_NAME,
    role_arn=LAMBDA_ROLE_ARN,
    lambda_code_path='src/lambda/lambda_function.py',
    description='AgentCore Request Lambda Interceptor to prevent SQL injection using pattern matching',
    timeout=30,
    memory_size=256,
    region=REGION
)

print(f"  ARN: {LAMBDA_ARN}")

Deploying Lambda interceptor...
  Lambda uses built-in SQL injection pattern detection
  No environment variables or external services required
‚úì Lambda created: interceptor-lambda-20260220-125255
  ARN: arn:aws:lambda:us-east-1:824050487885:function:interceptor-lambda-20260220-125255


### Step 1.5a: Grant Gateway Permission to Invoke Lambda

Add permissions for the Gateway to invoke the Lambda interceptor function.

In [7]:
# Grant Gateway permission to invoke the Lambda interceptor
print("\nGranting Gateway permission to invoke Lambda...")

utils.grant_gateway_invoke_permission(
    function_name=LAMBDA_FUNCTION_NAME,
    region=REGION
)


Granting Gateway permission to invoke Lambda...
‚úì Gateway invoke permission added to Lambda
  Principal: bedrock-agentcore.amazonaws.com


### Step 1.6: Create Amazon Cognito User Pool & App Client

Create Cognito user pool for Gateway authentication using OAuth client credentials flow.

In [8]:
# Create Cognito User Pool and Client for Gateway authentication using utils
print("Creating Cognito User Pool and Client...")

USER_POOL_NAME = f"gateway-pool-{DEPLOYMENT_ID}"
RESOURCE_SERVER_ID = 'gateway'
RESOURCE_SERVER_NAME = 'Gateway Resource Server'
SCOPES = [{'ScopeName': 'tools', 'ScopeDescription': 'Access to gateway tools'}]

# Create or get user pool
USER_POOL_ID = utils.get_or_create_user_pool(cognito_client, USER_POOL_NAME)
print(f"  Pool ID: {USER_POOL_ID}")

# Create or get resource server
utils.get_or_create_resource_server(cognito_client, USER_POOL_ID, RESOURCE_SERVER_ID, RESOURCE_SERVER_NAME, SCOPES)

# Wait for resource server to propagate
print("  Waiting for resource server to propagate...")
time.sleep(3)

# Create M2M client with client credentials flow
CLIENT_NAME = f"gateway-client-{DEPLOYMENT_ID}"
CLIENT_ID, CLIENT_SECRET = utils.get_or_create_m2m_client(
    cognito_client,
    USER_POOL_ID,
    CLIENT_NAME,
    RESOURCE_SERVER_ID,
    SCOPES=[f"{RESOURCE_SERVER_ID}/tools"]
)

print(f"‚úì User Pool Client created: {CLIENT_NAME}")
print(f"  Client ID: {CLIENT_ID}")
print(f"  Client Secret: {CLIENT_SECRET[:20]}...")

# Construct OAuth URLs
POOL_DOMAIN = USER_POOL_ID.replace('_', '').lower()
COGNITO_DOMAIN = f"https://{POOL_DOMAIN}.auth.{REGION}.amazoncognito.com"
DISCOVERY_URL = f"https://cognito-idp.{REGION}.amazonaws.com/{USER_POOL_ID}/.well-known/openid-configuration"
TOKEN_URL = f"{COGNITO_DOMAIN}/oauth2/token"

print(f"\n‚úì OAuth Configuration:")
print(f"  Discovery URL: {DISCOVERY_URL}")
print(f"  Token URL: {TOKEN_URL}")
print(f"  Scope: {RESOURCE_SERVER_ID}/tools")

Creating Cognito User Pool and Client...
Creating new user pool
Domain created as well
  Pool ID: us-east-1_aIkMDH2t0
creating new resource server
  Waiting for resource server to propagate...
creating new m2m client
‚úì User Pool Client created: gateway-client-20260220-125255
  Client ID: 7dm5r6f9gif83s0auevtc1mcmj
  Client Secret: 1ehm8lsc214e1fac4e66...

‚úì OAuth Configuration:
  Discovery URL: https://cognito-idp.us-east-1.amazonaws.com/us-east-1_aIkMDH2t0/.well-known/openid-configuration
  Token URL: https://us-east-1aikmdh2t0.auth.us-east-1.amazoncognito.com/oauth2/token
  Scope: gateway/tools


### Step 1.7: Create Gateway with Request Interceptor

**Why REQUEST Interceptor?**  
The interceptor processes incoming requests before they reach tools, allowing us to analyze and block prompt injection attempts (including SQL injection) before any tool executes.

In [9]:
# Create Gateway IAM role
gateway_iam_role = utils.create_agentcore_gateway_role_with_region(GATEWAY_NAME, REGION)
GATEWAY_ROLE_ARN = gateway_iam_role['Role']['Arn']

print(f"‚úì Gateway role created: {GATEWAY_ROLE_ARN}")

# Wait for role propagation
time.sleep(10)

# Create Gateway with Lambda interceptor
print(f"\nCreating Gateway with REQUEST interceptor...")

try:
    gateway_response = gateway_client.create_gateway(
        name=GATEWAY_NAME,
        protocolType="MCP",
        protocolConfiguration={
            "mcp": {
                "supportedVersions": ["2025-03-26"]
            }
        },
        interceptorConfigurations=[
            {
                "interceptor": {
                    "lambda": {
                        "arn": LAMBDA_ARN
                    }
                },
                "interceptionPoints": ["REQUEST"],
                "inputConfiguration": {
                    "passRequestHeaders": True  
                }
            }
        ],
        authorizerType="CUSTOM_JWT",
        authorizerConfiguration={
            "customJWTAuthorizer": {
                "discoveryUrl": DISCOVERY_URL,
                "allowedClients": [CLIENT_ID]
            }
        },
        roleArn=GATEWAY_ROLE_ARN
    )
    
    GATEWAY_ID = gateway_response.get('gatewayId')
    print(f"‚úì Gateway created: {GATEWAY_ID}")
    
except Exception as e:
    print(f"\n‚úó Failed to create Gateway: {e}")
    raise


attaching role policy agentcore-interceptor-gateway-20260220-125255-role
‚úì Gateway role created: arn:aws:iam::824050487885:role/agentcore-interceptor-gateway-20260220-125255-role

Creating Gateway with REQUEST interceptor...
‚úì Gateway created: interceptor-gateway-20260220-125255-49ymabaidj


### Step 1.8: Wait for Gateway to be Ready

In [10]:
# Wait for Gateway to be ready using signed requests
print("\nWaiting for Gateway to be ready...")

max_attempts = 30
for attempt in range(max_attempts):
    try:
        response = gateway_client.get_gateway(gatewayIdentifier=GATEWAY_ID)
        status_code = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

        if status_code == 200:
            # gateway_info = response.json()
            status = response.get('status', 'UNKNOWN')
            
            print(f"  [{attempt + 1}/{max_attempts}] Status: {status}")
            
            if status == 'READY':
                GATEWAY_URL = response.get('gatewayUrl')
                print(f"\n‚úì Gateway is ready!")
                print(f"  URL: {GATEWAY_URL}")
                
                # Show interceptor configuration
                if 'interceptorConfigurations' in response:
                    interceptor_configs = response['interceptorConfigurations']
                    print(f"\n  Interceptor Configuration:")
                    for idx, config in enumerate(interceptor_configs):
                        print(f"    [{idx}] Interception Points: {config.get('interceptionPoints', [])}")
                        print(f"    [{idx}] Lambda ARN: {config.get('interceptor', {}).get('lambda', {}).get('arn', 'N/A')}")
                        print(f"    [{idx}] Pass Headers: {config.get('inputConfiguration', {}).get('passRequestHeaders', False)}")
                break
            elif status == 'FAILED':
                print(f"\n‚úó Gateway creation failed")
                print(f"  Details: {response}")
                raise Exception("Gateway failed")
        else:
            print(f"  [{attempt + 1}/{max_attempts}] HTTP Error: {response.status_code}")
    except Exception as e:
        print(f"  [{attempt + 1}/{max_attempts}] Error: {e}")
    
    time.sleep(10)
else:
    print(f"\n‚ö† Timeout waiting for Gateway")
    raise Exception("Gateway timeout")



Waiting for Gateway to be ready...
  [1/30] Status: READY

‚úì Gateway is ready!
  URL: https://interceptor-gateway-20260220-125255-49ymabaidj.gateway.bedrock-agentcore.us-east-1.amazonaws.com/mcp

  Interceptor Configuration:
    [0] Interception Points: ['REQUEST']
    [0] Lambda ARN: arn:aws:lambda:us-east-1:824050487885:function:interceptor-lambda-20260220-125255
    [0] Pass Headers: True


### Step 1.9: Register Sample Database Tools with Gateway

Deploy sample database tool Lambda (customer query tool) and register it as a Gateway target.

**Note:** This tool uses mock data - no real database required. It simulates what would happen with a real database query interface.

In [14]:
# Deploy tool Lambdas and register as Gateway targets
print("Deploying tool Lambda functions...")

# Import tool modules
sys.path.insert(0, str(Path.cwd()))
from src.tools import customer_query_tool

# Create IAM role for tool Lambdas using utils
TOOL_ROLE_ARN = utils.create_lambda_role(
    role_name=f"tool-lambda-role-{DEPLOYMENT_ID}",
    description='Role for tool Lambda functions'
)

# Deploy tool Lambda functions
tools_to_deploy = [
    ('customer_query_tool', customer_query_tool),
]

deployed_tools = []

for tool_name, tool_module in tools_to_deploy:
    print(f"  Deploying {tool_name}...")
    
    function_name = f"{tool_name.replace('_', '-')}-{DEPLOYMENT_ID}"
    tool_code_path = Path(tool_module.__file__)
    
    lambda_arn = utils.deploy_lambda_function(
        function_name=function_name,
        role_arn=TOOL_ROLE_ARN,
        lambda_code_path=str(tool_code_path),
        environment_vars={'TOOL_NAME': tool_name},
        description=f'{tool_name} function - mock database query tool',
        region=REGION
    )
    
    tool_definition = getattr(tool_module, 'TOOL_DEFINITION', {
        "name": tool_name,
        "description": f"{tool_name} function"
    })
    
    deployed_tools.append({
        'tool_name': tool_name,
        'function_name': function_name,
        'lambda_arn': lambda_arn,
        'tool_definition': tool_definition
    })

print(f"‚úì Deployed {len(deployed_tools)} tool Lambdas")

# Register tools as Gateway targets
print("\nRegistering tools as Gateway targets...")
created_targets = []

for tool in deployed_tools:
    print(f"  Registering {tool['tool_name']}...")
    
    try:
        response = gateway_client.create_gateway_target(
            gatewayIdentifier=GATEWAY_ID,
            name=f"{tool['tool_name'].replace('_', '-')}-target",
            targetConfiguration={
                "mcp": {
                    "lambda": {
                        "lambdaArn": tool["lambda_arn"],
                        "toolSchema": {"inlinePayload": [tool["tool_definition"]]}
                    }
                }
            },
            credentialProviderConfigurations=[{
                "credentialProviderType": "GATEWAY_IAM_ROLE"
            }]
        )
        
        target_id = response['targetId']
        print(f"    ‚úì Target created: {target_id}")
        
        # Wait for target to be READY
        for attempt in range(18):
            status_response = gateway_client.get_gateway_target(
                gatewayIdentifier=GATEWAY_ID,
                targetId=target_id
            )
            status = status_response.get('status')
            
            if status == 'READY':
                print(f"    ‚úì Target is READY")
                created_targets.append({
                    'tool_name': tool['tool_name'],
                    'target_id': target_id,
                    'lambda_arn': tool['lambda_arn']
                })
                break
            elif status == 'FAILED':
                print(f"    ‚úó Target FAILED")
                break
            
            time.sleep(10)
            
    except Exception as e:
        print(f"    ‚úó Failed to create target: {e}")

# Summary
print(f"\n‚úì Deployed {len(deployed_tools)} tool Lambdas")
print(f"‚úì Created {len(created_targets)} gateway targets")

if len(created_targets) < len(deployed_tools):
    print(f"‚ö† Warning: Not all targets were created successfully")

# Store for cleanup
DEPLOYED_TOOL_FUNCTIONS = [t['function_name'] for t in deployed_tools]
CREATED_TARGET_IDS = [t['target_id'] for t in created_targets]


Deploying tool Lambda functions...
‚ö† Role already exists: tool-lambda-role-20260220-125255
  Deploying customer_query_tool...
‚ö† Lambda already exists: customer-query-tool-20260220-125255
‚úì Deployed 1 tool Lambdas

Registering tools as Gateway targets...
  Registering customer_query_tool...
    ‚úì Target created: SRDCLFDPLL
    ‚úì Target is READY

‚úì Deployed 1 tool Lambdas
‚úì Created 1 gateway targets


### Step 2.1: Test SQL Injection Prevention

Test the interceptor with both legitimate queries and SQL injection attempts to verify that malicious queries are blocked.

#### What to Expect:

The Lambda interceptor will:

1. **Intercept the tool call** before it reaches the database tool
2. **Extract tool arguments** (including the query parameter)
3. **Analyze using SQL injection pattern matching** to detect malicious patterns
4. **Block malicious queries** and return a generic security warning
5. **Allow legitimate queries** to proceed to the database tool

#### SQL Injection Patterns Detected:

The Lambda function detects high-signal SQL injection indicators:

- **Statement Stacking**: Semicolon followed by SQL keywords (`;DROP TABLE`, `;DELETE FROM`)
- **SQL Comments**: Comment tokens that can hide malicious code (`--`, `/*`, `*/`)
- **UNION SELECT**: Attempts to combine queries for data exfiltration
- **Tautologies**: Always-true conditions (`OR 1=1`, `AND 1=1`)
- **Time-Based Injection**: Delay functions for blind injection (`SLEEP()`, `WAITFOR DELAY`, `BENCHMARK()`)

#### Example Scenarios:

**Legitimate Query (ALLOWED):**
```
Tool Argument: {"query": "Show me customer information for customer ID 12345"}
Result: ‚úì Request proceeds to database tool
```

**SQL Injection with Stacked Query (BLOCKED):**
```
Tool Argument: {"query": "SELECT * FROM customers; DROP TABLE customers; --"}
Result: ‚úó Request blocked
Error: {"category": "SQL_INJECTION_DETECTED", "message": "Request blocked by security policy"}
Log: [SECURITY] SQL injection detected | rule=STACKED_QUERY
```

**SQL Injection with Tautology (BLOCKED):**
```
Tool Argument: {"query": "SELECT * FROM customers WHERE id = '1' OR 1=1"}
Result: ‚úó Request blocked
Error: {"category": "SQL_INJECTION_DETECTED"}
Log: [SECURITY] SQL injection detected | rule=TAUTOLOGY_OR
```

**SQL Injection with UNION (BLOCKED):**
```
Tool Argument: {"query": "SELECT name FROM customers UNION SELECT password FROM users"}
Result: ‚úó Request blocked
Log: [SECURITY] SQL injection detected | rule=UNION_SELECT
```

#### Security Note:

- **Caller receives**: Generic error message with category only (no attack details)
- **Logs contain**: Request ID, tool name, rule ID, query hash (no sensitive data)
- **Detection**: Happens in milliseconds with no external API calls



In [15]:
# Test the SQL injection prevention interceptor
import requests

print("Testing SQL injection prevention interceptor...")
print(f"Using pattern matching for SQL injection detection")
print(f"Gateway URL: {GATEWAY_URL}")

# Get OAuth token
token_data = utils.get_token(
    user_pool_id=USER_POOL_ID,
    client_id=CLIENT_ID,
    client_secret=CLIENT_SECRET,
    scope_string="gateway/tools",
    REGION=REGION
)

if 'error' in token_data:
    print(f"‚úó Token request failed: {token_data['error']}")
else:
    token = token_data['access_token']
    print(f"‚úì Token obtained")

Testing SQL injection prevention interceptor...
Using pattern matching for SQL injection detection
Gateway URL: https://interceptor-gateway-20260220-125255-49ymabaidj.gateway.bedrock-agentcore.us-east-1.amazonaws.com/mcp
7dm5r6f9gif83s0auevtc1mcmj
‚úì Token obtained


### Step 2.2: Test Legitimate Query (Should Pass)

Test a legitimate customer query that should pass through the interceptor without being blocked.

**Expected Result:**
- Pattern matching analyzes the query and finds no SQL injection patterns
- The request proceeds to the database tool
- Customer data is returned successfully

In [16]:
# Test a legitimate query (should pass)
print("\n" + "="*60)
print("Test 1: Legitimate Query (Should PASS)")
print("="*60)

# Reuse the token from previous step
if 'token' in locals():
    # Call the database tool with a legitimate query
    mcp_request = {
        "jsonrpc": "2.0",
        "method": "tools/call",
        "id": 1,
        "params": {
            "name": "customer-query-tool-target___customer_query_tool",
            "arguments": {
                "query": "Show me customer information for customer ID 12345"
            }
        }
    }
    
    response = requests.post(
        GATEWAY_URL,
        headers={
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json"
        },
        json=mcp_request
    )
    
    result = response.json()
    print(f"\nResponse:")
    print(json.dumps(result, indent=2))
else:
    print("‚úó No token available. Please run Step 2.1 first.")


Test 1: Legitimate Query (Should PASS)

Response:
{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "isError": false,
    "content": [
      {
        "type": "text",
        "text": "{\"statusCode\":200,\"body\":{\"tool\":\"customer_query_tool\",\"query\":\"Show me customer information for customer ID 12345\",\"data_source\":\"mock_database\",\"results\":[{\"customer_id\":22222,\"name\":\"Bob Davis\",\"email\":\"carol.miller@example.com\",\"city\":\"Portland\",\"account_status\":\"Inactive\",\"total_orders\":13,\"lifetime_value\":9151.23}],\"result_count\":1,\"success\":true,\"note\":\"This is simulated data. In production, this would query a real database. The Gateway interceptor protects against SQL injection attacks.\"}}"
      }
    ]
  }
}


### Step 2.3: Test SQL Injection Attempt - Stacked Query (Should Block)

Test a SQL injection attempt using statement stacking to execute multiple queries.

**Expected Result:**
- Pattern matching detects the stacked query pattern (`;` followed by SQL keyword)
- The request is blocked before reaching the database tool
- A generic error response is returned (no attack details exposed)
- Detailed rule ID is logged server-side only

In [17]:
# Test SQL injection attempt - Stacked Query
print("\n" + "="*60)
print("Test 2: SQL Injection Attempt - Stacked Query (Should BLOCK)")
print("="*60)

if 'token' in locals():
    # Attempt SQL injection with stacked query (DROP TABLE)
    mcp_request = {
        "jsonrpc": "2.0",
        "method": "tools/call",
        "id": 2,
        "params": {
            "name": "customer-query-tool-target___customer_query_tool",
            "arguments": {
                "query": "SELECT * FROM customers WHERE id = 1; DROP TABLE customers; --"
            }
        }
    }
    
    response = requests.post(
        GATEWAY_URL,
        headers={
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json"
        },
        json=mcp_request
    )
    
    result = response.json()
    print(f"\nResponse:")
    print(json.dumps(result, indent=2))
    
    # Check if blocked
    if 'error' in result:
        print("\n‚úì SQL injection attempt was BLOCKED")
        print(f"  Category: {result.get('error', {}).get('data', {}).get('category', 'N/A')}")
        print(f"  Message: {result.get('error', {}).get('message', 'N/A')}")
        print("\n  Note: Detailed rule ID is logged server-side only (not exposed to caller)")
    else:
        print("\n‚úó WARNING: SQL injection was NOT blocked!")
else:
    print("‚úó No token available. Please run Step 2.1 first.")


Test 2: SQL Injection Attempt - Stacked Query (Should BLOCK)

Response:
{
  "jsonrpc": "2.0",
  "id": 2,
  "error": {
    "code": -32000,
    "message": "Request blocked by security policy",
    "data": {
      "category": "SQL_INJECTION_DETECTED",
      "security_policy": "sql_injection_prevention"
    }
  }
}

‚úì SQL injection attempt was BLOCKED
  Category: SQL_INJECTION_DETECTED
  Message: Request blocked by security policy

  Note: Detailed rule ID is logged server-side only (not exposed to caller)


---

# Part 3: Cleanup - Delete All Resources

‚ö†Ô∏è **WARNING: This will DELETE all resources created in Part 1!**

Only run this section if you want to clean up everything.

### Step 3.1: Delete Created Resources

In [None]:
# Cleanup - Delete all created resources using utils
print("Starting cleanup...")

# 1. Delete gateway targets
if 'CREATED_TARGET_IDS' in globals() and 'GATEWAY_ID' in globals():
    utils.delete_gateway_targets(gateway_client, GATEWAY_ID, CREATED_TARGET_IDS)
    # Wait for target deletions to complete before deleting gateway
    time.sleep(5)

# 2. Delete gateway
if 'GATEWAY_ID' in globals():
    utils.delete_gateway(gateway_client, GATEWAY_ID)
    print("‚úì Deleted gateway")

# 3. Delete Lambda functions (tools + interceptor)
lambda_functions_to_delete = []
if 'DEPLOYED_TOOL_FUNCTIONS' in globals():
    lambda_functions_to_delete.extend(DEPLOYED_TOOL_FUNCTIONS)
if 'LAMBDA_FUNCTION_NAME' in globals():
    lambda_functions_to_delete.append(LAMBDA_FUNCTION_NAME)

if lambda_functions_to_delete:
    utils.delete_lambda_functions(lambda_functions_to_delete, REGION)

# 4. Delete IAM roles
if 'LAMBDA_ROLE_NAME' in globals():
    utils.delete_iam_role(LAMBDA_ROLE_NAME)
if 'DEPLOYMENT_ID' in globals():
    utils.delete_iam_role(f"tool-lambda-role-{DEPLOYMENT_ID}")
    utils.delete_iam_role(f"agentcore-{GATEWAY_NAME}-role")

# 5. Delete Cognito user pool
if 'USER_POOL_ID' in globals():
    utils.delete_cognito_user_pool(USER_POOL_ID, REGION)

print("\n‚úì Cleanup complete!")

---

# Summary

This notebook demonstrates prompt injection prevention using Lambda interceptors:

1. ‚úÖ **Setup** - Created Bedrock Guardrails, Lambda interceptor, IAM roles, Cognito, and Gateway with REQUEST interception
2. ‚úÖ **Test** - Verified prompt attack detection blocks malicious inputs using Bedrock Guardrails
3. ‚úÖ **Cleanup** - Deleted all resources

## What We Demonstrated

- **Lambda REQUEST interceptor** that analyzes prompts before they reach tools
- **Bedrock Guardrails PROMPT_ATTACK filter** for detecting jailbreaks, prompt injection, and prompt leakage
- **Centralized security enforcement** at the Gateway layer
- **Gateway integration** with custom security interceptors
- **Complete resource lifecycle** management

## Next Steps

- Adjust Guardrails filter strength based on your security requirements
- Add additional content filters (hate, violence, misconduct) as needed
- Integrate with security information and event management (SIEM) systems
- Monitor CloudWatch logs for security events and blocked attempts