# SageMaker Core Pipeline - Data Prep, Training, and Model Creation

This notebook demonstrates how to create a complete ML pipeline using SageMaker Core that includes:
1. Data Processing - Prepare and split the customer churn dataset
2. Model Training - Train an XGBoost model on processed data
3. Model Evaluation - Evaluate the trained model on holdout data
4. Model Creation - Create a deployable SageMaker model from training artifacts

In [8]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Initialize CoreLab Session

In [9]:
from corelab.core.session import CoreLabSession

lab_session = CoreLabSession(
    'xgboost',
    'customer-churn-pipeline',
    default_folder='pipeline_notebook',
    create_run_folder=True,
    aws_profile='sagemaker-role'
)
lab_session.print()
core_session = lab_session.core_session

execution role available: arn:aws:iam::136548476532:role/service-role/AmazonSageMaker-ExecutionRole-20250902T164316
AWS region: eu-central-1
Execution role arn:aws:iam::136548476532:role/service-role/AmazonSageMaker-ExecutionRole-20250902T164316
Output bucket uri: s3://sagemaker-eu-central-1-136548476532/pipeline_notebook/2025-09-17T07-41-57
Framework: xgboost
Project name: customer-churn-pipeline


## Import SageMaker Pipeline Components

Note: SageMaker Pipelines SDK (not sagemaker-core) is used for pipeline orchestration.

In [10]:
# Pipeline-specific imports from SageMaker SDK
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.parameters import (
    ParameterFloat,
    ParameterString
)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.properties import PropertyFile

# Processing imports - using XGBoostProcessor for better framework integration
from sagemaker.xgboost.processing import XGBoostProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor

# Training imports  
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

# Model imports
from sagemaker.model import Model

import time
from datetime import datetime

print("All pipeline modules imported successfully")


All pipeline modules imported successfully


In [11]:
# Create PipelineSession for proper pipeline execution context
pipeline_session = PipelineSession(
    boto_session=lab_session.core_session.boto_session,
    default_bucket=lab_session.core_session.default_bucket(),
    default_bucket_prefix=lab_session.core_session.default_bucket_prefix
)

print(f"📦 Default bucket: {pipeline_session.default_bucket()}")
print(f"📁 Bucket prefix: {pipeline_session.default_bucket_prefix}")

📦 Default bucket: sagemaker-eu-central-1-136548476532
📁 Bucket prefix: pipeline_notebook/2025-09-17T07-41-57


## Define input and output locations

In [12]:

# Define data locations
data_s3_uri = f"s3://sagemaker-example-files-prod-{lab_session.region}/datasets/tabular/synthetic/churn.txt"
pipeline_output_s3_uri = lab_session.jobs_output_s3_uri

print(f"📁 Data S3 URI: {data_s3_uri}")
print(f"📤 Pipeline Output S3 URI: {pipeline_output_s3_uri}")

📁 Data S3 URI: s3://sagemaker-example-files-prod-eu-central-1/datasets/tabular/synthetic/churn.txt
📤 Pipeline Output S3 URI: s3://sagemaker-eu-central-1-136548476532/pipeline_notebook/2025-09-17T07-41-57/jobs


## Define Pipeline Parameters

Pipeline parameters allow us to customize pipeline executions without modifying the code.

In [13]:
# Define pipeline parameters for flexibility

# Processing parameters
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.large"
)

train_test_split = ParameterFloat(
    name="TrainTestSplit",
    default_value=0.33
)

# Training parameters
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.large"
)

max_depth = ParameterString(
    name="MaxDepth",
    default_value="5"
)

num_round = ParameterString(
    name="NumRound",
    default_value="100"
)

print("Pipeline parameters defined")

Pipeline parameters defined


## Step 1: Define Processing Step

This step processes raw data and splits it into train, validation, and test sets.

In [14]:
from pathlib import Path
import os
# Create XGBoostProcessor - framework-aware with better dependency handling
xgb_processor = XGBoostProcessor(
    framework_version='1.7-1',
    instance_type=processing_instance_type,
    instance_count=1,
    role=lab_session.role,
    sagemaker_session=pipeline_session,
    volume_size_in_gb=30,
    max_runtime_in_seconds=3600,
    env={"PYTHONUNBUFFERED": "1"},
    base_job_name='churn-preprocessing'
)

src_dir = Path(os.getcwd(), '..', 'processing', 'src').resolve()
print(src_dir, src_dir.exists())
# Use step_args pattern for proper pipeline integration
processor_args = xgb_processor.run(
    code="preprocessing.py",
    source_dir=str(src_dir) + '/',  # Directory with code and requirements.txt
    inputs=[
        ProcessingInput(
            source=data_s3_uri,
            destination="/opt/ml/processing/input/data"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/output/train",
            # destination=f"{pipeline_output_s3_uri}/data/train"
        ),
        ProcessingOutput(
            output_name="validation", 
            source="/opt/ml/processing/output/validation",
            # destination=f"{pipeline_output_s3_uri}/data/validation"
        ),
        ProcessingOutput(
            output_name="test",
            source="/opt/ml/processing/output/test",
            # destination=f"{pipeline_output_s3_uri}/data/test"
        )
    ],
    arguments=["--train-test-split", train_test_split.to_string()]
)

step_process = ProcessingStep(
    name="PreprocessCustomerChurnData",
    step_args=processor_args
)

print("✅ Processing step defined")

INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.large.


/Users/machiel/Development/crystalline/sagemaker/corelab/labs/answers/processing/src True
✅ Processing step defined




## Step 2: Define Training Step

This step trains an XGBoost model using the processed data from Step 1.

In [15]:
# Create XGBoost estimator using generic Estimator with XGBoost image, train.py not needed
xgboost_estimator = Estimator(
    image_uri=lab_session.retrieve_image('1.7-1'),
    instance_type=training_instance_type,
    instance_count=1,
    role=lab_session.role,
    output_path=f"{pipeline_output_s3_uri}/models",
    sagemaker_session=pipeline_session,
    hyperparameters={
        "max_depth": max_depth,
        "eta": "0.2",
        "gamma": "4",
        "min_child_weight": "6",
        "subsample": "0.8",
        "verbosity": "0",
        "objective": "binary:logistic",
        "num_round": num_round
    }
)

# Use step_args pattern for training step
training_args = xgboost_estimator.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            content_type="text/csv"
        )
    }
)

step_train = TrainingStep(
    name="TrainXGBoostModel",
    step_args=training_args
)

print("✅ Training step defined")

INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.xlarge.
INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.


✅ Training step defined


## Step 3: Define Evaluation Step

This step evaluates the trained model against the validation dataset created during preprocessing.


In [None]:
# Create ScriptProcessor for evaluation to run custom metrics
evaluation_src_dir = Path(os.getcwd(), 'src').resolve()
script_processor = ScriptProcessor(
    image_uri=lab_session.retrieve_image('1.7-1'),
    command=['python3'],
    instance_type=processing_instance_type,
    instance_count=1,
    role=lab_session.role,
    sagemaker_session=pipeline_session,
    base_job_name='churn-evaluation',
    volume_size_in_gb=30,
    max_runtime_in_seconds=3600,
    env={'PYTHONUNBUFFERED': '1'}
)

evaluation_args = script_processor.run(
    code='evaluate.py',
    source_dir=str(evaluation_src_dir) + '/',
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination='/opt/ml/processing/model'
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs['validation'].S3Output.S3Uri,
            destination='/opt/ml/processing/evaluation'
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name='evaluation',
            source='/opt/ml/processing/output'
        )
    ]
)

evaluation_report = PropertyFile(
    name='EvaluationReport',
    output_name='evaluation',
    path='evaluation.json'
)

step_evaluate = ProcessingStep(
    name='EvaluateModel',
    step_args=evaluation_args,
    property_files=[evaluation_report]
)

print('✅ Evaluation step defined')


## Step 4: Define Model Creation Step

This step creates a SageMaker Model from the trained model artifacts.

In [16]:
# Create a Model object using pipeline session for consistency
model = Model(
    image_uri=lab_session.retrieve_image('1.7-1'),
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    role=lab_session.role,
    sagemaker_session=pipeline_session
)

# Use step_args pattern for model creation
model_create_args = model.create(instance_type="ml.m5.large")

step_create_model = ModelStep(
    name="CreateXGBoostModel",
    step_args=model_create_args
)

print("✅ Model creation step defined")

INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.xlarge.


✅ Model creation step defined


## Optional: Model Registry Step

Register the model in SageMaker Model Registry for versioning and deployment management.

In [17]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.functions import Join

model_metrics = ModelMetrics(
  model_statistics=MetricsSource(
      s3_uri=Join(on="/", values=[
          step_evaluate.properties.ProcessingOutputConfig.Outputs["evaluation"].S3Output.S3Uri,
          "evaluation.json"
      ]),
      content_type="application/json"
  )
)

register_args = model.register(content_types=["text/csv"], response_types=["text/csv"],
                          inference_instances=["ml.m5.large", "ml.m5.xlarge"], transform_instances=["ml.m5.large"],
                          model_package_group_name="customer-churn-models", approval_status="Approved",
                               model_metrics=model_metrics,
                          description="XGBoost model for customer churn prediction")
step_register_model = ModelStep(
    name="RegisterXGBoostModel",
    step_args=register_args
)

print("✅ Model registration step defined")

✅ Model registration step defined


## Create and Execute the Pipeline

In [None]:
# Create the pipeline with fixed name for versioning
# SageMaker Pipelines now support versioning - use fixed names instead of timestamps
pipeline_name = "customer-churn-pipeline"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        train_test_split,
        training_instance_type,
        max_depth,
        num_round
    ],
    steps=[
        step_process,
        step_train,
        step_evaluate,
        step_create_model,
        step_register_model
    ],
    sagemaker_session=pipeline_session
)

print(f"🚀 Pipeline Name: {pipeline_name}")
print(f"📊 Pipeline Steps: {len(pipeline.steps)}")
print("ℹ️  Using fixed name - SageMaker will create versions automatically")


## Validate Pipeline Definition

In [None]:
import json
import os
print('cwd', os.getcwd())
# Validate the pipeline definition
pipeline_definition = json.loads(pipeline.definition())
print("Pipeline definition validated successfully!")
print(f"\nPipeline has {len(pipeline_definition['Steps'])} steps:")
for step in pipeline_definition['Steps']:
    print(f"  - {step['Name']}: {step['Type']}")

## Create/Update and Execute Pipeline

In [None]:
# Create/update the pipeline (creates new version if pipeline exists)
response = pipeline.upsert(role_arn=lab_session.role)
print(f"✅ Pipeline '{pipeline_name}' created/updated successfully")

# Check if this created a new version
try:
    versions = pipeline.list_pipeline_versions()
    version_count = len(versions)
    latest_version = versions[0]['PipelineVersion'] if versions else 1
    print(f"📋 Pipeline now has {version_count} version(s), latest: v{latest_version}")
except:
    print("📋 Version information not available")

# Start pipeline execution
execution = pipeline.start(
    parameters={
        "ProcessingInstanceType": "ml.m5.large",
        "TrainingInstanceType": "ml.m5.large", 
        "TrainTestSplit": 0.33,
        "MaxDepth": "5",
        "NumRound": "100"
    }
)

print("🚀 Pipeline execution started")
print(f"📝 Execution ARN: {execution.arn}")
print(f"📊 Status: {execution.describe()['PipelineExecutionStatus']}")

## Monitor Pipeline Execution

In [None]:
# Monitor execution status
execution.wait()
# while True:
#     status = execution.describe()['PipelineExecutionStatus']
#     print(f"Pipeline Status: {status}")
#
#     if status in ['Succeeded', 'Failed', 'Stopped']:
#         break
#
#     # Check step statuses
#     steps = execution.list_steps()
#     for step in steps:
#         print(f"  - {step['StepName']}: {step['StepStatus']}")
#
#     time.sleep(30)
#     print("---")
#
# print(f"\n✅ Pipeline execution completed with status: {status}")

## Retrieve Pipeline Outputs

In [None]:
# Get execution steps details
execution_steps = execution.list_steps()

for step in execution_steps:
    print(f"\nStep: {step['StepName']}")
    print(f"  Status: {step['StepStatus']}")

    if step['StepName'] == 'TrainXGBoostModel' and step['StepStatus'] == 'Succeeded':
        # Get training job details
        training_job_arn = step['Metadata']['TrainingJob']['Arn']
        print(f"  Training Job ARN: {training_job_arn}")

    elif step['StepName'] == 'CreateXGBoostModel' and step['StepStatus'] == 'Succeeded':
        # Get model details
        model_arn = step['Metadata']['Model']['Arn']
        print(f"  Model ARN: {model_arn}")

    elif step['StepName'] == 'EvaluateModel' and step['StepStatus'] == 'Succeeded':
        outputs = step['Metadata']['ProcessingJob']['ProcessingOutputConfig']['Outputs']
        eval_uri = next((o['S3Output']['S3Uri'] for o in outputs if o['OutputName'] == 'evaluation'), None)
        if eval_uri:
            print(f"  Evaluation report: {eval_uri}/evaluation.json")


## View Pipeline Execution in SageMaker Studio

You can also view and manage your pipeline execution in SageMaker Studio:
1. Open SageMaker Studio
2. Navigate to the Pipelines section
3. Select your pipeline to view execution details, logs, and metrics

## Pipeline Version Management (Optional)

With SageMaker Pipeline versioning, you can manage different versions of your pipeline:

In [23]:
# List all versions of the pipeline
try:
    versions = pipeline.list_pipeline_versions()
    print(f"📋 Pipeline '{pipeline_name}' versions:")
    for version in versions[:5]:  # Show last 5 versions
        print(f"  - Version {version['PipelineVersion']}: Created {version['CreationTime']}")
        
    if len(versions) > 5:
        print(f"  ... and {len(versions) - 5} more versions")
        
    # Show how to execute a specific version
    print(f"\n💡 To execute a specific version:")
    print(f"   execution = pipeline.start(pipeline_version=1, parameters={{...}})")
    
except Exception as e:
    print(f"Could not retrieve version information: {e}")
    print("This may be normal for newly created pipelines")

📋 Pipeline 'customer-churn-pipeline' versions:
Could not retrieve version information: slice(None, 5, None)
This may be normal for newly created pipelines


## Clean Up Resources (Optional)

In [24]:
# # Delete the pipeline (uncomment to execute)
# try:
#     pipeline.delete()
#     print(f"✅ Pipeline '{pipeline_name}' deleted")
# except Exception as e:
#     print(f"Error deleting pipeline: {e}")