In [214]:
import sagemaker
import boto3
import os
import pandas as pd
from IPython.display import display as dp

DEFAULT_BUCKET = True
if DEFAULT_BUCKET:
    sagemaker_session = sagemaker.session.Session()
    bucket = sagemaker_session.default_bucket()
else:
    bucket = "<사용자 버켓 이름>"
    
print("now using bucket: ", bucket)

now using bucket:  sagemaker-ap-northeast-2-242201274000


In [215]:
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
print('SG Role: ', role)

SG Role:  arn:aws:iam::242201274000:role/service-role/AmazonSageMaker-ExecutionRole-20240923T230631


In [216]:
# preprocessing_code = 'src/ad_preprocessing.py'
# %store preprocessing_code

In [217]:
# base_output_dir = 'opt/ml/processing/output' 

# # 도커 컨테이너의 입력 폴더와 비슷한 환경 기술
# base_preproc_input_dir = 'opt/ml/processing/input'
# os.makedirs(base_preproc_input_dir, exist_ok=True)

# # 출력 훈련 폴더를 기술 합니다.
# base_preproc_output_train_dir = 'opt/ml/processing/output/'
# os.makedirs(base_preproc_output_train_dir, exist_ok=True)

# query_result_s3_uri = "s3://kefico-source/AD-Athena-Query-Result"
# database = "kefico-bigdata"
# table = "v2lc"
# test_name = "Severe1_Below1.12s_8"

# query = f"SELECT * FROM \"{database}\".\"{table}\" WHERE partition_1 = '{test_name}'"
# print(query)

In [218]:
# ! python {preprocessing_code} --base_output_dir {base_output_dir} \
#                               --query_result_s3_uri {query_result_s3_uri} \
#                               --database {database} \
#                               --table {table} \
#                               --test_name {test_name} 

In [219]:
preprocessing_code = 'src/ad_parquet_preprocessing.py'
%store preprocessing_code

Stored 'preprocessing_code' (str)


In [220]:
base_output_dir = 'opt/ml/processing/output' 

# 도커 컨테이너의 입력 폴더와 비슷한 환경 기술
base_preproc_input_dir = 'opt/ml/processing/input/raw/'
os.makedirs(base_preproc_input_dir, exist_ok=True)

# 출력 훈련 폴더를 기술 합니다.
base_preproc_output_dir = 'opt/ml/processing/output/'
os.makedirs(base_preproc_output_dir, exist_ok=True)

In [221]:
controller_name = "V2LC"
main_test_name = "HEV_P2_ACOverLoad_IG1_1"
sub_test_name = "Severe1_Above2.83s_1"

In [222]:
%store controller_name
%store main_test_name
%store sub_test_name

Stored 'controller_name' (str)
Stored 'main_test_name' (str)
Stored 'sub_test_name' (str)


In [223]:
# ! python {preprocessing_code} --base_preproc_input_dir {base_preproc_input_dir} \
#                               --base_preproc_output_dir {base_preproc_output_dir}

In [224]:
# Pipeline Parameter 정의
from sagemaker.workflow.parameters import ParameterInteger, ParameterString

input_data_uri = f's3://kefico-source/main_parquet_dir/{controller_name}/{main_test_name}/{sub_test_name}/'

processing_instance_count = ParameterInteger(
    name='ProcessingInstanceCount',
    default_value=1
)

processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.r5.8xlarge"
)

input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri
)

print(input_data_uri)

s3://kefico-source/main_parquet_dir/V2LC/HEV_P2_ACOverLoad_IG1_1/Severe1_Above2.83s_1/


In [225]:
from sagemaker.sklearn.processing import SKLearnProcessor

framework_version = '1.0-1'

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name='sklearn-AD-Demo-process',
    role=role
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [226]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

step_process = ProcessingStep(
    name='AD-Demo-Basic-Process',
    processor=sklearn_processor,
    inputs = [ProcessingInput(source=input_data_uri, destination='/opt/ml/processing/input/raw')],
    outputs = [ProcessingOutput(output_name='train', source='/opt/ml/processing/output')],
    code = preprocessing_code
)

In [227]:
from sagemaker.workflow.pipeline import Pipeline

project_prefix = "Kefico-Anomaly-Detection"

pipeline_name = project_prefix
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        input_data
    ],
    steps=[step_process]
)

In [228]:
import json

definition = json.loads(pipeline.definition())
definition



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.4xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://kefico-source/main_parquet_dir/V2LC/HEV_P2_ACOverLoad_IG1_1/Severe1_Above2.83s_1/'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'AD-Demo-Basic-Process',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '366743142698.dkr.ecr.ap-northeast-2.amazonaws.com/sagemaker-scikit-learn:1.0-1-cpu-py3',
     'ContainerEntrypoint': ['python3',
      '/opt/ml/p

In [229]:
# Pipeline 등록, 실행
pipeline.upsert(role_arn=role)
execution = pipeline.start()



In [230]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:ap-northeast-2:242201274000:pipeline/Kefico-Anomaly-Detection',
 'PipelineExecutionArn': 'arn:aws:sagemaker:ap-northeast-2:242201274000:pipeline/Kefico-Anomaly-Detection/execution/n6v7cryhxohr',
 'PipelineExecutionDisplayName': 'execution-1731564771647',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2024, 11, 14, 6, 12, 51, 542000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 11, 14, 6, 12, 51, 542000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:ap-northeast-2:242201274000:user-profile/d-l0dltcg6j4kj/default-20240923T230629',
  'UserProfileName': 'default-20240923T230629',
  'DomainId': 'd-l0dltcg6j4kj',
  'IamIdentity': {'Arn': 'arn:aws:sts::242201274000:assumed-role/AmazonSageMaker-ExecutionRole-20240923T230631/SageMaker',
   'PrincipalId': 'AROATQZCSE2IHHIBWPD6G:SageMaker'}},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:ap-northeast-2:242201274000:user-profile/d

In [231]:
execution.wait()

WaiterError: Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"

In [None]:
execution.list_steps()

In [None]:
# 전처리 후 결과 파일 경로 추출
import boto3

def get_proc_artifact(execution, client, kind):
    
    '''
    preprocess 후 전처리 결과물의 S3경로를 획득
    kind = 0 --> train
    kind = 1 --> test
    '''
    response = execution.list_steps()

    proc_arn = response[-1]['Metadata']['ProcessingJob']['Arn']
    # print(proc_arn)
    
    proc_job_name = proc_arn.split('/')[-1]
    # print(proc_job_name)

    response = client.describe_processing_job(ProcessingJobName = proc_job_name)
    print(response['ProcessingOutputConfig']['Outputs'])
    test_preprocessed_file = response['ProcessingOutputConfig']['Outputs'][kind]['S3Output']['S3Uri']
    return test_preprocessed_file


client = boto3.client("sagemaker")

train_preproc_dir_artifact = get_proc_artifact(execution, client, kind=0)
# test_preproc_dir_artifact = get_proc_artifact(execution, client, kind=1)

print('output-train: ', train_preproc_dir_artifact)
# print('output-test : ', test_preproc_dir_artifact)