In [1]:
# !pip install -U sagemaker

In [2]:
# Sagemaker Environment Setting

import sys
import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession


role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
default_bucket = sagemaker_session.default_bucket()
pipeline_session = PipelineSession()
model_package_group_name = f"PieplineForIdentifyingHRES"

  from pandas.core import (


sagemaker.config INFO - Not applying SDK defaults from location: C:\ProgramData\sagemaker\sagemaker\config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: C:\Users\gkthd\AppData\Local\sagemaker\sagemaker\config.yaml


Couldn't call 'get_role' to get Role ARN from role name melodyminds to get Role path.


ValueError: The current AWS identity is not a role: arn:aws:iam::381491990157:user/melodyminds, therefore it cannot be used as a SageMaker execution role

In [None]:
# Feature store session configuration
prefix = 'sagemaker-featurestore-HRES'
offline_feature_store_bucket = f"s3://{default_bucket}/{prefix}"
boto_session = boto3.Session(region_name=region)
sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = sagemaker.session.Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

In [None]:
# # Local Setting
# from sagemaker.local import LocalSession

# from sagemaker.workflow.pipeline_context import LocalPipelineSession

# role = 'arn:aws:iam::111111111111:role/service-role/AmazonSageMaker-ExecutionRole-20200101T000001'
# sagemaker_session = LocalSession()
# sagemaker_session.config = {'local': {'local_code': True}}
# pipeline_session = LocalPipelineSession()
# region = "us-west-2"
# default_bucket = "./sagemaker_local_output"
# model_package_group_name = "PipelineForIdentifyingHybridRenewableEnergyStation"

In [None]:
# Parameters for Pipeline Configuration

from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat


processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.t3.medium") 
model_approval_status = ParameterString(name='ModelApprovalStatus', default_value="PendingManualApproval")

mse_threshold = ParameterFloat(name='MseThreshold', default_value=50.0)

In [None]:
# Processor Configuration

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "1.2-1"
sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    role=role,
    instance_type=instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-HRES-process",
    sagemaker_session=pipeline_session,
    
)

In [None]:
# Data Collection Pipeline

from sagemaker.workflow.steps import ProcessingStep


data_collection_step = ProcessingStep(
    name="DataCollectionStep",
    processor=sklearn_processor,
    inputs=[],
    outputs=[
        ProcessingOutput(output_name='collected_data', source="/opt/ml/processing/output")
    ],
    code='code/data_collection_script.py'
)

In [None]:
# Data Preprocessing Pipeline

processing_step = ProcessingStep(
    name='HRESProcess',
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(
            source=data_collection_step.properties.ProcessingOutputConfig.Outputs['collected_data'].S3Output.S3Uri,
            destination='/opt/ml/processing/input'
        )
    ],
    outputs=[
        ProcessingOutput(output_name='train', source='/opt/ml/processing/train'),
        ProcessingOutput(output_name='validation', source='/opt/ml/processing/validation'),
        ProcessingOutput(output_name='test', source='/opt/ml/processing/test'),
    ],
    code='code/preprocessing.py'
    
)

In [None]:
# Training & HyperParameter Pipeline

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.tuner import HyperparameterTuner, ContinuousParameter, IntegerParameter
from sagemaker.workflow.steps import TuningStep

model_path = f's3://{default_bucket}/HRESTrain'
# model_path = f'{default_bucket}/HRESTrain' # TODO


image_uri = sagemaker.image_uris.retrieve(
    framework='xgboost',
    region=region,
    version='1.0-1',
    py_version='py3',
    instance_type='ml.m5.xlarge',
)

xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=1,
    output_path=model_path,
    base_job_name="xgb-HRES-train",
    role=role,
    sagemaker_session=pipeline_session,
)

xgb_train.set_hyperparameters(
    eval_metric="rmse",
    objective="reg:squarederror",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0,
)

hyperparameter_ranges ={
    "eta": ContinuousParameter(0.01, 0.5),
    "subsample": ContinuousParameter(0.1, 0.9),
    "alpha": ContinuousParameter(0, 2),
    "gamma": IntegerParameter(1, 10),
    "min_child_weight": IntegerParameter(1, 10),
    "max_depth": IntegerParameter(1, 10),
    "num_round": IntegerParameter(10, 100),
}

tuner = HyperparameterTuner(
    estimator=xgb_train,
    objective_metric_name='validation:rmse',
    hyperparameter_ranges=hyperparameter_ranges,
    metric_definitions=[{"Name": "validation:rmse", "Regex": "validation-rmse:(.*?);"}],
    max_jobs=3, 
    max_parallel_jobs=3, 
    objective_type="Minimize"
)

hpo_args = tuner.fit(
    inputs={
        "train": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

training_step = TuningStep(
    name='HRESTrainWithHyperParameterTuning',
    step_args=hpo_args,
)



In [None]:
# Validation Pipeline
from sagemaker.processing import ScriptProcessor
from sagemaker.workflow.properties import PropertyFile

script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=['python3'],
    instance_type='ml.m5.xlarge',
    instance_count=1,
    base_job_name='script-HRES-eval',
    role=role,
    sagemaker_session=pipeline_session
)

eval_args = script_eval.run(

    inputs=[
        ProcessingInput(
            source=training_step.get_top_model_s3_uri(top_k=0, s3_bucket=default_bucket),
            destination='/opt/ml/preocessing/model',
        ),
        ProcessingInput(
            source=training_step.get_top_model_s3_uri(top_k=0, s3_bucket=default_bucket),
            destination='/opt/ml/processing/test',
        ),
    ],
    outputs=[
        ProcessingOutput(output_name='evaluation', source='opt/ml/processing/evaluation'),
    ],
    code="code/evaluation.py",

)
evaluation_report = PropertyFile(name='EvaluationReport', output_name='evaluation', path='evaluation.json')
evaluation_step = ProcessingStep(
    name='HRESEval',
    step_args=eval_args,
    property_files=[evaluation_report],
)
    
        

In [None]:
# Model Step

from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri='{}/evaluation.json'.format(
            evaluation_step.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
        ),
        content_type='application/json',
    )
)


model = Model(
    image_uri=image_uri,
    model_data=training_step.get_top_model_s3_uri(top_k=0, s3_bucket=default_bucket),
    sagemaker_session=pipeline_session,
    role=role
)
register_model_step_args = model.register(
    content_types=['application/json'],
    response_types=['application/json'],
    inference_instances=['ml.t2.medium', 'ml.m5.xlarge'],
    transform_instances=['ml.m5.xlarge'],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

step_model_create_and_register = ModelStep(
    name="HRESCreateModel",
    step_args=register_model_step_args
)

In [None]:
# Fail Step

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="HRESMSEFail",
    error_message=Join(on=" ", values=["Execution failed due to MSE >", mse_threshold])
)

In [None]:
# Conditional Step

from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path='regression_metrics.mse.value',
    ),
    right=mse_threshold,
)

condition_step = ConditionStep(
    name="HRESMSECond",
    conditions=[cond_lte],
    if_steps=[step_model_create_and_register],
    else_steps=[step_fail],
)

In [None]:
# Defining Pipeline
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"HRESPipeline"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        mse_threshold
    ],
    steps=[data_collection_step, processing_step, training_step, evaluation_step, condition_step],
)

In [None]:
import json

pipeline_definition = json.loads(pipeline.definition())
pipeline_definition

In [None]:
pipeline.upsert(role_arn=role)
#pipeline.create()

In [None]:
execution = pipeline.start()

In [None]:
pipeline.describe()

In [None]:
execution.list_steps()

In [None]:
from pprint import pprint


evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        evaluation_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)
pprint(json.loads(evaluation_json))