In [1]:
%pip install s3fs

Note: you may need to restart the kernel to use updated packages.


In [2]:
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.properties import PropertyFile
from sagemaker import get_execution_role
from time import gmtime, strftime, sleep
from importlib.metadata import version, PackageNotFoundError



sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Fetched defaults config from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [3]:
sm_role = sagemaker.get_execution_role()

In [4]:
packages = ['s3fs', 'sagemaker', ]
requirements = [f'{p}=={version(p)}' for p in packages]

if requirements:
    with open('requirements.txt', 'w') as f:
        f.write('\n'.join(requirements))
    print("\nNew requirements.txt file created with the following content:")
    print('\n'.join(requirements))
else:
    print("\nNo requirements.txt file created as no packages were found")


New requirements.txt file created with the following content:
s3fs==2024.10.0
sagemaker==2.240.0


In [5]:
# Print default location of configuration files
import os
from platformdirs import site_config_dir, user_config_dir

#Prints the location of the admin config file
print(os.path.join(site_config_dir("sagemaker"), "config.yaml"))

#Prints the location of the user config file
print(os.path.join(user_config_dir("sagemaker"), "config.yaml"))

/etc/xdg/sagemaker/config.yaml
/home/sagemaker-user/.config/sagemaker/config.yaml


In [6]:
%%writefile config.yaml

SchemaVersion: '1.0'
SageMaker:
    PythonSDK:
        Modules:
            RemoteFunction:
                InstanceType: ml.m5.xlarge
                Dependencies: ./requirements.txt
                IncludeLocalWorkDir: true
                CustomFileFilter:
                    IgnoreNamePatterns: # files or directories to ignore
                        - "*.ipynb" # all notebook files
                        - "*.md" # all markdown files
                        - "__pycache__"

Overwriting config.yaml


In [7]:
# copy the configuration file to user config file location
%mkdir -p {user_config_dir("sagemaker")}
%cp config.yaml {os.path.join(user_config_dir("sagemaker"), "config.yaml")}

In [8]:
# Set name of the pipeline
project = "alt-llm-eval-poc"
current_timestamp = strftime('%d-%H-%M-%S', gmtime())
pipeline_name = f"{project}-pipeline-{current_timestamp}"
pipeline_name

'alt-llm-eval-poc-pipeline-27-16-46-05'

In [9]:
# Set instance types and counts
process_instance_type = "ml.m5.large"
train_instance_type = "ml.m5.large"

In [10]:
!mkdir -p custom_processing

In [11]:
%%writefile custom_processing/preprocessing.py
import os
import json
import sys

def preprocess(
    config_file_a,
    config_file_b,
    config_file_c,
    pipeline_run_name=None,
    run_id=None,
    always_fail="no"):

    if always_fail.lower() == "yes":
        print(f"custom_processing/preprocessing exiting: given always_fail value of: {always_fail}")
        output_data = {
            "run_status": "Failure",
            "message": "Failing due to always_fail parameter being set to True"
        }
        return {**output_data}
    else:
        print(f"custom_processing/preprocessing continuing: given always_fail value of: {always_fail}")

    
    try:
        # Check access to config files here
            
        # For now just output the S3 URIs
        config_files = [config_file_a, config_file_b, config_file_c]
        for cf in config_files:
            print(f"custom_processing/preprocessing: given config file: {cf}")
    
        # Read in the configuration file here and use the values to drive the processing 
    
        # ....
        
        # Do the processing here the files here
        
    
        # Indicate success and create the output.json
        output_data = {
            "run_status": "Success",
            # "run_status": "Failure",
            "message": "Processing completed successfully.",
        }

    except Exception as e:
        # If there's an error, write a failure status
        output_data = {
            "run_status": "Failure",
            "message": str(e)
        }
    
    return {**output_data}

Overwriting custom_processing/preprocessing.py


In [12]:
# Python function code is in the local files
from custom_processing.preprocessing import preprocess

In [13]:
pre_proc_status = preprocess(config_file_a="a.yaml",
           config_file_b="b.yaml", 
           config_file_c="c.yaml")
pre_proc_status['run_status']

custom_processing/preprocessing continuing: given always_fail value of: no
custom_processing/preprocessing: given config file: a.yaml
custom_processing/preprocessing: given config file: b.yaml
custom_processing/preprocessing: given config file: c.yaml


'Success'

In [14]:
pre_proc_status = preprocess(config_file_a="a.yaml",
                             config_file_b="b.yaml",
                             config_file_c="c.yaml",
                             always_fail="Yes")
pre_proc_status['run_status']

custom_processing/preprocessing exiting: given always_fail value of: Yes


'Failure'

In [15]:
import pandas as pd
import json
import boto3
import pathlib
import io
import sagemaker
import mlflow
from time import gmtime, strftime, sleep
from sagemaker.deserializers import CSVDeserializer
from sagemaker.serializers import CSVSerializer

from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import (
    ProcessingInput, 
    ProcessingOutput, 
    ScriptProcessor
)
from sagemaker.inputs import TrainingInput

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import (
    ProcessingStep, 
    TrainingStep, 
    CreateModelStep,
    CacheConfig
)
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.parameters import (
    ParameterInteger, 
    ParameterFloat, 
    ParameterString, 
    ParameterBoolean
)
from sagemaker.workflow.quality_check_step import (
    DataQualityCheckConfig,
    ModelQualityCheckConfig,
    QualityCheckStep,
)
from sagemaker.workflow.clarify_check_step import (
    ModelBiasCheckConfig, 
    ClarifyCheckStep, 
    ModelExplainabilityCheckConfig
)
from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.conditions import (
    ConditionGreaterThan,
    ConditionEquals,
    ConditionNot,
    ConditionOr
)
from sagemaker.workflow.parallelism_config import ParallelismConfiguration
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import (
    Join,
    JsonGet
)
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
from sagemaker.lambda_helper import Lambda

from sagemaker.model_metrics import (
    MetricsSource, 
    ModelMetrics, 
    FileSource
)
from sagemaker.drift_check_baselines import DriftCheckBaselines
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig 
from sagemaker.image_uris import retrieve
from sagemaker.workflow.function_step import step
from sagemaker.workflow.step_outputs import get_step
from sagemaker.model_monitor import DatasetFormat, model_monitoring
from IPython.display import HTML

sagemaker.__version__

'2.240.0'

### Setup pipeline parameters
SageMaker Pipelines supports [parameterization](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html), which allows you to specify input parameters at runtime without changing your pipeline code. You can use the parameter classes available under the [`sagemaker.workflow.parameters`](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#parameters) module.
Parameters have a default value, which you can override by specifying parameter values when starting a pipeline execution.

In [16]:
config_file_a_param = ParameterString(
    name="Config_File_A_URI",
    default_value="a.yaml",
)

config_file_b_param = ParameterString(
    name="Config_File_B_URI",
    default_value="b.yaml",
)

config_file_c_param = ParameterString(
    name="Config_File_C_URI",
    default_value="c.yaml",
)

always_fail_param = ParameterString(
    name="AlwaysFail",
    default_value="no",
)

process_instance_type_param = ParameterString(
    name="ProcessingInstanceType",
    default_value=process_instance_type,
)

In [17]:
# preprocess data step
step_preprocess = step(
    preprocess,
    instance_type=process_instance_type_param,
    name=f"{project}--preprocessing",
    keep_alive_period_in_seconds=3600,
)(
    config_file_a=config_file_a_param,
    config_file_b=config_file_b_param,
    config_file_c=config_file_c_param,
    pipeline_run_name=ExecutionVariables.PIPELINE_EXECUTION_ID,
    always_fail=always_fail_param
)

In [18]:
# condition to check in the condition step
condition_not_ok = ConditionNot(ConditionEquals(left=step_preprocess['run_status'], right='Success'))


step_fail = FailStep(
    name=f"{project}-fail",
    error_message="Data pre-processing failed",
)

# conditional register step
step_conditional_exit = ConditionStep(
    name=f"{project}-processing-status",
    conditions=[condition_not_ok],
    if_steps=[step_fail]
)


In [19]:
# Create a pipeline object
pipeline = Pipeline(
    name=f"{pipeline_name}",
    parameters=[
        config_file_a_param,
        config_file_b_param,
        config_file_c_param,
        process_instance_type_param,
        always_fail_param
    ],
    steps=[step_conditional_exit],
    pipeline_definition_config=PipelineDefinitionConfig(use_custom_job_prefix=True)
)

In [20]:
# Upsert operation serialize the function code, arguments, and other artefacts to S3 where it can be accessed during pipeline's runtime
pipeline.upsert(role_arn=sm_role)

sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.RemoteFunction.Dependencies
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.RemoteFunction.IncludeLocalWorkDir
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.RemoteFunction.CustomFileFilter.IgnoreNamePatterns


2025-03-27 16:47:54,359 sagemaker.remote_function INFO     Uploading serialized function code to s3://sagemaker-us-west-2-396913714882/alt-llm-eval-poc-pipeline-27-16-46-05/alt-llm-eval-poc--preprocessing/2025-03-27-16-47-51-607/function
2025-03-27 16:47:54,499 sagemaker.remote_function INFO     Uploading serialized function arguments to s3://sagemaker-us-west-2-396913714882/alt-llm-eval-poc-pipeline-27-16-46-05/alt-llm-eval-poc--preprocessing/2025-03-27-16-47-51-607/arguments
2025-03-27 16:47:54,692 sagemaker.remote_function INFO     Copied dependencies file at './requirements.txt' to '/tmp/tmprs4humml/requirements.txt'
2025-03-27 16:47:54,717 sagemaker.remote_function INFO     Successfully uploaded dependencies and pre execution scripts to 's3://sagemaker-us-west-2-396913714882/alt-llm-eval-poc-pipeline-27-16-46-05/alt-llm-eval-poc--preprocessing/2025-03-27-16-47-51-607/pre_exec_script_and_dependencies'
2025-03-27 16:47:54,720 sagemaker.remote_function INFO     Copied user workspace 

{'PipelineArn': 'arn:aws:sagemaker:us-west-2:396913714882:pipeline/alt-llm-eval-poc-pipeline-27-16-46-05',
 'ResponseMetadata': {'RequestId': 'ba8cb763-642b-4afa-a8d1-8c5e09911719',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'ba8cb763-642b-4afa-a8d1-8c5e09911719',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '105',
   'date': 'Thu, 27 Mar 2025 16:47:55 GMT'},
  'RetryAttempts': 0}}

### Execute the pipeline
The first pipeline execution takes about 4 minutes. Note the usage of the `keep_alive_period_in_seconds` parameter in the step definition for the warm pool reuse and `CacheConfig` in the Training step for the caching of step results.
A subsequent pipeline execution takes about 3 minutes due to usage of caching and a warm pool.

In [21]:
pipeline_name

'alt-llm-eval-poc-pipeline-27-16-46-05'

In [22]:
pipeline_execution = pipeline.start()
pipeline_execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-west-2:396913714882:pipeline/alt-llm-eval-poc-pipeline-27-16-46-05',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-west-2:396913714882:pipeline/alt-llm-eval-poc-pipeline-27-16-46-05/execution/5l0y0z76e8gu',
 'PipelineExecutionDisplayName': 'execution-1743094075312',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2025, 3, 27, 16, 47, 55, 241000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2025, 3, 27, 16, 47, 55, 241000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-west-2:396913714882:user-profile/d-jogucphocgiz/toby-2024-11-07',
  'UserProfileName': 'toby-2024-11-07',
  'DomainId': 'd-jogucphocgiz',
  'IamIdentity': {'Arn': 'arn:aws:sts::396913714882:assumed-role/AmazonSageMaker-ExecutionRole-20241022T112182/SageMaker',
   'PrincipalId': 'AROAVY2PGU3BPA645TTQE:SageMaker'}},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-west-2:396913714882:user-profile/d-jogucphoc

In [23]:
import time
time.sleep(3)

In [24]:
pipeline_execution.list_steps()

[{'StepName': 'alt-llm-eval-poc--preprocessing',
  'StartTime': datetime.datetime(2025, 3, 27, 16, 47, 55, 946000, tzinfo=tzlocal()),
  'StepStatus': 'Executing',
  'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-west-2:396913714882:training-job/preprocess-5l0y0z76e8gu-BDKGKa85jn'}},
  'AttemptCount': 1}]

In [25]:
time.sleep(3)

In [26]:
runtime_parameters = {
        'AlwaysFail': "Yes"
    }

In [27]:
pipeline_execution_fail = pipeline.start(parameters=runtime_parameters)
pipeline_execution_fail.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-west-2:396913714882:pipeline/alt-llm-eval-poc-pipeline-27-16-46-05',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-west-2:396913714882:pipeline/alt-llm-eval-poc-pipeline-27-16-46-05/execution/605x9kv1q06j',
 'PipelineExecutionDisplayName': 'execution-1743094083899',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2025, 3, 27, 16, 48, 3, 840000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2025, 3, 27, 16, 48, 3, 840000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-west-2:396913714882:user-profile/d-jogucphocgiz/toby-2024-11-07',
  'UserProfileName': 'toby-2024-11-07',
  'DomainId': 'd-jogucphocgiz',
  'IamIdentity': {'Arn': 'arn:aws:sts::396913714882:assumed-role/AmazonSageMaker-ExecutionRole-20241022T112182/SageMaker',
   'PrincipalId': 'AROAVY2PGU3BPA645TTQE:SageMaker'}},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-west-2:396913714882:user-profile/d-jogucphocgi

In [28]:
time.sleep(3)

In [29]:
pipeline_execution_fail.list_steps()

[{'StepName': 'alt-llm-eval-poc--preprocessing',
  'StartTime': datetime.datetime(2025, 3, 27, 16, 48, 4, 941000, tzinfo=tzlocal()),
  'StepStatus': 'Executing',
  'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-west-2:396913714882:training-job/preprocess-605x9kv1q06j-uf6rJ0g0lL'}},
  'AttemptCount': 1}]