# SageMaker Processing with Custom Container for Anomaly Detection

This notebook demonstrates how to run anomaly detection using SageMaker Processing with a custom Docker container. The solution implements region-level breach detection using the RRCF (Robust Random Cut Forest) algorithm.

## Features
- **Self-contained**: Generates synthetic data automatically
- **Custom Docker Container**: Uses Python 3.10 with scientific computing libraries
- **Anomaly Detection**: Implements RRCF algorithm for breach detection
- **End-to-End Workflow**: From local testing to production deployment

## Step 1: Generate Synthetic Data

First, we'll generate synthetic data that matches the structure of the original dataset.

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random
import os

def generate_synthetic_data(num_rows=6018, output_file='synthetic_data.parquet'):
    """Generate synthetic data matching the original parquet file structure"""
    
    # Set random seed for reproducibility
    np.random.seed(42)
    random.seed(42)
    
    # Define possible values based on original data
    marketplace_sets = ['IN']
    marketplace_ids = [44571, 44572, 44573, 44574, 44575]
    sortabilities = ['sortable', 'non_conveyable']
    iog_types = ['FBA', 'P3P_DF', 'PREMIUM_PRO_SELLER', 'ARIPL']
    regions = ['BLR_CLUSTER', 'HYD_CLUSTER', 'KOL_CLUSTER', 'AMD_CLUSTER', 'HRA_CLUSTER', 'BOM_CLUSTER']
    
    # Generate date range - ensure we have data for the target date (2025-11-08)
    target_date = datetime(2025, 11, 8)
    start_date = target_date - timedelta(weeks=26)
    date_range = pd.date_range(start=start_date, end=target_date, freq='W')
    
    # Ensure target date is included
    if target_date not in date_range:
        date_range = date_range.union([target_date])
    
    data = []
    
    # Generate combinations to ensure we have enough cohorts for the target date
    cohort_combinations = []
    for region in regions:
        for sortability in sortabilities:
            for iog_type in iog_types:
                cohort_combinations.append((region, sortability, iog_type))
    
    # Generate data with emphasis on target date
    target_date_rows = int(num_rows * 0.3)  # 30% of data for target date
    historical_rows = num_rows - target_date_rows
    
    # Generate target date data first
    for i in range(target_date_rows):
        region, sortability, iog_type = random.choice(cohort_combinations)
        
        # Generate units with some correlation
        total_units = np.random.randint(5000, 600000)
        inregion_units = np.random.randint(1000, min(total_units, 400000))
        
        # Generate metric_new (appears to be a ratio/percentage)
        metric_new = np.random.uniform(0.1, 0.95)
        
        row = {
            'marketplace_set': 'IN',
            'marketplace_id': random.choice(marketplace_ids),
            'snapshot_week': target_date,
            'sortability': sortability,
            'iog_type': iog_type,
            'destination_zip_assigned_region': region,
            'inregion_units': inregion_units,
            'total_units': total_units,
            'metric_new': metric_new
        }
        data.append(row)
    
    # Generate historical data
    for i in range(historical_rows):
        region, sortability, iog_type = random.choice(cohort_combinations)
        
        # Generate units with some correlation
        total_units = np.random.randint(1000, 600000)
        inregion_units = np.random.randint(100, min(total_units, 400000))
        
        # Generate metric_new (appears to be a ratio/percentage)
        metric_new = np.random.uniform(0.1, 0.95)
        
        row = {
            'marketplace_set': 'IN',
            'marketplace_id': random.choice(marketplace_ids),
            'snapshot_week': random.choice(date_range[:-1]),  # Exclude target date
            'sortability': sortability,
            'iog_type': iog_type,
            'destination_zip_assigned_region': region,
            'inregion_units': inregion_units,
            'total_units': total_units,
            'metric_new': metric_new
        }
        data.append(row)
    
    # Create DataFrame
    df = pd.DataFrame(data)
    
    # Ensure correct data types
    df['marketplace_set'] = df['marketplace_set'].astype('object')
    df['marketplace_id'] = df['marketplace_id'].astype('int64')
    df['snapshot_week'] = pd.to_datetime(df['snapshot_week'])
    df['sortability'] = df['sortability'].astype('object')
    df['iog_type'] = df['iog_type'].astype('object')
    df['destination_zip_assigned_region'] = df['destination_zip_assigned_region'].astype('object')
    df['inregion_units'] = df['inregion_units'].astype('int64')
    df['total_units'] = df['total_units'].astype('int64')
    df['metric_new'] = df['metric_new'].astype('float64')
    
    # Save to parquet
    df.to_parquet(output_file, index=False)
    
    print(f"Generated synthetic data: {output_file}")
    print(f"Shape: {df.shape}")
    print(f"Target date (2025-11-08) rows: {len(df[df['snapshot_week'] == target_date])}")
    print(f"Columns: {list(df.columns)}")
    
    return df

# Generate synthetic data
df = generate_synthetic_data()
print("\nData types:")
print(df.dtypes)

## Step 2: Local Inference Testing

Test the anomaly detection algorithm locally with the generated data.

In [None]:
import subprocess
import sys

# Install dependencies
subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas', 'joblib', 'numpy', 'rrcf', 'scipy', 'pyarrow', 'swifter'], check=True)

# Run the inference script
subprocess.run([sys.executable, 'standalone_region_training_test.py', '--dataset_date', '2025-11-08', '--output_path', './output'], check=True)

print("Inference completed successfully!")
print(f"Output saved to: {os.path.abspath('./output')}")

# Display output file info
output_files = os.listdir('./output')
for file in output_files:
    file_path = os.path.join('./output', file)
    file_size = os.path.getsize(file_path)
    print(f"Generated file: {file} ({file_size} bytes)")

## Step 3: Analyze Results

Load and examine the anomaly detection results.

In [None]:
# Load and display results
df = pd.read_parquet('./output/iris_region_breach_output.parquet')

print(f"Results Summary:")
print(f"Shape: {df.shape}")
print(f"Breaches detected: {df['breach'].sum()}/{len(df)} cohorts")
print(f"Total impact: {df['Impact'].sum():.2f}")

print(f"\nBreach Status:")
print(df['breach'].value_counts())

print(f"\nTop 5 cohorts by impact:")
print(df.nlargest(5, 'Impact')[['cohort', 'Impact', 'anomaly_score', 'breach']])

print(f"\nBreached cohorts:")
breached = df[df['breach'] == 1]
if len(breached) > 0:
    print(breached[['cohort', 'IRIS', 'anomaly_score', 'Impact']])
else:
    print("No breaches detected")

## Step 4: Build Docker Container

Build the Docker container for SageMaker Processing.

In [None]:
# Build Docker container
result = subprocess.run(['docker', 'buildx', 'build', '--platform', 'linux/amd64', '-t', 'sagemaker-processing-test-amd64', '.'], 
                       capture_output=True, text=True)

if result.returncode == 0:
    print("✅ Docker image built successfully!")
else:
    print("❌ Docker build failed:")
    print("STDERR:", result.stderr)
    print("STDOUT:", result.stdout)

## Step 5: Test Docker Container Locally

Test the Docker container with the synthetic data.

In [None]:
# Create test directories
os.makedirs('test-input', exist_ok=True)
os.makedirs('test-output', exist_ok=True)

# Copy synthetic data to test input
import shutil
shutil.copy("./synthetic_data.parquet", "./test-input/data.parquet")

# Run Docker container
result = subprocess.run([
    'docker', 'run', '--rm',
    '-v', f'{os.getcwd()}/test-input:/opt/ml/processing/input',
    '-v', f'{os.getcwd()}/test-output:/opt/ml/processing/output',
    'sagemaker-processing-test-amd64',
    '--dataset_date', '2025-11-08',
    '--output_path', '/opt/ml/processing/output'
], capture_output=True, text=True)

print("Docker container execution:")
print("Exit code:", result.returncode)
if result.returncode == 0:
    print("✅ Container ran successfully!")
    
    # Check output
    if os.path.exists('./test-output/iris_region_breach_output.parquet'):
        df = pd.read_parquet('./test-output/iris_region_breach_output.parquet')
        print(f"Output shape: {df.shape}")
        print(f"Breaches detected: {df['breach'].sum()}/{len(df)} cohorts")
        print("✅ Docker output file generated successfully!")
    else:
        print("❌ Output file not found")
else:
    print("❌ Container failed:")
    print("STDERR:", result.stderr)

## Step 6: Verify Docker Output

Examine the output from the Docker container test.

In [None]:
# Check if output file exists from local Docker test
output_file = './test-output/iris_region_breach_output.parquet'

if os.path.exists(output_file):
    df = pd.read_parquet(output_file)
    print(f"Shape: {df.shape}")
    print(f"Columns: {list(df.columns)}")
    print(f"Data types:\n{df.dtypes}")
    print(f"\nFirst 3 rows:\n{df.head(3)}")
    print(f"\n✅ Docker output verified successfully!")
else:
    print("Output file not found. Run the Docker test first.")

## Step 7: Deploy to SageMaker Processing

Deploy the container to Amazon ECR and run a SageMaker Processing job.

In [None]:
import boto3
import time
import json
from datetime import datetime

# Initialize clients
ecr_client = boto3.client('ecr', region_name='us-west-2')
s3_client = boto3.client('s3', region_name='us-west-2')
sagemaker_client = boto3.client('sagemaker', region_name='us-west-2')
iam_client = boto3.client('iam')

# Configuration
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
ecr_repo_name = 'sagemaker-processing-test'
bucket_name = f'sagemaker-processing-bucket-{timestamp}'
job_name = f'sagemaker-processing-job-{timestamp}'
account_id = boto3.client('sts').get_caller_identity()['Account']
image_uri = f'{account_id}.dkr.ecr.us-west-2.amazonaws.com/{ecr_repo_name}:amd64'

print(f"Starting SageMaker processing job: {job_name}")

# 1. Create ECR repository
try:
    ecr_client.create_repository(repositoryName=ecr_repo_name)
    print(f"✅ Created ECR repository: {ecr_repo_name}")
except ecr_client.exceptions.RepositoryAlreadyExistsException:
    print(f"✅ ECR repository already exists: {ecr_repo_name}")

# 2. Login and push Docker image to ECR
subprocess.run(f'aws ecr get-login-password --region us-west-2 | docker login --username AWS --password-stdin {account_id}.dkr.ecr.us-west-2.amazonaws.com', shell=True, check=True)
subprocess.run(f'docker tag sagemaker-processing-test-amd64:latest {image_uri}', shell=True, check=True)
subprocess.run(f'docker push {image_uri}', shell=True, check=True)
print(f"✅ Pushed Docker image to ECR: {image_uri}")

# 3. Create S3 bucket
s3_client.create_bucket(
    Bucket=bucket_name,
    CreateBucketConfiguration={'LocationConstraint': 'us-west-2'}
)
print(f"✅ Created S3 bucket: {bucket_name}")

# 4. Upload input data
s3_client.upload_file('./synthetic_data.parquet', bucket_name, 'input/data.parquet')
print(f"✅ Uploaded input data to S3")

# 5. Create/verify IAM role
role_name = 'SageMakerExecutionRole'
role_arn = f'arn:aws:iam::{account_id}:role/{role_name}'

try:
    iam_client.get_role(RoleName=role_name)
    print(f"✅ IAM role exists: {role_name}")
except iam_client.exceptions.NoSuchEntityException:
    # Create role
    trust_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {"Service": "sagemaker.amazonaws.com"},
                "Action": "sts:AssumeRole"
            }
        ]
    }
    
    iam_client.create_role(
        RoleName=role_name,
        AssumeRolePolicyDocument=json.dumps(trust_policy)
    )
    
    # Attach policies
    policies = [
        'arn:aws:iam::aws:policy/AmazonSageMakerFullAccess',
        'arn:aws:iam::aws:policy/AmazonS3FullAccess'
    ]
    
    for policy in policies:
        iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy)
    
    print(f"✅ Created IAM role: {role_name}")
    time.sleep(10)  # Wait for role propagation

## Step 8: Create and Monitor SageMaker Processing Job

In [None]:
# 6. Create processing job
response = sagemaker_client.create_processing_job(
    ProcessingJobName=job_name,
    ProcessingResources={
        'ClusterConfig': {
            'InstanceCount': 1,
            'InstanceType': 'ml.m5.large',
            'VolumeSizeInGB': 30
        }
    },
    AppSpecification={
        'ImageUri': image_uri,
        'ContainerArguments': ['--dataset_date', '2025-11-08', '--output_path', '/opt/ml/processing/output']
    },
    ProcessingInputs=[
        {
            'InputName': 'input-data',
            'S3Input': {
                'S3Uri': f's3://{bucket_name}/input/',
                'LocalPath': '/opt/ml/processing/input',
                'S3DataType': 'S3Prefix',
                'S3InputMode': 'File'
            }
        }
    ],
    ProcessingOutputConfig={
        'Outputs': [
            {
                'OutputName': 'output-data',
                'S3Output': {
                    'S3Uri': f's3://{bucket_name}/output/',
                    'LocalPath': '/opt/ml/processing/output',
                    'S3UploadMode': 'EndOfJob'
                }
            }
        ]
    },
    RoleArn=role_arn
)

print(f"✅ Created processing job: {job_name}")

# 7. Wait for completion
print("Waiting for processing job to complete...")
while True:
    response = sagemaker_client.describe_processing_job(ProcessingJobName=job_name)
    status = response['ProcessingJobStatus']
    print(f"Status: {status}")
    
    if status in ['Completed', 'Failed', 'Stopped']:
        break
    
    time.sleep(30)

if status == 'Completed':
    print("✅ Processing job completed successfully!")
    
    # Download output
    s3_client.download_file(bucket_name, 'output/iris_region_breach_output.parquet', 'sagemaker_output.parquet')
    print("✅ Downloaded output file: sagemaker_output.parquet")
else:
    print(f"❌ Processing job failed with status: {status}")
    if 'FailureReason' in response:
        print(f"Failure reason: {response['FailureReason']}")

## Step 9: Verify SageMaker Results

Examine the output from the SageMaker Processing job.

In [None]:
# Check if output file exists from SageMaker processing job
output_file = './sagemaker_output.parquet'

if os.path.exists(output_file):
    df = pd.read_parquet(output_file)
    print(f"Shape: {df.shape}")
    print(f"Columns: {list(df.columns)}")
    print(f"Data types:\n{df.dtypes}")
    print(f"\nFirst 3 rows:\n{df.head(3)}")
    print(f"\nBreaches detected: {df['breach'].sum()}/{len(df)} cohorts")
    print(f"\n✅ SageMaker processing completed successfully!")
else:
    print("SageMaker output file not found. Run the processing job first.")

## Cleanup

Clean up temporary files and directories.

In [None]:
# Optional: Clean up temporary files
import shutil

# Remove test directories
if os.path.exists('test-input'):
    shutil.rmtree('test-input')
    print("Removed: test-input directory")

if os.path.exists('test-output'):
    shutil.rmtree('test-output')
    print("Removed: test-output directory")

print("\n✅ Cleanup completed!")