In [None]:
!pip install jinja2 pdfplumber boto3==1.34.131

In [None]:
import sys
import os

# Specify the new package root directory
new_package_root = "../"

# Add the new package root to the system path
sys.path.insert(0, os.path.abspath(new_package_root))

In [None]:
from jinja2 import Environment, FileSystemLoader
import pdfplumber
from src.utils import *

bedrock_client = boto3.client(service_name='bedrock-runtime')

In [None]:
# Specify the directory where the template file is located
template_dir = '../src'

# Create a Jinja environment with the FileSystemLoader
env = Environment(loader=FileSystemLoader(template_dir))

# Load the template file
system_prompt_template = env.get_template('template/system_prompt_template.jinja')
user_prompt_pre_template = env.get_template('template/user_prompt_pre_template.jinja')
user_prompt_post_template = env.get_template('template/user_prompt_post_template.jinja')

In [None]:
document = "<Your PDF data path goes here>"
pdf_obj = pdfplumber.open(document)

In [None]:
for idx, page in enumerate(pdf_obj.pages[:3]):

    # Prepare prompts
    PDF_TEXT = page.extract_text(
        layout=True, 
    )

    system_text = system_prompt_template.render()
    input_text_pre = user_prompt_pre_template.render(PDF_TEXT=PDF_TEXT, FILENAME=document, PAGE_NUMBER=idx)
    input_text_post =user_prompt_post_template.render()

    suitable_image_size = find_suitable_image_size(page)

    input_image = './output_image.png'

    model_id = "anthropic.claude-3-5-sonnet-20240620-v1:0"

    response = generate_conversation(
        bedrock_client,
        model_id,
        system_text,
        input_text_pre,
        input_text_post,
        input_image
    )

    print(response['output']['message']['content'][0]['text'])
    print("\n\n ########################## \n\n")

In [None]:
### SageMaker Processing Job

In [None]:
%%writefile ../docker/Dockerfile

FROM python:3.10-slim
RUN pip3 install boto3==1.34.131 pdfplumber==0.11.2 Jinja2==3.1.4 asyncio

ENTRYPOINT ["python3"]

In [None]:
import boto3

account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.Session().region_name
ecr_repository = 'sagemaker-processing-async-custom-container'
tag = ':latest'
processing_repository_uri = '{}.dkr.ecr.{}.amazonaws.com/{}'.format(account_id, region, ecr_repository + tag)

In [None]:
# # If inside SageMaker Studio Notebook
# !cd ../docker && docker build --network sagemaker -t {ecr_repository + tag} .
!cd ../docker && docker build -t {ecr_repository + tag} .

In [None]:
!aws ecr get-login-password --region {region} | docker login --username AWS --password-stdin {account_id}.dkr.ecr.{region}.amazonaws.com

In [None]:
!aws ecr create-repository --repository-name $ecr_repository

In [None]:
!docker tag {ecr_repository + tag} $processing_repository_uri

In [None]:
!docker push $processing_repository_uri

In [None]:
# # If inside SageMaker Studio Notebook
# !docker run --network sagemaker --rm $processing_repository_uri
!docker run --rm $processing_repository_uri

In [None]:
## Processor/ScriptProcessor
### Processor需要先包script進入docker
### ScriptProcess可以從local的script進入程式

In [None]:
### Method 1 : SageMaker Python SDK

In [None]:
import sagemaker
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput

role = "arn:aws:iam::<accountId>:role/service-role/<roleName>"

In [None]:
input_s3_path = "s3://<input bucket>/input"
script_s3_path = "s3://<script bucket>/script"
output_s3_path = "s3://<output bucket>/output"

processor = ScriptProcessor(image_uri="<imageUri>",
                            command=['python3'],
                            base_job_name="<jobName>",
                            role=role,
                            instance_count=1,
                            instance_type="ml.m5.xlarge")
processor.run(
    code="../src/start.py",
    inputs=[
        ProcessingInput(
            source=input_s3_path,
            destination='/opt/ml/processing/input'),
        ProcessingInput(
            source=script_s3_path,
            destination='/opt/ml/processing/script')
    ],
    outputs=[
        ProcessingOutput(
            source='/opt/ml/processing/output',
            destination=output_s3_path)
    ],
)

In [None]:
### boto3 Python SDK

In [None]:
import boto3
import os
import json
import uuid
from datetime import datetime

s3_client = boto3.client('s3')
sagemaker_client = boto3.client('sagemaker')

input_bucket = "input bucket"
script_bucket = "script bucket"
output_bucket = "output bucket"
role_arn = "sagemaker execution role"
model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
ecr_image_uri = 'ecr image url'

# Generate a unique job name using the current timestamp and a UUID
timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S')
unique_id = str(uuid.uuid4().hex)
job_name = f"processing-job-{timestamp}-{unique_id}"

response = sagemaker_client.create_processing_job(
    ProcessingJobName=job_name,
    RoleArn=role_arn,
    AppSpecification={
        'ImageUri': ecr_image_uri,
        'ContainerEntrypoint': ['python3', '/opt/ml/processing/script/start.py'],
    },
    ProcessingInputs=[
        {
            'InputName': 'input-data',
            'S3Input': {
                'S3Uri': f's3://{input_bucket}',
                'LocalPath': '/opt/ml/processing/input',
                'S3DataType': 'S3Prefix',
                'S3InputMode': 'File',
                'S3DataDistributionType': 'FullyReplicated',
                'S3CompressionType': 'None'
            }
        },
        {
            'InputName': 'code',
            'S3Input': {
                'S3Uri': f's3://{script_bucket}',
                'LocalPath': '/opt/ml/processing/script',
                'S3DataType': 'S3Prefix',
                'S3InputMode': 'File',
                'S3DataDistributionType': 'FullyReplicated',
                'S3CompressionType': 'None'
            }
        }
    ],
    ProcessingOutputConfig={
        'Outputs': [
            {
                'OutputName': 'output-data',
                'S3Output': {
                    'S3Uri': f's3://{output_bucket}',
                    'LocalPath': '/opt/ml/processing/output',
                    'S3UploadMode': 'EndOfJob'
                 }
            }
        ]
    },
    StoppingCondition={
        'MaxRuntimeInSeconds': 3600,
    },
    ProcessingResources={
        'ClusterConfig': {
            'InstanceCount': 1,
            'InstanceType': 'ml.m5.xlarge',
            'VolumeSizeInGB': 30
        }
    },
    Environment={
        'BEDROCK_MODEL_ID': model_id
    }
)

In [None]:
### Monitor and clean up tools

In [None]:
import boto3
import os
import json
import time
from datetime import datetime, timedelta

def is_processing_job_running(sagemaker_client):
    response = sagemaker_client.list_processing_jobs(
           StatusEquals='InProgress',
           MaxResults=1
           )
    return len(response['ProcessingJobSummaries']) > 0         

In [None]:
s3_client = boto3.client('s3')
sagemaker_client = boto3.client('sagemaker')

is_processing_job_running(sagemaker_client)

In [None]:
from dateutil.tz import tzutc

def get_recently_uploaded_files(s3_client, bucket):
    now = datetime.now(tzutc())
    start_time = now - timedelta(minutes=1)
    
    response = s3_client.list_objects_v2(
         Bucket=bucket,
         Prefix='',  # You can specify a prefix if needed
    )
              
    recent_files = []
    if 'Contents' in response:
        for obj in response['Contents']:
            if obj['LastModified'] > start_time:
                recent_files.append(obj['Key'])
              
    return recent_files

In [None]:
get_recently_uploaded_files(s3_client, input_bucket)

In [None]:
### Clean up

In [None]:
import boto3
import os

def get_files_without_extension(bucket_name, prefix=''):
    s3_client = boto3.client('s3')
    files = set()
    paginator = s3_client.get_paginator('list_objects_v2')
    
    for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
        if 'Contents' in page:
            for obj in page['Contents']:
                file_name = os.path.splitext(os.path.basename(obj['Key']))[0]
                files.add(file_name)
    
    return files

def delete_processed_files(input_bucket, output_bucket, input_prefix='', output_prefix=''):
    s3_client = boto3.client('s3')
    
    # Get files from output bucket
    processed_files = get_files_without_extension(output_bucket, output_prefix)
    print(processed_files)
    
    # List and delete files from input bucket
    paginator = s3_client.get_paginator('list_objects_v2')
    for page in paginator.paginate(Bucket=input_bucket, Prefix=input_prefix):
        if 'Contents' in page:
            for obj in page['Contents']:
                file_name = os.path.splitext(os.path.basename(obj['Key']))[0]
                if file_name+'_output' in processed_files:
                    print(f"Deleting {obj['Key']} from input bucket")
                    s3_client.delete_object(Bucket=input_bucket, Key=obj['Key'])

def main():
    # Configuration
    input_bucket_name = 'your input bucket'
    output_bucket_name = 'your output bucket'
    input_prefix = ''  # Use if your files are in a specific folder
    output_prefix = ''  # Use if your output files are in a specific folder

    # Execute the cleanup process
    delete_processed_files(input_bucket_name, output_bucket_name, input_prefix, output_prefix)

if __name__ == "__main__":
    main()