# Adapting Preprocessing Script for SageMaker

To build a pipeline, we need to start with individual steps. Our first step will be data preprocessing — the same preprocessing you've done locally in previous courses, but now adapted to run in SageMaker's managed environment.

We'll create a separate file called `data_processing.py` that contains our preprocessing logic. The preprocessing logic itself is identical to what you've done before — we're still capping outliers, creating new features, and splitting the data. The key changes are in how we handle file paths to work within SageMaker's processing environment:

These specific paths (`/opt/ml/processing/`) are SageMaker conventions that allow the service to automatically handle data movement between S3 and your processing containers. When SageMaker runs this script, it will automatically mount your S3 data to the input directory and upload the results from the output directories back to S3.



# Understanding SageMaker Sessions

Before building your pipeline, you need to understand that SageMaker Pipelines require two different types of sessions that serve distinct purposes:



In [None]:
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession

# Create a SageMaker session for immediate operations
sagemaker_session = sagemaker.Session()

# Create a pipeline session for pipeline components
pipeline_session = PipelineSession()

The distinction between these sessions is about execution context, not timing:

- `sagemaker.Session()` — This is your direct connection to AWS services. When you use this session, you're telling SageMaker "execute this operation using real AWS resources right now." It handles immediate operations like uploading data to S3, creating pipeline definitions, starting executions, and checking status.

- `PipelineSession()` — This is a special "recording" session that creates placeholder operations instead of real ones. When you use this session with processors or estimators, instead of immediately creating SageMaker jobs, it returns step objects that become part of your pipeline definition. These placeholders get converted into real operations only when the pipeline executes.

Without `PipelineSession()`, if you created a processor with a regular session, SageMaker would immediately try to spin up compute instances and start processing your data before you've even finished defining your pipeline! The PipelineSession() lets you define all your pipeline components as a complete workflow first, then execute everything in the proper order when you're ready.

Simple Rule:

- Use `PipelineSession()` for any processor, estimator, or transformer that should become a pipeline step
- Use sagemaker.Session() for immediate actions like managing the pipeline itself

In [None]:
# Get the default SageMaker bucket name
default_bucket = sagemaker_session.default_bucket()

# Local file path
DATA_PATH = "data/california_housing.csv"

# S3 prefix (folder path within the bucket)
DATA_PREFIX = "datasets"

try:
    # Upload the dataset using the upload_data() method
    s3_uri = sagemaker_session.upload_data(
        path=DATA_PATH,
        bucket=default_bucket,
        key_prefix=DATA_PREFIX
    )
    
    print(f"Data uploaded successfully to: {s3_uri}")
    
except Exception as e:
    print(f"Error: {e}")

# Setting Up AWS Resources

Now that we understand sessions, we need to set up the AWS resources and permissions that our pipeline will use:



In [None]:
# Retrieve your AWS account ID (used for constructing resource ARNs)
account_id = sagemaker_session.boto_session.client('sts').get_caller_identity()['Account']

# Get the default S3 bucket for your SageMaker resources
default_bucket = sagemaker_session.default_bucket()

# Define the SageMaker execution role ARN
SAGEMAKER_ROLE = f"arn:aws:iam::{account_id}:role/SageMakerDefaultExecution"

# Creating the Processing Environment

To run our preprocessing script in SageMaker, we need to define the computing environment where our code will execute. We use an `SKLearnProcessor` because our preprocessing script uses scikit-learn libraries, and crucially, we use the `PipelineSession` to ensure the processor becomes a pipeline step rather than executing immediately:



In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor

# Create a processor that will run our data preprocessing script
processor = SKLearnProcessor(
    framework_version="1.2-1",    # Specify scikit-learn version
    role=SAGEMAKER_ROLE,          # IAM role with necessary permissions
    instance_type="ml.m5.large",  # Compute instance type for processing
    instance_count=1,             # Number of instances to use
    sagemaker_session=pipeline_session  # Use pipeline session for deferred execution
)

# Building Our First Pipeline Step

With our processor configured, we can now create the actual processing step using `ProcessingStep`. This step defines what data goes in, what comes out, and what code runs in between.

Before proceeding, note that we assume you have already uploaded your raw dataset (`california_housing.csv`) to your S3 default bucket at the path `/datasets/california_housing.csv`. This is necessary because the pipeline will read the input data directly from S3.

In [None]:
from sagemaker.workflow.steps import ProcessingStep

# Define the processing step with inputs, outputs, and the script to run
processing_step = ProcessingStep(
    name="ProcessData",   # Unique name for this step in the pipeline
    processor=processor,  # The processor we defined above
    inputs=[
        # Define where the raw data comes from (S3 location)
        sagemaker.processing.ProcessingInput(
            source=f"s3://{default_bucket}/datasets/california_housing.csv",
            destination="/opt/ml/processing/input"  # Where data will be mounted in container
        )
    ],
    outputs=[
        # Define where processed training data will be saved
        sagemaker.processing.ProcessingOutput(
            output_name="train_data",               # Reference name for this output
            source="/opt/ml/processing/train"       # Container path where script saves data
        ),
        # Define where processed test data will be saved
        sagemaker.processing.ProcessingOutput(
            output_name="test_data",                # Reference name for this output
            source="/opt/ml/processing/test"        # Container path where script saves data
        )
    ],
    code="data_processing.py"  # The Python script that performs the processing
)

Notice how the paths in our `ProcessingInput` and `ProcessingOutput` definitions perfectly match the paths our script expects. The `inputs` parameter specifies where our raw data comes from (an S3 location) and where it will be mounted inside the processing container (`/opt/ml/processing/input`). The `outputs` parameter defines where our processed data will be saved, with separate outputs for training and test data. Each output has a name that we can reference later and a source path where our processing script will write the data. The `code` parameter points to the Python script that contains our preprocessing logic.

This connection between the step definition and the script paths is crucial — it's what allows SageMaker to automatically handle all the data movement for you.

# Creating the Pipeline

With our processing step defined, we can now create our first pipeline. A pipeline is simply a collection of steps that execute in order, and right now we have just one step. Note that we use the regular `sagemaker_session` for pipeline management:



In [None]:
from sagemaker.workflow.pipeline import Pipeline

# Set a name for the SageMaker Pipeline
PIPELINE_NAME = "california-housing-preprocessing-pipeline"

# Create pipeline with our processing step
pipeline = Pipeline(
    name=PIPELINE_NAME,
    steps=[processing_step],  # For now, just one step
    sagemaker_session=sagemaker_session  # Use regular session for pipeline management
)

This creates a pipeline definition with our single processing step. In future lessons, we'll add more steps to this list to create more complex workflows with training, evaluation, and model registration.

At this point, we've only created the pipeline definition in memory — it doesn't exist in AWS yet. To make it available in SageMaker, we need to register it with the service using the `upsert` method:

In [None]:
# Create or update pipeline (upsert = update if exists, create if not)
pipeline.upsert(role_arn=SAGEMAKER_ROLE)

The `upsert` method is particularly useful because it handles both creation and updates intelligently. If this is the first time you're running this code, it will create a new pipeline in SageMaker. If you run the same code again after making changes to your pipeline definition, it will update the existing pipeline rather than throwing an error. This makes iterative development much smoother — you can modify your pipeline code, run it again, and SageMaker will automatically apply your changes.

Think of the pipeline definition as a blueprint or recipe. Once you've registered this blueprint with SageMaker using `upsert`, you can execute it multiple times. Each execution is a separate run of the same blueprint, potentially with different data or parameters.

# Executing the Pipeline

Now that our pipeline is registered with SageMaker, we can start an execution. This is where the actual work begins:

In [None]:
# Start pipeline execution and get execution object for monitoring
execution = pipeline.start()

# Get the execution ARN for tracking
print(f"Pipeline execution ARN: {execution.arn}")

When you call `pipeline.start()`, SageMaker immediately begins executing your pipeline in the background. This means your local Python script doesn't need to wait for the processing to complete — the heavy computational work is happening on AWS infrastructure while your script continues running or even after it finishes.

The execution object provides valuable information about the running pipeline. You'll see output similar to:

`
Pipeline execution ARN: arn:aws:sagemaker:us-east-1:123456789012:pipeline/california-housing-preprocessing-pipeline/execution/x1gc33lgj8v5
`

# Monitoring Your Pipeline


In [None]:
# Check the current status
execution_details = execution.describe()

# Display status
print(f"Status: {execution_details['PipelineExecutionStatus']}")


# Integrating Model Training Steps

## Our Training Script

Before we add the training step to our pipeline, let's understand what our training script does. Our train.py script contains the actual machine learning code that will execute during the training step. This script follows SageMaker's training script conventions, which means it knows how to read input data from specific locations and save the trained model to the correct output directory.


## Configuring a SKLearn Estimator

Now let's add the training components to our existing pipeline. First, we need to create an estimator that defines how our model training will be executed. Our `SKLearn estimator` will specify the training environment, including our entry point script, computational resources, and framework version:



In [None]:
from sagemaker.sklearn.estimator import SKLearn

# Create an estimator that defines how our model will be trained
estimator = SKLearn(
    entry_point="train.py",             # Our training script
    role=SAGEMAKER_ROLE,                # IAM role with necessary permissions
    instance_type="ml.m5.large",        # Compute instance type for training
    instance_count=1,                   # Number of instances to use
    framework_version="1.2-1",          # Specify scikit-learn version
    py_version="py3",                   # Python version
    sagemaker_session=pipeline_session  # Use pipeline session for deferred execution
)

## Creating the TrainingStep

With our estimator configured, we can now create the `TrainingStep` that orchestrates model training within our pipeline. A `TrainingStep` is a pipeline component that combines an estimator (which defines the training environment) with input data sources. <u>The key to building effective pipelines lies in properly connecting outputs from one step to inputs of another using SageMaker's step properties system.</U>



In [None]:
from sagemaker.workflow.steps import TrainingStep

# Define our training step that uses output from our processing step
training_step = TrainingStep(
    name="TrainModel",    # Unique name for this step in our pipeline
    estimator=estimator,  # Our estimator we defined above
    inputs={
        # Use the training data output from our processing step as input
        "train": sagemaker.inputs.TrainingInput(
            # Reference the S3 URI where our processing step saved the training data
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri
        )
    }
)

The critical connection happens in the `inputs` parameter. The `"train"` key in our inputs dictionary corresponds to the `SM_CHANNEL_TRAIN` environment variable that our training script uses to locate input data. Remember in our training script where we had `args.train = os.environ.get('SM_CHANNEL_TRAIN')`?

That environment variable gets populated with the local path where SageMaker downloads our training data. When the training job runs, SageMaker will automatically download the data from the S3 location we specify and make it available to our script at the path stored in `SM_CHANNEL_TRAIN`.

Now let's break down how we specify which S3 location to use for the `s3_data` parameter:

- `processing_step` - References our preprocessing step that we defined earlier
- `.properties` - Accesses the runtime properties of our preprocessing step
- `.ProcessingOutputConfig` - References the output configuration of the processing job
- `.Outputs` - Accesses the collection of all outputs from the processing step
["train_data"] - Selects the specific output we named "train_data" when we defined our processing step's outputs
- `.S3Output` - References the S3 output configuration for this specific output
- `.S3Uri` - Gets the actual S3 URI where this output was saved

This property chain essentially says: "Use the S3 location where the processing step saved its 'train_data' output as the input for this training step." The beauty of this approach is that SageMaker resolves these property references at runtime, so you don't need to know the exact S3 paths beforehand.

## Let's Update Our Pipeline

Finally, let's update our pipeline creation to include both our preprocessing and training steps:



In [None]:
# Create our pipeline by combining all steps
pipeline = Pipeline(
    name=PIPELINE_NAME,
    steps=[processing_step, training_step],  # Include both our steps
    sagemaker_session=sagemaker_session
)

try:
    # Create or update our pipeline (upsert = update if exists, create if not)
    pipeline.upsert(role_arn=SAGEMAKER_ROLE)
    
    # Start our pipeline execution and get execution object for monitoring
    execution = pipeline.start()

    # Print the unique ARN identifier for this execution
    print(f"Pipeline execution ARN: {execution.arn}")

    # Get detailed information about our execution
    execution_details = execution.describe()

    # Display the current status of our pipeline execution
    print(f"Status: {execution_details['PipelineExecutionStatus']}")
    
except Exception as e:
    print(f"Error: {e}")

## Understanding Pipeline Execution Order

Now that we have a multi-step pipeline, it's important to understand how SageMaker determines execution order. <u>Pipeline execution order is determined by dependencies, not by the order of steps in your steps list</u>.

When you create a pipeline with `steps=[processing_step, training_step]`, SageMaker doesn't execute steps based on list order. Instead, it analyzes dependencies between steps. In our pipeline, we created a dependency when we configured our training step to use the processing step's output:



In [None]:
# This creates a dependency: training_step depends on processing_step
s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri

This property reference tells SageMaker that the training step cannot begin until the processing step completes successfully. <u>Dependencies are explicit and created through step property references</u> — simply placing steps in a certain order in your list doesn't create dependencies.

<u>If steps have no dependencies between them, SageMaker executes them in parallel</u> to minimize total execution time. This explicit dependency model makes pipelines robust and predictable, as execution flow is clearly defined by data dependencies rather than list ordering.




# Evaluating Models in Pipelines


## Exploring the Evaluation Script


Let's examine the evaluation script we'll call `evaluation.py` that will perform the same model assessment you've done locally in previous exercises, but now adapted to work within our SageMaker Pipeline. This script follows SageMaker's processing script conventions, reading the model and data from other pipeline steps and saving results for future pipeline components.


## Configuring a SKLearnProcessor for Evaluation

Now, let's integrate the evaluation functionality into our existing pipeline. While we could technically reuse the same processor instance from our preprocessing step, creating separate processors for each step is a best practice that improves pipeline clarity and maintainability - each step has its own dedicated processor that can be independently configured, scaled, or modified without affecting other pipeline components.





In [None]:
# Create a processor for running the evaluation script
evaluation_processor = SKLearnProcessor(
    framework_version="1.2-1",
    role=SAGEMAKER_ROLE,
    instance_type="ml.m5.large",
    instance_count=1,
    sagemaker_session=pipeline_session
)

This separation allows you to optimize each step independently. For example, you might later decide that evaluation requires more memory or different compute resources than preprocessing, or you might want to experiment with different framework versions for specific steps. Having dedicated processors makes these modifications straightforward without risking unintended effects on other pipeline components.


## Understanding Property Files for Pipeline Integration

The evaluation step introduces a powerful new concept called `Property Files`, which allow pipeline steps to expose structured data that subsequent steps can reference and use for decision-making.


In [None]:
from sagemaker.workflow.properties import PropertyFile

# Define property file for evaluation metrics
evaluation_report_property = PropertyFile(
    name="EvaluationReport",     # Unique identifier for this property file within the pipeline
    output_name="evaluation",    # Must match the output_name in the evaluation step's ProcessingOutput
    path="evaluation.json"       # Path to the JSON file within the evaluation output directory
)

Property files enable sophisticated pipeline orchestration by making the contents of output files accessible to other pipeline components. In our case, the evaluation metrics (MSE, RMSE, MAE, R²) stored in the JSON file become available for future pipeline steps to reference. This capability becomes crucial when building more advanced pipelines that might include conditional deployment based on performance thresholds, automated model comparison logic, or routing decisions based on evaluation results.

<u>The `output_name` parameter must exactly match the output name we'll specify in our evaluation step's `ProcessingOutput`, while the `path` parameter tells SageMaker where to find the JSON file within the output directory</u>. This connection ensures that SageMaker can locate and parse the evaluation metrics for use by downstream pipeline components.




## Defining the Evaluation ProcessingStep

With our processor and property file configured, we can now define the evaluation step that orchestrates the entire evaluation process:



In [None]:
# Define the evaluation step that takes the trained model and test data as input
evaluation_step = ProcessingStep(
    name="EvaluateModel",            # Unique name for this step in the pipeline
    processor=evaluation_processor,  # The processor we defined above
    inputs=[
        # Model artifact from the training step
        sagemaker.processing.ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        # Test data from the processing step
        sagemaker.processing.ProcessingInput(
            source=processing_step.properties.ProcessingOutputConfig.Outputs["test_data"].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        # Where the evaluation report will be saved
        sagemaker.processing.ProcessingOutput(
            output_name="evaluation",               # Reference name for this output
            source="/opt/ml/processing/evaluation"  # Container path where script saves report
        )
    ],
    code="evaluation.py",               # The Python script that performs the evaluation
    property_files=[evaluation_report_property]  # Enable access to evaluation metrics in subsequent pipeline steps
)

The evaluation step demonstrates sophisticated pipeline orchestration by connecting to outputs from two different previous steps. The first input references `training_step.properties.ModelArtifacts.S3ModelArtifacts`, which provides the S3 location where our training step saved the trained model artifact. The second input references the test data output from our preprocessing step using the same property system you learned in the previous lesson.

This dual-input configuration creates implicit dependencies that ensure the evaluation step won't execute until both the training and preprocessing steps complete successfully. SageMaker automatically manages these dependencies, guaranteeing that our evaluation script will have access to both the trained model and the test data when it runs.

The `property_files` parameter enables downstream pipeline components to access the evaluation metrics programmatically. This capability becomes crucial when building more sophisticated pipelines that might include conditional deployment based on model performance thresholds or automated model comparison logic.

## Integrating the Evaluation Step into the Pipeline



In [None]:
# Create pipeline by combining all steps
pipeline = Pipeline(
    name=PIPELINE_NAME,
    steps=[processing_step, training_step, evaluation_step],
    sagemaker_session=sagemaker_session
)