In [8]:
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

from sagemaker import get_execution_role

# role = "<sagemaker-execution-role>"

role=get_execution_role()
region = "us-east-1"
image_uri = "961807745392.dkr.ecr.ap-south-1.amazonaws.com/mlops-container:latest"

# Preprocessing Step
preprocess_processor = ScriptProcessor(
    image_uri=image_uri,
    role=role,
    command=["python3"],
    instance_count=1,
    instance_type="ml.t3.medium"
)

preprocess_step = ProcessingStep(
    name="preprocess_step",
    processor=preprocess_processor,
    code="preprocess.py",
    inputs=[
        ProcessingInput(
            source="s3://feature-engineering-bucket-989220949c9c/Dataset/bank-additional-full.csv",  # S3 input file
            destination="/opt/ml/processing/input/"  # Path inside the container
        )
        # {"source": "s3://feature-engineering-bucket/bank-additional-full.csv", "destination": "/opt/ml/processing/input"}
    ],
    outputs=[
        ProcessingOutput(
            output_name="processed_data",
            source="/opt/ml/processing/output",  # Path inside the container
            destination="s3://feature-engineering-bucket-989220949c9c/processed/"  # S3 output location
        )
    ]
)


# Define Estimator
estimator = Estimator(
    image_uri=image_uri,
    role=role,
    entry_point="train.py",             # ✅ Training script passed at runtime
    source_dir=".",                     # ✅ Folder where train.py is located
    instance_count=1,
    instance_type="ml.m5.large",
    base_job_name="mlops-train",
    output_path="s3://feature-engineering-bucket-989220949c9c/models/"  # ✅ where model.tar.gz will be saved
)

# Define TrainingStep in pipeline
training_step = TrainingStep(
    name="TrainingStep",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data="s3://feature-engineering-bucket-989220949c9c/processed/bank-additional-processed.csv",
            content_type="text/csv"
        )
    }
)


# Ensure preprocessing and training steps are defined
assert preprocess_step is not None, "PreprocessingStep is not defined."
assert training_step is not None, "TrainingStep is not defined."

# Define steps as a list
steps = [preprocess_step, training_step]

# Ensure steps are a valid list
assert isinstance(steps, list) and all(steps), "Pipeline steps must be a non-empty list."

# Create the pipeline
pipeline = Pipeline(name="MLOpsPipeline", steps=steps)

# Upsert and execute the pipeline
pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.wait()



sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.Subnets
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.SecurityGroupIds
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.VpcConfig.Subn

In [7]:
print(preprocess_step.properties.ProcessingOutputConfig.Outputs["output"].S3Output.S3Uri.expr)

{'Get': "Steps.preprocess_step.ProcessingOutputConfig.Outputs['output'].S3Output.S3Uri"}


In [15]:
execution.list_steps()


[{'StepName': 'preprocess_step',
  'StartTime': datetime.datetime(2025, 12, 7, 22, 16, 57, 118000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 12, 7, 22, 22, 0, 723000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:ap-south-1:961807745392:processing-job/pipelines-ulfi852vuyyh-preprocess-step-zfBq1PyoPm'}},
  'AttemptCount': 1},
 {'StepName': 'training_step',
  'StartTime': datetime.datetime(2025, 12, 7, 22, 16, 57, 118000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 12, 7, 22, 19, 38, 31000, tzinfo=tzlocal()),
  'StepStatus': 'Failed',
  'FailureReason': 'ClientError: AlgorithmError: , exit code: 1',
  'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:ap-south-1:961807745392:training-job/pipelines-ulfi852vuyyh-training-step-E3zpYql2Pm'}},
  'AttemptCount': 1}]

In the above preprocessing and training job, you should notice that:

- ProcessingStep lets you specify custom input/output paths and a script to run.
- TrainingStep seems more rigid and does NOT let you specify inputs/outputs the same way.

1. Processing Jobs support arbitrary scripts + arbitrary input/output paths
2. Training Jobs must follow the built-in SageMaker training contract

Training jobs do NOT support arbitrary ProcessingInput/ProcessingOutput.\
They instead expect the Output model placed automatically in: ```/opt/ml/model```

3. Training jobs use a fixed input interface: ```TrainingInput(...)```\
You provide only the S3 URI, not the container path:
    
    ```bash
inputs={
"train": TrainingInput(
    s3_data="s3://bucket/path/train.csv"
)
}

```\
Inside the container, SageMaker always maps it automatically to: ```/opt/ml/input/data/train```

You cannot change this path.\
This is part of the SageMaker Training contract.
    

4. Training jobs also require a fixed output interface

Models MUST be written to the following directory in the docker container:
```
    /opt/ml/model
```
Anything you save here will be automatically uploaded to:
```
    s3://<bucket-name>/Models/<training-job-name>/output/model.tar.gz
```

    