## 02. Upstage OCR 시작/중지 ( API / Batch )
### 1. Subscribe to the model package

In [1]:

import boto3
import json

In [2]:
region_name = "us-east-1" # Bedrock과 Sagemaker Region은 us-east-1으로 설정

boto3_session = boto3.Session(region_name=region_name)
sagemaker_runtime = boto3.client("sagemaker-runtime",region_name=region_name)
sagemaker_client = boto3.client('sagemaker',region_name=region_name)
s3 = boto3.client('s3')

# role = get_execution_role()
sagemaker_role = "AmazonSageMaker-ExecutionRole-20240820T145192"
sagemaker_role_arn = "arn:aws:iam::761482380245:role/service-role/AmazonSageMaker-ExecutionRole-20240820T145192"
s3_bucket = "gsshop-video-analysis-761482380245-ap-northeast-2"
key = "images/trigger/prd_01/vrid_00003/_SUCCUESS"
prefix = key.replace("_SUCCUESS","")

endpoint_config_name = "Upstage-Document-OCR-config"
endpoint_name = "endpoint-Upstage-Document-OCR"



In [22]:
model_package_name = "upstage-document-ocr-240704-r1-8b4651227fa23fc6925dedbc4b4d1437"

# Mapping for Model Packages
model_package_map = {
    "us-east-1": f"arn:aws:sagemaker:us-east-1:865070037744:model-package/{model_package_name}",
    "us-east-2": f"arn:aws:sagemaker:us-east-2:057799348421:model-package/{model_package_name}",
    "us-west-1": f"arn:aws:sagemaker:us-west-1:382657785993:model-package/{model_package_name}",
    "us-west-2": f"arn:aws:sagemaker:us-east-1:594846645681:model-package/{model_package_name}",
    "ca-central-1": f"arn:aws:sagemaker:ca-central-1:470592106596:model-package/{model_package_name}",
    "eu-central-1": f"arn:aws:sagemaker:eu-central-1:446921602837:model-package/{model_package_name}",
    "eu-west-1": f"arn:aws:sagemaker:eu-west-1:985815980388:model-package/{model_package_name}",
    "eu-west-2": f"arn:aws:sagemaker:eu-west-2:856760150666:model-package/{model_package_name}",
    "eu-west-3": f"arn:aws:sagemaker:eu-west-3:843114510376:model-package/{model_package_name}",
    "eu-north-1": f"arn:aws:sagemaker:eu-north-1:136758871317:model-package/{model_package_name}",
    "ap-southeast-1": f"arn:aws:sagemaker:ap-southeast-1:192199979996:model-package/{model_package_name}",
    "ap-southeast-2": f"arn:aws:sagemaker:ap-southeast-2:666831318237:model-package/{model_package_name}",
    "ap-northeast-2": f"arn:aws:sagemaker:ap-northeast-2:745090734665:model-package/{model_package_name}",
    "ap-northeast-1": f"arn:aws:sagemaker:ap-northeast-1:977537786026:model-package/{model_package_name}",
    "ap-south-1": f"arn:aws:sagemaker:ap-south-1:077584701553:model-package/{model_package_name}",
    "sa-east-1": f"arn:aws:sagemaker:sa-east-1:270155090741:model-package/{model_package_name}",
}

if region_name not in model_package_map.keys():
    raise Exception(f"Current boto3 session region {region_name} is not supported.")

model_package_arn = (
    model_package_map[region_name]
)

print(f"Model Package: '{model_package_arn}'")

Model Package: 'arn:aws:sagemaker:us-east-1:865070037744:model-package/upstage-document-ocr-240704-r1-8b4651227fa23fc6925dedbc4b4d1437'


### 2. Create an endpoint


In [24]:
model_name = "Upstage-Document-OCR"

real_time_inference_instance_type = (
    "ml.g5.2xlarge"
)

In [29]:
# 2-1. 모델 생성 (최초 1회)
response = sagemaker_client.create_model(
    ModelName=model_name,
    PrimaryContainer={
        'ModelPackageName': model_package_arn
    },
    ExecutionRoleArn=sagemaker_role_arn,
    EnableNetworkIsolation=True
)
print(f"모델 생성 성공. 모델 response: {response}")



모델 생성 성공. 모델 response: {'ModelArn': 'arn:aws:sagemaker:us-east-1:761482380245:model/Upstage-Document-OCR', 'ResponseMetadata': {'RequestId': 'e9ca129a-cfd2-4cb8-aa88-4276857f767c', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'e9ca129a-cfd2-4cb8-aa88-4276857f767c', 'content-type': 'application/x-amz-json-1.1', 'content-length': '82', 'date': 'Mon, 02 Sep 2024 04:35:35 GMT'}, 'RetryAttempts': 0}}


# 참조 - 모델 생성 (최초 1회) CLI로 생성

```bash
export MODEL_NAME=Upstage-Document-OCR
export MODEL_PACKAGE_ARN=arn:aws:sagemaker:us-east-1:865070037744:model-package/upstage-document-ocr-240704-r1-8b4651227fa23fc6925dedbc4b4d1437
export SAGEMAKER_ROLE_ARN=arn:aws:iam::761482380245:role/service-role/AmazonSageMaker-ExecutionRole-20240820T145192

aws sagemaker create-model \
    --model-name $MODEL_NAME \
    --primary-container '{"ModelPackageName": "'$MODEL_PACKAGE_ARN'"}' \
    --execution-role-arn $SAGEMAKER_ROLE_ARN \
    --enable-network-isolation

echo "모델 생성 성공."
```

In [33]:
# 2-2. 모델 Endpoint Config 생성 (최초 1회 만 생성)
endpoint_config_name = f"{model_name}-config"
print(f"Endpoint config name: '{endpoint_config_name}'")

response = sagemaker_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            'VariantName': 'AllTraffic',
            'ModelName': model_name,
            'InstanceType': real_time_inference_instance_type,
            'InitialInstanceCount': 1,
            # 'InitialVariantWeight': 1.0
        }
    ]
)
print(f"Endpoint configuration 생성성공. response: {response}")

# 참조 - CLI로 생성



Endpoint config name: 'Upstage-Document-OCR-config'
Endpoint configuration 생성성공. response: {'EndpointConfigArn': 'arn:aws:sagemaker:us-east-1:761482380245:endpoint-config/Upstage-Document-OCR-config', 'ResponseMetadata': {'RequestId': '6058b6fd-cef0-4e71-8e0b-8ec0769626f9', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '6058b6fd-cef0-4e71-8e0b-8ec0769626f9', 'content-type': 'application/x-amz-json-1.1', 'content-length': '108', 'date': 'Mon, 02 Sep 2024 04:39:57 GMT'}, 'RetryAttempts': 0}}


# 참조 - 2-2. 모델 Endpoint Config 생성 CLI로 생성

```bash

export ENDPOINT_CONFIG_NAME=Upstage-Document-OCR-config
export MODEL_NAME=Upstage-Document-OCR
export REAL_TIME_INFERENCE_INSTANCE_TYPE=ml.g5.2xlarge

aws sagemaker create-endpoint-config \
    --endpoint-config-name $ENDPOINT_CONFIG_NAME \
    --production-variants '[{
        "VariantName": "AllTraffic",
        "ModelName": "'$MODEL_NAME'",
        "InstanceType": "'$REAL_TIME_INFERENCE_INSTANCE_TYPE'",
        "InitialInstanceCount": 1
    }]'

echo "엔드포인트 구성 생성 성공."
```

In [35]:
import re
# trigger일 경우 매번 다른 endpoint 생성시에 사용, 배치일경우 고정값 사용가능
# endpoint name 검증 함수
def sanitize_endpoint_name(name):
    # 알파벳, 숫자, 하이픈만 허용하고 나머지는 제거
    sanitized = re.sub(r'[^a-zA-Z0-9-]', '', name)
    
    # 이름이 하이픈으로 시작하거나 끝나면 제거
    sanitized = sanitized.strip('-')
    
    # 이름이 50자를 초과하면 자르기
    if len(sanitized) > 50:
        sanitized = sanitized[:50]
    
    # 이름이 비어있거나 숫자로 시작하면 접두사 추가
    # if not sanitized or sanitized[0].isdigit():
        # sanitized = 'endpoint-' + sanitized
    
    sanitized = 'endpoint-' + sanitized
    
    return sanitized

endpoint_name = sanitize_endpoint_name(prefix)
# endpoint_name = "endpoint-Upstage-Document-OCR" #  배치일경우 고정값 사용가능



In [7]:
# 2-3. 모델 Endpoint 생성 ( vrid 이용하여 유일하게 생성)
import upstage_ocr_endpoint

endpoint_config_name = "Upstage-Document-OCR-config"
endpoint_name = "endpoint-Upstage-Document-OCR"

# 2-3. Endpoint 생성 및 배포
upstage_ocr_endpoint.create_upstage_ocr_endpoint(
    endpoint_name=endpoint_name,
    config_name=endpoint_config_name
)


Creating endpoint: 'endpoint-Upstage-Document-OCR'
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: InService
Created endpoint: 'endpoint-Upstage-Document-OCR'


### 3. Athena 테이블 생성

In [55]:
# 3-1. Glue Data Catalog에 데이터베이스 생성:
glue_client = boto3.client('glue')
database_name = "video_analysis"

response = glue_client.create_database(
    DatabaseInput={
        'Name': database_name
    }
)
print("Database created successfully")

Database created successfully


In [60]:
# 3-2. Iceberg 테이블 생성:
athena_client = boto3.client('athena')
table_location = "s3://gsshop-video-analysis-761482380245-ap-northeast-2/logs/tables/video_analysis_logs/"
athena_output_location = "s3://gsshop-video-analysis-761482380245-ap-northeast-2/logs/athena_output/"

query = """
CREATE TABLE video_analysis.video_analysis_logs (
    vrid STRING,
    current_time TIMESTAMP,
    batch_execution_time DOUBLE,
    status STRING,
    error_message STRING,
    partition_date DATE
)
PARTITIONED BY (partition_date)
LOCATION '"""+table_location+"""'
TBLPROPERTIES (
    'table_type'='ICEBERG',
    'format'='parquet',
    'write_target_data_file_size_bytes'='536870912',
    'optimize_rewrite_delete_file_threshold'='10'
)
"""

response = athena_client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={
        'Database': database_name
    },
    ResultConfiguration={
        'OutputLocation': athena_output_location
    }
)
print("Table created successfully",response)


Table created successfully {'QueryExecutionId': 'f7aaefcf-4598-4741-a05d-52d5cfdebfbd', 'ResponseMetadata': {'RequestId': 'cc9f8aab-579e-4651-b59d-3db313e8c01c', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Mon, 02 Sep 2024 06:42:35 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '59', 'connection': 'keep-alive', 'x-amzn-requestid': 'cc9f8aab-579e-4651-b59d-3db313e8c01c'}, 'RetryAttempts': 0}}


In [5]:
import athena_iceberg_functions

# 사용 예시
# Insert
athena_iceberg_functions.insert_data('video123', '2023-06-01 12:00:00', 10.5, 'SUCCESS', '', '2023-06-01')

# Read
results = athena_iceberg_functions.read_data('2023-06-01')
if results:
    for row in results[1:]:  # Skip header
        print(row['Data'])

# # Update
athena_iceberg_functions.update_data('video123', 'FAILED', '2023-06-01')

# # Delete
athena_iceberg_functions.delete_data('video123', '2023-06-01')

Insert operation SUCCEEDED
[{'VarCharValue': 'video123'}, {'VarCharValue': '2023-06-01 12:00:00.000000'}, {'VarCharValue': '10.5'}, {'VarCharValue': 'SUCCESS'}, {'VarCharValue': ''}, {'VarCharValue': '2023-06-01'}]
Update operation SUCCEEDED
Delete operation SUCCEEDED


### 4. Batch Main 함수

In [8]:
import io
from PIL import Image
import s3_functions
import upstage_ocr_endpoint
import bedrock_functions
import athena_iceberg_functions
from datetime import datetime

# S3 버킷 이름과 경로 설정
bucket = "gsshop-video-analysis-761482380245-ap-northeast-2"
staging_prefix = "images/staging/"
endpoint_name = "endpoint-Upstage-Document-OCR"

def video_analyze(key):
    # vrid 추출
    vrid = key.split('/')[-2]
    prefix = key.replace("_SUCCESS","")

    # 시작 시간 기록
    start_time = datetime.now()
    current_time = start_time.strftime('%Y-%m-%d %H:%M:%S')
    partition_date = start_time.strftime('%Y-%m-%d')

    # 시작 로그 추가
    athena_iceberg_functions.insert_data(vrid, current_time, 0, 'STARTED', '', partition_date)
    
    try:
        ### 2-4. s3에서 image 목록 불러오기
        keys = s3_functions.list_s3_keys(bucket, prefix)
        input_ocr_list = "<input_ocr_list>\n"

        ### 2-5 Upstage OCR 엔드포인트 호출
        print(f"Total keys found: {len(keys)}")

        i = 0
        for s3_key in keys:
            # print("s3_key ==> " + s3_key)
            if prefix != s3_key and f"{prefix}_SUCCESS" != s3_key:
                image_data, content_type = s3_functions.download_from_s3(bucket, s3_key)
                # print("image_data ==> ",image_data)
                # print("content_type ==> ",content_type)
                
                image_stream = io.BytesIO(image_data)
                # Image 객체 생성
                img = Image.open(image_stream)

                cropped_img = img.crop((990, 150, 1230, 640))
                # cropped_img.save("test.jpg", format='JPEG')

                croped_byte_stream = io.BytesIO()
                cropped_img.save(croped_byte_stream, format="JPEG")  # 이미지 형식을 지정합니다. (예: PNG, JPEG 등)
                croped_image_data = croped_byte_stream.getvalue()
                
                # 엔드포인트 호출 (S3에서 가져온 Content-Type 사용)
                original_ocr_text = ""
                try:
                    response = upstage_ocr_endpoint.invoke_endpoint(endpoint_name, croped_image_data, content_type)
                    original_ocr_text = response["text"]
                    
                    ## OCR만 적용
                    input_ocr_list += "<frame>\n"
                    input_ocr_list += "  <frame_id>{}</frame_id>\n".format(i)
                    input_ocr_list += "  <original_ocr_text>{}</original_ocr_text>\n".format(original_ocr_text)
                    input_ocr_list += "</frame>\n"
                    
                except Exception as e:
                    print(e)
                    print(f"vrid : {vrid}, {i} 번째 이미지에 텍스트 없음")
                
                i = i+1
        input_ocr_list += "</input_ocr_list>"
        print(f"Upstage Call Ended")

        ### 2-6 bedrock 호출
        result_json = bedrock_functions.get_final_result(input_ocr_list)

        ### 2-7 Json 결과 s3에 저장
        split_result = prefix.split("/")
        result_path_key = f"results/{split_result[3]}.json"
        s3_functions.save_result_json(bucket, result_path_key, result_json)

        ### 3-8 Json 결과 s3에 저장
        source_folder = prefix
        destination_folder = source_folder.replace("staging","done")
        s3_functions.move_s3_folder(bucket, source_folder, destination_folder)
    
        # 성공 로그 추가
        end_time = datetime.now()
        execution_time = (end_time - start_time).total_seconds()
        current_time = end_time.strftime('%Y-%m-%d %H:%M:%S')
        partition_date = end_time.strftime('%Y-%m-%d')
        athena_iceberg_functions.insert_data(vrid, current_time, execution_time, 'SUCCESS', '', partition_date)
        
    except Exception as e:
        # 실패 로그 추가
        end_time = datetime.now()
        execution_time = (end_time - start_time).total_seconds()
        current_time = end_time.strftime('%Y-%m-%d %H:%M:%S')
        partition_date = end_time.strftime('%Y-%m-%d')
        print(str(e))
        athena_iceberg_functions.insert_data(vrid, current_time, execution_time, 'FAILED', str(e).replace("'","`"), partition_date)
        raise e


# 3-1. _SUCCESS 파일 목록 찾기
success_files = s3_functions.find_success_files(bucket, staging_prefix)
print(f"Found {len(success_files)} '_SUCCESS' files:")

# 3-2. success_files이 존재 하면 프로세싱
if len(success_files) > 0:
      
    # 최대 10개의 파일만 처리
    for i, key in enumerate(success_files):
        if i >= 10:
            break        
        print(f"{key} Video Analysis Start!!")
        video_analyze(key)
        print(f"{key} Video Analysis End!!")


    print(f"GS SHOP Video Analysis Seccess!!")



Found 3 '_SUCCESS' files:
images/staging/prd_01/vrid_00001/_SUCCESS Video Analysis Start!!
Insert operation SUCCEEDED
Total keys found: 62
An error occurred (ModelError) when calling the InvokeEndpoint operation: Received server error (503) from primary with message "{
  "code": 503,
  "type": "InternalServerException",
  "message": "Prediction failed"
}
". See https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/aws/sagemaker/Endpoints/endpoint-Upstage-Document-OCR in account 761482380245 for more information.
vrid : vrid_00001, 3 번째 이미지에 텍스트 없음
An error occurred (ModelError) when calling the InvokeEndpoint operation: Received server error (503) from primary with message "{
  "code": 503,
  "type": "InternalServerException",
  "message": "Prediction failed"
}
". See https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/aws/sagemaker/Endpoints/endpoint-Upstage-Document-OCR in account 761482380245 for m

In [8]:
# 3-9 Upstage Endpoint 삭제
upstage_ocr_endpoint.delete_upstage_ocr_endpoint(
    endpoint_name=endpoint_name
)

Deleted endpoint: 'endpoint-Upstage-Document-OCR'


## [참고] 테스트용 데이터 초기화

In [22]:
## s3 초기화
import s3_functions

bucket = "gsshop-video-analysis-761482380245-ap-northeast-2"
 
## 참고 폴더 원복
### 2-9. 분석 후 폴더 이동
def move_s3_folder(bucket, source_folder, destination_folder):

    # 소스 폴더의 모든 객체 나열
    response = s3.list_objects_v2(Bucket=bucket, Prefix=source_folder)

    # 'Contents' 키가 응답에 있는지 확인
    if 'Contents' not in response:
        print(f"No objects found in {source_folder}")
        return

    for obj in response['Contents']:
        old_key = obj['Key']
        new_key = old_key.replace(source_folder, destination_folder, 1)

        # 객체 복사
        s3.copy_object(
            Bucket=bucket,
            CopySource={'Bucket': bucket, 'Key': old_key},
            Key=new_key
        )

        # 원본 객체 삭제
        s3.delete_object(Bucket=bucket, Key=old_key)

        print(f"Moved {old_key} to {new_key}")

staging_prefix = "images/done/"
success_files = s3_functions.find_success_files(bucket, staging_prefix)

for key in success_files:
    # video_analyze(key)
    prefix = key.replace("_SUCCESS","")
    source_folder = prefix
    destination_folder = source_folder.replace("done","staging")
    s3_functions.move_s3_folder(bucket, source_folder, destination_folder)



In [None]:
result_prefix = "results/"

response = s3.list_objects_v2(Bucket=bucket, Prefix=result_prefix)

success_files = []
if 'Contents' in response:
    for obj in response['Contents']:
        key = obj['Key']
        print(key)
        s3.delete_object(Bucket=bucket, Key=key)



results/vrid_00001.json
results/vrid_00002.json
results/vrid_00003.json
