# Phase 2: Configurable CloudTrail Analysis

This notebook allows you to analyze CloudTrail data for any date or date range, with automatic validation that Phase 1 data exists.

In [None]:
import sys
from pathlib import Path


# Add src to path
src_path = Path('../src').resolve()
sys.path.insert(0, str(src_path))

import pandas as pd
import json
from datetime import datetime, timedelta

from phase2.duckdb_connector import DuckDBConnector
from phase2.query_templates import QueryTemplates
from phase2.cloudtrail_schema import CloudTrailSchema
from phase2.data_validator import Phase1DataValidator
from common.logging_config import setup_logging

## Configuration - Set Your Date Range Here

In [2]:
# ===== CONFIGURATION SECTION =====
# Modify these parameters for your analysis

# Date configuration (YYYY-MM-DD format)
START_DATE = "2025-07-25"  # Required: Start date for analysis
END_DATE = "2025-07-31"
# END_DATE = None            # Optional: End date for range analysis (None for single day)

# Alternative: Uncomment for date range analysis
# START_DATE = "2025-08-06"
# END_DATE = "2025-08-07"

# Paths
BASE_DATA_PATH = "../data"
DB_PATH = f"../data/analysis_{START_DATE.replace('-', '')}.duckdb"

# Logging
LOG_LEVEL = "INFO"

print(f"📅 Analysis Configuration:")
print(f"  Start Date: {START_DATE}")
print(f"  End Date: {END_DATE if END_DATE else 'Same as start date'}")
print(f"  Data Path: {BASE_DATA_PATH}")
print(f"  Database: {DB_PATH}")

📅 Analysis Configuration:
  Start Date: 2025-07-25
  End Date: 2025-07-31
  Data Path: ../data
  Database: ../data/analysis_20250725.duckdb


## Phase 1 Data Validation

In [3]:
# Setup logging
logger = setup_logging(log_level=LOG_LEVEL)

# Initialize data validator
validator = Phase1DataValidator(BASE_DATA_PATH)

print("🔍 Checking Phase 1 data availability...")

# Check what dates are available
available_ranges = validator.get_available_date_ranges()
print(f"\n📊 Available data dates: {len(available_ranges)} days")
if available_ranges:
    print(f"  Range: {min(available_ranges)} to {max(available_ranges)}")
    print(f"  Dates: {', '.join(available_ranges[:10])}{'...' if len(available_ranges) > 10 else ''}")
else:
    print("  ❌ No processed data found! Please run Phase 1 first.")

# Validate requested date range
all_available, available_dates, missing_dates = validator.validate_date_range(START_DATE, END_DATE)

print(f"\n✅ Validation Results:")
print(f"  Requested dates available: {all_available}")
print(f"  Available dates: {available_dates}")
if missing_dates:
    print(f"  ❌ Missing dates: {missing_dates}")
    print(f"  ⚠️  Please run Phase 1 for missing dates before proceeding.")

# Count events for the date range
event_count = validator.count_events_for_date_range(START_DATE, END_DATE)
print(f"  📈 Estimated files to process: {event_count}")

if not all_available:
    print("\n⚠️  WARNING: Some requested dates are missing. Analysis will only include available dates.")
    proceed = input("Do you want to proceed with available data only? (y/n): ")
    if proceed.lower() != 'y':
        print("Analysis cancelled. Please run Phase 1 for missing dates.")
        raise SystemExit("Analysis cancelled by user")

🔍 Checking Phase 1 data availability...

📊 Available data dates: 14 days
  Range: 07-30-.ipynb_checkpoints to 2025-08-07
  Dates: 07-30-.ipynb_checkpoints, 2025-07-25, 2025-07-26, 2025-07-27, 2025-07-28, 2025-07-29, 2025-07-30, 2025-07-31, 2025-08-01, 2025-08-02...
[2025-08-08 14:43:07] [INFO] [cloudtrail_analyzer.data_validator] [validate_date_range] - Date validation: 7 available, 0 missing

✅ Validation Results:
  Requested dates available: True
  Available dates: ['2025-07-25', '2025-07-26', '2025-07-27', '2025-07-28', '2025-07-29', '2025-07-30', '2025-07-31']
  📈 Estimated files to process: 2041


## Initialize DuckDB and Load Data

In [4]:
if not available_dates:
    raise SystemExit("No data available for analysis. Please run Phase 1 first.")

print("🔗 Initializing DuckDB connection...")
db = DuckDBConnector(DB_PATH)

# Get data path for the validated date range
data_path = validator.get_data_path_for_dates(START_DATE, END_DATE)
print(f"📂 Using data path: {data_path}")

# Create CloudTrail view with date filtering
print("📊 Creating CloudTrail view...")
success = db.create_cloudtrail_view(data_path)

if not success:
    raise SystemExit("Failed to create CloudTrail view. Check data path and format.")

print("✅ CloudTrail view created successfully")

# Test data loading and apply date filter
date_filter = f"eventTime >= '{START_DATE}'"
if END_DATE:
    date_filter += f" AND eventTime <= '{END_DATE} 23:59:59'"

test_query = f"SELECT COUNT(*) as total_events FROM cloudtrail WHERE {date_filter}"
result = db.execute_query(test_query)
total_events = result.iloc[0]['total_events']

print(f"\n📈 Data loaded successfully:")
print(f"  Total events in date range: {total_events:,}")

if total_events == 0:
    print("⚠️  No events found in the specified date range. Check your date configuration.")

🔗 Initializing DuckDB connection...
[2025-08-08 14:43:11] [INFO] [cloudtrail_analyzer.duckdb_connector] [connect] - Connected to DuckDB at ../data/analysis_20250725.duckdb
📂 Using data path: /home/ec2-user/prj_ws/pyprj/ct-ddb-json/data/processed
📊 Creating CloudTrail view...
[2025-08-08 14:43:13] [INFO] [cloudtrail_analyzer.duckdb_connector] [create_cloudtrail_view] - Created CloudTrail view 'cloudtrail' from /home/ec2-user/prj_ws/pyprj/ct-ddb-json/data/processed
✅ CloudTrail view created successfully
[2025-08-08 14:43:17] [INFO] [cloudtrail_analyzer.duckdb_connector] [execute_query] - Query returned 1 rows

📈 Data loaded successfully:
  Total events in date range: 93,923


## Data Overview for Selected Date Range

In [5]:
if total_events > 0:
    # Data overview with date filtering
    overview_query = f"""
    SELECT 
        COUNT(*) as total_events,
        COUNT(DISTINCT eventSource) as unique_services,
        COUNT(DISTINCT eventName) as unique_events,
        COUNT(DISTINCT sourceIPAddress) as unique_ips,
        MIN(eventTime) as earliest_event,
        MAX(eventTime) as latest_event
    FROM cloudtrail 
    WHERE {date_filter}
    """
    
    overview = db.execute_query(overview_query)
    
    print("="*60)
    print(f"📊 CLOUDTRAIL ANALYSIS: {START_DATE}" + (f" to {END_DATE}" if END_DATE else ""))
    print("="*60)
    
    for col in overview.columns:
        value = overview.iloc[0][col]
        print(f"  {col.replace('_', ' ').title()}: {value}")
    
    # Top services for the date range
    services_query = f"""
    SELECT 
        eventSource,
        COUNT(*) as event_count,
        COUNT(DISTINCT eventName) as unique_events
    FROM cloudtrail 
    WHERE {date_filter}
    GROUP BY eventSource
    ORDER BY event_count DESC
    LIMIT 10
    """
    
    services = db.execute_query(services_query)
    print(f"\n🏆 Top AWS Services ({START_DATE}" + (f" to {END_DATE}" if END_DATE else "") + "):")
    for _, row in services.iterrows():
        print(f"  {row['eventSource']}: {row['event_count']:,} events")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[2025-08-08 14:43:27] [INFO] [cloudtrail_analyzer.duckdb_connector] [execute_query] - Query returned 1 rows
📊 CLOUDTRAIL ANALYSIS: 2025-07-25 to 2025-07-31
  Total Events: 93923
  Unique Services: 132
  Unique Events: 583
  Unique Ips: 55
  Earliest Event: 2025-07-25 00:01:03
  Latest Event: 2025-07-31 23:58:14


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[2025-08-08 14:43:31] [INFO] [cloudtrail_analyzer.duckdb_connector] [execute_query] - Query returned 10 rows

🏆 Top AWS Services (2025-07-25 to 2025-07-31):
  tagging.amazonaws.com: 27,800 events
  sts.amazonaws.com: 21,488 events
  s3.amazonaws.com: 15,141 events
  config.amazonaws.com: 5,741 events
  kms.amazonaws.com: 4,584 events
  ssm.amazonaws.com: 4,323 events
  ec2.amazonaws.com: 3,015 events
  logs.amazonaws.com: 2,237 events
  events.amazonaws.com: 883 events
  notifications.amazonaws.com: 794 events


## Security Analysis for Date Range

In [6]:
if total_events > 0:
    print("\n" + "="*50)
    print("🔒 SECURITY ANALYSIS")
    print("="*50)
    
    # Initialize query templates
    templates = QueryTemplates()
    
    # Modify security queries to include date filter
    unusual_template = templates.get_template('unusual_api_calls')
    if unusual_template:
        # Add date filter to the query
        modified_query = unusual_template.sql.replace(
            "FROM cloudtrail", 
            f"FROM cloudtrail WHERE {date_filter}"
        )
        
        print(f"\n🔍 {unusual_template.name} ({START_DATE}" + (f" to {END_DATE}" if END_DATE else "") + "):")
        unusual_results = db.execute_query(modified_query)
        
        if not unusual_results.empty:
            print(f"  Found {len(unusual_results)} unusual API patterns")
            for _, row in unusual_results.head(5).iterrows():
                print(f"  - {row['eventName']}: {row['call_count']} calls from {row['unique_ips']} IPs")
        else:
            print("  ✅ No unusual API calls detected")
    
    # Check for privilege escalation in date range
    priv_template = templates.get_template('privilege_escalation')
    if priv_template:
        # Add date filter to existing WHERE clause
        modified_query = priv_template.sql.replace(
            "WHERE eventName IN", 
            f"WHERE {date_filter} AND eventName IN"
        )
        
        print(f"\n🔐 {priv_template.name}:")
        priv_results = db.execute_query(modified_query)
        
        if not priv_results.empty:
            print(f"  ⚠️  Found {len(priv_results)} privilege escalation events")
            for _, row in priv_results.head(3).iterrows():
                print(f"  - {row['eventTime']}: {row['eventName']} by {row['userName']}")
        else:
            print("  ✅ No privilege escalation events detected")


🔒 SECURITY ANALYSIS

🔍 Unusual API Calls (2025-07-29 to 2025-07-31):
[2025-08-08 11:50:20] [INFO] [cloudtrail_analyzer.duckdb_connector] [execute_query] - Query returned 227 rows
  Found 227 unusual API patterns
  - ListScripts: 1 calls from 1 IPs
  - RunInstances: 1 calls from 1 IPs
  - SharedSnapshotVolumeCreated: 1 calls from 1 IPs
  - GetBuckets: 1 calls from 1 IPs
  - DescribeImageBuilders: 1 calls from 1 IPs

🔐 Privilege Escalation Events:
[2025-08-08 11:50:23] [INFO] [cloudtrail_analyzer.duckdb_connector] [execute_query] - Query returned 9350 rows
  ⚠️  Found 9350 privilege escalation events
  - 2025-07-31 23:58:09: AssumeRole by None
  - 2025-07-31 23:52:26: AssumeRole by None
  - 2025-07-31 23:49:08: AssumeRole by None


## Custom Analysis for Date Range

In [7]:
if total_events > 0:
    print("\n" + "="*50)
    print("🔧 CUSTOM ANALYSIS")
    print("="*50)
    
    # Timeline analysis for the date range
    timeline_query = f"""
    SELECT 
        DATE(CAST(eventTime AS TIMESTAMP)) as event_date,
        HOUR(CAST(eventTime AS TIMESTAMP)) as event_hour,
        COUNT(*) as event_count,
        COUNT(DISTINCT eventSource) as unique_services
    FROM cloudtrail 
    WHERE {date_filter}
    GROUP BY DATE(CAST(eventTime AS TIMESTAMP)), HOUR(CAST(eventTime AS TIMESTAMP))
    ORDER BY event_date, event_hour
    """
    
    print(f"\n📈 Hourly Activity Timeline:")
    timeline = db.execute_query(timeline_query)
    
    if not timeline.empty:
        for _, row in timeline.iterrows():
            print(f"  {row['event_date']} {row['event_hour']:02d}:00 - {row['event_count']:,} events ({row['unique_services']} services)")
    
    # Top users/roles in the date range
    users_query = f"""
    SELECT 
        json_extract_string(userIdentity, '$.type') as user_type,
        json_extract_string(userIdentity, '$.userName') as user_name,
        COUNT(*) as activity_count,
        COUNT(DISTINCT eventName) as unique_actions
    FROM cloudtrail 
    WHERE {date_filter}
      AND json_extract_string(userIdentity, '$.type') IS NOT NULL
    GROUP BY user_type, user_name
    ORDER BY activity_count DESC
    LIMIT 10
    """
    
    print(f"\n👥 Top Active Users/Roles:")
    users = db.execute_query(users_query)
    
    if not users.empty:
        for _, row in users.iterrows():
            user_name = row['user_name'] if row['user_name'] else 'N/A'
            print(f"  {row['user_type']}: {user_name} - {row['activity_count']:,} actions")
    else:
        print("  No user activity data found")


🔧 CUSTOM ANALYSIS

📈 Hourly Activity Timeline:
[2025-08-08 11:50:26] [INFO] [cloudtrail_analyzer.duckdb_connector] [execute_query] - Query returned 73 rows
  2025-07-29 00:00:00 00:00 - 157 events (26 services)
  2025-07-29 00:00:00 01:00 - 596 events (32 services)
  2025-07-29 00:00:00 02:00 - 151 events (24 services)
  2025-07-29 00:00:00 03:00 - 745 events (20 services)
  2025-07-29 00:00:00 04:00 - 1,115 events (34 services)
  2025-07-29 00:00:00 05:00 - 125 events (27 services)
  2025-07-29 00:00:00 06:00 - 1,442 events (35 services)
  2025-07-29 00:00:00 07:00 - 268 events (30 services)
  2025-07-29 00:00:00 08:00 - 127 events (24 services)
  2025-07-29 00:00:00 09:00 - 217 events (24 services)
  2025-07-29 00:00:00 10:00 - 1,029 events (18 services)
  2025-07-29 00:00:00 11:00 - 256 events (31 services)
  2025-07-29 00:00:00 12:00 - 181 events (30 services)
  2025-07-29 00:00:00 13:00 - 126 events (20 services)
  2025-07-29 00:00:00 14:00 - 131 events (28 services)
  2025-07-29

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[2025-08-08 11:50:29] [INFO] [cloudtrail_analyzer.duckdb_connector] [execute_query] - Query returned 2 rows
  AssumedRole: N/A - 30,047 actions
  AWSService: N/A - 11,382 actions


## Export Results (Optional)

In [8]:
# Optional: Export analysis results
EXPORT_RESULTS = False  # Set to True to export

if EXPORT_RESULTS and total_events > 0:
    export_date = START_DATE.replace('-', '')
    export_path = Path(f"../data/reports/analysis_{export_date}.csv")
    export_path.parent.mkdir(exist_ok=True)
    
    # Export summary data
    summary_query = f"""
    SELECT 
        eventTime,
        eventSource,
        eventName,
        sourceIPAddress,
        json_extract_string(userIdentity, '$.type') as userType
    FROM cloudtrail 
    WHERE {date_filter}
    ORDER BY eventTime DESC
    LIMIT 1000
    """
    
    summary_data = db.execute_query(summary_query)
    summary_data.to_csv(export_path, index=False)
    print(f"\n💾 Results exported to: {export_path}")
else:
    print("\n💾 Export disabled. Set EXPORT_RESULTS = True to export analysis results.")


💾 Export disabled. Set EXPORT_RESULTS = True to export analysis results.


## Custom Queries - Access Key and Role Analysis

In [9]:
# Test: Check if access key exists in database without date filter
ACCESS_KEY_ID = "ASIAY5XLNU5IYJANPK4H"

test_query = f"""
SELECT 
    COUNT(*) as total_events,
    MIN(eventTime) as earliest_event,
    MAX(eventTime) as latest_event
FROM cloudtrail 
WHERE json_extract_string(userIdentity, '$.accessKeyId') = '{ACCESS_KEY_ID}'
"""

print(f"🔍 Testing if access key {ACCESS_KEY_ID} exists in database (no date filter):")
test_results = db.execute_query(test_query)

if test_results.iloc[0]['total_events'] > 0:
    print(f"  ✅ Found {test_results.iloc[0]['total_events']} events for this access key")
    print(f"  📅 Date range: {test_results.iloc[0]['earliest_event']} to {test_results.iloc[0]['latest_event']}")
    print(f"  ⚠️  Note: These events may be outside your configured date range ({START_DATE}" + (f" to {END_DATE}" if END_DATE else "") + ")")
else:
    print(f"  ❌ Access key not found in database")

🔍 Testing if access key ASIAY5XLNU5IYJANPK4H exists in database (no date filter):
[2025-08-08 11:50:39] [INFO] [cloudtrail_analyzer.duckdb_connector] [execute_query] - Query returned 1 rows
  ✅ Found 2 events for this access key
  📅 Date range: 2025-07-29 23:58:13 to 2025-07-29 23:58:13
  ⚠️  Note: These events may be outside your configured date range (2025-07-29 to 2025-07-31)


In [10]:
if total_events > 0:
    print("\n" + "="*50)
    print("🔑 ACCESS KEY ACTIVITY ANALYSIS")
    print("="*50)
    
    # Query for specific Access Key ID activity
    ACCESS_KEY_ID = "ASIAY5XLNU5IYJANPK4H"  # Change this to your target access key
    
    access_key_query = f"""
    SELECT 
        eventTime,
        eventName,
        eventSource,
        sourceIPAddress,
        userAgent,
        json_extract_string(userIdentity, '$.accessKeyId') as accessKeyId,
        json_extract_string(userIdentity, '$.type') as userType,
        json_extract_string(userIdentity, '$.userName') as userName,
        awsRegion,
        requestParameters
    FROM cloudtrail 
    WHERE {date_filter}
      AND json_extract_string(userIdentity, '$.accessKeyId') = '{ACCESS_KEY_ID}'
    ORDER BY eventTime DESC
    """
    
    print(f"🔍 Searching for activity by Access Key: {ACCESS_KEY_ID}")
    access_key_results = db.execute_query(access_key_query)
    
    if not access_key_results.empty:
        print(f"  📊 Found {len(access_key_results)} events for this access key")
        print(f"  📅 Time range: {access_key_results['eventTime'].min()} to {access_key_results['eventTime'].max()}")
        print(f"  🌐 Unique services: {access_key_results['eventSource'].nunique()}")
        print(f"  🎯 Unique actions: {access_key_results['eventName'].nunique()}")
        print(f"  🌍 Unique IPs: {access_key_results['sourceIPAddress'].nunique()}")
        
        print(f"\n🏆 Top Activities:")
        top_activities = access_key_results['eventName'].value_counts().head(10)
        for activity, count in top_activities.items():
            print(f"  - {activity}: {count} times")
        
        print(f"\n🌐 Source IPs:")
        unique_ips = access_key_results['sourceIPAddress'].value_counts()
        for ip, count in unique_ips.items():
            print(f"  - {ip}: {count} events")
    else:
        print(f"  ❌ No activity found for access key: {ACCESS_KEY_ID}")


🔑 ACCESS KEY ACTIVITY ANALYSIS
🔍 Searching for activity by Access Key: ASIAY5XLNU5IYJANPK4H
[2025-08-08 11:50:44] [INFO] [cloudtrail_analyzer.duckdb_connector] [execute_query] - Query returned 2 rows
  📊 Found 2 events for this access key
  📅 Time range: 2025-07-29 23:58:13 to 2025-07-29 23:58:13
  🌐 Unique services: 1
  🎯 Unique actions: 1
  🌍 Unique IPs: 1

🏆 Top Activities:
  - DescribeStacks: 2 times

🌐 Source IPs:
  - ssm-quicksetup.amazonaws.com: 2 events


In [None]:
if total_events > 0:
    print("\n" + "="*50)
    print("👤 ROLE ACTIVITY ANALYSIS")
    print("="*50)
    
    # Query for specific Role activity
    ROLE_NAME = "AWSServiceRoleForSupport"  # Change this to your target role
    
    role_query = f"""
    SELECT 
        eventTime,
        eventName,
        eventSource,
        sourceIPAddress,
        userAgent,
        json_extract_string(userIdentity, '$.type') as userType,
        json_extract_string(userIdentity, '$.userName') as userName,
        json_extract_string(userIdentity, '$.sessionContext.sessionIssuer.userName') as roleName,
        json_extract_string(userIdentity, '$.arn') as userArn,
        awsRegion,
        requestParameters
    FROM cloudtrail 
    WHERE {date_filter}
      AND (json_extract_string(userIdentity, '$.userName') LIKE '%{ROLE_NAME}%'
           OR json_extract_string(userIdentity, '$.sessionContext.sessionIssuer.userName') LIKE '%{ROLE_NAME}%'
           OR json_extract_string(userIdentity, '$.arn') LIKE '%{ROLE_NAME}%')
    ORDER BY eventTime DESC
    """
    
    print(f"🔍 Searching for activity by Role: {ROLE_NAME}")
    role_results = db.execute_query(role_query)
    
    if not role_results.empty:
        print(f"  📊 Found {len(role_results)} events for this role")
        print(f"  📅 Time range: {role_results['eventTime'].min()} to {role_results['eventTime'].max()}")
        print(f"  🌐 Unique services: {role_results['eventSource'].nunique()}")
        print(f"  🎯 Unique actions: {role_results['eventName'].nunique()}")
        print(f"  🌍 Unique IPs: {role_results['sourceIPAddress'].nunique()}")
        
        print(f"\n🏆 Top Activities:")
        top_activities = role_results['eventName'].value_counts().head(10)
        for activity, count in top_activities.items():
            print(f"  - {activity}: {count} times")
        
        print(f"\n🌐 Source IPs:")
        unique_ips = role_results['sourceIPAddress'].value_counts()
        for ip, count in unique_ips.items():
            print(f"  - {ip}: {count} events")
        
        print(f"\n👤 User Types:")
        user_types = role_results['userType'].value_counts()
        for user_type, count in user_types.items():
            print(f"  - {user_type}: {count} events")
    else:
        print(f"  ❌ No activity found for role: {ROLE_NAME}")

## Summary and Cleanup

In [None]:
print("\n" + "="*60)
print("📋 ANALYSIS SUMMARY")
print("="*60)

print(f"\n📅 Analysis Period: {START_DATE}" + (f" to {END_DATE}" if END_DATE else ""))
print(f"📊 Events Analyzed: {total_events:,}" if total_events > 0 else "📊 No events found")
print(f"📂 Data Source: {data_path}")
print(f"💾 Database: {DB_PATH}")

if available_dates:
    print(f"✅ Available Dates: {', '.join(available_dates)}")
if missing_dates:
    print(f"❌ Missing Dates: {', '.join(missing_dates)}")

print("\n🎯 Key Capabilities Demonstrated:")
print("  ✅ Configurable date range analysis")
print("  ✅ Phase 1 data validation")
print("  ✅ Date-filtered security analysis")
print("  ✅ Timeline and user activity analysis")
print("  ✅ Flexible query framework")

# Close database connection
db.close()
print("\n🔌 Database connection closed.")
print("\n" + "="*60)
print("✅ Configurable Analysis Complete!")
print("="*60)