# SageMaker Pipeline via SDK


## Helpful Links

https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/abalone_build_train_deploy/sagemaker-pipelines-preprocess-train-evaluate-batch-transform.ipynb  
https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps-types.html#step-type-training  
https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#pipeline-session  


In [None]:
import boto3 
import sagemaker
from sagemaker import image_uris

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.model import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.pipeline_context import PipelineSession


# Initialize session
### Local
# from sagemaker.workflow.pipeline_context import LocalPipelineSession
# local_pipeline_session = LocalPipelineSession()
# pipeline_session = local_pipeline_session
### Remote
pipeline_session = PipelineSession()

role = "INSERT_ARN"
s3_bucket = "INSERT_BUCKET"
role = "arn:aws:iam::146868985163:role/SageMaker-ExecutionRole"
s3_bucket = "adgu-datasets"


# Define Parameters
xgb_image_uri = image_uris.retrieve(framework='xgboost',region='us-east-1', version='1.7-1')
input_train_path = f"s3://{s3_bucket}/pipeline-dataset/train.csv"
input_validate_path = f"s3://{s3_bucket}/pipeline-dataset/validate.csv"
model_path = f"s3://{s3_bucket}/pipeline-model/"

xgb_train = Estimator(
    image_uri=xgb_image_uri,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:squarederror",
    num_round=50,
    max_depth=5,
    eta=0.2,
    subsample=0.7
)

# Use estimator directly in the TrainingStep insteaad of calling fit()
step_train = TrainingStep(
    name="Train",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=input_train_path,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=input_validate_path,
            content_type="text/csv",
        ),
    }
)


model = Model(
    image_uri=xgb_image_uri,
    sagemaker_session=pipeline_session,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    role=role,
)
step_create_model = ModelStep(
    name="AbaloneCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

step_register = RegisterModel(
    name="RegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["application/json"],
    inference_instances=["ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name="PipelineModelPackageGroup",
    approval_status="Approved"
)

pipeline_name = "ADGUPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    steps=[step_train, step_create_model, step_register],
)

pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.describe()

## Automate Pipeline Execution
- Monitor for file changes in S3 and trigger the Pipeline to execute
- Send event to SQS queue
- Trigger Lambda functionm
- Trigger pipeline execution

# Lambda Function 

```
import json
import boto3
import os

# Initialize AWS clients
sagemaker_client = boto3.client('sagemaker')
sqs_client = boto3.client('sqs')
sns_client = boto3.client('sns')

# Define constants
SAGEMAKER_PIPELINE_NAME = "ADGUPipeline"
SNS_TOPIC_ARN = "arn:aws:sns:us-east-1:146868985163:SageMakerPipelineTopic"  # Update with your SNS ARN

def lambda_handler(event, context):
    """
    AWS Lambda function to process messages from an SQS queue, trigger an AWS SageMaker Pipeline,
    and publish a notification to an SNS topic.
    """
    try:
        for record in event['Records']:
            print(record)
            # Extract message body
            message_body = record['body']
            print(f"Received SQS Message: {message_body}")

            # Trigger SageMaker pipeline execution
            response = sagemaker_client.start_pipeline_execution(
                PipelineName=SAGEMAKER_PIPELINE_NAME,
                PipelineExecutionDisplayName="TriggeredFromLambda",
                PipelineParameters=[]
            )

            execution_arn = response['PipelineExecutionArn']
            print(f"SageMaker Pipeline Execution started: {execution_arn}")

            # Publish SNS Notification
            sns_message = {
                "message": "SageMaker Pipeline Execution Triggered",
                "pipeline_execution_arn": execution_arn,
                "sqs_message": message_body
            }

            sns_client.publish(
                TopicArn=SNS_TOPIC_ARN,
                Message=json.dumps(sns_message),
                Subject="SageMaker Pipeline Execution Notification"
            )

            print(f"SNS Notification sent to topic {SNS_TOPIC_ARN}")

        return {
            'statusCode': 200,
            'body': json.dumps('SageMaker pipeline execution started and SNS notification sent successfully.')
        }

    except Exception as e:
        print(f"Error: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps(f"Failed to process: {str(e)}")
        }
```

## IAM permission policy for Lambda function, attached to execution role

```
{
  "Effect": "Allow",
  "Action": [
    "sagemaker:StartPipelineExecution"
  ],
  "Resource": "*"
}
```