# EBS Snapshot Tag Compliance & Cleanup Runbook
# WORK-IN-PROGRESS

A hands-on guide to discovering untagged EBS snapshots, cleaning them up, and enforcing tag policies with SCPs.

---

> ‚ö†Ô∏è **IMPORTANT: READ BEFORE RUNNING IN PRODUCTION**
>
> This runbook is both **guided learning** and **solution implementation** in one document.
>
> **The Risk:** When you enable the SCP in Step 8, any workflow, automation, or service that creates EBS snapshots **without the required tags will break**. This includes:
> - Backup solutions (AWS Backup, third-party tools)
> - CI/CD pipelines that snapshot volumes
> - Lambda functions or scripts that create snapshots
> - AWS services like Data Lifecycle Manager (DLM)
> - Manual snapshots from the console (if tags aren't added)
>
> **Before running in a non-dev environment:**
> 1. Read and understand the entire runbook first
> 2. Identify ALL workflows that create snapshots in your environment
> 3. Update those workflows to include required tags
> 4. Test in a sandbox/dev account before production
> 5. Have a rollback plan (SCP detachment instructions included in Cleanup section)
>
> **Recommendation:** Run through this lab in an isolated AWS account first. Understand what each step does before applying to production workloads.

---

# Overview

**Problem:**

EBS snapshots are piling up without proper tags, making cost allocation and resource management a nightmare.

**Solution:**

1. Make sure your AWS accounts are in an AWS Organization
2. Use AWS Config to find non-compliant snapshots (missing tags OR invalid tag values)
3. Tag them
4. Enforce tagging with an SCP so it never happens again

**Required Tags (customize these as needed):**

* Environment - Accepted values: dev, Dev, development, Development, staging, Staging, prod, Prod, production, Production
* CostCenter (any value accepted)

> **Note: This runbook is expected to be ran sequentially skipping a step will likely break it**

## Instructions

### 1. Setup: AWS Credentials

Before running anything, you need to authenticate with AWS. Besides what is seen directly below, configuring your AWS credentials is out of scope for this runbook.

### 1.1 Environment Variables


In [None]:
export AWS_ACCESS_KEY_ID="AKIAIOSFODNN7EXAMPLE"
export AWS_SECRET_ACCESS_KEY="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
export AWS_DEFAULT_REGION="us-east-1"

# Optional
export AWS_SESSION_TOKEN="your-session-token"

### 1.1.1 AWS CLI Profile

In [None]:
# Configure default profile
aws configure

### 1.2 Required IAM Permissisons

Your user must be able to complete the following actions in your aws account to be able to complete this runbook

```
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ConfigPermissions",
            "Effect": "Allow",
            "Action": [
                "config:DescribeConfigurationRecorderStatus",
                "config:DescribeConfigurationRecorders",
                "config:PutConfigurationRecorder",
                "config:PutDeliveryChannel",
                "config:StartConfigurationRecorder",
                "config:PutConfigRule",
                "config:StartConfigRulesEvaluation",
                "config:GetComplianceDetailsByConfigRule",
                "config:DeleteConfigRule"
            ],
            "Resource": "*"
        },
        {
            "Sid": "EC2Permissions",
            "Effect": "Allow",
            "Action": ["ec2:DescribeSnapshots", "ec2:CreateTags"],
            "Resource": "*"
        },
        {
            "Sid": "IAMPermissions",
            "Effect": "Allow",
            "Action": ["iam:GetRole", "iam:CreateRole", "iam:AttachRolePolicy", "iam:PassRole"],
            "Resource": ["arn:aws:iam::*:role/AWSConfigRole"]
        },
        {
            "Sid": "S3Permissions",
            "Effect": "Allow",
            "Action": ["s3:CreateBucket", "s3:PutBucketPolicy", "s3:HeadBucket"],
            "Resource": ["arn:aws:s3:::aws-config-bucket-*"]
        },
        {
            "Sid": "OrganizationsPermissions",
            "Effect": "Allow",
            "Action": [
                "organizations:DescribeOrganization",
                "organizations:ListRoots",
                "organizations:ListOrganizationalUnitsForParent",
                "organizations:ListPolicies",
                "organizations:CreatePolicy",
                "organizations:AttachPolicy",
                "organizations:ListTargetsForPolicy"
            ],
            "Resource": "*"
        }
    ]
}
```

### 2. Configure your environment variables and dependicies

This runbook utilizes Python3. If you don't feel confortable with Python, use Kiro to convert these steps into AWSCLI steps.

### 2.1 Install Python Dependencies




In [None]:
!pip install boto3 pandas


### 2.2 Quick test - should return your account info


In [None]:
import boto3; boto3.client('sts').get_caller_identity()


### 2.3 Import Dependancies and Test Authenication

In [None]:
import boto3
import pandas as pd
import json
import time
from botocore.exceptions import ClientError

# Initialize clients
config_client = boto3.client('config')
ec2_client = boto3.client('ec2')
organizations_client = boto3.client('organizations')
iam_client = boto3.client('iam')
s3_client = boto3.client('s3')
sts_client = boto3.client('sts')

# Get account info
ACCOUNT_ID = sts_client.get_caller_identity()['Account']
REGION = boto3.session.Session().region_name

# This will verify credentials are working and print that to the console
identity = sts_client.get_caller_identity()
print(f"‚úÖ Authenticated as: {identity['Arn']}")
print(f"   Account: {ACCOUNT_ID}")
print(f"   Region: {REGION}")



### 2.3 Configure Tags

In [None]:
REQUIRED_TAGS = ['Environment', 'CostCenter']
VALID_ENVIRONMENT_VALUES = [
    'dev', 'Dev', 'development', 'Development',
    'staging', 'Staging',
    'prod', 'Prod', 'production', 'Production'
]
DEFAULT_TAG_VALUES = {
    'Environment': 'dev',  # Must be one of the $VALID_ENVIRONMENT_VALUES tags
    'CostCenter': 'needs-review'
}
CONFIG_RULE_NAME = 'ebs-snapshot-required-tags'

### 2.4 Lab Setup (Optional): Create Test Snapshots

>üß™ For testing/demo purposes only
This section creates dummy EBS volumes and snapshots so you can run through the entire workflow without needing existing infrastructure.

Create Test EBS Volumes and Snapshots by uncommenting the function at the bottom

In [None]:
def create_lab_environment(num_compliant=3, num_non_compliant=5):
    """Create test volumes and snapshots for lab purposes"""

    created_volumes = []
    created_snapshots = []

    print("üß™ Creating lab environment...\n")

    # Get an available AZ in this region
    try:
        az_response = ec2_client.describe_availability_zones(
            Filters=[{'Name': 'state', 'Values': ['available']}]
        )
        availability_zone = az_response['AvailabilityZones'][0]['ZoneName']
        print(f"üìç Using Availability Zone: {availability_zone}")
    except ClientError as e:
        print(f"‚ùå Error getting AZs: {e}")
        return [], []

    # Create a small EBS volume (we'll delete it after snapshotting)
    # Note: Volume doesn't need to be attached to any EC2 instance!

    try:
        volume = ec2_client.create_volume(
            AvailabilityZone=availability_zone,
            Size=1,  # 1 GB - minimum size, cheapest
            VolumeType='gp3',
            TagSpecifications=[{
                'ResourceType': 'volume',
                'Tags': [{'Key': 'Purpose', 'Value': 'lab-testing'}]
            }]
        )
        volume_id = volume['VolumeId']
        created_volumes.append(volume_id)
        print(f"‚úÖ Created test volume: {volume_id}")

        # Wait for volume to be available
        print("   ‚è≥ Waiting for volume to be available...")
        waiter = ec2_client.get_waiter('volume_available')
        waiter.wait(VolumeIds=[volume_id])

    except ClientError as e:
        print(f"‚ùå Error creating volume: {e}")
        return [], []

    # Create NON-COMPLIANT snapshots (no required tags)
    print(f"\nüì∏ Creating {num_non_compliant} NON-COMPLIANT snapshots (missing tags)...")

    non_compliant_descriptions = [
        "backup-daily-server",
        "prod-database-backup",
        "dev-test-snapshot",
        "staging-app-server",
        "random-snapshot-123"
    ]

    for i in range(num_non_compliant):
        try:
            desc = non_compliant_descriptions[i % len(non_compliant_descriptions)]
            snapshot = ec2_client.create_snapshot(
                VolumeId=volume_id,
                Description=f"LAB-{desc}-{i}",
                TagSpecifications=[{
                    'ResourceType': 'snapshot',
                    'Tags': [
                        {'Key': 'Purpose', 'Value': 'lab-testing'},
                        {'Key': 'CreatedBy', 'Value': 'compliance-lab'}
                        # Intentionally missing Environment and CostCenter!
                    ]
                }]
            )
            created_snapshots.append(snapshot['SnapshotId'])
            print(f"   ‚ùå {snapshot['SnapshotId']} - NO required tags (non-compliant)")

        except ClientError as e:
            print(f"   Error creating snapshot: {e}")

    # Create COMPLIANT snapshots (with required tags)
    print(f"\nüì∏ Creating {num_compliant} COMPLIANT snapshots (with tags)...")

    compliant_configs = [
        {'Environment': 'prod', 'CostCenter': '12345'},
        {'Environment': 'Dev', 'CostCenter': '67890'},
        {'Environment': 'staging', 'CostCenter': '11111'},
    ]

    for i in range(num_compliant):
        try:
            config = compliant_configs[i % len(compliant_configs)]
            snapshot = ec2_client.create_snapshot(
                VolumeId=volume_id,
                Description=f"LAB-compliant-snapshot-{i}",
                TagSpecifications=[{
                    'ResourceType': 'snapshot',
                    'Tags': [
                        {'Key': 'Purpose', 'Value': 'lab-testing'},
                        {'Key': 'CreatedBy', 'Value': 'compliance-lab'},
                        {'Key': 'Environment', 'Value': config['Environment']},
                        {'Key': 'CostCenter', 'Value': config['CostCenter']}
                    ]
                }]
            )
            created_snapshots.append(snapshot['SnapshotId'])
            print(f"   ‚úÖ {snapshot['SnapshotId']} - Environment={config['Environment']}, CostCenter={config['CostCenter']}")

        except ClientError as e:
            print(f"   Error creating snapshot: {e}")

    # Delete the volume (snapshots persist independently)
    print(f"\nüóëÔ∏è  Cleaning up test volume...")
    try:
        ec2_client.delete_volume(VolumeId=volume_id)
        print(f"   ‚úÖ Deleted volume {volume_id}")
        created_volumes.remove(volume_id)
    except ClientError as e:
        print(f"   ‚ö†Ô∏è  Could not delete volume: {e}")

    print(f"\nüìä Lab Environment Summary:")
    print(f"   Total snapshots created: {len(created_snapshots)}")
    print(f"   Non-compliant: {num_non_compliant}")
    print(f"   Compliant: {num_compliant}")
    print(f"\n‚úÖ Lab environment ready! Continue to Step 1.")

    return created_volumes, created_snapshots

# UNCOMMENT TO CREATE LAB ENVIRONMENT
# lab_volumes, lab_snapshots = create_lab_environment(num_compliant=3, num_non_compliant=5)

### 3. Enable AWS Config

AWS Config needs to be running before we can use Config rules. This section will enable it.

### 3.1 Create IAM Role for AWS Config

AWS Config needs an IAM role to read your resources and write to S3.

In [None]:
CONFIG_ROLE_NAME = 'AWSConfigRole'

config_assume_role_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "config.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

def create_config_role():
    """Create IAM role for AWS Config if it doesn't exist"""

    try:
        # Check if role exists
        iam_client.get_role(RoleName=CONFIG_ROLE_NAME)
        print(f"‚úÖ IAM role '{CONFIG_ROLE_NAME}' already exists")
        return f"arn:aws:iam::{ACCOUNT_ID}:role/{CONFIG_ROLE_NAME}"

    except iam_client.exceptions.NoSuchEntityException:
        print(f"Creating IAM role '{CONFIG_ROLE_NAME}'...")

        # Create the role
        response = iam_client.create_role(
            RoleName=CONFIG_ROLE_NAME,
            AssumeRolePolicyDocument=json.dumps(config_assume_role_policy),
            Description='Role for AWS Config to access resources'
        )
        role_arn = response['Role']['Arn']

        # Attach the AWS managed policy for Config
        iam_client.attach_role_policy(
            RoleName=CONFIG_ROLE_NAME,
            PolicyArn='arn:aws:iam::aws:policy/service-role/AWS_ConfigRole'
        )

        print(f"‚úÖ IAM role created: {role_arn}")
        print("   ‚è≥ Waiting 10 seconds for role to propagate...")
        time.sleep(10)

        return role_arn

    except ClientError as e:
        print(f"‚ùå Error with IAM role: {e}")
        return None

# Create role if needed
config_role_arn = create_config_role()

### 3.2 Create S3 Bucket for Config
AWS Config needs an S3 bucket to store configuration snapshots and history.

In [None]:
CONFIG_BUCKET_NAME = f"aws-config-bucket-{ACCOUNT_ID}-{REGION}"

def create_config_bucket():
    """Create S3 bucket for AWS Config if it doesn't exist"""

    try:
        # Check if bucket exists
        s3_client.head_bucket(Bucket=CONFIG_BUCKET_NAME)
        print(f"‚úÖ S3 bucket '{CONFIG_BUCKET_NAME}' already exists")
        return CONFIG_BUCKET_NAME

    except ClientError as e:
        if e.response['Error']['Code'] == '404':
            print(f"Creating S3 bucket '{CONFIG_BUCKET_NAME}'...")

            # Create bucket (handle us-east-1 differently)
            if REGION == 'us-east-1':
                s3_client.create_bucket(Bucket=CONFIG_BUCKET_NAME)
            else:
                s3_client.create_bucket(
                    Bucket=CONFIG_BUCKET_NAME,
                    CreateBucketConfiguration={'LocationConstraint': REGION}
                )

            # Add bucket policy for Config
            bucket_policy = {
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Sid": "AWSConfigBucketPermissionsCheck",
                        "Effect": "Allow",
                        "Principal": {"Service": "config.amazonaws.com"},
                        "Action": "s3:GetBucketAcl",
                        "Resource": f"arn:aws:s3:::{CONFIG_BUCKET_NAME}"
                    },
                    {
                        "Sid": "AWSConfigBucketDelivery",
                        "Effect": "Allow",
                        "Principal": {"Service": "config.amazonaws.com"},
                        "Action": "s3:PutObject",
                        "Resource": f"arn:aws:s3:::{CONFIG_BUCKET_NAME}/AWSLogs/{ACCOUNT_ID}/Config/*",
                        "Condition": {
                            "StringEquals": {
                                "s3:x-amz-acl": "bucket-owner-full-control"
                            }
                        }
                    }
                ]
            }

            s3_client.put_bucket_policy(
                Bucket=CONFIG_BUCKET_NAME,
                Policy=json.dumps(bucket_policy)
            )

            print(f"‚úÖ S3 bucket created: {CONFIG_BUCKET_NAME}")
            return CONFIG_BUCKET_NAME
        else:
            print(f"‚ùå Error checking bucket: {e}")
            return None

# Create bucket if needed
config_bucket = create_config_bucket()

### 3.3 Create Config Recorder and Delivery Channel

Now we set up the actual Config recorder and delivery channel.

**Note: If you want to tag different aws resources types instead of just ec2 snaphots you'll need to change the `resourceTypes` in the `config_client.put_configuration_recorder` function.**

In [None]:
def setup_config_recorder(role_arn):
    """Create or update the AWS Config recorder"""

    try:
        config_client.put_configuration_recorder(
            ConfigurationRecorder={
                'name': 'default',
                'roleARN': role_arn,
                'recordingGroup': {
                    'allSupported': False,
                    'includeGlobalResourceTypes': False,
                    'resourceTypes': [
                        'AWS::EC2::Snapshot'  # Only record snapshots for this use case
                    ]
                }
            }
        )
        print("‚úÖ Config recorder created/updated")
        return True

    except ClientError as e:
        print(f"‚ùå Error creating Config recorder: {e}")
        return False

def setup_delivery_channel(bucket_name):
    """Create or update the delivery channel"""

    try:
        config_client.put_delivery_channel(
            DeliveryChannel={
                'name': 'default',
                's3BucketName': bucket_name,
                'configSnapshotDeliveryProperties': {
                    'deliveryFrequency': 'TwentyFour_Hours'
                }
            }
        )
        print("‚úÖ Delivery channel created/updated")
        return True

    except ClientError as e:
        print(f"‚ùå Error creating delivery channel: {e}")
        return False

def start_config_recorder():
    """Start the Config recorder"""

    try:
        config_client.start_configuration_recorder(
            ConfigurationRecorderName='default'
        )
        print("‚úÖ Config recorder started")
        return True

    except ClientError as e:
        print(f"‚ùå Error starting Config recorder: {e}")
        return False

# Set up Config if not already running
if not config_enabled:
    print("\nüì¶ Setting up AWS Config...\n")

    if config_role_arn and config_bucket:
        setup_config_recorder(config_role_arn)
        setup_delivery_channel(config_bucket)
        start_config_recorder()

        print("\n‚úÖ AWS Config is now enabled!")
        print("   ‚è≥ Wait a few minutes for initial resource discovery...")
    else:
        print("‚ùå Cannot set up Config - missing role or bucket")
else:
    print("‚úÖ AWS Config is already running - skipping setup")

### 3.4 Create the Required Tags Config Rule

This rule will evaluate all EBS snapshots against our required tags.

>**What This Rule Checks:**
>
>1. Tag key `Environment` exists AND value is one of the `allowed values`
>
>2. Tag key `CostCenter` exists (any value)
>
> If either check fails, the snapshot is non-compliant.

In [None]:
def create_required_tags_rule():
    """Create or update the required-tags Config rule for EBS snapshots"""

    # Build the input parameters
    # Format: tag1Key, tag1Value (comma-separated allowed values)
    input_params = {
        'tag1Key': 'Environment',
        'tag1Value': ','.join(VALID_ENVIRONMENT_VALUES),  # Validates the value too!
        'tag2Key': 'CostCenter'
        # No tag2Value = any value is accepted for CostCenter
    }

    try:
        response = config_client.put_config_rule(
            ConfigRule={
                'ConfigRuleName': CONFIG_RULE_NAME,
                'Description': 'Checks EBS snapshots for required tags with valid values',
                'Scope': {
                    'ComplianceResourceTypes': ['AWS::EC2::Snapshot']
                },
                'Source': {
                    'Owner': 'AWS',
                    'SourceIdentifier': 'REQUIRED_TAGS'
                },
                'InputParameters': json.dumps(input_params)
            }
        )
        print(f"‚úÖ Config rule '{CONFIG_RULE_NAME}' created/updated successfully!")
        print(f"\nüìã Rule checks for:")
        print(f"   - Environment tag with value in: {VALID_ENVIRONMENT_VALUES}")
        print(f"   - CostCenter tag (any value)")
        return True

    except ClientError as e:
        print(f"‚ùå Error creating Config rule: {e}")
        return False

# Create the rule
create_required_tags_rule()

### 4. Trigger Rule Evaluation

Force an evaluation so we don't have to wait for the periodic check.


In [None]:
def trigger_evaluation():
    """Manually trigger the Config rule evaluation"""
    try:
        config_client.start_config_rules_evaluation(
            ConfigRuleNames=[CONFIG_RULE_NAME]
        )
        print(f"‚úÖ Evaluation triggered for '{CONFIG_RULE_NAME}'")
        print("   ‚è≥ Wait 1-2 minutes for results...")
        return True

    except ClientError as e:
        print(f"‚ùå Error triggering evaluation: {e}")
        return False

trigger_evaluation()

### 5. Get Non-Compliant Snapshots

Now let's see which snapshots are missing tags.

In [None]:
def get_non_compliant_snapshots():
    """Retrieve all non-compliant EBS snapshots from Config"""

    non_compliant = []
    next_token = None

    while True:
        params = {
            'ConfigRuleName': CONFIG_RULE_NAME,
            'ComplianceTypes': ['NON_COMPLIANT']
        }
        if next_token:
            params['NextToken'] = next_token

        try:
            response = config_client.get_compliance_details_by_config_rule(**params)

            for result in response.get('EvaluationResults', []):
                resource_id = result['EvaluationResultIdentifier']['EvaluationResultQualifier']['ResourceId']
                non_compliant.append(resource_id)

            next_token = response.get('NextToken')
            if not next_token:
                break

        except ClientError as e:
            print(f"‚ùå Error fetching compliance details: {e}")
            break

    print(f"Found {len(non_compliant)} non-compliant snapshots")
    return non_compliant

non_compliant_snapshots = get_non_compliant_snapshots()

### 6. Get Snapshot Details

Let's get more info about these snapshots so we can make smart tagging decisions.

In [None]:
def get_snapshot_details(snapshot_ids):
    """Get detailed info about snapshots including existing tags"""

    if not snapshot_ids:
        print("No snapshots to look up")
        return pd.DataFrame()

    snapshots_data = []

    # EC2 API limits to 200 IDs per call
    batch_size = 200
    for i in range(0, len(snapshot_ids), batch_size):
        batch = snapshot_ids[i:i + batch_size]

        try:
            response = ec2_client.describe_snapshots(SnapshotIds=batch)

            for snap in response['Snapshots']:
                # Get existing tags as dict
                existing_tags = {tag['Key']: tag['Value'] for tag in snap.get('Tags', [])}

                snapshots_data.append({
                    'SnapshotId': snap['SnapshotId'],
                    'VolumeId': snap.get('VolumeId', 'N/A'),
                    'Size_GB': snap['VolumeSize'],
                    'StartTime': snap['StartTime'].strftime('%Y-%m-%d %H:%M'),
                    'Description': snap.get('Description', '')[:50],
                    'ExistingTags': existing_tags,
                    'MissingTags': [t for t in REQUIRED_TAGS if t not in existing_tags]
                })

        except ClientError as e:
            print(f"‚ùå Error describing snapshots: {e}")

    return pd.DataFrame(snapshots_data)

# Get details and display
df_snapshots = get_snapshot_details(non_compliant_snapshots)
print(f"\nüìä Non-Compliant Snapshots Summary:")
print(f"   Total: {len(df_snapshots)}")
if len(df_snapshots) > 0:
    print(f"   Total Size: {df_snapshots['Size_GB'].sum()} GB")
df_snapshots

### 7. Bulk Tag Snapshots

>‚ö†Ô∏è Why Tag First, Enforce Later?
SCPs are powerful‚Äîyou could block all actions on untagged snapshots right now. But that's risky. You might break automation, backups, or workflows you didn't know existed.
The safer approach:

>Tag everything that exists today (this step)
Then enforce tagging on new snapshots only (Step 9)

>The catch: This approach isn't all-encompassing. The SCP only blocks creation of new untagged snapshots. It won't magically fix snapshots that slip through or get their tags removed later. If you don't stay on top of compliance (Step 8), untagged resources will accumulate again.
Consider setting up ongoing monitoring (Lambda + SNS alerts, or periodic Config evaluations) to catch drift.

Now let's fix these snapshots by Applying default tags to everything

### 7.1 Apply Default Tags to All

In [None]:
def bulk_tag_snapshots(snapshot_ids, tags_dict):
    """Apply tags to a list of snapshots"""

    if not snapshot_ids:
        print("No snapshots to tag")
        return

    tags = [{'Key': k, 'Value': v} for k, v in tags_dict.items()]

    success_count = 0
    fail_count = 0

    # EC2 create-tags supports up to 1000 resources per call
    batch_size = 500
    for i in range(0, len(snapshot_ids), batch_size):
        batch = snapshot_ids[i:i + batch_size]

        try:
            ec2_client.create_tags(Resources=batch, Tags=tags)
            success_count += len(batch)
            print(f"‚úÖ Tagged {len(batch)} snapshots")

        except ClientError as e:
            fail_count += len(batch)
            print(f"‚ùå Error tagging batch: {e}")

    print(f"\nüìä Tagging Complete:")
    print(f"   Success: {success_count}")
    print(f"   Failed: {fail_count}")

bulk_tag_snapshots(non_compliant_snapshots, DEFAULT_TAG_VALUES)

### 8. Verify Compliance

Re-run the evaluation and check that everything is now compliant.

In [None]:
def verify_compliance():
    """Re-evaluate and check compliance status"""

    # Trigger re-evaluation
    trigger_evaluation()

    print("‚è≥ Waiting 60 seconds for re-evaluation...")
    time.sleep(60)

    # Check compliance summary
    response = config_client.get_compliance_details_by_config_rule(
        ConfigRuleName=CONFIG_RULE_NAME,
        ComplianceTypes=['NON_COMPLIANT']
    )

    remaining = len(response.get('EvaluationResults', []))

    if remaining == 0:
        print("üéâ All EBS snapshots are now compliant!")
    else:
        print(f"‚ö†Ô∏è  Still {remaining} non-compliant snapshots remaining")

    return remaining

verify_compliance()

### 9. Enforce with SCP

Once you're compliant, lock it down so nobody creates untagged snapshots again.

> ‚ö†Ô∏è Important: SCPs can only be created from the management account
You must run this step from your AWS Organizations management account. Member accounts cannot create or attach SCPs.


### 9.1 Verify You're in the Management Account

In [None]:
def verify_management_account():
    """Check if we're in the management account and get org info"""

    try:
        # Get organization info
        org_response = organizations_client.describe_organization()
        org = org_response['Organization']

        master_account_id = org['MasterAccountId']
        org_id = org['Id']

        print(f"üìã Organization Info:")
        print(f"   Org ID: {org_id}")
        print(f"   Management Account: {master_account_id}")
        print(f"   Current Account: {ACCOUNT_ID}")

        if ACCOUNT_ID == master_account_id:
            print(f"\n‚úÖ You ARE in the management account - SCPs can be created")
            return True, org_id
        else:
            print(f"\n‚ùå You are NOT in the management account!")
            print(f"   Switch to account {master_account_id} to create SCPs")
            return False, org_id

    except organizations_client.exceptions.AWSOrganizationsNotInUseException:
        print("‚ùå AWS Organizations is not enabled for this account")
        return False, None

    except ClientError as e:
        print(f"‚ùå Error checking organization: {e}")
        return False, None

is_management_account, org_id = verify_management_account()

### 9.2 List Organizational Units (OUs)

Before attaching the SCP, you need to know which OUs exist.


In [None]:
def list_org_structure():
    """List all OUs in the organization"""

    def get_children(parent_id, level=0):
        """Recursively get all OUs"""
        ous = []

        # Get OUs under this parent
        try:
            paginator = organizations_client.get_paginator('list_organizational_units_for_parent')
            for page in paginator.paginate(ParentId=parent_id):
                for ou in page['OrganizationalUnits']:
                    ou['Level'] = level
                    ous.append(ou)
                    # Recursively get children
                    ous.extend(get_children(ou['Id'], level + 1))
        except ClientError as e:
            print(f"Error listing OUs: {e}")

        return ous

    try:
        # Get the root
        roots = organizations_client.list_roots()['Roots']
        root_id = roots[0]['Id']

        print(f"üìÇ Organization Structure:\n")
        print(f"Root: {root_id}")

        # Get all OUs
        all_ous = get_children(root_id, level=1)

        for ou in all_ous:
            indent = "  " * ou['Level']
            print(f"{indent}‚îú‚îÄ‚îÄ {ou['Name']} ({ou['Id']})")

        return root_id, all_ous

    except ClientError as e:
        print(f"‚ùå Error listing org structure: {e}")
        return None, []

root_id, all_ous = list_org_structure()

### 9.3 The SCP Policy

Save this SCP as a variable

In [None]:
SCP_POLICY = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "RequireTagsOnEBSSnapshots",
            "Effect": "Deny",
            "Action": [
                "ec2:CreateSnapshot",
                "ec2:CreateSnapshots"
            ],
            "Resource": "arn:aws:ec2:*::snapshot/*",
            "Condition": {
                "Null": {
                    "aws:RequestTag/Environment": "true",
                    "aws:RequestTag/CostCenter": "true"
                }
            }
        },
        {
            "Sid": "RequireValidEnvironmentValues",
            "Effect": "Deny",
            "Action": [
                "ec2:CreateSnapshot",
                "ec2:CreateSnapshots"
            ],
            "Resource": "arn:aws:ec2:*::snapshot/*",
            "Condition": {
                "StringNotEquals": {
                    "aws:RequestTag/Environment": [
                        "dev",
                        "Dev",
                        "development",
                        "Development",
                        "staging",
                        "Staging",
                        "prod",
                        "Prod",
                        "production",
                        "Production"
                    ]
                }
            }
        }
    ]
}

print("üìã SCP Policy to Apply:")
print(json.dumps(SCP_POLICY, indent=2))

### 10. Create the SCP Policy


In [1]:
def create_scp(policy_name, policy_content):
    """Create an SCP in AWS Organizations"""

    if not is_management_account:
        print("‚ùå Cannot create SCP - not in management account")
        return None

    try:
        # Check if policy already exists
        existing_policies = organizations_client.list_policies(Filter='SERVICE_CONTROL_POLICY')
        for policy in existing_policies['Policies']:
            if policy['Name'] == policy_name:
                print(f"‚ö†Ô∏è  SCP '{policy_name}' already exists with ID: {policy['Id']}")
                return policy['Id']

        # Create new policy
        response = organizations_client.create_policy(
            Content=json.dumps(policy_content),
            Description='Require Environment and CostCenter tags on EBS snapshots',
            Name=policy_name,
            Type='SERVICE_CONTROL_POLICY'
        )
        policy_id = response['Policy']['PolicySummary']['Id']
        print(f"‚úÖ SCP created with ID: {policy_id}")
        return policy_id

    except ClientError as e:
        print(f"‚ùå Error creating SCP: {e}")
        return None

# UNCOMMENT TO CREATE - BE CAREFUL IN PROD!
scp_policy_id = create_scp('RequireEBSSnapshotTags', SCP_POLICY)

### 10. Attach the SCPs to OUs

Creating the SCP doesn't enforce it‚Äîyou must attach it to OUs or accounts.

**Note: You'll need to replace `attach_scp_to_ou(scp_policy_id, 'ou-xxxx-xxxxxxxx', 'Production')` with specific OUs from your account for this to work OR comment the `attach_scp_to_ou(scp_policy_id, 'ou-xxxx-xxxxxxxx', 'Production')` function and uncomment `attach_scp_to_all_ous(scp_policy_id, all_ous)` to place the SCP on all OUs**

In [None]:
def attach_scp_to_ou(policy_id, target_id, target_name=""):
    """Attach an SCP to an OU or account"""

    try:
        organizations_client.attach_policy(
            PolicyId=policy_id,
            TargetId=target_id
        )
        print(f"‚úÖ SCP attached to {target_name} ({target_id})")
        return True

    except organizations_client.exceptions.DuplicatePolicyAttachmentException:
        print(f"‚ö†Ô∏è  SCP already attached to {target_name} ({target_id})")
        return True

    except ClientError as e:
        print(f"‚ùå Error attaching SCP: {e}")
        return False

def attach_scp_to_all_ous(policy_id, ou_list):
    """Attach SCP to all OUs (excluding root)"""

    print(f"\nüîó Attaching SCP to {len(ou_list)} OUs...\n")

    for ou in ou_list:
        attach_scp_to_ou(policy_id, ou['Id'], ou['Name'])

# UNCOMMENT TO ATTACH - THIS ENFORCES THE POLICY!
# Example: Attach to specific OU
attach_scp_to_ou(scp_policy_id, 'ou-xxxx-xxxxxxxx', 'Production')

# Example: Attach to ALL OUs (careful!)
# attach_scp_to_all_ous(scp_policy_id, all_ous)

# Example: Attach to root (applies to entire org - VERY careful!)
# attach_scp_to_ou(scp_policy_id, root_id, 'Root')

### 10.1 Verify SCP Attachments

In [None]:
def list_scp_attachments(policy_id):
    """Show where an SCP is attached"""

    try:
        targets = organizations_client.list_targets_for_policy(PolicyId=policy_id)

        print(f"\nüìé SCP is attached to:")
        for target in targets['Targets']:
            print(f"   - {target['Name']} ({target['TargetId']}) - {target['Type']}")

        if not targets['Targets']:
            print("   ‚ö†Ô∏è  Not attached anywhere yet - SCP is not enforced!")

    except ClientError as e:
        print(f"‚ùå Error listing attachments: {e}")

list_scp_attachments(scp_policy_id)

### Full Lab Cleanup (All-in-One)
If you want to clean up everything created during this lab in one go:

In [None]:
def cleanup_config_iam_role():
    """Delete the IAM role created for AWS Config"""
    
    role_name = 'AWSConfigRole'
    
    try:
        # Detach managed policies first
        attached_policies = iam_client.list_attached_role_policies(RoleName=role_name)
        for policy in attached_policies.get('AttachedPolicies', []):
            iam_client.detach_role_policy(
                RoleName=role_name,
                PolicyArn=policy['PolicyArn']
            )
            print(f"‚úÖ Detached policy: {policy['PolicyName']}")
        
        # Delete the role
        iam_client.delete_role(RoleName=role_name)
        print(f"‚úÖ IAM role '{role_name}' deleted")
        
    except iam_client.exceptions.NoSuchEntityException:
        print(f"‚ö†Ô∏è  Role '{role_name}' doesn't exist")
    except ClientError as e:
        print(f"‚ùå Error deleting role: {e}")


def cleanup_config_infrastructure():
    """Stop Config recorder and delete the S3 bucket we created"""
    
    print("üßπ Cleaning up AWS Config infrastructure...\n")
    
    # 1. Stop the Config recorder
    try:
        config_client.stop_configuration_recorder(
            ConfigurationRecorderName='default'
        )
        print("‚úÖ Config recorder stopped")
    except ClientError as e:
        print(f"‚ö†Ô∏è  Could not stop recorder: {e}")
    
    # 2. Delete the delivery channel (must be done before deleting recorder)
    try:
        config_client.delete_delivery_channel(
            DeliveryChannelName='default'
        )
        print("‚úÖ Delivery channel deleted")
    except ClientError as e:
        print(f"‚ö†Ô∏è  Could not delete delivery channel: {e}")
    
    # 3. Delete the Config recorder
    try:
        config_client.delete_configuration_recorder(
            ConfigurationRecorderName='default'
        )
        print("‚úÖ Config recorder deleted")
    except ClientError as e:
        print(f"‚ö†Ô∏è  Could not delete recorder: {e}")
    
    # 4. Empty and delete the S3 bucket
    bucket_name = f"aws-config-bucket-{ACCOUNT_ID}-{REGION}"
    
    try:
        # First, delete all objects in the bucket
        print(f"\nüóëÔ∏è  Emptying S3 bucket: {bucket_name}")
        
        paginator = s3_client.get_paginator('list_objects_v2')
        
        for page in paginator.paginate(Bucket=bucket_name):
            objects = page.get('Contents', [])
            if objects:
                delete_keys = [{'Key': obj['Key']} for obj in objects]
                s3_client.delete_objects(
                    Bucket=bucket_name,
                    Delete={'Objects': delete_keys}
                )
                print(f"   Deleted {len(delete_keys)} objects")
        
        # Now delete the bucket
        s3_client.delete_bucket(Bucket=bucket_name)
        print(f"‚úÖ S3 bucket '{bucket_name}' deleted")
        
    except s3_client.exceptions.NoSuchBucket:
        print(f"‚ö†Ô∏è  Bucket '{bucket_name}' doesn't exist")
    except ClientError as e:
        print(f"‚ùå Error deleting bucket: {e}")
        print("   You may need to manually empty and delete the bucket")


def cleanup_lab_environment(snapshot_ids):
    """Delete all lab snapshots when done"""
    
    print("üßπ Cleaning up lab environment...\n")
    
    for snap_id in snapshot_ids:
        try:
            ec2_client.delete_snapshot(SnapshotId=snap_id)
            print(f"   ‚úÖ Deleted {snap_id}")
        except ClientError as e:
            print(f"   ‚ùå Error deleting {snap_id}: {e}")

def delete_config_rule():
    """Remove the Config rule"""
    try:
        config_client.delete_config_rule(ConfigRuleName=CONFIG_RULE_NAME)
        print(f"‚úÖ Config rule '{CONFIG_RULE_NAME}' deleted")
    except ClientError as e:
        print(f"‚ùå Error: {e}")

def delete_scp(policy_id):
    """Delete an SCP (must be detached first)"""
    
    try:
        # Detach from everywhere first
        detach_scp_from_all(policy_id)
        
        # Delete the policy
        organizations_client.delete_policy(PolicyId=policy_id)
        print(f"‚úÖ SCP {policy_id} deleted")
        
    except ClientError as e:
        print(f"‚ùå Error deleting SCP: {e}")

def detach_scp_from_all(policy_id):
    """Detach SCP from all targets"""
    
    try:
        targets = organizations_client.list_targets_for_policy(PolicyId=policy_id)
        
        for target in targets['Targets']:
            organizations_client.detach_policy(
                PolicyId=policy_id,
                TargetId=target['TargetId']
            )
            print(f"‚úÖ Detached from {target['Name']} ({target['TargetId']})")
            
    except ClientError as e:
        print(f"‚ùå Error detaching SCP: {e}")

def full_lab_cleanup(snapshot_ids=None, scp_policy_id=None):
    """Clean up ALL resources created during this lab"""
    
    print("=" * 60)
    print("üßπ FULL LAB CLEANUP")
    print("=" * 60)
    
    # 1. Delete lab snapshots
    if snapshot_ids:
        print("\n1Ô∏è‚É£  Deleting lab snapshots...")
        cleanup_lab_environment(snapshot_ids)
    else:
        print("\n1Ô∏è‚É£  No lab snapshots to delete")
    
    # 2. Delete SCP (if created)
    if scp_policy_id:
        print("\n2Ô∏è‚É£  Deleting SCP...")
        delete_scp(scp_policy_id)
    else:
        print("\n2Ô∏è‚É£  No SCP to delete")
    
    # 3. Delete Config rule
    print("\n3Ô∏è‚É£  Deleting Config rule...")
    delete_config_rule()
    
    # 4. Delete Config infrastructure
    print("\n4Ô∏è‚É£  Deleting Config infrastructure (recorder, delivery channel, S3 bucket)...")
    cleanup_config_infrastructure()
    
    # 5. Delete IAM role
    print("\n5Ô∏è‚É£  Deleting IAM role...")
    cleanup_config_iam_role()
    
    print("\n" + "=" * 60)
    print("‚úÖ FULL LAB CLEANUP COMPLETE")
    print("=" * 60)

# UNCOMMENT TO RUN FULL CLEANUP
# full_lab_cleanup(
#     snapshot_ids=lab_snapshots,  # From Lab Setup step
#     scp_policy_id=scp_policy_id  # From Step 8d (if created)
# )

### 11 Done!

---

## Summary of What This Runbook Does

### Problem ‚Üí Solution

| Problem | Solution |
|---------|----------|
| Snapshots without tags | AWS Config rule finds them |
| Can't track costs | Required `CostCenter` tag |
| Unknown environments | Required `Environment` tag with validated values |
| People keep creating untagged snapshots | SCP blocks creation without tags |

### What Gets Created

| Resource | Name |
|----------|------|
| IAM Role | `AWSConfigRole` |
| S3 Bucket | `aws-config-bucket-{account}-{region}` |
| Config Recorder | `default` |
| Config Rule | `ebs-snapshot-required-tags` |
| SCP | `RequireEBSSnapshotTags` |

### Services Used

AWS Config ‚Üí S3 ‚Üí IAM ‚Üí CloudTrail ‚Üí Organizations (SCPs)

### Full Cleanup Included

Everything created can be deleted with the included `full_lab_cleanup()` function.