In [1]:
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.inputs import TrainingInput
from sagemaker.estimator import Estimator
from sagemaker.workflow.parameters import ParameterString, ParameterInteger

# Initialize SageMaker session and role
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()

# Parameters
input_data = ParameterString(name="InputDataUrl", default_value="s3://mlops-testing-bucket-kulsin/sample-data.csv")
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1)

# Processing step: simple sklearn processor to clean data
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=role,
    instance_type="ml.t3.medium",
    instance_count=processing_instance_count
)

step_process = ProcessingStep(
    name="SampleProcessing",
    processor=sklearn_processor,
    inputs=[
        sagemaker.processing.ProcessingInput(
            source=input_data,
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        sagemaker.processing.ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/output/train"
        ),
        sagemaker.processing.ProcessingOutput(
            output_name="validation",
            source="/opt/ml/processing/output/validation"
        )
    ],
    code="processing_script.py"  # You need to create this script to clean/split data
)

# Training step: simple XGBoost training
xgboost_estimator = Estimator(
    image_uri=sagemaker.image_uris.retrieve("xgboost", sagemaker_session.boto_region_name, version="1.2-1"),
    role=role,
    instance_count=training_instance_count,
    instance_type="ml.m5.xlarge",
    output_path=f"s3://{sagemaker_session.default_bucket()}/output"
)

step_train = TrainingStep(
    name="SampleTraining",
    estimator=xgboost_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            content_type="csv"
        )
    }
)

# Pipeline
pipeline = Pipeline(
    name="SampleDataAnalysisPipeline",
    parameters=[input_data, processing_instance_count, training_instance_count],
    steps=[step_process, step_train],
    sagemaker_session=sagemaker_session
)

# Create or update pipeline
pipeline.upsert(role_arn=role)

# (Optional) Trigger execution
execution = pipeline.start(parameters={"InputDataUrl": "s3://mlops-testing-bucket-kulsin/sample-data.csv"})
print(f"Pipeline execution ARN: {execution.arn}")


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


Pipeline execution ARN: arn:aws:sagemaker:us-east-1:282698011778:pipeline/SampleDataAnalysisPipeline/execution/wyjw74imxzw9
