# Manual Setup

We perform manual setup to validate our cloud infrastructure before we create pipeline

## Dependencies

In [13]:
%pip install -q "botocore==1.38.23" boto3 sagemaker mlflow s3fs fsspec "scikit-learn>=1.0" "pandas>=1.2"

Note: you may need to restart the kernel to use updated packages.


##  Load Data Set

In [14]:
import pandas as pd
df = pd.read_csv("ai4i2020.csv")
print(f"Dataset shape: {df.shape}")
print(f"Columns: {df.info()}")
print(f"Columns: {df.columns.tolist()}")
print(f"First 3 rows:\n{df.head(3)}") 

Dataset shape: (10000, 14)
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 14 columns):
 #   Column                   Non-Null Count  Dtype  
---  ------                   --------------  -----  
 0   UDI                      10000 non-null  int64  
 1   Product ID               10000 non-null  object 
 2   Type                     10000 non-null  object 
 3   Air temperature [K]      10000 non-null  float64
 4   Process temperature [K]  10000 non-null  float64
 5   Rotational speed [rpm]   10000 non-null  int64  
 6   Torque [Nm]              10000 non-null  float64
 7   Tool wear [min]          10000 non-null  int64  
 8   Machine failure          10000 non-null  int64  
 9   TWF                      10000 non-null  int64  
 10  HDF                      10000 non-null  int64  
 11  PWF                      10000 non-null  int64  
 12  OSF                      10000 non-null  int64  
 13  RNF                      10000 non-null  int64  
d

In [15]:
%%writefile preprocess.py
## This file is created once during manual setup 
import os
import argparse
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
def unknown_fail_check(row): return ((row['Machine failure'] == 1)
                                     & (row['RNF'] == 0)
                                     & (row['HDF'] == 0)
                                     & (row['TWF'] == 0)
                                     & (row['PWF'] == 0)
                                     & (row['OSF'] == 0))

def pass_yet_fail_check(row): return (row['Machine failure'] == 0) & ((row['RNF'] == 1)
                                                                     | (row['HDF'] == 1)
                                                                     | (row['TWF'] == 1)
                                                                     | (row['PWF'] == 1)
                                                                     | (row['OSF'] == 1))
def preprocessing(df):
    print("# Preprocessing")
    df['Type'] = df['Type'].astype('category')
    type_mapping = {'L': 0, 'M': 1, 'H': 2}
    df['Type'] = df['Type'].map(type_mapping).astype('int')
    print(" Type  Unique Values after encoding: ", df['Type'].unique())
    df.drop(columns=['UDI', 'Product ID'], inplace=True)
    print(f"shape of data after dropping columns {df.shape}")
    df.columns = [col.replace("[","(").replace("]",")") for col in df.columns.values]
    print("DF columns after clean up", df.columns)
    print("## Handle Duplicates") 
    # our original dataset does not have duplicates
    # However, there is no guarantee that production/new data is free of duplicates
    duplicated_row_count = df.duplicated().sum()
    total_row_count = df.shape[0]
    duplicated_row_percentage = (duplicated_row_count/total_row_count*100)
    print(f"Total rows count: {total_row_count}")
    print(f"Duplicated rows count: {duplicated_row_count}")
    print(f"Duplicated rows percentage: {duplicated_row_percentage}")
    df.drop_duplicates(inplace=True)
    print("After removing duplicates rows count:", df.shape[0])
    print("## Handle NULL") 
    print("number of null values : ", df.isnull().sum().sum())
    df.dropna(inplace=True)
    print("After removing null rows count:", df.shape[0])

    passed_although_failed = df[pass_yet_fail_check(df)]
    print(
        f"Number of samples that passed although failed: {len(passed_although_failed)}")
    passed_although_failed.loc[:, ['Machine failure',
                                'TWF', 'HDF', 'PWF', 'OSF', 'RNF']].head(10)
    df['Machine failure'] = np.where(
        pass_yet_fail_check(df), 1, df['Machine failure'])
    passed_although_failed = df[pass_yet_fail_check(df)]
    print(
        f"Number of samples that passed although failed after fix: {len(passed_although_failed)}")

    print(f"Number of machine failures: {df['Machine failure'].sum()}")
    unknown_failures = df[unknown_fail_check(df)]
    print(
        f"Number of failures due to unknown reasons: {len(unknown_failures)}")
    unknown_failures.loc[:, ['Machine failure',
                            'TWF', 'HDF', 'PWF', 'OSF', 'RNF']].head(10)
    df['Machine failure'] = np.where(
        unknown_fail_check(df), 0, df['Machine failure'])
    unknown_failures = df[unknown_fail_check(df)]
    print(
        f"Number of failures due to unknown reasons after fix: {len(unknown_failures)}")
    print("## Add Features") 
    df['Strain (minNm)'] = df['Tool wear (min)'] * df['Torque (Nm)'] 
    df['Power (W)'] = df['Rotational speed (rpm)'] * df['Torque (Nm)'] * 2 * np.pi / 60
    df['Temperature Difference (K)'] = df['Process temperature (K)'] - df['Air temperature (K)']
    labels = ['Machine failure', 'TWF', 'HDF', 'PWF', 'OSF', 'RNF']
    print("# Splitting into train/test...")
    X = df.drop(columns=labels)
    y = df[labels]
    X_train, X_test, y_train, y_test = train_test_split(X,y, test_size=0.2, random_state=42,  stratify=y['Machine failure']) 
    train=pd.concat([X_train, y_train], axis=1)
    test=pd.concat([X_test, y_test], axis=1)
    return train, test

if __name__ == "__main__":
    # The pipeline will pass arguments to this script.
    # The argument will be used to pass the S3 path of our data.
    parser = argparse.ArgumentParser()
    parser.add_argument("--input-path", type=str, help="path containing data.csv")
    parser.add_argument("--output-train-path", type=str, help="Output directory for train.csv")
    parser.add_argument("--output-test-path", type=str, help="Output directory for test.csv")
    args = parser.parse_args()

    input_path = args.input_path or "/opt/ml/processing/input"
    output_train_path = args.output_train_path or "/opt/ml/processing/train"
    output_test_path = args.output_test_path or "/opt/ml/processing/test"
    print(f"--- Starting Processing Job ---")
    print(f"Input path: {input_path}")
    print(f"Output train path: {output_train_path}")
    print(f"Output test path: {output_test_path}")
    # Load the dataset
    print(f"Loading data from {input_path}/data.csv")
    if not os.path.exists(input_path):
        raise FileNotFoundError(f"Input path {input_path} does not exist.")
    if not os.path.exists(os.path.join(input_path, "data.csv")):
        raise FileNotFoundError(f"Data file not found in {input_path}. Please check the path.")
    # Read the CSV file 
    data_path = os.path.join(input_path, "data.csv")
    df = pd.read_csv(data_path) 
    # Preprocess
    train, test = preprocessing(df)
    os.makedirs(output_train_path, exist_ok=True)
    os.makedirs(output_test_path, exist_ok=True)
    print(f"Saving train data to {output_train_path}/train.csv")
    train.to_csv(os.path.join(output_train_path, "train.csv"), index=False)
    print(f"Saving test data to {output_test_path}/test.csv")
    test.to_csv(os.path.join(output_test_path, "test.csv"), index=False)
    print("--- Processing Job Completed ---")


Overwriting preprocess.py


In [16]:
from preprocess import preprocessing 
# since we write the preprocess script we can now import it into our notebook
train, test= preprocessing(df)

# Preprocessing
 Type  Unique Values after encoding:  [1 0 2]
shape of data after dropping columns (10000, 12)
DF columns after clean up Index(['Type', 'Air temperature (K)', 'Process temperature (K)',
       'Rotational speed (rpm)', 'Torque (Nm)', 'Tool wear (min)',
       'Machine failure', 'TWF', 'HDF', 'PWF', 'OSF', 'RNF'],
      dtype='object')
## Handle Duplicates
Total rows count: 10000
Duplicated rows count: 0
Duplicated rows percentage: 0.0
After removing duplicates rows count: 10000
## Handle NULL
number of null values :  0
After removing null rows count: 10000
Number of samples that passed although failed: 18
Number of samples that passed although failed after fix: 0
Number of machine failures: 357
Number of failures due to unknown reasons: 9
Number of failures due to unknown reasons after fix: 0
## Add Features
# Splitting into train/test...


##  Configuration

In [17]:
# ----------------------
base_folder = 'ai4i'      # e.g., 'users/my-name'
experiment_name = "ai4i-Experiment"  # e.g., 'my-experiment'
model_name = "ai4i-model"  # e.g., 'my-model'
tracking_server_name = "Team16"
bucket_name="iti113-team16-bucket" # s3://iti113-team16-bucket

## Create SageMaker and S3 Clients

In [18]:
import sagemaker
import boto3
import mlflow
from sklearn.metrics import classification_report

sagemaker_client = None
s3_client = None
try:
    sagemaker_session = sagemaker.Session()
    sagemaker_client = boto3.client("sagemaker")
    s3_bucket = sagemaker_session.default_bucket()
    s3_client = boto3.client('s3')
    s3_data_key=f"{base_folder}/data/v1/data.csv"
    s3_data_path = f"s3://{bucket_name}/{s3_data_key}"
    s3_data_dir_uri = f"s3://{bucket_name}/{base_folder}/data/v1"
    print(f"Your datasets will be versioned inside: {s3_data_path}")
except Exception as e:
    print(f"Error initializing SageMaker session or S3 client: {e}")
    s3_data_path = None
# minimize traceback in the output as we are not interested in the details
if not sagemaker_client or not s3_client:
    raise Exception("Failed to initialize SageMaker session or S3 client.")


Error initializing SageMaker session or S3 client: Could not connect to the endpoint URL: "https://sts.ap-souhteast-1.amazonaws.com/"


## Connect to Tracking Server


In [None]:
mlflow_tracking_server_arn = None
try:
    response = sagemaker_client.describe_mlflow_tracking_server(
        TrackingServerName=tracking_server_name
    )
    # ARN of MLflow Tracking Server
    mlflow_tracking_server_arn = response['TrackingServerArn']
    print(f"Found MLflow Tracking Server ARN: {mlflow_tracking_server_arn}")
except Exception as e:
    print(f"Could not find tracking server: {e}")
    mlflow_tracking_server_arn = None

# minimize traceback in the output as we are not interested in the details
if not mlflow_tracking_server_arn:
    raise Exception("Failed to find MLflow Tracking Server.")

# IAM role for SageMaker execution
role = sagemaker.get_execution_role()

print(f"S3 Bucket: {s3_data_path}")
print(f"SageMaker Role ARN: {role}")
print(f"MLflow Tracking Server ARN: {mlflow_tracking_server_arn}")

# Connect to the MLflow Tracking Server
# Set the MLflow tracking URI to managed server
if mlflow_tracking_server_arn:
    mlflow.set_tracking_uri(mlflow_tracking_server_arn)
    print("MLflow tracking URI set successfully.")

# Define an experiment name. If it doesn't exist, MLflow creates it.
mlflow.set_experiment(experiment_name)

print(f"MLflow tracking URI set to: {mlflow.get_tracking_uri()}")
print(f"MLflow experiment set to: '{experiment_name}'")

Found MLflow Tracking Server ARN: arn:aws:sagemaker:ap-southeast-1:287730026636:mlflow-tracking-server/mlflow-server-1234567a
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
S3 Bucket: s3://sagemaker-iti112-common/9002963k@myaccount.nyp.edu.sg/assignment/data/v1/iris_data.csv
SageMaker Role ARN: arn:aws:iam::287730026636:role/sagemaker-lab-user-9002963k@myaccount.nyp.edu.sg
MLflow Tracking Server ARN: arn:aws:sagemaker:ap-southeast-1:287730026636:mlflow-tracking-server/mlflow-server-1234567a
MLflow tracking URI set successfully.
MLflow tracking URI set to: arn:aws:sagemaker:ap-southeast-1:287730026636:mlflow-tracking-server/mlflow-server-1234567a
MLflow experiment set to: '9002963k_Iris-Experiment'


## Run Experiments and Track
Let us setup our experiments, to simplify we create a function that can be called to run our experiment

In [None]:
def run_experiment(experiment_name, run_name, C_param):
    print("Starting experiment ", experiment_name, " with run name ", run_name)
    run_id = None
    accuracy_score = None
    # Start an MLflow run
    # Use the experiment name and run name to organize runs
    with mlflow.start_run(run_name=run_name) as run:
        run_id = run.info.run_id
        print(f"\tMLflow Run ID : {run_id}")
        print(f"\tRunning experiment: {experiment_name}, Run Name: {run_name}")
        # Train the model
        max_iter = 100
        model = LogisticRegression(C=C_param, random_state=42,
                                   max_iter=max_iter, solver='liblinear')
        model.fit(X_train, y_train)
        # Evaluate and log metrics
        y_pred = model.predict(X_test)
        cr = classification_report(y_test, y_pred, 
                                   output_dict=True, zero_division=0)
        accuracy_score = cr.pop("accuracy")
        print(f"\tModel accuracy: {accuracy_score:.4f}")
        mlflow.log_param("C", C_param)
        mlflow.log_param("max_iter", max_iter)
        mlflow.log_param("solver", "liblinear")
        # Logging all metrics in classification_report
        mlflow.log_metric("accuracy", accuracy_score)
        for class_or_avg, metrics_dict in cr.items():
            for metric, value in metrics_dict.items():
                metric_name = f"{class_or_avg}_{metric}"
                mlflow.log_metric(metric_name, value)
        # Log the trained model as an artifact
        # Provide the first 5 rows of the training data as an example
        input_example = X_train.head(5)
        mlflow.sklearn.log_model(sk_model=model,artifact_path=model_name,input_example=input_example)
        # Get the run ID for later use
        print("\tFinished: experiment ", experiment_name, 
              " with run name ", run_name)
    return run_id, accuracy_score

Let's run our experiments while varying the hyperparameter

In [None]:
# Run experiments with different C parameters
#
results = {}
best_run_id = None
best_run_name = None
best_accuracy = 0.0
best_c_param = None
for run_name, C_param in experiments.items():
    run_id, accuracy_score = run_experiment(experiment_name, run_name, C_param)
    results[run_name] = {
        'run_id': run_id,
        'accuracy': accuracy_score
    }
    if best_accuracy < accuracy_score:
        best_accuracy = accuracy_score
        best_run_id = run_id
        best_run_name = run_name
        best_c_param = C_param
    elif best_accuracy == accuracy_score:
        print(f"Found another run with same accuracy: {accuracy_score:.4f}")
        print(f"run {best_run_name} vs run {run_name}")
        if best_c_param is None or C_param < best_c_param:
            # Update the best run if the C parameter is lower
            best_c_param = C_param
            best_run_id = run_id
            best_run_name = run_name
            print(f"\t Updating best run to {run_name} with C={C_param} and accuracy={accuracy_score:.4f}")
        else:
            print(f"\t Keeping best run {best_run_name} with smaller C to have simpler model")

print(f"Best run: {best_run_name} id: {best_run_id} with accuracy: {best_accuracy:.4f}")

Starting experiment  9002963k_Iris-Experiment  with run name  C-0-0-1
	MLflow Run ID : d874dbb1fb244edc86842bd30ecf2474
	Running experiment: 9002963k_Iris-Experiment, Run Name: C-0-0-1
	Model accuracy: 0.6667
	Finished: experiment  9002963k_Iris-Experiment  with run name  C-0-0-1
🏃 View run C-0-0-1 at: https://ap-southeast-1.experiments.sagemaker.aws/#/experiments/70/runs/d874dbb1fb244edc86842bd30ecf2474
🧪 View experiment at: https://ap-southeast-1.experiments.sagemaker.aws/#/experiments/70
Starting experiment  9002963k_Iris-Experiment  with run name  C-0-1-0
	MLflow Run ID : 754689b0eab84ff193db8fc280a2393f
	Running experiment: 9002963k_Iris-Experiment, Run Name: C-0-1-0
	Model accuracy: 0.8333
	Finished: experiment  9002963k_Iris-Experiment  with run name  C-0-1-0
🏃 View run C-0-1-0 at: https://ap-southeast-1.experiments.sagemaker.aws/#/experiments/70/runs/754689b0eab84ff193db8fc280a2393f
🧪 View experiment at: https://ap-southeast-1.experiments.sagemaker.aws/#/experiments/70
Starting

The best model is the one with the highest accuracy which we store in `best_run_id`.

The best model is from the run 'C-1-0-0' with accuracy '0.9667' 

There is another model with the same accuracy from the run but we prioritise smaller C

as it means stronger regularization ( C is inverse regularization )

This will help us to get simpler model and less chance of overfitting

### 1.7:  Model Registration

The best model is the one with run_id stored in `best_run_id`
Let us save it to S3

In [None]:
def mlflow_register_model(run_id):
    print(f"Saving model with run ID: {run_id} to S3")
    model_uri = f"runs:/{run_id}/{model_name}"
    print(f"\tRegistering model from URI: {model_uri}")
    # Register the model to the MLflow Model Registry
    reg_model = mlflow.register_model(
        model_uri=model_uri,
        name=model_name
    )
    print(f"\tModel '{model_name}' registered with version: {reg_model.version}")
    return reg_model


registered_model = mlflow_register_model(best_run_id)

Saving model with run ID: 2b3a5ec25a6e447f95075b8c929ae7bb to S3
	Registering model from URI: runs:/2b3a5ec25a6e447f95075b8c929ae7bb/iris-classifier-model


Registered model 'iris-classifier-model' already exists. Creating a new version of this model...
2025/07/24 03:15:18 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: iris-classifier-model, version 2


	Model 'iris-classifier-model' registered with version: 2


Created version '2' of model 'iris-classifier-model'.


Check Model has been Registered properly

In [None]:
from mlflow.tracking import MlflowClient
client = MlflowClient()
model_version = client.get_model_version(model_name, registered_model.version)
model_artifact_s3 = model_version.source
model_version_no = model_version.version
model_version_name = model_version.name
print("Model S3 Artifact URI:", model_artifact_s3)
print("Model Version No     :", model_version_no)
print("Model Version Name   :", model_version_name)


Model S3 Artifact URI: s3://sagemaker-iti112-common/mlflow-1/70/2b3a5ec25a6e447f95075b8c929ae7bb/artifacts/iris-classifier-model
Model Version No     : 2
Model Version Name   : iris-classifier-model


In [None]:
import tarfile
import os
model_s3_uri = None
def download_model_artifact(model_version_name,model_version_no, model_folder="/tmp/model"):
    """
    Download the model artifact from the MLflow Model Registry.
    """
    artifact_uri=f"models:/{model_version_name}/{model_version_no}"
    os.makedirs(model_folder, exist_ok=True)
    mlflow.artifacts.download_artifacts(
        artifact_uri=artifact_uri,
        dst_path=model_folder
    )
    print(f"Model artifact {artifact_uri} downloaded to: {model_folder}")

# Download the model artifact

def create_model_archive(model_folder="/tmp/model", model_tgz_path="/tmp/model.tar.gz"):
    """
    Create a tar.gz archive of the model folder.
    """
    with tarfile.open(model_tgz_path, "w:gz") as tar:
        tar.add(model_folder, arcname=".")
    print(f"Model archive created at: {model_tgz_path}")

def upload_to_s3(local_file, model_version_name, model_version_no):
    """
    Upload a local file to an S3 bucket.
    """
    s3_key = f"{base_folder}/models/{model_version_name}-v{model_version_no}/model.tar.gz"
    print(f"Uploading {local_file} to s3 with key {s3_key}")
    model_s3_uri = None
    try:
        bucket = sagemaker_session.default_bucket() 
        s3_client.upload_file(local_file, bucket, s3_key)
        model_s3_uri = f"s3://{bucket}/{s3_key}"
    except Exception as e:
        print(f"Error uploading to S3: {e}")
    # minimize traceback in the output as we are not interested in the details
    if not model_s3_uri:
        raise Exception("Failed to upload model to S3.")
    print(f"File {local_file} uploaded to {model_s3_uri}")
    return model_s3_uri

# Create a compressed archive of the model folder
model_folder="/tmp/model"
model_tgz_path="/tmp/model.tar.gz"
download_model_artifact(model_version_name, model_version_no, model_folder)
create_model_archive(model_folder, model_tgz_path)
model_s3_uri = upload_to_s3(model_tgz_path, model_version_name, model_version_no)
# Upload to S3
print("✅ Compressed model uploaded to:", model_s3_uri)


Downloading artifacts:   0%|          | 0/7 [00:00<?, ?it/s]

Model artifact models:/iris-classifier-model/2 downloaded to: /tmp/model
Model archive created at: /tmp/model.tar.gz
Uploading /tmp/model.tar.gz to s3 with key 9002963k@myaccount.nyp.edu.sg/models/iris-classifier-model-v2/model.tar.gz
File /tmp/model.tar.gz uploaded to s3://sagemaker-iti112-common/9002963k@myaccount.nyp.edu.sg/models/iris-classifier-model-v2/model.tar.gz
✅ Compressed model uploaded to: s3://sagemaker-iti112-common/9002963k@myaccount.nyp.edu.sg/models/iris-classifier-model-v2/model.tar.gz


# Part 2: Model Deployment With SageMaker Endpoints

We will use `model_s3_uri` from part 1 to load our best model from `best_run_id`

In [None]:
print(f" best_run_id: {best_run_id} model_s3_uri: {model_s3_uri}")

 best_run_id: c0a1cd2b43ce4cf69946e0573fc536d2 model_s3_uri: s3://sagemaker-iti112-common/9002963k@myaccount.nyp.edu.sg/models/iris-classifier-model-v1/model.tar.gz


Next we create entry point script to be hosted as SageMaker endpoint

In [None]:
%%writefile inference.py
# inference.py
import joblib
import os
import pandas as pd

def model_fn(model_dir):
    return joblib.load(os.path.join(model_dir, "model.pkl"))

def input_fn(request_body, content_type):
    if content_type == "application/json":
        return pd.DataFrame.from_dict(eval(request_body))  # simple eval for test input
    raise ValueError(f"Unsupported content type: {content_type}")

def predict_fn(input_data, model):
    return model.predict(input_data)

def output_fn(prediction, accept):
    return str(prediction.tolist())

Overwriting inference.py


Deploy our model to SageMaker endpoint

In [None]:
from sagemaker.sklearn.model import SKLearnModel

# Define the endpoint name
endpoint_name = f"sklearn-iris-classifier-endpoint-v{model_version_no}"

# Create the model object
sklearn_model = SKLearnModel(
    model_data=model_s3_uri,
    role=role,
    entry_point="inference.py", 
    framework_version="1.2-1",
    sagemaker_session=sagemaker_session
)

# Deploy to SageMaker
predictor = sklearn_model.deploy(
    instance_type="ml.t2.medium",
    initial_instance_count=1,
    endpoint_name=endpoint_name
)

print(f"Model deployed at endpoint: {endpoint_name}")

INFO:sagemaker:Creating model with name: sagemaker-scikit-learn-2025-07-24-04-06-05-128
INFO:sagemaker:Creating endpoint-config with name sklearn-iris-classifier-endpoint-v2
INFO:sagemaker:Creating endpoint with name sklearn-iris-classifier-endpoint-v2


-------!Model deployed at endpoint: sklearn-iris-classifier-endpoint-v2


The following sagemaker api call show the status of our endpoint and its arn

In [None]:
def endpoint_info(endpoint_name):
    arn = None
    name = None
    try:
        response = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
        status = response["EndpointStatus"]
        arn = response["EndpointArn"]
        name = response["EndpointName"]
        if status == "Failed":
            status = status + "-" + response["FailureReason"]
    except Exception as e:
        status = f"Exception {e}"
    print(f"Endpoint {endpoint_name} Status {status}")
    if arn:
        print(f"\t - ARN {arn}")
    if name:
        print(f"\t - Name {name}")

endpoint_info(endpoint_name)

Endpoint sklearn-iris-classifier-endpoint-v2 Status InService
	 - ARN arn:aws:sagemaker:ap-southeast-1:287730026636:endpoint/sklearn-iris-classifier-endpoint-v2
	 - Name sklearn-iris-classifier-endpoint-v2


Wait until `endpoint_info(endpoint_name)` show status is InService.

It will also log ARN of the endpoint e.g. `arn:aws:sagemaker:ap-southeast-1:287730026636:endpoint/sklearn-iris-classifier-endoint-v2`

Once the endpoint is `InService`, let's test it and then clean up our resources to avoid incurring costs.

In [None]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import StringDeserializer

# Create the predictor with JSON handling
predictor = Predictor(
    endpoint_name=endpoint_name,
    sagemaker_session=sagemaker_session,
    serializer=JSONSerializer(),
    deserializer=StringDeserializer()
)

# Select 5 rows from X_test
test_sample = X_test.iloc[:5]

# Convert to a dict (records format: {col_name: [v1, v2, ...]})
payload = test_sample.to_dict(orient="list")
print("Sending payload:\n", payload)

# Make prediction
response = predictor.predict(payload)

# Print result
print("\nReceived prediction:", response)

Sending payload:
 {'sepal length (cm)': [4.4, 6.1, 4.9, 5.0, 4.4], 'sepal width (cm)': [3.0, 3.0, 2.4, 2.3, 3.2], 'petal length (cm)': [1.3, 4.9, 3.3, 3.3, 1.3], 'petal width (cm)': [0.2, 1.8, 1.0, 1.0, 0.2]}

Received prediction: [0.0, 2.0, 1.0, 1.0, 0.0]


In [None]:
# Clean Up Resources
print(f"Deleting SageMaker endpoint: {endpoint_name}...")
predictor.delete_endpoint()
print("Endpoint deleted successfully.")

# To delete the MLflow Tracking Server, go to the SageMaker console,
# find your server, and delete it from there.

INFO:sagemaker:Deleting endpoint configuration with name: sklearn-iris-classifier-endpoint-v2


Deleting SageMaker endpoint: sklearn-iris-classifier-endpoint-v2...


INFO:sagemaker:Deleting endpoint with name: sklearn-iris-classifier-endpoint-v2


Endpoint deleted successfully.


## Configuration

In [None]:
# ----------------------
bucket_name = 'sagemaker-iti112-common'  # e.g., 'my-company-sagemaker-bucket'
base_folder = '9002963k@myaccount.nyp.edu.sg'      # e.g., 'users/my-name'
experiment_name = "Iris-Pipeline-Experiment"  # e.g., 'my-experiment'
model_name = "iris-classifier-pipeline-model"  # e.g., 'my-model'
model_package_group_name = "IrisClassifierPipelineModels"
pipeline_name = "IrisClassifierPipeline"
# ----------------------
tracking_server_name = "mlflow-server-1234567a"

## Install dependencies and Setup

In [None]:
# The SageMaker Studio environment comes with most of these pre-installed.
# This cell ensures all dependencies are present.
%pip install -q boto3 sagemaker mlflow "scikit-learn>=1.0" "pandas>=1.2" "sagemaker_mlflow==0.1.0" 

Note: you may need to restart the kernel to use updated packages.


## Session setup

Setup sagemaker and s3 session clients

In [None]:
import io
import os
import sagemaker
import boto3

import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
from sklearn.datasets import make_classification

sagemaker_client = None
s3_client = None
try:
    sagemaker_session = sagemaker.Session()
    sagemaker_client = boto3.client("sagemaker")
    s3_bucket = sagemaker_session.default_bucket()
    s3_client = boto3.client('s3')
    s3_data_key=f"{base_folder}/data/v1/data.csv"
    s3_data_path = f"s3://{bucket_name}/{s3_data_key}"
    s3_data_dir_uri = f"s3://{bucket_name}/{base_folder}/data/v1"
    print(f"DataSet will be stored inside: {s3_data_path}")
except Exception as e:
    print(f"Error initializing SageMaker session or S3 client: {e}")
    s3_data_path = None
# minimize traceback in the output as we are not interested in the details
if not sagemaker_client or not s3_client:
    raise Exception("Failed to initialize SageMaker session or S3 client.")


sagemaker.config INFO - Not applying SDK defaults from location: /Library/Application Support/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /Users/Ray/Library/Application Support/sagemaker/config.yaml
Error initializing SageMaker session or S3 client: Could not connect to the endpoint URL: "https://sts.ap-souhteast-1.amazonaws.com/"


Setup mlflow client

In [None]:
import mlflow
mlflow_tracking_server_arn = None
try:
    response = sagemaker_client.describe_mlflow_tracking_server(
        TrackingServerName=tracking_server_name
    )
    # ARN of MLflow Tracking Server
    mlflow_tracking_server_arn = response['TrackingServerArn']
    print(f"Found MLflow Tracking Server ARN: {mlflow_tracking_server_arn}")
except Exception as e:
    print(f"Could not find tracking server: {e}")
    mlflow_tracking_server_arn = None

# minimize traceback in the output as we are not interested in the details
if not mlflow_tracking_server_arn:
    raise Exception("Failed to find MLflow Tracking Server.")

# IAM role for SageMaker execution
role = sagemaker.get_execution_role()

print(f"SageMaker Role ARN: {role}")
print(f"MLflow Tracking Server ARN: {mlflow_tracking_server_arn}")

# Connect to the MLflow Tracking Server
# Set the MLflow tracking URI to managed server
if mlflow_tracking_server_arn:
    mlflow.set_tracking_uri(mlflow_tracking_server_arn)
    print("MLflow tracking URI set successfully.")

# Define an experiment name. If it doesn't exist, MLflow creates it.
mlflow.set_experiment(experiment_name)

print(f"MLflow tracking URI set to: {mlflow.get_tracking_uri()}")
print(f"MLflow experiment set to: '{experiment_name}'")

Found MLflow Tracking Server ARN: arn:aws:sagemaker:ap-southeast-1:287730026636:mlflow-tracking-server/mlflow-server-1234567a
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
SageMaker Role ARN: arn:aws:iam::287730026636:role/sagemaker-lab-user-9002963k@myaccount.nyp.edu.sg
MLflow Tracking Server ARN: arn:aws:sagemaker:ap-southeast-1:287730026636:mlflow-tracking-server/mlflow-server-1234567a
MLflow tracking URI set successfully.
MLflow tracking URI set to: arn:aws:sagemaker:ap-southeast-1:287730026636:mlflow-tracking-server/mlflow-server-1234567a
MLflow experiment set to: '9002963k_Iris-Pipeline-Experiment'


## Upload DataSet To S3

In [None]:
if s3_data_path is None:
    raise Exception("S3 data path is not set. Cannot proceed with dataset creation.")
df.to_csv(s3_data_path, index=False)
print(f"Dataset v1.0 created and uploaded to: {s3_data_path}")

Dataset v1.0 created and uploaded to: s3://sagemaker-iti112-common/9002963k@myaccount.nyp.edu.sg/assignment/data/v1/data.csv


Let us test whether we can load the dataset

In [None]:
try:
    df_loaded = pd.read_csv(s3_data_path)
    print("Successfully loaded dataset v1.0:")
    print(df_loaded.head(1))
except Exception as e:
    print(f"An error occurred: {e}")
    print("\nPlease double-check that your bucket and folder names are correct in Step 1.")

Successfully loaded dataset v1.0:
   sepal length (cm)  sepal width (cm)  petal length (cm)  petal width (cm)  \
0                5.1               3.5                1.4               0.2   

   target  
0       0  


## Create the SageMaker Pipeline

### Requirements file

In [None]:
%%writefile requirements.txt
boto3==1.28.57
botocore==1.31.85

mlflow
sagemaker-mlflow
scikit-learn
pandas
joblib

Overwriting requirements.txt


### Preprocessing script

We already create our preprocessing script during manual setup

### Training script

In [None]:
%%writefile train.py

import sys
import subprocess

# # Ensure MLflow is installed
try:
    import mlflow
    import sagemaker_mlflow
except ImportError:
    print("Installing MLflow...")
    subprocess.check_call([sys.executable, "-m", "pip", "install",  "boto3==1.37.1", "botocore==1.37.1", "s3transfer", "mlflow==2.22.0", "sagemaker-mlflow==0.1.0"])
    import mlflow
    import sagemaker_mlflow
    
# import mlflow
# import sagemaker_mlflow
import mlflow.sklearn
import os
import argparse
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
import joblib
import glob

parser = argparse.ArgumentParser()
parser.add_argument("--tracking_server_arn", type=str, required=True)
parser.add_argument("--experiment_name", type=str, default="Default")
parser.add_argument("--model_output_path", type=str, default="/opt/ml/model")
parser.add_argument("-C", "--C", type=float, default=0.5)
args, _ = parser.parse_known_args()

# Load training data
train_path = glob.glob("/opt/ml/input/data/train/*.csv")[0]
df = pd.read_csv(train_path)
X = df.drop("target", axis=1)
y = df["target"]

# Set up MLflow
mlflow.set_tracking_uri(args.tracking_server_arn)
mlflow.set_experiment(args.experiment_name)

with mlflow.start_run() as run:
    mlflow.log_param("C", args.C)
    model = LogisticRegression(C=args.C,random_state=42,max_iter=100, solver='liblinear')
    model.fit(X, y)
    acc = accuracy_score(y, model.predict(X))
    mlflow.log_metric("accuracy", acc)

    mlflow.sklearn.log_model(sk_model=model, artifact_path="model")

    os.makedirs(args.model_output_path, exist_ok=True)
    joblib.dump(model, os.path.join(args.model_output_path, "model.joblib"))
    with open(os.path.join(args.model_output_path, "run_id.txt"), "w") as f:
        f.write(run.info.run_id)

    print(f"Training complete. Accuracy: {acc:.4f}")
    print(f"MLflow Run ID: {run.info.run_id}")

Overwriting train.py


### Evaluation script

Here we create evaluation script

In [None]:
%%writefile evaluate.py
import argparse
import pandas as pd
from sklearn.metrics import accuracy_score
import joblib
import os
import json
import boto3
import tarfile

if __name__ == "__main__":
    # --- Parse Arguments ---
    parser = argparse.ArgumentParser()
    parser.add_argument("--model-path", type=str, required=True, help="Path to the directory containing the model.tar.gz file.")
    parser.add_argument("--test-path", type=str, required=True, help="Path to the directory containing test.csv.")
    parser.add_argument("--output-path", type=str, required=True, help="Path to save the evaluation.json report.")
    parser.add_argument("--model-package-group-name", type=str, required=True, help="Name of the SageMaker Model Package Group.")
    parser.add_argument("--region", type=str, required=True, help="The AWS region for creating the boto3 client.")
    args = parser.parse_args()

    # --- Extract and Load Model ---
    # SageMaker packages models in a .tar.gz file. We need to extract it first.
    model_archive_path = os.path.join(args.model_path, 'model.tar.gz')
    print(f"Extracting model from archive: {model_archive_path}")
    with tarfile.open(model_archive_path, "r:gz") as tar:
        tar.extractall(path=args.model_path)

    # Load the model using joblib
    model_file_path = os.path.join(args.model_path, "model.joblib")
    if not os.path.exists(model_file_path):
        raise FileNotFoundError(f"Model file 'model.joblib' not found after extraction in: {args.model_path}")
    
    print(f"Loading model from: {model_file_path}")
    model = joblib.load(model_file_path)

    # --- Prepare Data and Evaluate ---
    test_file_path = os.path.join(args.test_path, "test.csv")
    if not os.path.exists(test_file_path):
        raise FileNotFoundError(f"Test data not found: {test_file_path}")
    
    test_df = pd.read_csv(test_file_path)
    X_test = test_df.drop("target", axis=1)
    y_test = test_df["target"]
    
    print("Running predictions on the test dataset.")
    predictions = model.predict(X_test)
    accuracy = accuracy_score(y_test, predictions)
    report = {"accuracy": accuracy}
    print(f"Calculated accuracy: {accuracy:.4f}")

    # --- Check for Existing Baseline Model in SageMaker Model Registry ---
    print(f"Checking for baseline model in region: {args.region}")
    sagemaker_client = boto3.client("sagemaker", region_name=args.region)
    try:
        response = sagemaker_client.list_model_packages(
            ModelPackageGroupName=args.model_package_group_name,
            ModelApprovalStatus="Approved",
            SortBy="CreationTime",
            SortOrder="Descending",
            MaxResults=1,
        )
        # If the list is not empty, an approved model already exists
        report["baseline_exists"] = len(response["ModelPackageSummaryList"]) > 0
        if report["baseline_exists"]:
            print(f"An approved baseline model was found in '{args.model_package_group_name}'.")
        else:
             print(f"No approved baseline model was found in '{args.model_package_group_name}'.")

    except sagemaker_client.exceptions.ClientError as e:
        # If the ModelPackageGroup doesn't exist, there is no baseline
        if "ResourceNotFound" in str(e):
            report["baseline_exists"] = False
            print(f"Model Package Group '{args.model_package_group_name}' not found. Assuming no baseline exists.")
        else:
            raise

    # --- Write Final Report ---
    os.makedirs(args.output_path, exist_ok=True)
    report_path = os.path.join(args.output_path, "evaluation.json")
    with open(report_path, "w") as f:
        json.dump(report, f, indent=4)
        
    print(f"✅ Evaluation complete. Report written to: {report_path}")
    print("Evaluation Report:")
    print(json.dumps(report, indent=4))

Overwriting evaluate.py


## Pipeline


In [None]:
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, TrainingInput
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.properties import PropertyFile
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.conditions import ConditionNot
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionEquals
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.functions import Join
from sagemaker.workflow.parameters import ParameterFloat, ParameterString
from sagemaker.model_metrics import ModelMetrics, FileSource
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig

In [None]:
experiment_name_param = ParameterString(name="ExperimentName", default_value=experiment_name)
accuracy_threshold_param = ParameterFloat(name="AccuracyThreshold", default_value=0.85)
pipeline_parameters = [experiment_name_param, accuracy_threshold_param]

### Processing Step Definition

In [None]:
processing_instance_type = "ml.t3.medium" # cheapest $0.063/hour
preprocessor = ScriptProcessor(
    image_uri=sagemaker.image_uris.retrieve("sklearn", sagemaker_session.boto_region_name, "1.2-1"),
    command=[
        "python3",
    ],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="preprocess-data",
    role=role,
)

step_preprocess = ProcessingStep(
    name="PreprocessData",
    processor=preprocessor,
    inputs=[ProcessingInput(source=s3_data_dir_uri, destination="/opt/ml/processing/input")],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="preprocess.py",
)


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.


sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket


### Training Step Definition

In [None]:
training_instance_type = "ml.t3.large" # second cheapest $0.127/hour

# Training Step
sklearn_estimator = SKLearn(
    entry_point="train.py", 
    framework_version="1.2-1",
    instance_type=training_instance_type,
    role=role,
    hyperparameters={
        "tracking_server_arn": mlflow_tracking_server_arn,
        "experiment_name": experiment_name_param,
        "C": 1.0,
        "model_output_path": "/opt/ml/model",
    },
    py_version="py3",
    requirements="requirements.txt",
    depends_on=[step_preprocess]  # Explicitly depends on the preprocess
)

step_train = TrainingStep(
    name="TrainModel",
    estimator=sklearn_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        )
    },
)

sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket


### Evaluation Step Defintion 

In [None]:
evaluation_processor = ScriptProcessor(
    image_uri=sagemaker.image_uris.retrieve("sklearn", sagemaker_session.boto_region_name, "1.2-1"),
    command=['python3'],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="evaluate-model",
    role=role,
)

evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

step_eval = ProcessingStep(
    name="EvaluateModel",
    processor=evaluation_processor,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_preprocess.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation")],
    code="evaluate.py",  # SageMaker will handle uploading and running this script
    job_arguments=[  # Pass arguments here instead of in command
        "--model-path", "/opt/ml/processing/model",
        "--test-path", "/opt/ml/processing/test",
        "--output-path", "/opt/ml/processing/evaluation",
        "--model-package-group-name", model_package_group_name,
        "--region", "ap-southeast-1",
    ],
    property_files=[evaluation_report],
    depends_on=[step_train]  # Explicitly depends on the train process
)


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.


sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket


### Model Registration Step

The model registration follows the following logic 
```bash
if cond_no_registered 
   step_register_new
else if cond_accuracy 
   step_register_better_model
end if
```
where :
- cond_no_registered check whether existing baseline model exist
- cond_accuracy check whether new model has higher accuracy than existing baseline model 

In [None]:
# RegisterModel step (always defined, but executed conditionally)
model_metrics_report = ModelMetrics(
    model_statistics=FileSource(
        s3_uri=step_eval.properties.ProcessingOutputConfig.Outputs["evaluation"].S3Output.S3Uri,
        content_type="application/json"
    )
)

step_register_new = RegisterModel(
    name="RegisterNewModel",
    estimator=sklearn_estimator,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.t2.medium"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    model_metrics=model_metrics_report,
    approval_status="PendingManualApproval",
)

step_register_better_model = RegisterModel(
    name="RegisterBetterModel",
    estimator=sklearn_estimator,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.t2.medium"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    model_metrics=model_metrics_report,
    approval_status="PendingManualApproval",
)


# Conditions: check accuracy > threshold OR no model exists
cond_accuracy = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="accuracy"
    ),
    right=accuracy_threshold_param
)

cond_no_registered = ConditionEquals(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="baseline_exists" # Check the key added to the report
    ),
    right=False # Condition is TRUE if baseline_exists is False
)

# Outer step: Checks for existence of registered model first
step_cond_accuracy = ConditionStep(
    name="CheckAccuracy",
    conditions=[cond_accuracy],
    if_steps=[step_register_better_model], # Register model if accuracy is high
    else_steps=[],
)

step_cond_no_registered = ConditionStep(
    name="CheckIfModelExists",
    conditions=[cond_no_registered],
    if_steps=[step_register_new], # Register model if no baseline exists
    else_steps=[step_cond_accuracy], # Do nothing if a model exists and accuracy was low
)

### Pipeline Definition And Run

In [None]:
# Define steps in the pipeline
pipeline_steps = []
pipeline_steps.append(step_preprocess)
pipeline_steps.append(step_train)
pipeline_steps.append(step_eval)
pipeline_steps.append(step_cond_no_registered) 
# Define Pipeline
pipeline = Pipeline(
    name=pipeline_name,
    parameters=pipeline_parameters,
    steps=pipeline_steps
)
pipeline.upsert(role_arn=role)
execution = pipeline.start()

sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket




### Pipeline Status or  Cleanup

In [None]:
def get_pipeline_status()->str:
    try:
        response = sagemaker_client.describe_pipeline(
            PipelineName=pipeline_name
        )
        result= response["PipelineStatus"]
    except ClientError as e:
        if e.response['Error']['Code'] == 'ResourceNotFound':
            result= (f"Pipeline {pipeline_name} not found {e}")
        else:
            result= (f"Unknown error {e.response} {type(e)}")
    return result

print(get_pipeline_status())

Active


In [None]:
# uncomment this to remove pipeline
#pipeline.delete()
print(get_pipeline_status())

INFO:sagemaker.workflow.pipeline:If triggers have been setup for this target, they will become orphaned.You will need to clean them up manually via the CLI or EventBridge console.


Pipeline IrisClassifierPipeline not found An error occurred (ResourceNotFound) when calling the DescribePipeline operation: Pipeline 'arn:aws:sagemaker:ap-southeast-1:287730026636:pipeline/IrisClassifierPipeline' does not exist.
