# Setup Environment

In [None]:
import boto3
import sagemaker
import sagemaker.session

region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"AbaloneModelPackageGroupName"

# Step 1: Download the Dataset

In [None]:
!mkdir -p data
local_path = "data/abalone-dataset.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset.csv",
    local_path
)

base_uri = f"s3://{default_bucket}/abalone"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)
print(input_data_uri)

In [None]:
local_path = "data/abalone-dataset-batch"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset-batch",
    local_path
)

base_uri = f"s3://{default_bucket}/abalone"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)
print(batch_data_uri)

# Step 2: Define Pipeline Parameters

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)

## TruEra Params

In [None]:
CONNECTION_STRING = <INSERT YOUR DEPLOYMENT URL>

In [None]:
from random import randrange
from datetime import date, datetime
version=randrange(1000)

PROJECT_NAME = "SM TruEra Pipeline {} {}".format(date.today(), version)
DATA_COLLECTION_NAME = "Abalone"

In [None]:
with open("truera_credentials.json", "r") as f:
    _ = json.load(f)
    TRUERA_CLIENT_ID = _["client ID"]
    TRUERA_SECRET = _["client secret"]

client_id = ParameterString(name="TrueraClientID", default_value=TRUERA_CLIENT_ID)
client_secret = ParameterString(name="TrueraSecret", default_value=TRUERA_SECRET)
connection_string = ParameterString(name="TrueraConnectionString", default_value=CONNECTION_STRING)
project_name = ParameterString(name="TrueraProjectName", default_value=PROJECT_NAME)
data_collection_name = ParameterString(name="TrueraDataCollectionName", default_value=DATA_COLLECTION_NAME)

Create env for script vars

In [None]:
env = {
    "TRUERA_CLIENT_ID": client_id,
    "TRUERA_SECRET": client_secret,
    "TRUERA_CONNECTION_STRING": connection_string,
    "TRUERA_PROJECT_NAME": project_name,
    "TRUERA_DATA_COLLECTION_NAME": data_collection_name
}

## Step 2.1: Configure TruEra SDK
* also configure Pipeline Params for later use
* NOTE: basic username and password used here, as test server used for development purposes does not have service accounts enabled, and TOKEN auth is not supported because len TOKEN > allowable len of SM Pipeline Parameters. Code is left in place to convert back to ServiceAccountAuthentication

In [None]:
import json
import os

from truera.client.truera_workspace import TrueraWorkspace
from truera.client.truera_authentication import ServiceAccountAuthentication
from truera.client.truera_authentication import TokenAuthentication
from truera.client.truera_authentication import BasicAuthentication

from truera.client.ingestion import ColumnSpec, ModelOutputContext
from truera.client.public.communicator.http_communicator import AlreadyExistsError

## Step 2.2: Create TruEra project and data collection

In [None]:
auth = ServiceAccountAuthentication(
    client_id = TRUERA_CLIENT_ID,
    client_secret = TRUERA_CLIENT_SECRET
)

In [None]:
tru = TrueraWorkspace(CONNECTION_STRING, auth)
try:
    tru.add_project(PROJECT_NAME, score_type="regression")
except AlreadyExistsError as e:
    tru.delete_project(PROJECT_NAME)
    tru.add_project(PROJECT_NAME, score_type="regression")
tru.add_data_collection(DATA_COLLECTION_NAME)

In [None]:
#change this param to "truera-qii" if it is available
tru.set_influence_type("shap")

# Step 3: Define a Processing Step for Feature Engineering
* This processing step also loads in the processed data into your TruEra Project

In [None]:
#!mkdir -p abalone_truera

In [None]:
%%writefile abalone_truera/preprocessing.py
import argparse
import os
import requests
import tempfile
import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder

import sys, os, subprocess
def install(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])

#install("numpy>=1.21")
install("truera")
#install("numpy==1.24.1")
#install("protobuf==3.20.2")

from truera.client.truera_workspace import TrueraWorkspace
#from truera.client.truera_authentication import TokenAuthentication
#from truera.client.truera_authentication import BasicAuthentication
from truera.client.truera_authentication import ServiceAccountAuthentication
from truera.client.ingestion import ColumnSpec, ModelOutputContext

# Because this is a headerless CSV file, specify the column names here.
feature_columns_names = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
]
label_column = "rings"

feature_columns_dtype = {
    "sex": str,
    "length": np.float64,
    "diameter": np.float64,
    "height": np.float64,
    "whole_weight": np.float64,
    "shucked_weight": np.float64,
    "viscera_weight": np.float64,
    "shell_weight": np.float64
}
label_column_dtype = {"rings": np.float64}


def merge_two_dicts(x, y):
    z = x.copy()
    z.update(y)
    return z

##Pre-processing + TruEra ingestion
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    df = pd.read_csv(
        f"{base_dir}/input/abalone-dataset.csv",
        header=None, 
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)
    )
    numeric_features = list(feature_columns_names)
    numeric_features.remove("sex")
    numeric_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler())
        ]
    )

    categorical_features = ["sex"]
    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("onehot", OneHotEncoder(handle_unknown="ignore"))
        ]
    )

    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features)
        ]
    )
    
    y = df.pop("rings")
    X_pre = preprocess.fit_transform(df)

    #get post transform column names, as list
    ohe_feat = preprocess.transformers_[1][1]\
        ['onehot'].get_feature_names_out()
    cat_post_list = ohe_feat.tolist()
    input_columns = ['rings']+numeric_features+cat_post_list

    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)

    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])

    train_df = pd.DataFrame(train)
    train_df.columns = [str(c) for c in input_columns]
    train_df.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation_df = pd.DataFrame(validation)
    validation_df.columns = [str(c) for c in input_columns]
    validation_df.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test_df = pd.DataFrame(test)
    test_df.columns = [str(c) for c in input_columns]
    test_df.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
    
    ##########################
    ## Truera code ##
    ##########################
    
    auth = ServiceAccountAuthentication(
    client_id = os.environ['TRUERA_CLIENT_ID'],
    client_secret = os.environ['TRUERA_SECRET'])
    
    tru = TrueraWorkspace(os.environ['TRUERA_CONNECTION_STRING'], auth,verify_cert=False)
    tru.set_project(os.environ['TRUERA_PROJECT_NAME'])
    tru.set_data_collection(os.environ['TRUERA_DATA_COLLECTION_NAME'])
    
    ids = 'index'
    
    #we are not using a feature map here, so model-readable columns are read in as "pre" data
    print(train_df.shape)
    tru.add_data(data=train_df.reset_index(), 
                 data_split_name='train',
                 column_spec=ColumnSpec(id_col_name=ids,
                                        pre_data_col_names=input_columns[1:],
                                        label_col_names=input_columns[0]))
    print(validation_df.shape)
    tru.add_data(data=validation_df.reset_index(), 
                 data_split_name='validation',
                 column_spec=ColumnSpec(id_col_name=ids,
                                        pre_data_col_names=input_columns[1:],
                                        label_col_names=input_columns[0]))

    print(test_df.shape)
    tru.add_data(data=test_df.reset_index(), 
                 data_split_name='test',
                 column_spec=ColumnSpec(id_col_name=ids,
                                        pre_data_col_names=input_columns[1:],
                                        label_col_names=input_columns[0]))

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor

# framework_version = "0.23-1"
framework_version = "1.2-1"


sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    role=role,
    env=env # Truera env variables
)

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
    

step_process = ProcessingStep(
    name="TrueraProcessAndIngestData",  # Different name
    processor=sklearn_processor,
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
    ],
    code="abalone_truera/preprocessing.py", # Truera script
)

# Step 4: Define a Training step

In [None]:
model_path = f"s3://{default_bucket}/AbaloneTrain"

In [None]:
from sagemaker.estimator import Estimator

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    #version="1.0-1", # Note a version change
    version="1.7-1",
    py_version="py3",
    instance_type="ml.m5.xlarge"
)

xgb_train = Estimator(
    image_uri=image_uri,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=model_path,
    role=role,
)

xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7
    # silent=0
)

In [None]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

step_train = TrainingStep(
    name="AbaloneTrain",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
)

# Step 5: Define a Processing Step for Model Evaluation
## This evaluation step includes TruEra SDK methods to do the following:
1. add the python model object to the project
2. For each split added to the data collection in the Processing step,
  a. compute and upload predictions
  b. compute and upload feature value influences
  c. compute and upload feature residual (error) influences

In [None]:
%%writefile abalone_truera/evaluation.py

import sys, os, subprocess
def install(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])

install("numpy>=1.21")
install("truera")
#install("numpy==1.24.1")
#install("protobuf==3.20.2")

#import shap
from truera.client.truera_workspace import TrueraWorkspace
from truera.client.truera_authentication import TokenAuthentication
from truera.client.truera_authentication import BasicAuthentication
from truera.client.truera_authentication import ServiceAccountAuthentication
from truera.client.ingestion import ColumnSpec, ModelOutputContext

import json
import pathlib
import tarfile
import numpy as np
import pandas as pd
import xgboost

import os

from sklearn.metrics import mean_squared_error

if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    
    cmd = 'ls -lar'
    os.system(cmd)
    model = xgboost.Booster()
    model.load_model('xgboost-model')

    ##########################
    ## Truera code ##
    ##########################

    auth = ServiceAccountAuthentication(
    client_id = os.environ['TRUERA_CLIENT_ID'],
    client_secret = os.environ['TRUERA_SECRET'])
    
    tru = TrueraWorkspace(os.environ['TRUERA_CONNECTION_STRING'], auth, verify_cert=False)
    tru.set_project(os.environ['TRUERA_PROJECT_NAME'])
    tru.set_data_collection(os.environ['TRUERA_DATA_COLLECTION_NAME'])
    tru.set_model_execution("local")
    
    #add model object
    tru.add_python_model("xgb_abalone_regression", model)
    
    for split in tru.get_data_splits():
        print('loading data split {}'.format(split))
        tru.set_data_split(split)
        temp = tru.get_xs()
        DMtemp = xgboost.DMatrix(temp)
        
        print('generating predictions for split {}'.format(split))
        preds = model.predict(DMtemp)
        preds_df = pd.DataFrame(preds, index=temp.index, columns=['rings']).reset_index()

        print('adding predictions to data split {}'.format(split))
        tru.add_data(
        data=preds_df,
        data_split_name=split,
        column_spec=ColumnSpec(
            id_col_name="index",
            prediction_col_names='rings'),

        model_output_context=ModelOutputContext(
            model_name=tru.get_models()[0],
            score_type='regression')
        )

        print('generating and uploading feature influences for split {}'.format(split))
        tru.compute_feature_influences()
        print('generating and uploading error influences for split {}'.format(split))
        tru.compute_error_influences()

    #Example Tests
    tru.tester.add_performance_test(test_name="MSE test",
                                    data_split_names=tru.get_data_splits(), 
                                    metric="MSE", 
                                    warn_if_greater_than = 3,
                                    fail_if_greater_than = 6)

    tru.tester.add_performance_test(test_name="MAPE test",
                                    data_split_names=tru.get_data_splits(), 
                                    metric="MAPE", 
                                    warn_if_greater_than = 8,
                                    fail_if_greater_than = 16)

    tru.tester.add_performance_test(test_name="MAE test",
                                    data_split_names=tru.get_data_splits(), 
                                    metric="MAE", 
                                    warn_if_greater_than = 2,
                                    fail_if_greater_than = 4)

    metric_dict = tru.tester.get_model_test_results().as_dict()["Performance Tests"]
    metric_df = pd.DataFrame(metric_dict["Rows"], columns=metric_dict["Column Names"])

    report_dict = {
        "regression_metrics": {}
    }

    num_tests = 0
    num_passed = 0
    for metric in metric_df["Metric"].unique():
        num_tests += 1
        row = metric_df[(metric_df["Metric"] == metric) & (metric_df["Split"] == "test")].iloc[0]
        score = row['Score']
        print(f"Metric: {metric} \t Value: {score} \t Outcome: {row['Outcome']}")

        if row["Outcome"] == "PASSED":
            num_passed += 1

        report_dict["regression_metrics"][metric] = {"value": score}

    report_dict["test_metrics"] = {
        "total_tests": num_tests,
        "num_passing": num_passed,
        "passing_percentage": num_passed / num_tests,
    }
    report_dict["test_results"] = metric_dict
    print(f"Tests passing: {num_passed}/{num_tests}")

    ##############################
    ## END Truera-specific code ##
    ##############################

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

In [None]:
processor_image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    #version="1.7-1", # Note a version change
    version="1.7-1",
    py_version="py3",
    instance_type="ml.m5.2xlarge"
)

In [None]:
from sagemaker.processing import ScriptProcessor

script_eval = ScriptProcessor(
    image_uri=processor_image_uri,
    command=["python3"],
    instance_type="ml.m5.2xlarge",
    instance_count=1,
    base_job_name="script-abalone-eval",
    role=role,
    env=env # Truera env variables
)

In [None]:
#print(step_train.inputs['train'])
dir(step_train.properties.RoleArn)
step_train.properties.RoleArn
#print(dir(step_train.properties.ModelArtifacts.S3ModelArtifacts))
#step_train.properties.ModelArtifacts.S3ModelArtifacts
#step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri

In [None]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)
step_eval = ProcessingStep(
    name="TrueraEval",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="abalone_truera/evaluation.py",
    property_files=[evaluation_report],
)

# Step 6: Define a CreateModelStep for Batch Transformation

In [None]:
from sagemaker.model import Model

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role,
)

In [None]:
from sagemaker.inputs import CreateModelInput


inputs = CreateModelInput(
    instance_type="ml.m5.large",
    accelerator_type="ml.eia1.medium",
)

In [None]:
from sagemaker.workflow.steps import CreateModelStep


step_create_model = CreateModelStep(
    name="AbaloneCreateModel",
    model=model,
    inputs=inputs,
)

# Step 7: Define a TransformStep to Perform Batch Transformation

In [None]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/AbaloneTransform"
)

In [None]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="AbaloneTransform",
    transformer=transformer,
    inputs=TransformInput(data=batch_data)
)

# Step 8: Define a RegisterModel Step to Create a Model Package

In [None]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.workflow.step_collections import RegisterModel


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)
step_register = RegisterModel(
    name="AbaloneRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)

# Step 9: Define a Condition Step to Verify Model Accuracy

In [None]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo, ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.MSE.value"
    ),
    right=6.0
)


In [None]:
#################
## Truera Code ##
#################

cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="test_metrics.num_passing",
    ),
    right=3
)

In [None]:
step_cond = ConditionStep(
    name="TrueraQualityCheck",
    conditions=[cond_lte, cond_gte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[], 
)

# Step 10: Create a pipeline

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"AbaloneWithTrueraPipeline-v2"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        model_approval_status,
        input_data,
        batch_data,
        # Truera Parameters
        #username,
        #password,
        client_id,
        client_secret,
        connection_string,
        project_name,
        data_collection_name
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

In [None]:
import json

json.loads(pipeline.definition())

# Final Step: Executing the Pipeline

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start()

# Cleanup