# DeepSeek OCR on Amazon SageMaker

This notebook demonstrates how to deploy and use **DeepSeek OCR** on Amazon SageMaker with two types of endpoints:

## Endpoint Types

### 1. Async Endpoint (Recommended for Production)
- **Timeout**: 15+ minutes
- **Best for**: Large PDFs (100+ pages), batch processing, background jobs
- **Method**: `invoke_endpoint_async()` with S3 input/output
- **Benefits**: Scales to zero, handles large workloads, no result truncation

### 2. Sync Endpoint
- **Timeout**: 60 seconds
- **Best for**: Single images, small documents, real-time UI
- **Method**: `invoke_endpoint()` with direct JSON
- **Benefits**: Immediate response, simpler workflow

## What is DeepSeek OCR?

DeepSeek OCR is a state-of-the-art vision-language model for optical character recognition:
- Extract text from images (documents, invoices, receipts, whiteboards)
- Convert documents to structured formats like Markdown
- Process multi-page PDFs
- Provide bounding box coordinates (grounding mode)

**Model**: [deepseek-ai/DeepSeek-OCR](https://huggingface.co/deepseek-ai/DeepSeek-OCR) (3B parameters)

---

## Prerequisites

Before running this notebook:
1. Build and push the Docker image to ECR using CodeBuild
2. Ensure your SageMaker execution role has permissions for ECR, S3, and SageMaker


---

## 1. Deploy Async Endpoint

We'll deploy an **Async Inference endpoint** using SageMaker's `AsyncInferenceConfig`. This enables:
- Processing times up to 15+ minutes
- Automatic S3 storage for results
- Auto-scaling to zero when idle
- Cost-effective batch processing


In [None]:
import boto3
import sagemaker
import time
from sagemaker import get_execution_role

# Setup
region = boto3.Session().region_name
account = boto3.client('sts').get_caller_identity()['Account']
image = f"{account}.dkr.ecr.{region}.amazonaws.com/deepseek-ocr-sagemaker-byoc:latest"
role = get_execution_role()
sm = boto3.client('sagemaker')

print(f"Region: {region}")
print(f"Account: {account}")
print(f"Image URI: {image}")
print(f"Role: {role}")

In [None]:
# Setup S3 buckets for async inference
import boto3

s3 = boto3.client('s3')

# Define bucket names
async_input_bucket = f"sagemaker-async-{region}-{account}"
async_output_bucket = f"sagemaker-async-output-{region}-{account}"

print("Creating S3 buckets for async inference...")

# Create input bucket
try:
    s3.create_bucket(
        Bucket=async_input_bucket,
        CreateBucketConfiguration={'LocationConstraint': region}
    )
    print(f"✓ Created input bucket: {async_input_bucket}")
except s3.exceptions.BucketAlreadyOwnedByYou:
    print(f"✓ Input bucket already exists: {async_input_bucket}")

# Create output bucket
try:
    s3.create_bucket(
        Bucket=async_output_bucket,
        CreateBucketConfiguration={'LocationConstraint': region}
    )
    print(f"✓ Created output bucket: {async_output_bucket}")
except s3.exceptions.BucketAlreadyOwnedByYou:
    print(f"✓ Output bucket already exists: {async_output_bucket}")

print("\n✅ S3 buckets ready for async inference")

In [None]:
# Create unique names for async endpoint
async_model_name = f"deepseek-ocr-async-{int(time.time())}"
async_endpoint_config_name = f"{async_model_name}-cfg"
async_endpoint_name = f"{async_model_name}-ep"

print(f"Async Model Name: {async_model_name}")
print(f"Async Endpoint Config: {async_endpoint_config_name}")
print(f"Async Endpoint Name: {async_endpoint_name}")

In [None]:
# Create async model (same container as sync endpoint)
print("Creating async SageMaker model...")
sm.create_model(
    ModelName=async_model_name,
    ExecutionRoleArn=role,
    PrimaryContainer={
        'Image': image,
        'Mode': 'SingleModel',
        'Environment': {
            'MODEL_ID': 'deepseek-ai/DeepSeek-OCR',
            'HF_HUB_ENABLE_HF_TRANSFER': '1'
        }
    }
)
print(f"✓ Async model created: {async_model_name}")

In [None]:
# Create async endpoint configuration
print("Creating async endpoint configuration...")

sm.create_endpoint_config(
    EndpointConfigName=async_endpoint_config_name,
    ProductionVariants=[{
        'VariantName': 'AllTraffic',
        'ModelName': async_model_name,
        'InitialInstanceCount': 1,
        'InstanceType': 'ml.g5.2xlarge'
    }],
    AsyncInferenceConfig={  # ← THIS MAKES IT ASYNC!
        'OutputConfig': {
            'S3OutputPath': f"s3://{async_output_bucket}/async-results/"
        }
    }
)

print(f"✓ Async endpoint config created: {async_endpoint_config_name}")
print(f"  Output S3: s3://{async_output_bucket}/async-results/")

In [None]:
# Create async endpoint
print("Creating async endpoint (this takes ~5-10 minutes)...")
sm.create_endpoint(
    EndpointName=async_endpoint_name,
    EndpointConfigName=async_endpoint_config_name
)
print(f"✓ Async endpoint creation started: {async_endpoint_name}")
print("\nWaiting for async endpoint to be in service...")

# Wait for endpoint
waiter = sm.get_waiter('endpoint_in_service')
waiter.wait(EndpointName=async_endpoint_name)

print(f"\n✅ Async endpoint is ready: {async_endpoint_name}")
print(f"   Input bucket: s3://{async_input_bucket}/")
print(f"   Output bucket: s3://{async_output_bucket}/async-results/")
print("\nThis endpoint supports:")
print("  ✓ Up to 15+ minute processing")
print("  ✓ invoke_endpoint_async() method")
print("  ✓ S3-based input/output")
print("  ✓ Automatic result storage (no truncation)")
print("  ✓ Scales to zero when idle")

## 2. Test Async Endpoint

The async workflow involves:
1. Encode image/PDF to base64 and create JSON payload
2. Upload JSON to S3 input bucket
3. Call `invoke_endpoint_async()` with S3 input location
4. Poll S3 output bucket for results


In [None]:
import json
import base64
import time
from pathlib import Path
from urllib.parse import urlparse
import boto3

# Setup runtime client for async inference
runtime = boto3.client('sagemaker-runtime')

def process_async(file_path, prompt="<image>\nFree OCR."):
    """Complete async workflow for image or PDF"""
    
    # Step 1: Read and encode file
    print(f"Processing: {file_path}")
    with open(file_path, 'rb') as f:
        file_data = f.read()
        file_base64 = base64.b64encode(file_data).decode('utf-8')
    
    # Step 2: Create JSON payload
    is_pdf = file_path.lower().endswith('.pdf')
    payload = {
        "prompt": prompt,
        "pdf_base64" if is_pdf else "image_base64": file_base64
    }
    
    # Step 3: Upload to S3
    s3_key = f"inputs/{Path(file_path).stem}.json"
    s3.put_object(
        Bucket=async_input_bucket,
        Key=s3_key,
        Body=json.dumps(payload),
        ContentType='application/json'
    )
    input_location = f"s3://{async_input_bucket}/{s3_key}"
    print(f"  ✓ Uploaded to: {input_location}")
    
    # Step 4: Invoke async
    print(f"  ✓ Invoking async endpoint...")
    response = runtime.invoke_endpoint_async(
        EndpointName=async_endpoint_name,
        InputLocation=input_location,
        ContentType='application/json'
    )
    output_location = response['OutputLocation']
    print(f"  ✓ Request submitted: {output_location}")
    
    # Step 5: Wait for result
    parsed = urlparse(output_location)
    bucket = parsed.netloc
    key = parsed.path.lstrip('/')
    
    print(f"  ⏳ Waiting for result...")
    start = time.time()
    max_wait = 900  # 15 minutes
    
    while time.time() - start < max_wait:
        try:
            obj = s3.get_object(Bucket=bucket, Key=key)
            result = json.loads(obj['Body'].read())
            elapsed = int(time.time() - start)
            print(f"  ✅ Result ready after {elapsed}s!\n")
            return result
        except s3.exceptions.NoSuchKey:
            time.sleep(10)
        except Exception as e:
            print(f"  ⚠️  Error: {e}")
            time.sleep(10)
    
    print(f"  ⏱ Timeout after {max_wait}s")
    return None

print("✓ Async helper function defined")

### Example 1: Invoice Processing

Process a business invoice using the async endpoint.


In [None]:
# Process invoice with async endpoint
result = process_async("Invoice_3.jpg")

if result:
    print("OCR Result:")
    print("=" * 80)
    text = result['text']
    if len(text) > 1000:
        print(text[:1000])
        print(f"\n... (truncated, full text in S3)")
    else:
        print(text)
    print("=" * 80)
    print(f"\nTotal length: {len(text)} characters")
else:
    print("❌ Failed to get result")

### Example 2: Multi-Page PDF

Process a research paper PDF with multiple pages. This demonstrates the async endpoint's ability to handle long-running tasks.


In [None]:
# Process PDF with async endpoint
result = process_async("1706.03762v7.pdf")

if result:
    print("OCR Result:")
    print("=" * 80)
    text = result['text']
    print(text[:1000])
    if len(text) > 1000:
        print(f"\n... (showing first 1000 of {len(text)} characters)")
    print("=" * 80)
    print(f"\nPages: {result.get('pages', 'N/A')}")
    print(f"Total length: {len(text)} characters")
    print("\n✅ Full OCR text stored in S3 - no truncation!")
else:
    print("❌ Failed to get result")

---

## 3. Deploy Sync Endpoint (Optional)

For real-time applications requiring immediate responses, deploy a synchronous endpoint. Note the 60-second timeout limit.


In [None]:
# Create unique names for resources
model_name = f"deepseek-ocr-byoc-{int(time.time())}"
endpoint_config_name = f"{model_name}-cfg"
endpoint_name = f"{model_name}-ep"

print(f"Model Name: {model_name}")
print(f"Endpoint Config: {endpoint_config_name}")
print(f"Endpoint Name: {endpoint_name}")

In [None]:
# Create SageMaker Model
print("Creating SageMaker model...")
sm.create_model(
    ModelName=model_name,
    ExecutionRoleArn=role,
    PrimaryContainer={
        'Image': image,
        'Mode': 'SingleModel',
        'Environment': {
            'MODEL_ID': 'deepseek-ai/DeepSeek-OCR',
            'HF_HUB_ENABLE_HF_TRANSFER': '1'
        }
    }
)
print(f"✓ Model created: {model_name}")

In [None]:
# Create Endpoint Configuration
print("Creating endpoint configuration...")
sm.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            'VariantName': 'AllTraffic',
            'ModelName': model_name,
            'InitialInstanceCount': 1,
            'InstanceType': 'ml.g5.2xlarge'  # 24GB GPU, 8 vCPUs, 32GB RAM
        }
    ]
)
print(f"✓ Endpoint config created: {endpoint_config_name}")

In [None]:
# Create and wait for sync endpoint
print("Creating sync endpoint (this takes ~5-10 minutes)...")
sm.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name
)
print(f"✓ Endpoint creation started: {endpoint_name}")
print("\nWaiting for endpoint to be in service...")

# Wait for endpoint
waiter = sm.get_waiter('endpoint_in_service')
waiter.wait(EndpointName=endpoint_name)

print(f"\n✓ Sync endpoint is ready: {endpoint_name}")

## 4. Test Sync Endpoint

Test the sync endpoint with a single invoice example.


In [None]:
import json
import base64
from pathlib import Path

# Setup runtime client for inference
runtime = boto3.client('sagemaker-runtime')

def invoke_ocr(payload):
    """Helper function to invoke the endpoint"""
    response = runtime.invoke_endpoint(
        EndpointName=endpoint_name,
        ContentType='application/json',
        Body=json.dumps(payload)
    )
    return json.loads(response['Body'].read())

print("✓ Helper function defined")

### Example: Invoice Processing

Process an invoice with immediate response.


In [None]:
# Read local invoice image
invoice_path = Path("Invoice_3.jpg")
with open(invoice_path, "rb") as f:
    img_data = f.read()
    img_base64 = base64.b64encode(img_data).decode("utf-8")

payload = {
    "prompt": "<image>\nFree OCR.",
    "image_base64": img_base64
}

print(f"Processing invoice image ({invoice_path.stat().st_size / 1024:.1f} KB)...\n")
result = invoke_ocr(payload)

print("✅ SUCCESS!\n")
print("OCR Result:")
print("=" * 80)
# Show first 1000 characters if output is long
text = result["text"]
if len(text) > 1000:
    print(text[:1000])
    print("\n... (truncated) ...")
else:
    print(text)
print("=" * 80)
print(f"\nTotal length: {len(text)} characters")

---

## 5. Cleanup Resources

**Important**: SageMaker endpoints incur charges while running (~$1.52/hour per endpoint). Delete endpoints when not in use.


In [None]:
# Display current resources
print("Sync Endpoint Resources:")
print(f"  Endpoint: {endpoint_name}")
print(f"  Endpoint Config: {endpoint_config_name}")
print(f"  Model: {model_name}")

print("\nAsync Endpoint Resources:")
print(f"  Endpoint: {async_endpoint_name}")
print(f"  Endpoint Config: {async_endpoint_config_name}")
print(f"  Model: {async_model_name}")

print("\nRun the next cell to delete these resources.")

In [None]:
# Delete sync endpoint resources
print("=" * 80)
print("Cleaning up SYNC endpoint resources...")
print("=" * 80)

try:
    print("Deleting sync endpoint...")
    sm.delete_endpoint(EndpointName=endpoint_name)
    print(f"✓ Sync endpoint deleted: {endpoint_name}")
except Exception as e:
    print(f"⚠ Could not delete sync endpoint: {e}")

try:
    print("Deleting sync endpoint configuration...")
    sm.delete_endpoint_config(EndpointConfigName=endpoint_config_name)
    print(f"✓ Sync endpoint config deleted: {endpoint_config_name}")
except Exception as e:
    print(f"⚠ Could not delete sync endpoint config: {e}")

try:
    print("Deleting sync model...")
    sm.delete_model(ModelName=model_name)
    print(f"✓ Sync model deleted: {model_name}")
except Exception as e:
    print(f"⚠ Could not delete sync model: {e}")

# Delete async endpoint resources
print("\n" + "=" * 80)
print("Cleaning up ASYNC endpoint resources...")
print("=" * 80)

try:
    print("Deleting async endpoint...")
    sm.delete_endpoint(EndpointName=async_endpoint_name)
    print(f"✓ Async endpoint deleted: {async_endpoint_name}")
except Exception as e:
    print(f"⚠ Could not delete async endpoint: {e}")

try:
    print("Deleting async endpoint configuration...")
    sm.delete_endpoint_config(EndpointConfigName=async_endpoint_config_name)
    print(f"✓ Async endpoint config deleted: {async_endpoint_config_name}")
except Exception as e:
    print(f"⚠ Could not delete async endpoint config: {e}")

try:
    print("Deleting async model...")
    sm.delete_model(ModelName=async_model_name)
    print(f"✓ Async model deleted: {async_model_name}")
except Exception as e:
    print(f"⚠ Could not delete async model: {e}")

print("\n" + "=" * 80)
print("✓ Cleanup completed!")
print("=" * 80)
print("\nAll resources have been deleted. No more charges will incur.")

## Summary

This notebook demonstrated:
- ✓ Deploying **Async Inference endpoint** with AsyncInferenceConfig
- ✓ Processing documents with S3-based async workflow
- ✓ Handling large PDFs with 15+ minute timeout
- ✓ Deploying **Sync endpoint** for real-time processing
- ✓ Managing and cleaning up SageMaker resources

### When to Use Each Endpoint

| Use Case | Endpoint Type | Reason |
|----------|---------------|--------|
| Single image | Sync | Fast (2-5s) response |
| Real-time UI | Sync | Immediate feedback |
| Large PDF (10+ pages) | Async | Avoids timeout |
| Batch processing | Async | Cost-effective scaling |
| Background jobs | Async | Fire-and-forget |

### Production Enhancements

For production use:
1. **Configure SNS topics** - Get notifications when async jobs complete
2. **Set up auto-scaling** - Handle variable workloads
3. **Add S3 lifecycle policies** - Auto-delete old results
4. **Monitor with CloudWatch** - Track processing times and costs

### Resources

- **Model**: [DeepSeek-AI/DeepSeek-OCR](https://huggingface.co/deepseek-ai/DeepSeek-OCR)
- **SageMaker Async Inference**: [AWS Documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/async-inference.html)
- **SageMaker BYOC**: [Developer Guide](https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms.html)
