# Batch Inference with Nova Embeddings

This notebook demonstrates how to prepare input data for batch inference using the Nova Embeddings model. We'll use 100 text files to create a JSONL file for batch processing.

In [None]:
# Restore variables from setup notebook
%store -r s3_bucket
print(f"Using S3 bucket: {s3_bucket}")
%store -r region_name
print(f"Using region: {region_name}")

In [None]:
import json
import os
import boto3
from pathlib import Path

## Step 1: Download and Prepare Text Dataset

We'll download a public text dataset and create individual text files for batch inference.

In [None]:
from datasets import load_dataset
from pathlib import Path

# Load OpenAI HumanEval dataset
ds = load_dataset("openai/openai_humaneval")
test_data = ds['test']

# Create text files from first 100 rows
text_dir = Path('text_dataset')
text_dir.mkdir(exist_ok=True)

text_files = []
for i in range(min(100, len(test_data))):
    row = test_data[i]
    # Combine prompt and canonical_solution for richer text content
    text_content = f"{row['prompt']}\n\n# Solution:\n{row['canonical_solution']}"
    
    filename = f"text_{i+1:03d}.txt"
    filepath = text_dir / filename
    with open(filepath, 'w', encoding='utf-8') as f:
        f.write(text_content)
    text_files.append((filename, str(filepath)))

print(f"Created {len(text_files)} text files in {text_dir}")
print(f"Sample files: {[f[0] for f in text_files[:5]]}")

# Upload to S3
s3_client = boto3.client('s3', region_name=region_name)
s3_prefix = 'batch-inference/'

uploaded_texts = []

for filename, filepath in text_files:
    s3_key = f"{s3_prefix}{filename}"
    s3_uri = f"s3://{s3_bucket}/{s3_key}"
    
    try:
        s3_client.upload_file(filepath, s3_bucket, s3_key)
        uploaded_texts.append((filename, s3_uri))
    except Exception as e:
        print(f"Error uploading {filename}: {e}")

print(f"\nUploaded {len(uploaded_texts)} text files to S3")

## Step 2: Prepare JSONL Input File

Create the JSONL file with the required format for batch inference.

In [None]:
# Get AWS account ID for bucketOwner parameter
sts_client = boto3.client('sts', region_name=region_name)
account_id = sts_client.get_caller_identity()['Account']

jsonl_records = []

for i, (filename, s3_uri) in enumerate(uploaded_texts, 1):
    print(s3_uri)
    record = {
        "recordId": f"record{i:03d}",
        "modelInput": {
            "taskType": "SINGLE_EMBEDDING",
            "singleEmbeddingParams": {
                "embeddingPurpose": "GENERIC_INDEX",
                "embeddingDimension": 3072,
                "text": {
                    "source": {
                        "s3Location": {
                            "uri": s3_uri,
                            "bucketOwner": account_id
                        }
                    },
                    "truncationMode": "END"
                }
            }
        }
    }
    jsonl_records.append(record)

# Write JSONL file
jsonl_filename = 'batch_inference_input.jsonl'
with open(jsonl_filename, 'w') as f:
    for record in jsonl_records:
        f.write(json.dumps(record) + '\n')

print(f"Created {jsonl_filename} with {len(jsonl_records)} records")
print(f"\nFirst record example:")
print(json.dumps(jsonl_records[0], indent=2))

## Step 3: Upload JSONL File to S3

Upload the input file to S3 for batch processing.

In [None]:
input_s3_uri = f"s3://{s3_bucket}/batch-inference/"
input_s3_key = f"batch-inference/{jsonl_filename}"

s3_client.upload_file(jsonl_filename, s3_bucket, input_s3_key)
print(f"Uploaded input file to: {input_s3_key}")

# Also define output location
output_s3_uri = f"s3://{s3_bucket}/batch-inference/output/"
print(f"Output will be written to: {output_s3_uri}")

## Step 4: Create IAM Role for Batch Inference

Create the required IAM role with permissions for Bedrock batch inference.

In [None]:
import json

iam_client = boto3.client('iam', region_name=region_name)
role_name = 'BedrockBatchExecutionRole'
bedrock_role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"

# Trust policy for Bedrock service
trust_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "bedrock.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

# Permissions policy
permissions_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeModel"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject"
            ],
            "Resource": f"arn:aws:s3:::{s3_bucket}/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": f"arn:aws:s3:::{s3_bucket}"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        }
    ]
}

try:
    # Check if role exists
    iam_client.get_role(RoleName=role_name)
    print(f"Role {role_name} already exists")
except iam_client.exceptions.NoSuchEntityException:
    # Create role
    iam_client.create_role(
        RoleName=role_name,
        AssumeRolePolicyDocument=json.dumps(trust_policy)
    )
    
    # Attach inline policy
    iam_client.put_role_policy(
        RoleName=role_name,
        PolicyName='BedrockBatchExecutionPolicy',
        PolicyDocument=json.dumps(permissions_policy)
    )
    
    print(f"Created role: {bedrock_role_arn}")

print(f"Using role: {bedrock_role_arn}")

## Step 5: Start Batch Inference Job


In [None]:
from datetime import datetime

# Create job name with timestamp
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
job_name = f"nova-embeddings-batch-{timestamp}"

print(f"Job name: {job_name}")
print(f"Role ARN: {bedrock_role_arn}")

In [None]:
# Create Bedrock client
bedrock_client = boto3.client('bedrock', region_name=region_name)

# Prepare job payload
job_payload = {
    'jobName': job_name,
    'roleArn': bedrock_role_arn,
    'modelId': 'amazon.nova-2-multimodal-embeddings-v1:0',
    'inputDataConfig': {
        's3InputDataConfig': {
            's3InputFormat': 'JSONL',
            's3Uri': input_s3_uri
        }
    },
    'outputDataConfig': {
        's3OutputDataConfig': {
            's3Uri': output_s3_uri
        }
    },
    'timeoutDurationInHours': 24
}

print("Job payload:")
for key, value in job_payload.items():
    print(f"  {key}: {value}")

In [None]:
try:
    # Start the batch inference job
    response = bedrock_client.create_model_invocation_job(**job_payload)
    
    print(f"Request ID: {response.get("ResponseMetadata").get("RequestId")}") 
    print(f"Job ARN: {response.get("jobArn")}")
    
    # Store job ARN for later use
    job_arn = response['jobArn']
    %store job_arn
    
except Exception as e:
    print(f"Failed to start batch job: {e}")
    print("\nCommon issues:")
    print("1. IAM role doesn't exist or lacks permissions")
    print("2. S3 bucket/objects are not accessible")
    print("3. Model ID is incorrect or not enabled")

## Step 6: Monitor Job Status

Check the status of your batch inference job.

In [None]:
# Check job details
try:
    job_details = bedrock_client.get_model_invocation_job(jobIdentifier=job_arn)
    
    # Convert datetime objects to strings for JSON serialization
    import json
    from datetime import datetime
    
    def json_serial(obj):
        if isinstance(obj, datetime):
            return obj.isoformat()
        raise TypeError(f"Type {type(obj)} not serializable")
    
    print(json.dumps(job_details, indent=2, default=json_serial))
    
except Exception as e:
    print(f"Error checking job status: {e}")

## Step 7: Download Results

Download the batch inference results when the job is completed.

In [None]:
import json
from pathlib import Path

# Check if job is completed and download results
try:
    job_details = bedrock_client.get_model_invocation_job(jobIdentifier=job_arn)
    
    if job_details['status'] == 'Completed':
        print("Job completed! Downloading results...")
        
        # Create results directory
        results_dir = Path('batch_results')
        results_dir.mkdir(exist_ok=True)
        
        # Extract job ID from job ARN
        job_id = job_details['jobArn'].split('/')[-1]
        
        # List objects in the output S3 location with job ID
        output_prefix = f'batch-inference/output/{job_id}'
        response = s3_client.list_objects_v2(
            Bucket=s3_bucket,
            Prefix=output_prefix
        )
        
        if 'Contents' in response:
            for obj in response['Contents']:
                s3_key = obj['Key']
                filename = Path(s3_key).name
                local_path = results_dir / filename
                
                # Download the file
                s3_client.download_file(s3_bucket, s3_key, str(local_path))
                print(f"Downloaded: {filename}")
                
                # If it's a JSONL file, show sample results
                if filename.endswith('.jsonl'):
                    with open(local_path, 'r') as f:
                        lines = f.readlines()[:3]  # Show first 3 results
                        print(f"\nSample results from {filename}:")
                        for i, line in enumerate(lines, 1):
                            result = json.loads(line)
                            print(f"Record {i}: {result['recordId']} - Status: {result.get('modelOutput', {}).get('status', 'N/A')}")
        else:
            print("No output files found")
            
    else:
        print(f"Job status: {job_details['status']} - Results not ready yet")
        
except Exception as e:
    print(f"Error downloading results: {e}")