In [1]:
# !pip install -U numexpr

In [2]:
"""
Helper to upsert & start the AutoGluon Tabular SageMaker Pipeline.
Adjust the parameter values below or pass environment variables.
"""

import os

import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.session import Session

from pipeline_autogluon_tabular import get_pipeline

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


In [3]:
region = os.environ.get("AWS_DEFAULT_REGION")
sagemaker_session = Session(boto3.Session(region_name=region))
default_bucket = sagemaker_session.default_bucket()
role = sagemaker.get_execution_role()
print("Role:", role)

INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


Role: arn:aws:iam::628182899181:role/service-role/AmazonSageMaker-ExecutionRole-20250728T162690


In [4]:
pipeline = get_pipeline(region=region, default_bucket=default_bucket)
pipeline

INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
Ignoring the following arguments when retrieving image uri for JumpStart model id 'autogluon-classification-ensemble': {'framework': 'autogluon'}
INFO:sagemaker.jumpstart:Ignoring the following arguments when retrieving image uri for JumpStart model id 'autogluon-classification-ensemble': {'framework': 'autogluon'}
Using model 'autogluon-classification-ensemble' with wildcard version identifier '*'. You can pin to version '2.0.16' for more stable results. Note that models may have different input/output signatures after a major version upgrade.
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
Ignoring the following arguments when retrieving image uri for JumpStart model id 'autogluon-classification-e

<sagemaker.workflow.pipeline.Pipeline at 0x7ff2ac2f7f40>

In [5]:
pipeline.upsert(role_arn=role)  # uses the attached execution role



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:628182899181:pipeline/Autogluon-Tabular-Pipeline',
 'PipelineVersionId': 11,
 'ResponseMetadata': {'RequestId': '9b83bfcc-4659-42c4-8a3a-f39589d175b5',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '9b83bfcc-4659-42c4-8a3a-f39589d175b5',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '117',
   'date': 'Sat, 16 Aug 2025 23:54:08 GMT'},
  'RetryAttempts': 0}}

In [6]:
# ---- Set your parameters here ----
params = {
    "InputDataS3Uri": f"s3://{default_bucket}/autogluon/input/covid.csv",
    "LabelColumn": "mortality",
    "ProblemType": "classification",
    "ProcessingInstanceType": "ml.m5.2xlarge",
    "TrainInstanceType": "ml.m5.4xlarge",  # "ml.g5.2xlarge",
    "TimeLimitSeconds": 900,  # 15 minutes
    "AutoStack": "true",
    "Presets": "medium_quality_faster_train",
    "ValFraction": 0.2,
    "AccuracyThreshold": 0.8,
    "RMSEThreshold": 1e9,
    "ModelPackageGroupName": "AutogluonTabular",
    "ModelApprovalStatus": "PendingManualApproval",
}

execution = pipeline.start(parameters=params)
print("Started pipeline execution:", execution.arn)

Started pipeline execution: arn:aws:sagemaker:us-east-1:628182899181:pipeline/Autogluon-Tabular-Pipeline/execution/vh4s3sj737bz


In [7]:
import time, datetime, boto3

def poll_pipeline(execution_arn: str, poll_sec: int = 20):
    sm = boto3.client("sagemaker")

    # helper to print once per change
    seen = {"pipeline": None}
    last_print = 0

    def _print_once(label, value):
        nonlocal seen
        if seen.get(label) != value:
            print(f"[{datetime.datetime.utcnow().isoformat(timespec='seconds')}Z] {label}: {value}")
            seen[label] = value

    while True:
        # 1) High-level pipeline status
        desc = sm.describe_pipeline_execution(PipelineExecutionArn=execution_arn)
        p_status = desc["PipelineExecutionStatus"]
        _print_once("Pipeline", p_status)

        # 2) Each step + child job details
        steps = sm.list_pipeline_execution_steps(PipelineExecutionArn=execution_arn)["PipelineExecutionSteps"]
        # sort by start time for a stable printout
        steps = sorted(steps, key=lambda s: s.get("StartTime", datetime.datetime.min))

        for s in steps:
            name = s["StepName"]
            status = s["StepStatus"]
            _print_once(f"Step/{name}", status)

            # Only print failure reason once if present
            if status in ("Failed", "Stopped") and s.get("FailureReason"):
                _print_once(f"Step/{name}/Failure", s["FailureReason"])

            md = s.get("Metadata", {})
            # Surface Processing job status/progress
            if "ProcessingJob" in md:
                pj_name = md["ProcessingJob"]["Arn"].split("/")[-1]
                pj = sm.describe_processing_job(ProcessingJobName=pj_name)
                _print_once(f"Processing/{pj_name}", pj["ProcessingJobStatus"])
                if pj.get("FailureReason"):
                    _print_once(f"Processing/{pj_name}/Failure", pj["FailureReason"])

            # Surface Training job status + secondary transitions (progress messages)
            if "TrainingJob" in md:
                tj_name = md["TrainingJob"]["Arn"].split("/")[-1]
                tj = sm.describe_training_job(TrainingJobName=tj_name)
                _print_once(f"Training/{tj_name}", tj["TrainingJobStatus"])
                if tj.get("SecondaryStatusTransitions"):
                    last = tj["SecondaryStatusTransitions"][-1]
                    msg = f"{last['Status']}: {last.get('StatusMessage','')}"
                    _print_once(f"Training/{tj_name}/Progress", msg.strip())
                if tj.get("FailureReason"):
                    _print_once(f"Training/{tj_name}/Failure", tj["FailureReason"])

        # Stop when pipeline reaches a terminal state
        if p_status in ("Succeeded", "Failed", "Stopped"):
            print("\nFinal pipeline status:", p_status)
            if desc.get("FailureReason"):
                print("Pipeline failure reason:", desc["FailureReason"])
            break

        time.sleep(poll_sec)


poll_pipeline(execution.arn, poll_sec=30)

[2025-08-16T23:54:08Z] Pipeline: Executing
[2025-08-16T23:54:38Z] Step/FeatureEngineer: Executing
[2025-08-16T23:54:38Z] Processing/pipelines-vh4s3sj737bz-FeatureEngineer-UUNlN2eAhh: InProgress
[2025-08-16T23:57:10Z] Step/FeatureEngineer: Succeeded
[2025-08-16T23:57:10Z] Processing/pipelines-vh4s3sj737bz-FeatureEngineer-UUNlN2eAhh: Completed
[2025-08-16T23:57:10Z] Step/PreprocessData: Executing
[2025-08-16T23:57:10Z] Processing/pipelines-vh4s3sj737bz-PreprocessData-s4XIH9qdTO: InProgress
[2025-08-16T23:59:41Z] Step/PreprocessData: Succeeded
[2025-08-16T23:59:41Z] Processing/pipelines-vh4s3sj737bz-PreprocessData-s4XIH9qdTO: Completed
[2025-08-16T23:59:41Z] Step/TrainAutogluon: Executing
[2025-08-16T23:59:41Z] Training/pipelines-vh4s3sj737bz-TrainAutogluon-MuQgmm5gbe: InProgress
[2025-08-16T23:59:41Z] Training/pipelines-vh4s3sj737bz-TrainAutogluon-MuQgmm5gbe/Progress: Starting: Preparing the instances for training
[2025-08-17T00:00:11Z] Training/pipelines-vh4s3sj737bz-TrainAutogluon-MuQg

In [8]:
# Optional wait (comment out if you don't want to block)

# execution.wait(delay=60, max_attempts=1000)

# print("Status:", execution.describe().get("PipelineExecutionStatus"))

In [9]:
# import boto3
# sm = boto3.client("sagemaker")

# # get the training job name from the execution
# steps = sm.list_pipeline_execution_steps(PipelineExecutionArn=execution.arn)["PipelineExecutionSteps"]
# tname = next(s["Metadata"]["TrainingJob"]["Arn"].split("/")[-1] for s in steps if s["StepName"]=="TrainAutogluon")

# desc = sm.describe_training_job(TrainingJobName=tname)
# print("Status:", desc["TrainingJobStatus"])
# print("FailureReason:", desc.get("FailureReason"))
# print("StoppingCondition:", desc.get("StoppingCondition"))
# for tr in desc.get("SecondaryStatusTransitions", []):
#     print(tr["Status"], tr["StatusMessage"])

# Status: Stopped
# FailureReason: None
# StoppingCondition: {'MaxRuntimeInSeconds': 900}
# Starting Starting the training job
# Pending Preparing the instances for training
# Downloading Downloading the training image
# Training Training image download completed. Training in progress.
# Stopping Stopping the training job
# Uploading Uploading generated training model
# MaxRuntimeExceeded Training job runtime exceeded MaxRuntimeInSeconds provided