[Sagemaker pipelines template](https://docs.aws.amazon.com/sagemaker/latest/dg/define-pipeline.html)

In [2]:
%cd ..

/root/rl-market-simulator


# Set Up Your Environment

In [3]:
# AWS api services setup
import boto3
import sagemaker
import sagemaker.session

# Training estimator
from sagemaker.rl import RLEstimator, RLToolkit, RLFramework

# Sagemaker pipeline dependencies
import json
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.tuner import ContinuousParameter, HyperparameterTuner
from sagemaker.workflow.steps import TuningStep
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession

# From custom code
from src.utils import global_parameters as gp

# Create TFM-v3 Sagemaker Pipeline
[Pipeline overall structure](https://confluence.tuigroup.com/display/MLL/3.1.3.+Model+Development)

# Step 1: Download model dependencies for the RL training

In [4]:
# Get session info
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
pipeline_session = PipelineSession()

# update bucket results location
gp.base_uri = f"s3://{default_bucket}/tfm-v3"

# upload reinforcement learning data model dependencies

# 1 - processed training data
local_path = "src/data/interim/tfm_data_last_365_days.csv"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=gp.base_uri,
)
print(input_data_uri)

# 2 - Source code data
local_path = "src"
src_code_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=gp.base_uri,
)
print(src_code_uri)

s3://sagemaker-eu-central-1-961105418118/tfm-v3/tfm_data_last_365_days.csv
s3://sagemaker-eu-central-1-961105418118/tfm-v3


Define model dependencies pipeline parameters

This code block defines the following parameters for your pipeline:

- processing_instance_count – The instance count of the processing job.
- input_data – The Amazon S3 location of the input data.
- input_booking_curve_model – The Amazon S3 location of the booking curves machine learning model.
- input_source_code – The Amazon S3 location of the source code.

In [5]:
processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
input_source_code = ParameterString(
    name="SourceCode",
    default_value=src_code_uri,
)

# Step 2: Process TFM data
check: https://stackoverflow.com/questions/69046990/how-to-pass-dependency-files-to-sagemaker-sklearnprocessor-and-use-it-in-pipelin

In [6]:
# Define loading dependencies estimator step
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="tfm-v3-dependencies",
    role=role,
)
# Define loading dependencies step
step_process = ProcessingStep(
    name="Process-TFM-data",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input/data"), 
        ProcessingInput(source=input_source_code, destination="/opt/ml/processing/input/code/src")
    ],    
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train")
    ],
    code="src/pipeline/booking_curve_preprocessing.py",
)

# Step 3: Generate booking curves (training)

In [7]:
# Define loading dependencies estimator step
sklearn_train = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="Fit-booking-curves",
    role=role,
)

In [8]:
# Define training step
step_train = ProcessingStep(
    name="Fit-booking-curves",
    processor=sklearn_train,
    inputs=[
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/train"
        ), 
        ProcessingInput(source=input_source_code, destination="/opt/ml/processing/input/code/src")
    ],
    code="src/pipeline/extract_booking_curves.py",
)

# Step 4: Evaluation step (check recall score)

In [9]:
# Get container image
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge"
)

# Define evaluation script processor
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="script-tfm-booking-curve-eval",
    role=role,
)

# Define evaluation report objetct for the script processor
evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)

In [10]:
# Define evaluation step
step_eval = ProcessingStep(
    name="TFM-V3-Eval-Booking-Curves",
    processor=script_eval,
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation")
    ],
    code="src/pipeline/evaluation-booking-curves.py",
    property_files=[evaluation_report],
)

step_eval.add_depends_on([step_train])

# Step 5: Define fail step if Recall criteria is not met

In [11]:
# Condition is based on the p-vaule obtained from the statistical test for the training convergence
step_fail = FailStep(
    name="TFM-V3-Failed-Recall-Standard",
    error_message=Join(
        on=" ", 
        values=["Execution failed due to recall value <", JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="booking_curve_metrics.performance_metrics.training_recall_score"
    )]
    ),
)

# Step 6: Check if the selected model is compliant with the RL training

In [12]:
# Define loading dependencies estimator step
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="tfm-v3-dependencies",
    role=role,
)
# Define loading dependencies step
step_process_rl = ProcessingStep(
    name="Load-RL-Dependencies",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input/data"), 
        ProcessingInput(source=input_source_code, destination="/opt/ml/processing/input/code/src"),
        ProcessingInput(source=step_eval.properties.ProcessingOutputConfig.Outputs[
            "evaluation"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/evaluation"
        ),
    ],    
    outputs=[
        ProcessingOutput(output_name="markup_probability_prediction", source="/opt/ml/processing/quality_check")
    ],
    code="src/pipeline/preprocessing-rl.py",
)

# Step 7: Define a Condition Step to Verify Model Recall threshold

In [13]:
# Get the evaluation json and parse the statistical test result
cond_lte = ConditionLessThanOrEqualTo(
    right=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="booking_curve_metrics.performance_metrics.training_recall_score"
    ),
    left=0.70 # recall score has to be higher than 70%
)

In [14]:
# Definition of the condition step
step_cond = ConditionStep(
    name="Training-Recall-Evaluation",
    conditions=[cond_lte],
    if_steps=[step_process_rl],
    else_steps=[step_fail], 
)

step_cond.add_depends_on([step_eval])

# Step 8: Define RL Training step

In [15]:
# set up s3 bucket
sage_session = sagemaker.session.Session()
s3_bucket = sage_session.default_bucket()
s3_output_path = "s3://{}/".format(s3_bucket)
print("S3 bucket path: {}".format(s3_output_path))

#DNS name of the Load Balancer that interacts with fargate cluster in which the MLflow server is installed 
tracking_uri = 'http://mll-mlflow-development-1-72adb7f6eb3c1c02.elb.eu-central-1.amazonaws.com'
rl_experiment_name = 'tfm-v3-rlestimator+mlflow+pipelines'

# create a descriptive job name
job_name_prefix = "rl-market-simulator"

# RL estimator metrics
metric_definitions = [{'Name': 'episode_reward_mean',
                      'Regex': 'episode_reward_mean: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'},
                     {'Name': 'episode_reward_max',
                      'Regex': 'episode_reward_max: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'},
                     {'Name': 'episode_reward_min',
                      'Regex': 'episode_reward_min: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'},
                     {'Name': 'training_iteration',
                      'Regex': 'training_iteration: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'},
                     {'Name': 'entropy',
                      'Regex': 'entropy: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'}]

# RL estimator hyperparamenters
run_hyperparameters = {
    'tracking_uri': tracking_uri,
    'experiment_name': rl_experiment_name,
    'training_iteration': gp.MAX_ITERATIONS,
    'gamma': 0.50,
    'horizon': gp.AGENT_MAX_STEPS,
    'lr': 0.001,
    's3_bucket_output': s3_bucket
}

# Tuning parameters
rl_hyperparameter_ranges = {
    'gamma': ContinuousParameter(0.01, 0.99),
    'lr': ContinuousParameter(0.0001, 0.001)
}

# Definition of the reinforcement learning estimator
rl_estimator = RLEstimator(base_job_name=job_name_prefix,
                        entry_point='./pipeline/train_rl_agent-new.py',
                        source_dir='src',
                        dependencies=["src/common/sagemaker_rl"],
                        toolkit=RLToolkit.RAY,
                        framework=RLFramework.TENSORFLOW,
                        toolkit_version="1.6.0",
                        role=role,
                        debugger_hook_config=False,
                        instance_type='ml.m5.large',
                        instance_count=1,
                        output_path=s3_output_path,
                        use_spot_instances=True, # use spot instance
                        max_run = 7200, # seconds
                        max_wait = 7200, # seconds
                        metric_definitions=metric_definitions,
                        hyperparameters = run_hyperparameters,
                        sagemaker_session=pipeline_session
                        )

# Define the tuning processor
rl_tuner = HyperparameterTuner(rl_estimator,
                            objective_metric_name='episode_reward_mean',
                            objective_type='Maximize',
                            hyperparameter_ranges=rl_hyperparameter_ranges,
                            metric_definitions=metric_definitions,
                            max_jobs=50,
                            max_parallel_jobs=10,
                            base_tuning_job_name='tfm-v3-training')

S3 bucket path: s3://sagemaker-eu-central-1-961105418118/


In [16]:
# Define tuning step
step_train_rl = TuningStep(
    name = "TFM-V3-Training",
    step_args = rl_tuner.fit(inputs={
        "train": TrainingInput(
            s3_data=gp.logreg_model_train_data_path,input_mode='File',
        ),
        "booking_curves": TrainingInput(
            s3_data=gp.logreg_model_path,input_mode='File',
        ),
    })
    )
step_train_rl.add_depends_on([step_process_rl])



# Step 9: Evaluate RL training convergence

In [17]:
# Get container image
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge"
)

# Define evaluation script processor
script_eval_rl = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="script-tfm-rl-eval",
    role=role,
)

# Define evaluation report objetct for the script processor
evaluation_report_rl = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)

In [18]:
# Define evaluation step
step_eval_rl = ProcessingStep(
    name="TFM-V3-Eval-RL",
    processor=script_eval_rl,
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="src/pipeline/evaluation.py",
    property_files=[evaluation_report_rl],
)

step_eval_rl.add_depends_on([step_train_rl])

# Step 10: Define fail step if criteria is not met

In [19]:
# Condition is based on the p-vaule obtained from the statistical test for the training convergence
step_fail_rl = FailStep(
    name="TFM-V3-Failed-Training-Convergence",
    error_message=Join(
        on=" ", 
        values=["Execution failed due to training convergence p-value <", JsonGet(
        step_name=step_eval_rl.name,
        property_file=evaluation_report_rl,
        json_path="reward_mean_convergence_metrics.statistical_test.p_value"
    )]
    ),
)

# Step 11: Define a Condition Step to Verify Model training convergence

In [20]:
# Get the evaluation json and parse the statistical test result
cond_lte_rl = ConditionLessThanOrEqualTo(
    right=JsonGet(
        step_name=step_eval_rl.name,
        property_file=evaluation_report_rl,
        json_path="reward_mean_convergence_metrics.statistical_test.p_value"
    ),
    left=0.05 # 5% significance level
)

In [21]:
# Definition of the condition step
step_cond_rl = ConditionStep(
    name="Training-Convergence-Evaluation",
    conditions=[cond_lte_rl],
    if_steps=[],
    else_steps=[step_fail_rl], 
)

step_cond_rl.add_depends_on([step_eval_rl])

# Step 12: Create a pipeline

In [22]:
# Pipeline definition
pipeline_name = f"TFM-v3-pipeline"
pipeline = Pipeline(
     name=pipeline_name,
     parameters=[
         processing_instance_count,
         input_data,
         input_source_code
        
     ],
    steps=[step_process, step_train, step_eval, step_cond, step_train_rl, step_eval_rl, step_cond_rl]
)

# Step 13: Start the Pipeline
Examine the JSON pipeline definition to ensure that it's well-formed.

In [23]:
json.loads(pipeline.definition())

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-eu-central-1-961105418118/tfm-v3/tfm_data_last_365_days.csv'},
  {'Name': 'SourceCode',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-eu-central-1-961105418118/tfm-v3'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'Process-TFM-data',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '492215442770.dkr.ecr.eu-central-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3',
     'ContainerEntrypoint': ['python3',
      '/opt/ml/processing/input/code

Submit the pipeline definition to the SageMaker Pipelines service to create a pipeline if it doesn't exist, or update the pipeline if it does. The role passed in is used by SageMaker Pipelines to create all of the jobs defined in the steps.

In [24]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:eu-central-1:961105418118:pipeline/tfm-v3-pipeline',
 'ResponseMetadata': {'RequestId': 'bdb8d03d-8ec5-436d-818d-6e0bfa2a1e66',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'bdb8d03d-8ec5-436d-818d-6e0bfa2a1e66',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '86',
   'date': 'Mon, 10 Oct 2022 15:09:04 GMT'},
  'RetryAttempts': 0}}

Start a pipeline execution.

In [25]:
execution = pipeline.start()

Check steps execution

In [31]:
execution.list_steps()

[{'StepName': 'Training-Convergence-Evaluation',
  'StartTime': datetime.datetime(2022, 10, 10, 19, 31, 58, 474000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 10, 10, 19, 31, 58, 748000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'Condition': {'Outcome': 'True'}}},
 {'StepName': 'TFM-V3-Eval-RL',
  'StartTime': datetime.datetime(2022, 10, 10, 19, 27, 41, 169000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 10, 10, 19, 31, 57, 956000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:eu-central-1:961105418118:processing-job/pipelines-jpiyur3jy65t-tfm-v3-eval-rl-tu3m54nrze'}}},
 {'StepName': 'TFM-V3-Training',
  'StartTime': datetime.datetime(2022, 10, 10, 15, 37, 51, 942000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 10, 10, 19, 27, 39, 746000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'Tunin