In [1]:
# Importing necessary libraries
import os
import json
import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.parameters import ParameterString, ParameterInteger, ParameterFloat
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.properties import PropertyFile
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.model import Model
from sagemaker.inputs import TrainingInput, TransformInput
from sagemaker.lineage.visualizer import LineageTableVisualizer
from sagemaker.estimator import Estimator
from sagemaker.sklearn.estimator import SKLearn
from sagemaker import image_uris
from sagemaker.xgboost.model import XGBoostModel
from sagemaker.sklearn.model import SKLearnModel
from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.session import Session
from sagemaker.feature_store.feature_group import FeatureGroup
from sklearn.model_selection import train_test_split
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [2]:
# Creating session variables
region = boto3.Session().region_name
role = sagemaker.get_execution_role()
sm_session = sagemaker.Session()
pipeline_session = PipelineSession()
bucket = sm_session.default_bucket()
prefix = "student-anxiety-pipeline"

LOCAL_DIR = "local_artifacts"
os.makedirs(LOCAL_DIR, exist_ok=True)
print(f"Region: {region}")
print(f"Role: {role}")

Region: us-east-1
Role: arn:aws:iam::303848588930:role/LabRole


In [14]:
# Creating parameters
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.xlarge")
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
model_package_group_name = f"AbaloneModelPackageGroupName"

In [4]:
def prepare_training_data(
    sm_session: Session,
    bucket: str,
    prefix: str,
    local_dir: str,
    region: str,
    target_col: str = "anxiety_level_encoded"
):
    """
    Loading feature groups from SageMaker Feature Store, preprocessing data,
    and uploading train/val/test/prod splits to S3 for use in training pipeline
    """

    s3 = boto3.client("s3", region_name=region)
    os.makedirs(local_dir, exist_ok=True)

    def load_feature_group(fg_name):
        """Load data from Feature Store & clean metadata"""
        fg = FeatureGroup(name=fg_name, sagemaker_session=sm_session)
        q = fg.athena_query()
        table = q.table_name

        q.run(
            query_string=f'SELECT * FROM "{table}"',
            output_location=f"s3://{bucket}/athena-results/"
        )
        q.wait()
        df = q.as_dataframe()

        # Keeping most recent record per student
        df = df.sort_values(["student_id", "event_time"])
        df = df.drop_duplicates(subset=["student_id"], keep="last")

        # Dropping metadata columns
        meta_cols = ["write_time", "is_deleted", "api_invocation_time", "event_time"]
        df = df.drop(columns=[c for c in meta_cols if c in df.columns])

        return df

    # Loading all feature groups
    demo_df = load_feature_group("student-demographics-ses-fg")
    performance_df = load_feature_group("student-performance-fg")
    wellbeing_df = load_feature_group("student-wellbeing-fg")
    target_df = load_feature_group("student-anxiety-target-fg")

    print("\nFeature Group shapes:")
    print(f"Demographics & SES: {demo_df.shape}")
    print(f"Academic Performance: {performance_df.shape}")
    print(f"Student Wellbeing: {wellbeing_df.shape}")
    print(f"Anxiety Target: {target_df.shape}")

    # Merging into one dataframe
    df = demo_df.merge(performance_df, on="student_id")
    df = df.merge(wellbeing_df, on="student_id")
    df = df.merge(target_df, on="student_id")

    print("Merged shape:", df.shape)

    # Preprocessing
    # Converting categorical columns to numeric codes
    for col in df.columns:
        if df[col].dtype == 'object':
            df[col] = df[col].fillna("missing")
            df[col] = df[col].astype("category").cat.codes

    # Defining feature columns (everything except target & ID)
    feature_cols = [c for c in df.columns if c not in [target_col, "student_id"]]

    print(f"Using {len(feature_cols)} features")

    # Splitting into train/val/test/prod
    df_main, df_prod = train_test_split(df, test_size=0.4, random_state=0, stratify=df[target_col])
    df_train, df_temp = train_test_split(df_main, test_size=0.3333, random_state=0, stratify=df_main[target_col])
    df_val, df_test = train_test_split(df_temp, test_size=0.5, random_state=0, stratify=df_temp[target_col])

    print("Dataset sizes:")
    print(f"Train: {len(df_train)} | Val: {len(df_val)} | Test: {len(df_test)} | Prod: {len(df_prod)}")

    # Helper: Uploading splits to S3
    def upload_to_s3(dataframe, split_name, use_xgb_format=False):
        """Save dataframe to S3 in the correct format"""
        if use_xgb_format:
            cols = [target_col] + feature_cols
            local_path = f"{local_dir}/{split_name}_xgb.csv"
            dataframe[cols].to_csv(local_path, header=False, index=False)
            s3_key = f"{prefix}/xgb/{split_name}/{split_name}.csv"
        else:
            cols = [target_col] + feature_cols
            local_path = f"{local_dir}/{split_name}_sklearn.csv"
            dataframe[cols].to_csv(local_path, index=False)
            s3_key = f"{prefix}/sklearn/{split_name}/{split_name}.csv"

        s3.upload_file(local_path, bucket, s3_key)
        return f"s3://{bucket}/{s3_key}"

    # Uploading all splits
    for name, data in [("train", df_train), ("val", df_val), ("test", df_test), ("prod", df_prod)]:
        upload_to_s3(data, name, use_xgb_format=False)
        upload_to_s3(data, name, use_xgb_format=True)

    print("All datasets uploaded to S3.")

    # Returning useful paths for pipeline steps
    return {
        "train_sklearn": f"s3://{bucket}/{prefix}/sklearn/train/",
        "val_sklearn": f"s3://{bucket}/{prefix}/sklearn/val/",
        "test_sklearn": f"s3://{bucket}/{prefix}/sklearn/test/",
        "train_xgb": f"s3://{bucket}/{prefix}/xgb/train/",
        "val_xgb": f"s3://{bucket}/{prefix}/xgb/val/",
        "test_xgb": f"s3://{bucket}/{prefix}/xgb/test/",
        "num_classes": df[target_col].nunique(),
        "feature_cols": feature_cols,
    }

In [5]:
# Preprocessing step
data_info = prepare_training_data(
    sm_session=sm_session,
    bucket=bucket,
    prefix=prefix,
    local_dir=LOCAL_DIR,
    region=region
)

INFO:sagemaker:Query 1d670c52-7489-4416-b8ff-8817d0f0b664 is being executed.
INFO:sagemaker:Query 1d670c52-7489-4416-b8ff-8817d0f0b664 successfully executed.
INFO:sagemaker:Query 3126a43d-98e2-4600-957f-b2b052bd4122 is being executed.
INFO:sagemaker:Query 3126a43d-98e2-4600-957f-b2b052bd4122 successfully executed.
INFO:sagemaker:Query beb0b405-2899-4f71-a226-d8e24edaa1a1 is being executed.
INFO:sagemaker:Query beb0b405-2899-4f71-a226-d8e24edaa1a1 successfully executed.
INFO:sagemaker:Query 854af3f2-3aa4-4926-a5e4-7ed0bdac7acc is being executed.
INFO:sagemaker:Query 854af3f2-3aa4-4926-a5e4-7ed0bdac7acc successfully executed.



Feature Group shapes:
Demographics & SES: (32293, 7)
Academic Performance: (36824, 2)
Student Wellbeing: (25141, 9)
Anxiety Target: (35565, 2)
Merged shape: (25112, 17)
Using 15 features
Dataset sizes:
Train: 10045 | Val: 2511 | Test: 2511 | Prod: 10045
All datasets uploaded to S3.


In [6]:
train_sklearn = data_info["train_sklearn"]
test_sklearn = data_info["test_sklearn"]
val_sklearn = data_info["val_sklearn"]
train_xgb = data_info["train_xgb"]
test_xgb = data_info["test_xgb"]
val_xgb = data_info["val_xgb"]
num_classes = data_info["num_classes"]


In [7]:
# Training step for baseline model
baseline_estimator = SKLearn(
    entry_point="train.py",
    source_dir="baseline_lr",
    role=role,
    framework_version="1.2-1",
    instance_type=processing_instance_type,
    instance_count=1,
    sagemaker_session=pipeline_session
)

baseline_train_input = TrainingInput(
    s3_data=train_sklearn,
    content_type="text/csv"
)
baseline_val_input = TrainingInput(
    s3_data=val_sklearn,
    content_type="text/csv"
)

train_step_sklearn = TrainingStep(
    name="TrainBaselineModel",
    estimator=baseline_estimator,
    inputs={"train": baseline_train_input, "validation": baseline_val_input}
)




In [8]:
# Training step for XGBoost model
num_classes = 3

xgb_container = image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1"
)

xgb_estimator = Estimator(
    image_uri=xgb_container,
    role=role,
    instance_count=1,
    instance_type=processing_instance_type,
    sagemaker_session=pipeline_session,
    hyperparameters={
        "objective": "multi:softprob",
        "num_class": num_classes,
        "num_round": 300,
        "max_depth": 6,
        "eta": 0.1,
        "subsample": 0.8,
        "colsample_bytree": 0.8,
        "eval_metric": "mlogloss"
    }
)

xgb_train_input = TrainingInput(
    s3_data=train_xgb,
    content_type="text/csv"
)
xgb_val_input = TrainingInput(
    s3_data=val_xgb,
    content_type="text/csv"
)

train_step_xgb = TrainingStep(
    name="TrainXGBoostModel",
    estimator=xgb_estimator,
    inputs={"train": xgb_train_input, "validation": xgb_val_input}
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.


In [9]:
# Evaluation step

# Evaluation script path
evaluation_script_path = "code/evaluation.py"

evaluation_processor = ScriptProcessor(
    image_uri=xgb_container,
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="evaluate-student-models",
    sagemaker_session=pipeline_session,
    command=["python3"]
)

eval_args = evaluation_processor.run(
    inputs=[
        ProcessingInput(
            source=train_step_xgb.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=data_info["test_xgb"],
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/evaluation"
        ),
    ],
    code="code/evaluation-xgboost.py",
)

evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)

evaluate_step = ProcessingStep(
    name="EvaluateModels",
    step_args=eval_args,
    property_files=[evaluation_report]
)




In [16]:
# Conditional Model Registration
f1_threshold = 0.5

# Fail step
step_fail = FailStep(
    name="XGBoostF1Fail",
    error_message=Join(on=" ", values=["Execution failed due to F1 <", f1_threshold]),
)

# Registering model
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            evaluate_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

# Commenting out for pipeline demo
"""register_args = xgb_estimator.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name="StudentAnxiety-XGBoost",
    approval_status=model_approval_status,
    model_metrics=model_metrics
)

xgboost_register = ModelStep(name="XGBoostModel", step_args=register_args)
"""

condition_register_xgb = ConditionStep(
    name="RegisterXGBIfF1Good",
    conditions=[
        ConditionGreaterThan(
            left=JsonGet(
                step_name=evaluate_step.name,
                property_file=evaluation_report,
                json_path="XGBoost.F1"
            ),
            right=f1_threshold
        )
    ],
    if_steps=[],
    else_steps=[step_fail]
)





In [17]:
# Pipeline definition
pipeline = Pipeline(
    name="StudentAnxietyPipeline",
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
    ],
    steps=[
        # train_step_sklearn,  omitting for pipeline demo
        train_step_xgb,
        evaluate_step,
        # condition_register_sklearn, omitting for pipeline demo
        condition_register_xgb
    ],
    sagemaker_session=pipeline_session
)

In [18]:
definition = json.loads(pipeline.definition())
definition



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'TrainXGBoostModel',
   'Type': 'Training',
   'Arguments': {'AlgorithmSpecification': {'TrainingInputMode': 'File',
     'TrainingImage': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.0-1-cpu-py3'},
    'OutputDataConfig': {'S3OutputPath': 's3://sagemaker-us-east-1-303848588930/'},
    'StoppingCondition': {'MaxRuntimeInSeconds': 86400},
    'ResourceConfig': {'VolumeSizeInGB': 30,
     'InstanceCount': 1,
     'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'}},
    'RoleArn'

In [19]:
# Executing pipeline
pipeline.upsert(role_arn=role)

execution = pipeline.start()
print(f"Pipeline execution started: {execution.arn}")



Pipeline execution started: arn:aws:sagemaker:us-east-1:303848588930:pipeline/StudentAnxietyPipeline/execution/ybronayikskm


In [None]:
execution.describe()["PipelineExecutionStatus"]

In [None]:
for step in execution.list_steps():
    print(step['StepName'], step['StepStatus'])