In [15]:
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.estimator import Estimator
from sagemaker.workflow.parameters import ParameterString
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.sklearn.processing import SKLearnProcessor
import pandas as pd
import numpy as np
import os
import json
from sklearn.metrics import mean_absolute_error, mean_squared_error
import boto3

# Initialize pipeline session
pipeline_session = PipelineSession()

# Define input parameter
input_data_uri = ParameterString(name="InputData", default_value="s3://nexttrendco/processed/rossman_sagemaker/")

# XGBoost training image URI
xgb_image = sagemaker.image_uris.retrieve("xgboost", region="us-east-1", version="1.5-1")

In [23]:
step_eval = ProcessingStep(
    name="EvaluateModel",
    processor=processor,
    inputs=[
        ProcessingInput(
            source="s3://nexttrendco/rossman/true.csv",
            destination="/opt/ml/processing/groundtruth"
        ),
        ProcessingInput(
            source="s3://nexttrendco/rossman/pred.csv",
            destination="/opt/ml/processing/predictions"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/eval"
        )
    ],
    code="scripts/evaluate.py"  
)

In [28]:
# Define Estimator for training
xgb_estimator = Estimator(
    image_uri=xgb_image,
    role=role_arn,
    instance_count=1,
    instance_type="ml.m5.large",
    output_path="s3://nexttrendco/rossman/output/",
    sagemaker_session=pipeline_session
)

# Set model hyperparameters
xgb_estimator.set_hyperparameters(
    objective="reg:squarederror",
    num_round=100,
    max_depth=5
)

# Define training step
train_step = TrainingStep(
    name="XGBoostTrain",
    estimator=xgb_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=input_data_uri,
            content_type="parquet"
        )
    }
)

In [29]:
# Define SKLearnProcessor for evaluation
processor = SKLearnProcessor(
    framework_version="1.2-1",
    role=role_arn,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="model-eval",
    sagemaker_session=pipeline_session
)

In [31]:
# Define evaluation step
step_eval = ProcessingStep(
    name="EvaluateModel",
    processor=processor,
    code="scripts/evaluate.py",
    inputs=[
        ProcessingInput(
            source="s3://nexttrendco/rossman/true.csv",
            destination="/opt/ml/processing/input/true"
        ),
        ProcessingInput(
            source="s3://nexttrendco/rossman/pred.csv",
            destination="/opt/ml/processing/input/pred"
        ),
    ],
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/evaluation",
            destination="s3://nexttrendco/rossman/evaluation"
        )
    ]
)

In [32]:
# Define pipeline
pipeline = Pipeline(
    name="RossmanForecastPipeline",
    parameters=[input_data_uri],
    steps=[train_step, step_eval],
    sagemaker_session=pipeline_session
)

# Register and start pipeline
pipeline.upsert(role_arn=role_arn)
execution = pipeline.start()