In [1]:
import json
import logging
import sys
import warnings
from pathlib import Path
import boto3
import os
import ipytest

from sagemaker.session import Session
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession, LocalPipelineSession

# %load_ext autoreload
# %autoreload 2
%load_ext dotenv
%dotenv

ipytest.autoconfig(raise_on_error=True)
logging.getLogger('sagemaker').setLevel(logging.ERROR)
warnings.filterwarnings('ignore')

sagemaker.config INFO - Not applying SDK defaults from location: C:\ProgramData\sagemaker\sagemaker\config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: C:\Users\kamil\AppData\Local\sagemaker\sagemaker\config.yaml


In [2]:
CODE_FOLDER = Path("code")
CODE_FOLDER.mkdir(parents=True, exist_ok=True)
INFERENCE_CODE_FOLDER = CODE_FOLDER / "inference"
INFERENCE_CODE_FOLDER.mkdir(parents=True, exist_ok=True)

sys.path.extend([f"./{CODE_FOLDER}", f"./{INFERENCE_CODE_FOLDER}"])

In [3]:
sagemaker_session = Session()
sagemaker_client = boto3.client("sagemaker")
iam_client = boto3.client("iam")

sagemaker.config INFO - Not applying SDK defaults from location: C:\ProgramData\sagemaker\sagemaker\config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: C:\Users\kamil\AppData\Local\sagemaker\sagemaker\config.yaml


In [4]:
BUCKET = os.environ["BUCKET"]
region = boto3.Session().region_name

DUMMY_ROLE = "arn:aws:iam::111111111111:role/service-role/AmazonSageMaker-ExecutionRole-11111111111111"
S3_LOCATION = f"s3://{BUCKET}/football"
role = os.environ["ROLE"]
LOCAL_MODE = False

architecture = !(uname -m)
IS_ARM64_ARCHITECTURE = architecture[0] == "arm64"

if LOCAL_MODE:
    config = {
        "session": LocalPipelineSession(default_bucket=BUCKET),
        "instance_type": "local",
        "image": "sagemaker-xgboost-training-toolkit-local" if IS_ARM64_ARCHITECTURE else None
    }
else:
    config = {
        "session": PipelineSession(default_bucket=BUCKET) if not LOCAL_MODE else None,
        "instance_type": "ml.m5.xlarge",
        "image": None,
    }

config["framework_version"] = "1.7-1"
config["py_version"] = "py310"

sagemaker.config INFO - Not applying SDK defaults from location: C:\ProgramData\sagemaker\sagemaker\config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: C:\Users\kamil\AppData\Local\sagemaker\sagemaker\config.yaml


In [5]:
from sagemaker.s3 import S3Uploader

df_local_path = str(os.environ['DATA_FILEPATH_X'])
y_local_path = str(os.environ['DATA_FILEPATH_Y'])

S3Uploader.upload(local_path=df_local_path, desired_s3_uri=f"{S3_LOCATION}/data", sagemaker_session=config['session'])
S3Uploader.upload(local_path=y_local_path, desired_s3_uri=f"{S3_LOCATION}/data", sagemaker_session=config['session'])

's3://football-data-kamil/football/data/y.csv'

In [6]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="15d")

In [7]:
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig
from sagemaker.sklearn import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.parameters import ParameterString

pipeline_definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

dataset_location = ParameterString(
    name="dataset_location",
    default_value=f"{S3_LOCATION}/data",
)

processor = SKLearnProcessor(
    base_job_name="split-and-transform-data",
    framework_version="1.2-1",
    instance_type=config["instance_type"],
    instance_count=2,
    role=role,
    sagemaker_session=config['session'],
)

split_and_transform_data_step = ProcessingStep(
    name="split-and-transform-data",
    step_args=processor.run(
        code=f"{CODE_FOLDER}/preprocessor/preprocessor.py",
        inputs=[
            ProcessingInput(source=dataset_location, destination="/opt/ml/processing/input"),
        ],
        outputs=[
            ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
            ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
            ProcessingOutput(output_name="model", source="/opt/ml/processing/model"),
            ProcessingOutput(output_name="train-baseline", source="/opt/ml/processing/train-baseline"),
            ProcessingOutput(output_name="test-baseline", source="/opt/ml/processing/test-baseline"),
        ]
    ),
    cache_config=cache_config,
)

In [8]:
from sagemaker.workflow.steps import TrainingStep
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.retry import (
    StepRetryPolicy,
    StepExceptionTypeEnum,
    SageMakerJobExceptionTypeEnum,
    SageMakerJobStepRetryPolicy
)


def create_training_step(estimator):
    """Create a SageMaker TrainingStep using the provided estimator."""
    return TrainingStep(
        name="train-model",
        step_args=estimator.fit(
            inputs={
                "train": TrainingInput(
                    s3_data=split_and_transform_data_step.properties.ProcessingOutputConfig.Outputs[
                        "train"
                    ].S3Output.S3Uri,
                    content_type="text/csv",
                ),
                "pipeline": TrainingInput(
                    s3_data=split_and_transform_data_step.properties.ProcessingOutputConfig.Outputs[
                        "model"
                    ].S3Output.S3Uri,
                    content_type="application/tar+gzip",
                ),
            },
        ),
        # cache_config=cache_config,
        retry_policies=[
            StepRetryPolicy(
                exception_types=[
                    StepExceptionTypeEnum.SERVICE_FAULT,
                    StepExceptionTypeEnum.THROTTLING
                ],
                expire_after_mins=5,
                interval_seconds=10,
                backoff_rate=2.0
            ),
            SageMakerJobStepRetryPolicy(
                exception_types=[SageMakerJobExceptionTypeEnum.RESOURCE_LIMIT],
                expire_after_mins=120,
                interval_seconds=60,
                backoff_rate=2.0
            ),
            SageMakerJobStepRetryPolicy(
                failure_reason_types=[
                    SageMakerJobExceptionTypeEnum.INTERNAL_ERROR,
                    SageMakerJobExceptionTypeEnum.CAPACITY_ERROR,
                ],
                max_attempts=3,
                interval_seconds=10,
                backoff_rate=2.0
            )
        ]
    )

In [None]:
lifecycle_role_name = "lifecycle-ecr-role"
lifecycle_role_arn = None

 # Attach the custom policy for ecr:PutLifecyclePolicy
ecr_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Statement1",
            "Effect": "Allow",
            "Action": [
                "ecr:PutLifecyclePolicy"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

iam_client.put_role_policy(
    RoleName=lifecycle_role_name,
    PolicyName='PutLifecyclePolicy',
    PolicyDocument=json.dumps(ecr_policy_document)
)

In [13]:
IMAGE_NAME = "xgb-clf-custom-training-container"

lifecycle_ecr = f"""
{{
    "rules": [
        {{
            "rulePriority": 1,
            "description": "Keep only one untagged image, expire all others",
            "selection": {{
                "tagStatus": "untagged",
                "countType": "imageCountMoreThan",
                "countNumber": 1
            }},
            "action": {{
                "type": "expire"
            }}
        }}
    ]
}}
"""

response = boto3.client("ecr").put_lifecycle_policy(
    registryId=os.environ["ACCOUNT_ID"],
    repositoryName=IMAGE_NAME,
    lifecyclePolicyText=lifecycle_ecr
)

ClientError: An error occurred (AccessDeniedException) when calling the PutLifecyclePolicy operation: User: arn:aws:iam::284415450706:user/kamil_admin is not authorized to perform: ecr:PutLifecyclePolicy on resource: arn:aws:ecr:eu-north-1:284415450706:repository/xgb-clf-custom-training-container because no identity-based policy allows the ecr:PutLifecyclePolicy action

In [None]:
from pythonProject.program.code.containers.SageMakerContainerBuilder import SageMakerContainerBuilder

builder = SageMakerContainerBuilder(
    code_folder=CODE_FOLDER,
    image_name=IMAGE_NAME,
    local_mode=LOCAL_MODE
)

training_container_image = builder.build_and_push()
print(training_container_image)
# training_container_image = builder.get_image_uri()
# print(training_container_image)

In [None]:
from sagemaker.xgboost import XGBoost
from sagemaker.estimator import Estimator

use_spot_instances = True and not LOCAL_MODE
max_run = 500
max_wait = 800 if use_spot_instances else None
instance_type = config['instance_type']

COMET_API_KEY = os.environ["COMET_API_KEY"]
COMET_PROJECT_NAME = os.environ["COMET_PROJECT_NAME"]

xgb_estimator = Estimator(
    image_uri=training_container_image,
    instance_count=2,
    instance_type=config["instance_type"],
    sagemaker_session=config["session"],
    role=role,
    use_spot_instances=use_spot_instances,
    max_run=max_run,
    max_wait=max_wait,
    disable_profiler=True,
    environment={
        # "COMET_API_KEY": COMET_API_KEY,
        # "COMET_PROJECT_NAME": COMET_PROJECT_NAME,
    },
)

xgb_train_model_step = create_training_step(xgb_estimator)

In [None]:
session_pipeline = Pipeline(
    name="football-pipeline",
    parameters=[dataset_location],
    steps=[
        split_and_transform_data_step,
        xgb_train_model_step,
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=config["session"],
)

session_pipeline.upsert(role_arn=role)
session_pipeline.start()

In [11]:
USE_TUNING_STEP = False and not LOCAL_MODE

In [12]:
from sagemaker.tuner import HyperparameterTuner
from sagemaker.parameter import IntegerParameter, ContinuousParameter

hyperparameter_ranges = {
    'eta': ContinuousParameter(min_value=0.05, max_value=0.3, scaling_type="Logarithmic"),
    'max_depth': IntegerParameter(min_value=5, max_value=15, scaling_type="Auto"),
    'subsample': ContinuousParameter(min_value=0.7, max_value=1.0, scaling_type="Auto"),
    'colsample_bytree': ContinuousParameter(min_value=0.7, max_value=1.0, scaling_type="Logarithmic"),
    'lambda': ContinuousParameter(min_value=5, max_value=12, scaling_type="Logarithmic"),
    'alpha': ContinuousParameter(min_value=1, max_value=10, scaling_type="Logarithmic"),
    'min_child_weight': ContinuousParameter(min_value=0.4, max_value=1.0, scaling_type="Auto"),
    'scale_pos_weight': ContinuousParameter(min_value=0.8, max_value=2.2, scaling_type="Auto"),
}

objective_type = "Maximize"
metric_definitions = [
    {
        'Name': 'validation:logloss',
        'Regex': r".*\[[0-9]+\].*#011validation_0-logloss:([-+]?[0-9]*\.?[0-9]+(?:[eE][-+]?[0-9]+)?).*"
    },
    {
        'Name': 'validation:auc',
        'Regex': r".*\[[0-9]+\].*#011validation_0-auc:([-+]?[0-9]*\.?[0-9]+(?:[eE][-+]?[0-9]+)?).*"
    },
    {
        'Name': 'train:logloss',
        'Regex': r".*\[[0-9]+\].*#011train-logloss:([-+]?[0-9]*\.?[0-9]+(?:[eE][-+]?[0-9]+)?).*"
    },
    {
        'Name': 'train:auc',
        'Regex': r".*\[[0-9]+\].*#011train-auc:([-+]?[0-9]*\.?[0-9]+(?:[eE][-+]?[0-9]+)?).*"
    }
]

metric_name = "validation:auc"
strategy = "Bayesian"

tuner = HyperparameterTuner(
    base_tuning_job_name='xgboost-tuning',
    estimator=xgb_estimator,
    objective_metric_name=metric_name,
    objective_type=objective_type,
    hyperparameter_ranges=hyperparameter_ranges,
    metric_definitions=metric_definitions,
    max_jobs=5,
    max_parallel_jobs=2,
    early_stopping_type='Auto'
)


In [13]:
from sagemaker.workflow.steps import TuningStep

tune_model_step = TuningStep(
    name="tune-model",
    step_args=tuner.fit(
        inputs={
            "train": TrainingInput(
                s3_data=
                split_and_transform_data_step.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "pipeline": TrainingInput(
                s3_data=split_and_transform_data_step.properties.ProcessingOutputConfig.Outputs[
                    "model"
                ].S3Output.S3Uri,
                content_type="application/tar+gzip",
            ),
        },
    ),
    cache_config=cache_config,
)

In [14]:
from sagemaker import ScriptProcessor
from sagemaker.xgboost import XGBoostProcessor

evaluation_processor = XGBoostProcessor(
    base_job_name="evaluation-processor",
    image_uri=config["image"],
    framework_version=config["framework_version"],
    instance_type=config["instance_type"],
    instance_count=1,
    role=role,
    sagemaker_session=config["session"],
)

In [15]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name="evaluation-report", output_name="evaluation", path="evaluation.json"
)

In [16]:
model_assets = xgb_train_model_step.properties.ModelArtifacts.S3ModelArtifacts

if USE_TUNING_STEP:
    model_assets = tune_model_step.get_top_model_s3_uri(
        top_k=0,
        s3_bucket=config["session"].default_bucket(),
    )

In [17]:
evaluate_model_step = ProcessingStep(
    name="evaluate-model",
    code=f"{(CODE_FOLDER / 'evaluate/evaluation.py').as_posix()}",
    step_args=evaluation_processor.run(
        inputs=[
            ProcessingInput(
                source=split_and_transform_data_step.properties.ProcessingOutputConfig.Outputs[
                    "test"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/test",
            ),
            ProcessingInput(
                source=model_assets,
                destination="/opt/ml/processing/model",
            ),
            ProcessingInput(
                source=split_and_transform_data_step.properties.ProcessingOutputConfig.Outputs[
                    "model"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/pipeline",
            ),
        ],
        outputs=[
            ProcessingOutput(
                output_name="evaluation", source="/opt/ml/processing/evaluation"
            ),
        ],
        code=f"{CODE_FOLDER}/evaluate/evaluation.py",
    ),
    property_files=[evaluation_report],
    cache_config=cache_config,
    retry_policies=[
        SageMakerJobStepRetryPolicy(
            failure_reason_types=[
                SageMakerJobExceptionTypeEnum.INTERNAL_ERROR,
            ],
            max_attempts=3,
            interval_seconds=7,
            backoff_rate=2.0
        )
    ]
)

In [18]:
MODEL_PACKAGE_GROUP = os.environ["MODEL_PACKAGE_GROUP"]

In [19]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.functions import Join

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=Join(
            on="/",
            values=[
                evaluate_model_step.properties.ProcessingOutputConfig.Outputs[
                    "evaluation"
                ].S3Output.S3Uri,
                "evaluation.json",
            ],
        ),
        content_type="application/json",
    ),
)

In [20]:
from sagemaker.workflow.model_step import ModelStep


def create_registration_step(
        model,
        model_package_group_name,
        approval_status="Approved",
        content_types=["text/csv"],
        response_types=["text/csv"],
        model_metrics=None,
        drift_check_baselines=None,
):
    """Create a Registration Step using the supplied parameters."""
    return ModelStep(
        name="register",
        step_args=model.register(
            model_package_group_name=model_package_group_name,
            approval_status=approval_status,
            model_metrics=model_metrics,
            drift_check_baselines=drift_check_baselines,
            content_types=content_types,
            response_types=response_types,
            inference_instances=[config["instance_type"]],
            transform_instances=[config["instance_type"]],
            framework_version=config["framework_version"],
            domain="MACHINE_LEARNING",
            task="CLASSIFICATION",
            framework="XGBOOST",
        ),
    )

In [21]:
from sagemaker.xgboost import XGBoostModel

custom_xgb_model = XGBoostModel(
    name="football",
    model_data=model_assets,
    entry_point="inference.py",
    source_dir=(CODE_FOLDER / "inference").as_posix(),
    framework_version=config["framework_version"],
    sagemaker_session=config["session"],
    role=role,
)

In [22]:
GROUND_TRUTH_LOCATION = f"{S3_LOCATION}/monitoring/groundtruth"
DATA_QUALITY_LOCATION = f"{S3_LOCATION}/monitoring/data-quality"
MODEL_QUALITY_LOCATION = f"{S3_LOCATION}/monitoring/model-quality"

In [23]:
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.quality_check_step import (
    DataQualityCheckConfig,
    QualityCheckStep,
)

data_quality_check_config = DataQualityCheckConfig(
    baseline_dataset=split_and_transform_data_step.properties.ProcessingOutputConfig.Outputs["train-baseline"].S3Output.S3Uri,
    dataset_format=DatasetFormat.csv(header=True), output_s3_uri=DATA_QUALITY_LOCATION, )

check_job_config = CheckJobConfig(instance_type="ml.c5.xlarge", instance_count=1, volume_size_in_gb=20, sagemaker_session=config["session"], role=role, )

data_quality_baseline_step = QualityCheckStep(
    name="generate-data-quality-baseline",
    check_job_config=check_job_config,
    quality_check_config=data_quality_check_config,
    model_package_group_name=MODEL_PACKAGE_GROUP,
    skip_check=True,
    register_new_baseline=True,
    cache_config=cache_config,
)

In [24]:
create_model_step = ModelStep(
    name="create-model",
    step_args=custom_xgb_model.create(instance_type=config["instance_type"]),
)

In [25]:
from sagemaker.transformer import Transformer

transformer = Transformer(
    model_name=create_model_step.properties.ModelName,
    instance_type=config["instance_type"],
    instance_count=1,
    strategy="MultiRecord",
    accept="text/csv",
    assemble_with="Line",
    output_path=f"{S3_LOCATION}/transform",
    sagemaker_session=config["session"],
)

In [26]:
from sagemaker.workflow.steps import TransformStep

generate_test_predictions_step = TransformStep(
    name="generate-test-predictions",
    step_args=transformer.transform(
        data=split_and_transform_data_step.properties.ProcessingOutputConfig.Outputs[
            "test-baseline"
        ].S3Output.S3Uri,
        join_source="Input",
        split_type="Line",
        content_type="text/csv",
        output_filter="$[-3,-2]",
    ),
    cache_config=cache_config,
)

In [27]:
from sagemaker.workflow.quality_check_step import ModelQualityCheckConfig

model_quality_baseline_step = QualityCheckStep(
    name="generate-model-quality-baseline",
    check_job_config=CheckJobConfig(
        instance_type="ml.m5.xlarge",
        instance_count=2,
        volume_size_in_gb=20,
        sagemaker_session=config["session"],
        role=role,
    ),
    quality_check_config=ModelQualityCheckConfig(
        baseline_dataset=generate_test_predictions_step.properties.TransformOutput.S3OutputPath,
        dataset_format=DatasetFormat.csv(header=False),
        problem_type="MulticlassClassification",
        ground_truth_attribute="_c0",
        inference_attribute="_c1",
        output_s3_uri=MODEL_QUALITY_LOCATION,
    ),
    model_package_group_name=MODEL_PACKAGE_GROUP,
    skip_check=True,
    register_new_baseline=True,
    cache_config=cache_config,
)

In [28]:
from sagemaker.drift_check_baselines import DriftCheckBaselines

model_quality_model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=model_quality_baseline_step.properties.CalculatedBaselineStatistics,
        content_type="application/json",
    ),
    model_constraints=MetricsSource(
        s3_uri=model_quality_baseline_step.properties.CalculatedBaselineConstraints,
        content_type="application/json",
    ),
    model_data_statistics=MetricsSource(
        s3_uri=data_quality_baseline_step.properties.CalculatedBaselineStatistics,
        content_type="application/json",
    ),
    model_data_constraints=MetricsSource(
        s3_uri=data_quality_baseline_step.properties.CalculatedBaselineConstraints,
        content_type="application/json",
    ),
)

model_quality_drift_check_baselines = DriftCheckBaselines(
    model_statistics=MetricsSource(
        s3_uri=model_quality_baseline_step.properties.BaselineUsedForDriftCheckStatistics,
        content_type="application/json",
    ),
    model_constraints=MetricsSource(
        s3_uri=model_quality_baseline_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    ),
    model_data_statistics=MetricsSource(
        s3_uri=data_quality_baseline_step.properties.BaselineUsedForDriftCheckStatistics,
        content_type="application/json",
    ),
    model_data_constraints=MetricsSource(
        s3_uri=data_quality_baseline_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    ),
)

In [29]:
register_model_step = create_registration_step(
    custom_xgb_model,
    MODEL_PACKAGE_GROUP,
    approval_status='PendingManualApproval',
    content_types=["text/csv", "application/json"],
    response_types=["text/csv", "application/json"],
    model_metrics=model_quality_model_metrics,
    drift_check_baselines=model_quality_drift_check_baselines,
)

In [30]:
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.parameters import ParameterFloat
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo

f1_threshold = ParameterFloat(name="f1_threshold", default_value=0.6)

fail_step = FailStep(
    name="fail",
    error_message=Join(
        on=" ",
        values=[
            "Execution failed because the model's f1 result was lower than",
            f1_threshold,
        ],
    ),
)

condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=evaluate_model_step.name,
        property_file=evaluation_report,
        json_path="metrics.f1.value",
    ),
    right=f1_threshold,
)

In [31]:
lambda_role_name = "lambda-deployment-role"
lambda_role_arn = None

try:
    response = iam_client.create_role(
        RoleName=lambda_role_name,
        AssumeRolePolicyDocument=json.dumps(
            {
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Principal": {
                            "Service": ["lambda.amazonaws.com", "events.amazonaws.com"],
                        },
                        "Action": "sts:AssumeRole",
                    },
                ],
            },
        ),
        Description="Lambda Endpoint Deployment",
    )

    lambda_role_arn = response["Role"]["Arn"]

    iam_client.attach_role_policy(
        PolicyArn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
        RoleName=lambda_role_name,
    )

    iam_client.attach_role_policy(
        PolicyArn="arn:aws:iam::aws:policy/AmazonSageMakerFullAccess",
        RoleName=lambda_role_name,
    )

    print(f'Role "{lambda_role_name}" created with ARN "{lambda_role_arn}".')
except iam_client.exceptions.EntityAlreadyExistsException:
    response = iam_client.get_role(RoleName=lambda_role_name)
    lambda_role_arn = response["Role"]["Arn"]
    print(f'Role "{lambda_role_name}" already exists with ARN "{lambda_role_arn}".')

Role "lambda-deployment-role" already exists with ARN "arn:aws:iam::284415450706:role/lambda-deployment-role".


In [32]:
from sagemaker.lambda_helper import Lambda

ENDPOINT = "football-endpoint"
DATA_CAPTURE_DESTINATION = f"{S3_LOCATION}/monitoring/data-capture"
DATA_CAPTURE_PERCENTAGE = 100

deploy_lambda_fn = Lambda(
    function_name="deployment_fn",
    execution_role_arn=lambda_role_arn,
    script=(CODE_FOLDER / "lambda" / "lambda.py").as_posix(),
    handler="lambda.lambda_handler",
    timeout=600,
    session=sagemaker_session,
    runtime="python3.11",
    environment={
        "Variables": {
            "ENDPOINT": ENDPOINT,
            "DATA_CAPTURE_DESTINATION": DATA_CAPTURE_DESTINATION,
            "DATA_CAPTURE_PERCENTAGE": str(DATA_CAPTURE_PERCENTAGE),
            "ROLE": role,
            "MODEL_PACKAGE_GROUP": MODEL_PACKAGE_GROUP
        },
    },
)

deploy_lambda_fn_response = deploy_lambda_fn.upsert()

In [33]:
event_pattern = f"""
{{
  "source": ["aws.sagemaker"],
  "detail-type": ["SageMaker Model Package State Change"],
  "detail": {{
    "ModelPackageGroupName": ["{MODEL_PACKAGE_GROUP}"],
    "ModelApprovalStatus": ["Approved"]
  }}
}}
"""

In [34]:
rule_name = "PendingModelApprovedRule"

events_client = boto3.client("events")
rule_response = events_client.put_rule(
    Name=rule_name,
    EventPattern=event_pattern,
    State="ENABLED",
    RoleArn=role,
)

In [35]:
response = events_client.put_targets(
    Rule=rule_name,
    Targets=[
        {
            "Id": "1",
            "Arn": deploy_lambda_fn_response["FunctionArn"],
        },
    ],
)

In [36]:
lambda_function_name = deploy_lambda_fn_response["FunctionName"]
lambda_client = boto3.client("lambda")

try:
    response = lambda_client.add_permission(
        Action="lambda:InvokeFunction",
        FunctionName=lambda_function_name,
        Principal="events.amazonaws.com",
        SourceArn=rule_response["RuleArn"],
        StatementId="EventBridge",
    )
except lambda_client.exceptions.ResourceConflictException:
    print(f'Function "{lambda_function_name}" already has the specified permission.')

Function "deployment_fn" already has the specified permission.


In [37]:
from sagemaker.workflow.lambda_step import LambdaStep


def create_deployment_step(register_model_step):
    """Create a Deploy Step using the supplied parameters."""
    return LambdaStep(
        name="deploy",
        lambda_func=deploy_lambda_fn,
        inputs={
            "model_package_arn": register_model_step.properties.ModelPackageArn,
        },
    )

In [38]:
deploy_step = create_deployment_step(register_model_step)

condition_step = ConditionStep(
    name="check-model-f1-score",
    conditions=[condition],
    if_steps=(
        [
            create_model_step,
            generate_test_predictions_step,
            model_quality_baseline_step,
            register_model_step,
        ]
    ),
    else_steps=[fail_step],
)

In [39]:
session_pipeline = Pipeline(
    name="football-pipeline",
    parameters=[dataset_location, f1_threshold],
    steps=[
        split_and_transform_data_step,
        tune_model_step if USE_TUNING_STEP else xgb_train_model_step,
        evaluate_model_step,
        data_quality_baseline_step,
        condition_step,
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=config["session"],
)

session_pipeline.upsert(role_arn=role)

Using provided s3_resource
Using provided s3_resource


{'PipelineArn': 'arn:aws:sagemaker:eu-north-1:284415450706:pipeline/football-pipeline',
 'ResponseMetadata': {'RequestId': '7670f990-ed9b-46e4-84b3-e0dae378de94',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '7670f990-ed9b-46e4-84b3-e0dae378de94',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '86',
   'date': 'Mon, 29 Jul 2024 19:49:02 GMT'},
  'RetryAttempts': 0}}

In [40]:
session_pipeline.start()

# session_pipeline.start(
#     parameters=dict(
#         dataset_location=''
#     ))

_PipelineExecution(arn='arn:aws:sagemaker:eu-north-1:284415450706:pipeline/football-pipeline/execution/674x71l3hsvq', sagemaker_session=<sagemaker.workflow.pipeline_context.PipelineSession object at 0x000001861AA225F0>)