In [None]:
import boto3
import sagemaker

region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
account = sagemaker_session.boto_session.client('sts').get_caller_identity()["Account"]
role = sagemaker.get_execution_role()

In [None]:
sagemaker_session

In [None]:
role

In [None]:
account  

## Train hyperparameter

In [None]:
n_epochs = 20 

## Preprocess hyperpatameter

In [None]:
month_len = 2
test_data_num =0

## Pipeline Embedd Parameter

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
# s3 고정 경로: hash_key, text_embedding_model, model.tar.gz @TODO: 하루에 한번씩 model.tar.gz는 버저닝을 해야할듯?

embedding_instance_count = ParameterInteger(
    name="EmbeddingInstanceCount",
    default_value=1
)
embedding_instance_type = ParameterString(
    name="EmbeddingInstanceType",
    default_value="ml.p3.8xlarge"
)
embedding_instance_volume = ParameterInteger(
    name="EmbeddingInstanceVolume",
    default_value=100
)
embedding_image_uri = ParameterString(
    name="EmbeddingImage",
    default_value="{}.dkr.ecr.{}.amazonaws.com/sagemaker-test:embedding-step".format(account, region)
)
text_model_uri = ParameterString(
    name="TextModel",
    default_value="s3://chatie-ml-sagemaker/text_embedding_model/text_embedding_model.tar.gz"
)
trained_model_zip = ParameterString(
    name="TrainedModel",
    default_value="s3://chatie-ml-sagemaker/model/model.tar.gz"
)
hash_keys = ParameterString(
    name="HashKeys",
    default_value="s3://sagemaker-ap-northeast-2-095239156209/sagemaker-test-2022-11-04-06-42-23-649/output/output_hash/hash_keys.json"
)

## Pipeline Preprocess Parameter

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

preprocessing_image_uri = ParameterString(
    name="PreprocessingImage",
    default_value="{}.dkr.ecr.{}.amazonaws.com/sagemaker-test:preprocessing-step".format(account, region)
)
preprocessing_instance_count = ParameterInteger(
    name="PreprocessingInstanceCount",
    default_value=1
)
preprocessing_instance_type = ParameterString(
    name="PreprocessingInstanceType",
    default_value="ml.m5.12xlarge"
)
preprocessing_instance_volume = ParameterInteger(
    name="PreprocessingInstanceVolume",
    default_value=100
)
# month_len = ParameterInteger(
#     name="MonthLength",
#     default_value=2
# )
# for test, if you want to test pipeline, set 100000
# test_data_num = ParameterInteger(
#     name="TestDataNumber",
#     default_value=0
# )

## Pipeline Train Parameter

In [None]:
train_image_uri = ParameterString(
    name="TrainImage",
    default_value="{}.dkr.ecr.{}.amazonaws.com/sagemaker-test:preprocessing-step".format(account, region)
)
train_instance_count = ParameterInteger(
    name="TrainInstanceCount",
    default_value=1
)
train_instance_type = ParameterString(
    name="TrainInstanceType",
    default_value="ml.p3.8xlarge"
)
train_instance_volume = ParameterInteger(
    name="TrainInstanceVolume",
    default_value=100
)
# n_epochs = ParameterInteger(
#     name="EpochNumber",
#     default_value=20 # set 1 if you want to test pipeline
# )

### Pipeline Train Code Uri 

In [None]:
train_code_uri = "s3://chatie-ml-sagemaker/code/code.tar.gz"

## Pipeline Create Model Parameter

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

deploy_model_image_uri = ParameterString(
    name="DeployImage",
    default_value="{}.dkr.ecr.{}.amazonaws.com/sagemaker-test:inference-step".format(account, region)
)
# instance_type은 gpu쓸 건지 아닐지 determine하는 용도로만 쓰임(출처: sagemaker python SDK)
endpoint_instance_type = ParameterString(
    name="DeployInstanceType",
    default_value="ml.g4dn.8xlarge"
)
# deploy_code_uri은 어쩔 수 없이 하드코딩해야됨(process step code는 절대 경로만 허용)
deploy_code_dir = ParameterString(
    name="DeployCode",
    default_value="s3://chatie-ml-sagemaker/deploy_code/"
)


## Pipeline Deploy Parameter

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

deploy_process_instance_count = ParameterInteger(
    name="DeployProcessInstanceCount",
    default_value=1
)
deploy_process_instance_type = ParameterString(
    name="DeployProcessInstanceType",
    default_value="ml.m5.xlarge"
)
endpoint_instance_count = ParameterInteger(
    name="DeployInstanceCount",
    default_value=2
)


### Pipeline Deploy Code Uri

In [None]:
# deploy_code_dir에 따라 바꿔야됨
deploy_code_uri = "s3://chatie-ml-sagemaker/deploy_code/deploy_model.py"

## Embedding Step

In [None]:
from sagemaker.processing import Processor
from sagemaker.processing import ProcessingInput, ProcessingOutput

processor = Processor(
    image_uri=embedding_image_uri,
    role=role,
    instance_count=embedding_instance_count,
    instance_type=embedding_instance_type,
    volume_size_in_gb=embedding_instance_volume
)

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

step_embedd = ProcessingStep(
    name="Embedd-Step",
    processor=processor,
    inputs=[ProcessingInput(source=trained_model_zip, destination="/opt/ml/processing/input_model"),
           ProcessingInput(source=text_model_uri, destination="/opt/ml/processing/text_embedding_model"),
            ProcessingInput(source=hash_keys, destination="/opt/ml/processing/whole_embedding")
           ],
    outputs=[
        ProcessingOutput(output_name="output_hash", source="/opt/ml/processing/output_hash"),
        ProcessingOutput(output_name="whole_embedding", source="/opt/ml/processing/output_embedding"),
        ProcessingOutput(output_name="sim_dic", source="/opt/ml/processing/output_similarity"),
        # destination 지정하면 안에 따로 폴더 만드는 과정 없이 파일만 저장됨
        ProcessingOutput(output_name="deploy_code", source="/opt/ml/processing/output_deploy_code", destination=deploy_code_dir),
        # 학습 돌리는 날은 해당 output 필요 x
        ProcessingOutput(output_name="model", source="/opt/ml/processing/output_model")
        ],
)

## Preprocessing Step

In [None]:
from sagemaker.processing import Processor
from sagemaker.processing import ProcessingInput, ProcessingOutput

processor = Processor(
    image_uri=preprocessing_image_uri,
    role=role,
    instance_count=preprocessing_instance_count,
    instance_type=preprocessing_instance_type,
)

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.functions import Join

step_preprocess = ProcessingStep(
    name="Preprocess-Step",
    processor=processor,
    inputs=[
            ProcessingInput(source=Join(on="", 
                                        values=[step_embedd.properties.ProcessingOutputConfig.Outputs['whole_embedding'].S3Output.S3Uri,'/whole_embedding.pickle']), 
                                        destination="/opt/ml/processing/whole_embedding")
    ],
    outputs=[
        ProcessingOutput(output_name="interaction_data", source="/opt/ml/processing/interaction_data"),
        ProcessingOutput(output_name="encoding_data", source="/opt/ml/processing/encoding_data"),
        ProcessingOutput(output_name="embedding", source="/opt/ml/processing/embedding"),
        ProcessingOutput(output_name="code", source="/opt/ml/processing/code", destination=train_code_uri)
    ], 
    job_arguments=["--month_len", f"{month_len}"],
    depends_on=[]
)

## Train Step

In [None]:
from sagemaker.huggingface import HuggingFace
from sagemaker.workflow.functions import Join

estimator = HuggingFace(
    py_version='py38',
    image_uri=train_image_uri,
    role=role,
    instance_count=train_instance_count,
    instance_type=train_instance_type,
    volume_size=train_instance_volume,
    input_mode="Pipe",
    source_dir=train_code_uri,
    entry_point='train.py',
    hyperparameters={
        'n_epochs': n_epochs
        }
    )

In [None]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="Train-Step",
    estimator=estimator,
    inputs={
        # preprocess-step output
        "interaction_data": TrainingInput(
            s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs[
                "interaction_data"
            ].S3Output.S3Uri
        ),
        "embedding": TrainingInput(
            s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs[
                "embedding"
            ].S3Output.S3Uri
        ),
        "encoding_data": TrainingInput(
            s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs[
                "encoding_data"
            ].S3Output.S3Uri
        ),
        # embedd-step output
        "whole_embedding": TrainingInput(
            s3_data=step_embedd.properties.ProcessingOutputConfig.Outputs[
                "whole_embedding"
            ].S3Output.S3Uri
        ),
        "sim_dic": TrainingInput(
            s3_data=step_embedd.properties.ProcessingOutputConfig.Outputs[
                "sim_dic"
            ].S3Output.S3Uri
        ),
    },
    depends_on=[step_embedd, step_preprocess]
)

## Create Model Step

In [None]:
from sagemaker.huggingface.model import HuggingFaceModel
from sagemaker.workflow.functions import Join

model = HuggingFaceModel(
    image_uri=deploy_model_image_uri,
    role=role,
    # 7일 배치에서는 estimator output
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    pytorch_version="1.12.1",
)

In [None]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep


inputs = CreateModelInput(
    # 여기서 instance_type은 gpu쓸 건지 아닐지 determine하는 용도로만 쓰임(출처: sagemaker python SDK)
    instance_type=endpoint_instance_type,
)
step_create_model = CreateModelStep(
    name="Recommender-Model",
    model=model,
    inputs=inputs,
    depends_on=[step_train]
)

## Deploy Step

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep

deploy_model_processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=role,
    instance_type=deploy_process_instance_type, 
    instance_count=deploy_process_instance_count,
    base_job_name='recommender-deploy-model',
    )

In [None]:
step_deploy = ProcessingStep(
    name='Recommender-Endpoint',
    processor=deploy_model_processor,
    job_arguments=[
        "--model_name", step_create_model.properties.ModelName, 
        "--region", region,
        "--endpoint_instance_type", endpoint_instance_type,
        "--endpoint_name", "Recommender-Endpoint"
    ],
    code=deploy_code_uri,
    depends_on=[step_create_model]
)

## Pipeline

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = "recommender-7day-pipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        # embedd
        embedding_instance_count,
        embedding_instance_type,
        embedding_instance_volume,
        embedding_image_uri,
        text_model_uri,
        trained_model_zip,
        hash_keys,
        
        # preprocess
        preprocessing_instance_count,
        preprocessing_instance_type,
        preprocessing_instance_volume,
        preprocessing_image_uri,
        month_len,
        test_data_num,
        
        # train 
        train_instance_count,
        train_instance_type,
        train_instance_volume,
        train_image_uri,
        n_epochs,
        
        # create_model
        deploy_model_image_uri,
        endpoint_instance_type,
        deploy_code_dir,
        
        # deploy model 
        deploy_process_instance_count,
        deploy_process_instance_type,
        endpoint_instance_count
    ],
    steps=[step_embedd, step_preprocess, step_train, step_create_model, step_deploy]
)
        

In [None]:
import json

definition = json.loads(pipeline.definition())
definition

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start()

In [None]:
execution.describe()