# Orchestrating Jobs, Model Registration, and Continuous Deployment with Amazon SageMaker

Amazon SageMaker offers Machine Learning application developers and Machine Learning operations engineers the ability to orchestrate SageMaker jobs and author reproducible Machine Learning pipelines, deploy custom-build models for inference in real-time with low latency or offline inferences with Batch Transform, and track lineage of artifacts. You can institute sound operational practices in deploying and monitoring production workflows, deployment of model artifacts, and track artifact lineage through a simple interface, adhering to safety and best-practice paradigmsfor Machine Learning application development.

The SageMaker Pipelines service supports a SageMaker Machine Learning Pipeline Domain Specific Language (DSL), which is a declarative Json specification. This DSL defines a Directed Acyclic Graph (DAG) of pipeline parameters and SageMaker job steps. The SageMaker Python Software Developer Kit (SDK) streamlines the generation of the pipeline DSL using constructs that are already familiar to engineers and scientists alike.

The SageMaker Model Registry is where trained models are stored, versioned, and managed. Data Scientists and Machine Learning Engineers can compare model versions, approve models for deployment, and deploy models from different AWS accounts, all from a single Model Registry. SageMaker enables customers to follow the best practices with ML Ops and getting started right. Customers are able to standup a full ML Ops end-to-end system with a single API call.

## SageMaker Pipelines

Amazon SageMaker Pipelines support the following activites:

* Pipelines - A Directed Acyclic Graph of steps and conditions to orchestrate SageMaker jobs and resource creation.
* Processing Job steps - A simplified, managed experience on SageMaker to run data processing workloads, such as feature engineering, data validation, model evaluation, and model interpretation.
* Training Job steps - An iterative process that teaches a model to make predictions by presenting examples from a training dataset.
* Conditional step execution - Provides conditional execution of branches in a pipeline.
* Registering Models - Creates a model package resource in the Model Registry that can be used to create deployable models in Amazon SageMaker.
* Creating Model steps - Create a model for use in transform steps or later publication as an endpoint.
* Parameterized Pipeline executions - Allows pipeline executions to vary by supplied parameters.
* Transform Job steps - A batch transform to preprocess datasets to remove noise or bias that interferes with training or inference from your dataset, get inferences from large datasets, and run inference when you don't need a persistent endpoint.

## Layout of the SageMaker ModelBuild Project Template

The template provides a starting point for bringing your SageMaker Pipeline development to production.

```
|-- codebuild-buildspec.yml
|-- CONTRIBUTING.md
|-- pipelines
|   |-- abalone
|   |   |-- evaluate.py
|   |   |-- __init__.py
|   |   |-- pipeline.py
|   |   `-- preprocess.py
|   |-- get_pipeline_definition.py
|   |-- __init__.py
|   |-- run_pipeline.py
|   |-- _utils.py
|   `-- __version__.py
|-- README.md
|-- sagemaker-pipelines-project.ipynb
|-- setup.cfg
|-- setup.py
|-- tests
|   `-- test_pipelines.py
`-- tox.ini
```

A description of some of the artifacts is provided below:
<br/><br/>
Your codebuild execution instructions:
```
|-- codebuild-buildspec.yml
```
<br/><br/>
Your pipeline artifacts, which includes a pipeline module defining the required `get_pipeline` method that returns an instance of a SageMaker pipeline, a preprocessing script that is used in feature engineering, and a model evaluation script to measure the Mean Squared Error of the model that's trained by the pipeline:

```
|-- pipelines
|   |-- abalone
|   |   |-- evaluate.py
|   |   |-- __init__.py
|   |   |-- pipeline.py
|   |   `-- preprocess.py

```
<br/><br/>
Utility modules for getting pipeline definition jsons and running pipelines:

```
|-- pipelines
|   |-- get_pipeline_definition.py
|   |-- __init__.py
|   |-- run_pipeline.py
|   |-- _utils.py
|   `-- __version__.py
```
<br/><br/>
Python package artifacts:
```
|-- setup.cfg
|-- setup.py
```
<br/><br/>
A stubbed testing module for testing your pipeline as you develop:
```
|-- tests
|   `-- test_pipelines.py
```
<br/><br/>
The `tox` testing framework configuration:
```
`-- tox.ini
```

### A SageMaker Pipeline

The pipeline that we create follows a typical Machine Learning Application pattern of pre-processing, training, evaluation, and conditional model registration and publication, if the quality of the model is sufficient.

![A typical ML Application pipeline](img/pipeline-full.png)

### Getting some constants

We get some constants from the local execution environment.

### All of thepipeline is embedded here with steps

In [1]:
import boto3
import sagemaker


region = boto3.Session().region_name
role = sagemaker.get_execution_role()
sm_session = sagemaker.session.Session(default_bucket="sagemaker-grewaltempl")
default_bucket = sm_session.default_bucket()

# Change these to reflect your project/business name or if you want to separate ModelPackageGroup/Pipeline from the rest of your team
model_package_group_name = f"TweetsModelPackageGroup-Example1"
pipeline_name = f"TweetsPipeline-Example1"

In [2]:
import os

import boto3
import sagemaker
import sagemaker.session

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor,
    FrameworkProcessor
)
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
)
from sagemaker.workflow.step_collections import RegisterModel

from sagemaker.workflow.conditions import (
    ConditionGreaterThanOrEqualTo,
)

In [3]:
## -- Common constants
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.properties import PropertyFile

from sagemaker.processing import ProcessingInput, ProcessingOutput
import os

print(f"Using:role={role}:")
print(f"using SageMaker session={sm_session}:")

BASE_JOB_PREFIX="smjobs",  # Choose any name
BASE_DIR = os.path.dirname(os.path.realpath('__file__'))
BASE_DIR = BASE_DIR + "/pipelines/tweets" # -- to simulate this jupyter file running where the pipelines will run

# Parameters for pipeline execution
processing_instance_count = ParameterInteger(
        name="ProcessingInstanceCount", default_value=1
)
processing_instance_type = ParameterString(
        name="ProcessingInstanceType", default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
        name="TrainingInstanceType", default_value="ml.m5.xlarge"
)
model_approval_status = ParameterString(
        name="ModelApprovalStatus",
        default_value="PendingManualApproval",  # ModelApprovalStatus can be set to a default of "Approved" if you don't want manual approval.
)
input_data = ParameterString(
        name="InputDataUrl",
        default_value=f"s3://{default_bucket}/data/finance/combined_tweets.csvv",  # Change this to point to the s3 location of your raw input data.
)
print(f"pipeline:get_pipeline::processor:")
# Cache Pipeline steps to reduce execution time on subsequent executions

from sagemaker.workflow.steps import CacheConfig
cache_config = CacheConfig(enable_caching=True, expire_after="1d")
print(f"pipeline::get_pipeline:cache:config:enabled:")


pipeline_name="SageMakerTweetsPipeline"  # You can find your pipeline name in the Studio UI (project -> Pipelines -> name)
base_job_prefix="tweets-pipeline" # Choose any name

print(f"Pipeline_name={pipeline_name}:")
print(f"Pipeline:base:job:prefix={base_job_prefix}:")

Using:role=arn:aws:iam::034150676293:role/service-role/AmazonSageMaker-ExecutionRole-20220322T185187:
using SageMaker session=<sagemaker.session.Session object at 0x7f436550c950>:
pipeline:get_pipeline::processor:
pipeline::get_pipeline:cache:config:enabled:
Pipeline_name=SageMakerTweetsPipeline:
Pipeline:base:job:prefix=tweets-pipeline:


In [4]:
type(base_job_prefix)

str

### PRE PROC SCRIPTS AND STEPS

In [5]:
%%writefile ./pipelines/tweets/preprocess_tweets.py
"""Feature engineers the Tweets churn dataset."""
import argparse
import logging
import os
import pathlib
import requests
import tempfile

import boto3
import numpy as np
import pandas as pd

import json

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder

_logger = logging.getLogger()
_logger.setLevel(logging.INFO)
_logger.addHandler(logging.StreamHandler())

try:
    _logger.info(f"Pandas:version:{pd.__version__}")
    _logger.info(f"Numpy:version:{np.__version__}")
    import xgboost as xgb
    _logger.info(f"XGBoost:version:{xgb.__version__}")
except:
    pass


# Since we get a headerless CSV file we specify the column names here.

X_columns_names =  [
    'tweet_id', 
    'writer', 
    'post_date', 
    'body', 
    'comment_num', 
    'retweet_num',
    'like_num', 
    'ticker_symbol'
]

Y_column = "high_price"


X_columns_dtype = {
    'tweet_id': np.float64, 
    'writer': str, 
    'post_date': np.int64, 
    'body': str, 
    'comment_num': np.int64, 
    'retweet_num': np.int64, 
    'like_num': np.int64, 
    'ticker_symbol': str
}
Y_column_dtype = {Y_column: np.bool} #{Y_column: np.float64}


def merge_two_dicts(x, y):
    """Merges two dicts, returning a new copy."""
    z = x.copy()
    z.update(y)
    return z

def listLocalDirectory(dirPath="."):
    for path, dnames, fnames in os.walk(dirPath):
        _logger.info(f"List::path={path}::dirNames={dnames}::fileNames={fnames}::")

def textToVectors(text, vectorizer):
    vector = vectorizer.transform([text])
    return sum(vector.toarray()[0])

def vectorizerText(textArray):
    from sklearn.feature_extraction.text import TfidfVectorizer
    # create the transform
    vectorizer = TfidfVectorizer()
    # tokenize and build vocab
    vectorizer.fit(textArray)
    return vectorizer
       
if __name__ == "__main__":
    _logger.info("Starting preprocessing.")
    parser = argparse.ArgumentParser()
    parser.add_argument("--input-data", type=str, required=True)
    parser.add_argument("--data-size", type=int, default=100)
    args = parser.parse_args()
    input_data = args.input_data
    data_size = args.data_size
    _logger.info(f"Data size={data_size}::")
    
    
    BASE_DIR = "/opt/ml/processing"
    pathlib.Path(f"{BASE_DIR}/data").mkdir(parents=True, exist_ok=True)
    _logger.info(f"Download:data:from:s3:to:local:location:={BASE_DIR}/data::")
    
    print(input_data)
    _logger.info(f"Input:data:={input_data}::")
    
    bucket = input_data.split("/")[2]
    key = "/".join(input_data.split("/")[3:])

    _logger.info(f"TEST:TEST:Downloading data from bucket: {bucket}, key: {key}:Willdownload to localfile:name as raw-data.csv")
    fn = f"{BASE_DIR}/data/raw-data.csv"
    try:
        s3 = boto3.resource("s3")
        s3.Bucket(bucket).download_file(key, fn)
    except:
        _logger.error("TEST:TEST:error:in:downloading:from:s3:ignore")

    

    #fn = os.path.join("/opt/ml/processing/input", "combined_tweets.csv")
    
    onlyFiles = [f for f in os.listdir("/opt/ml/processing/input") if os.path.isfile(os.path.join("/opt/ml/processing/input", f))]
    _logger.info(f"Data Downloaded::Now Reading downloaded data.:dir:/opt/ml/processing/input::And:FILES:ARE::{onlyFiles}")
    
    fn = os.path.join("/opt/ml/processing/input", onlyFiles[0])
    _logger.info(f"Data Downloaded::Now Reading downloaded data.:dir:/opt/ml/processing/input:::from:location={fn}::")
    
    # read in csv
    combinedTweetsDF = pd.read_csv(fn)
    combinedTweetsDF = combinedTweetsDF.dropna()
    combinedTweetsDF = combinedTweetsDF.drop_duplicates() # -- this drops duplicates

    # -- FOR NOW CREATE just a 10 ROW DATA SET for FASTER processing
    combinedTweetsDF =  combinedTweetsDF.iloc[:data_size,:]
    # -- END 10 row data set creation
    _logger.info(f"After:ILOC:shape={combinedTweetsDF.shape}:")
    
    # Create one binary classification target column
    combinedTweetsDF['body_length'] = combinedTweetsDF['body'].apply( lambda x: len(x))
    combinedTweetsDF['Y_label'] = combinedTweetsDF['body_length'].apply( lambda x: 1 if x > 115 else 0)
    #combinedTweetsDF['Y_label'] = combinedTweetsDF.Y_label.apply(lambda x: 1 if x else 0) # -- convert to 1 and 0
    _logger.info(f"After:transformation:shape={combinedTweetsDF.shape}:columns={combinedTweetsDF.columns}::describe={combinedTweetsDF.describe()}::")
     
    # Convert categorical variables into dummy/indicator variables.
    #categorical_cols=['writer', 'ticker_symbol']
    #categorical_cols_dict ={'writer':'wr', 'ticker_symbol' :'ticker' }
    #df_multi = pd.get_dummies(combinedTweetsDF, columns=categorical_cols, prefix=categorical_cols_dict, drop_first=True)
    df_multi = combinedTweetsDF.reindex(columns=(['Y_label'] + list([a for a in combinedTweetsDF.columns if a != 'Y_label']) ))
    _logger.info(f"df_multi:BEFORE:DROP:BODY:first 10 cols = {df_multi.columns[:10]}::")
    
    # -- vectorize the text 
    #df_multi = df_multi[1:] # remove the header row 
    vectorizer = vectorizerText(df_multi.body)
    df_multi['vec_text'] = df_multi.body.apply(lambda x: textToVectors(x,vectorizer ))
    df_multi = df_multi.drop(['body'], axis=1)
    _logger.info(f"After:Vectorization:columns={len(df_multi.columns)}::describe={df_multi.describe()}::")
    _logger.info(f"After:Vectorization:shape of data set={df_multi.shape}::len={len(df_multi)}::")
    
    # -- vectorize the Writer and ticker symbol

    vectorizer = vectorizerText(df_multi.writer.dropna())
    df_multi['writer_text'] = df_multi.writer.apply(lambda x: textToVectors(x,vectorizer ))
    df_multi = df_multi.drop(['writer'], axis=1)

    vectorizer = vectorizerText(df_multi.ticker_symbol.dropna())
    df_multi['ticker_symbol_text'] = df_multi.ticker_symbol.apply(lambda x: textToVectors(x,vectorizer ))
    df_multi = df_multi.drop(['ticker_symbol'], axis=1)

    _logger.info(f"After:FULL:Vectorization:columns={len(df_multi.columns)}::describe={df_multi.describe()}::")
    _logger.info(f"After:FULL:Vectorization:shape of data set={df_multi.shape}::len={len(df_multi)}::")


    
    # Split the data
    train_data, val_data, test_data = np.split(
        df_multi.sample(frac=1, random_state=1729),
        [int(0.7 * len(df_multi)), int(0.9 * len(df_multi))],
    )
    _logger.info(f"Going to write it to {BASE_DIR}/train and {BASE_DIR}/test and {BASE_DIR}/val")
    _logger.info(f"train_data:len={len(train_data)}::  val_data:len={len(val_data)}::  test_data:len={len(test_data)}::")
    pd.DataFrame(train_data).to_csv(
        f"{BASE_DIR}/train/train.csv", header=False, index=False
    )
    pd.DataFrame(val_data).to_csv(
        f"{BASE_DIR}/val/val.csv", header=False, index=False
    )
    pd.DataFrame(test_data).to_csv(
        f"{BASE_DIR}/test/test.csv", header=False, index=False
    )
    
    report_dict = {
        "regression_metrics": {
            "mse": {
                "value": 11.1,
                "standard_deviation": 89.2
            },
        },
    }

    output_dir = "/opt/ml/processing/evalproperty"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    _logger.info("Evaluation:Writing out evaluation report with mse: %f", 11.1)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

    _logger.info("Evaluation: All Done !!")    
    
    
    _logger.info("All Done !! written out !!")
    
    # ----------------   OLD TEMPLATE CODE --------------------#


Overwriting ./pipelines/tweets/preprocess_tweets.py


In [6]:
# Processing step for feature engineering
sklearn_processor = SKLearnProcessor(
        framework_version="0.23-1",
        instance_type=processing_instance_type,
        instance_count=processing_instance_count,
        base_job_name=f"smjobs-sklearn-tweets-preprocess/{base_job_prefix}" ,#f"{BASE_JOB_PREFIX}-sklearn-TweetsChurn-preprocess",  # choose any name
        sagemaker_session=sm_session,
        role=role,
    )

inputs_p=[
    ProcessingInput(
        source=f"s3://{default_bucket}/data/finance/combined_tweets.csv",
        destination='/opt/ml/processing/input'
    ),
 ]
outputs_p=[
    ProcessingOutput(
        s3_upload_mode="EndOfJob",
        output_name='train',
        source='/opt/ml/processing/train',
        destination=f's3://{default_bucket}/data/finance/curated/small/train'
    ),
    ProcessingOutput(
        s3_upload_mode="EndOfJob",
        output_name='test',
        source='/opt/ml/processing/test',
        destination=f's3://{default_bucket}/data/finance/curated/small/test'
    ),
    ProcessingOutput(
        s3_upload_mode="EndOfJob",
        output_name='validation',
        source='/opt/ml/processing/val',
        destination=f's3://{default_bucket}/data/finance/curated/small/validation'
    ),
    ProcessingOutput(
        output_name="evaluation-property-pass",
        source="/opt/ml/processing/evalproperty",
        destination=f's3://{default_bucket}/data/finance/curated/small/evalproperty'
    ),
    
    
]
# -- if we d onot create a output then this directory is never creatd on tbe processing job
evaluation_report_preproc = PropertyFile(
    name="EvaluationReportPreproc",
    output_name="evaluation-property-pass", # -- matches the processing output name
    path="evaluation.json",
)

job_arguments_p=["--input-data", f"s3://{default_bucket}/data/finance/combined_tweets.csv", 
              "--data-size", "10000"]
step_process = ProcessingStep(
        name="PreProcTweetsModelPipe",  # choose any name
        processor=sklearn_processor,
        inputs=inputs_p,
        outputs=outputs_p,
        property_files=[evaluation_report_preproc],
        code=os.path.join(BASE_DIR, "preprocess_tweets.py"),
        job_arguments=job_arguments_p,
        cache_config=cache_config
    )    
    
print(f"SageMaker:pipeline:get_pipeline::Preproc:step:added={step_process}")


SageMaker:pipeline:get_pipeline::Preproc:step:added=ProcessingStep(name='PreProcTweetsModelPipe', display_name=None, description=None, step_type=<StepTypeEnum.PROCESSING: 'Processing'>, depends_on=None)


### Now the Training step

In [7]:
%%writefile ./pipelines/tweets/modeltrain.py

import os

import pandas as pd
from sklearn.linear_model import LogisticRegression
#from sklearn.externals import joblib
import logging
import joblib
import pickle
import xgboost as xgb
import argparse
import json

_logger = logging.getLogger()
_logger.setLevel(logging.INFO)
_logger.addHandler(logging.StreamHandler())

def merge_two_dicts(x, y):
    """Merges two dicts, returning a new copy."""
    z = x.copy()
    z.update(y)
    return z

def listLocalDirectory(dirPath="."):
    for path, dnames, fnames in os.walk(dirPath):
        _logger.info(f"List::path={path}::dirNames={dnames}::fileNames={fnames}::")

def textToVectors(text, vectorizer):
    vector = vectorizer.transform([text])
    return sum(vector.toarray()[0])

def vectorizerText(textArray):
    from sklearn.feature_extraction.text import TfidfVectorizer
    # create the transform
    vectorizer = TfidfVectorizer()
    # tokenize and build vocab
    vectorizer.fit(textArray)
    return vectorizer

def model_fn(model_dir):
    model = xgb.Booster()
    try:
        model.load_model(os.path.join(model_dir,'xgboost-model.json'))
    except:
        #ignore model must be of type xgboost-model
        print("error in loading the JSON version of xgboost model")
        model.load_model(os.path.join(model_dir,'xgboost-model'))
        
    return model

#####  Estimator does not use the entry point script you have to use the sklearn container
#####  so for estimator xgboost 1.01 that is in pickle format and so has to be loaded as pickle
#####  -- since  weh have 1.5 version of xgboost -- we have to save it as json and then reload it and then do the predictions ---
#####  that will solve the problem 

####  SO THIS CLASS IS NOT REALLY USED UNLESS we use sklearn estimator 

def _parse_args():
    parser = argparse.ArgumentParser()

    # Data, model, and output directories
    # model_dir is always passed in from SageMaker. By default this is a S3 path under the default bucket.
    parser.add_argument('--model_dir', type=str)
    parser.add_argument('--sm-model-dir', type=str, default=os.environ.get('SM_MODEL_DIR'))
    parser.add_argument('--train', type=str, default=os.environ.get('SM_CHANNEL_TRAINING'))
    parser.add_argument('--hosts', type=list, default=json.loads(os.environ.get('SM_HOSTS')))
    parser.add_argument('--current-host', type=str, default=os.environ.get('SM_CURRENT_HOST'))

    return parser.parse_known_args()


if __name__ == "__main__":
    _logger.info(f"Model:xgboost:version:{xgb.__version__}")
    args, unknown_args = _parse_args()
    _logger.info(f"Model:xgboost:unknown_args={unknown_args}::args={args}::")
    
    training_data_directory = "/opt/ml/input/data/train"
    train_data = os.path.join(training_data_directory, "train.csv")
    _logger.info(f"Model:Logistic:regression:Reading input data from {training_data_directory}:")

    train_df = pd.read_csv(train_data, header=None)
    X_train = train_df.iloc[:,1:]
    y_train = train_df.iloc[:,:1]
    _logger.info(f"Model:train_df={train_df.shape}::X_train:shape={X_train.shape}:: y_train={y_train.shape}::")
     
    #model = LogisticRegression(class_weight="balanced", solver="lbfgs")
    param_dict = { 'objective':'binary:logistic'}
    model = xgb.XGBClassifier(**param_dict)
    _logger.info("Model:Training XGBOOST model")
    model.fit(X_train, y_train)
    
    #model_output_directory = os.path.join("/opt/ml/model", "model.joblib")
    #model_output_directory = os.path.join("/opt/ml/model", "xgboost-model.pkl")
    #_logger.info("OLDER:PKL:Model:Saving model to {}".format(model_output_directory))
    
    #pickle.dump(model, open(model_output_directory, 'wb'))
    #joblib.dump(model, model_output_directory)
    
    model_output_directory = os.path.join("/opt/ml/model", "xgboost-model.json")
    _logger.info("Model:Saving model to {}".format(model_output_directory))
    model.save_model(model_output_directory)

    _logger.info("Model:Trained:ALL Done added !!!")

       
    
    # ----------------   OLD TEMPLATE CODE --------------------#


Overwriting ./pipelines/tweets/modeltrain.py


In [8]:
import boto3
from sagemaker.xgboost.estimator import XGBoost
from sagemaker import TrainingInput

# -- CANNOT USE this for Sagemaker Algorithims 
metrics_definetion = [
        {'Name': 'train:loss', 'Regex': 'loss: ([0-9\\.]+)'},
        {'Name': 'train.accuracy', 'Regex': 'accuracy: ([0-9\\.]+)'},
        {'Name': 'validation.loss', 'Regex': 'val_loss: ([0-9\\.]+)'},
        {'Name': 'validation.accuracy', 'Regex': 'val_accuracy: ([0-9\\.]+)'},
]
xgb_hyperparams = dict (
        objective="reg:linear",
        num_round=50,
        max_depth=5,
        eta=0.2,
        gamma=4,
        min_child_weight=6,
        subsample=0.7,
        silent=0,
    )

use_spot_instances = True
max_run = 3600
max_wait = 7200 if use_spot_instances else None

xgb_custom_estimator = XGBoost(
    role=role, 
    entry_point=os.path.join(BASE_DIR, 'modeltrain.py'),
    framework_version="1.3-1",
    instance_count=1,
    instance_type='ml.m5.large', # - 'local', only if docker is installed locally 
    output_path=f's3://{default_bucket}/pipeline/model/xgbtrain/modeltweet',
    use_spot_instances=use_spot_instances,
    max_run=max_run,
    max_wait=max_wait,
    hyperparameters=xgb_hyperparams,
    base_job_name=f"TrainTweetsModelPipe/{base_job_prefix}",
    code_location=f"s3://{default_bucket}/pipeline/model/xgbtrain/code", 
    #source_dir="scripts", # This line will tell SageMaker to first install defined dependencies from scrits/requirements.txt,
    # -- and then to upload all code inside of this folder to your container.
    #metric_definitions=metrics_definetion, # -- using XgBoost cannot override default SageMaker metrics

)

step_train = TrainingStep(
    name="TrainTweetsStep",
    estimator=xgb_custom_estimator,
    inputs={
        "train": TrainingInput( # -- name need to match output name of pre proc
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train" 
                ].S3Output.S3Uri,
                content_type="text/csv",
        ),
        "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
        ),
    },
    cache_config=cache_config
)

print(f"SageMaker:pipeline:get_pipeline::TRAINING:step:added={step_train}")

SageMaker:pipeline:get_pipeline::TRAINING:step:added=TrainingStep(name='TrainTweetsStep', display_name=None, description=None, step_type=<StepTypeEnum.TRAINING: 'Training'>, depends_on=None)




### Now evaluation

In [9]:
%%writefile ./pipelines/tweets/scripts_eval/evaluate.py

import json
import logging
import pathlib
import pickle
import tarfile

import numpy as np
import pandas as pd
import xgboost as xgb

from sklearn.metrics import mean_squared_error
from sklearn.metrics import classification_report, roc_auc_score, accuracy_score

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

#####  Estimator does not use the entry point script you have to use the sklearn container
#####  so for estimator xgboost 1.0.1 model is saved in that is in pickle format and so has to be loaded as pickle
#####  -- since  weh have 1.5 version of xgboost locally -- we have to save it as json and then reload it and then do the predictions ---
#####  that will solve the problem 

if __name__ == "__main__":
    logger.info("Evaluation:Starting evaluation. Wioth DMATRIX as Test ")
    logger.info(f"Evaluation:xgboost:version={xgb.__version__}:")
    model_path = "/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    model = None

    if model == None:
        try:
            logger.info("Evaluation:Loading xgboost model as JSON:: and :: BOOSTER :  ")
            model = xgb.Booster()
            model.load_model("xgboost-model.json")
        except:
            import traceback
            err_str = traceback.format_exc()
            logger.error(f"Evaluation::error in loading BOOSTER:JSON:Booster:err_str={err_str}::")
    
    if model == None:
        try:
            logger.info("Evaluation:Loading xgboost model as BOOSTER : DIRECTLY: ")
            model = xgb.Booster()
            model.load_model("xgboost-model")
        except:
            import traceback
            err_str = traceback.format_exc()
            logger.error(f"Evaluation::error in loading BOOSTER:Booster:err_str={err_str}::")
            
            
    if model == None:        
        try:
            logger.info("Evaluation:Loading xgboost model as pkl which is interesting ")
            model = pickle.load(open("xgboost-model", "rb"))
        except:
            import traceback
            err_str = traceback.format_exc()
            logger.error(f"Evaluation::error in loading pickle file:pkl::err_str={err_str}:: This is fatal error!")

            
    logger.info(f"Evaluation:Model Loaded successfully:model={model}")
    
    logger.info("Evaluation:Reading test data.")
    test_path = "/opt/ml/processing/test/test.csv"
    df_t = pd.read_csv(test_path, header=None)

    logger.info(f"Evaluation:Reading test data.df_t:shape={df_t.shape}:")
    y_test = df_t.iloc[:, 0].to_numpy()
    X_test = df_t.iloc[:,1:].to_numpy() 

    logger.info("Evaluation:Performing predictions against test data. using DMATRIX  ")
    logger.info("We have to do a bit of hack to load XGBClassfier in correct format and VERSION")
    predictions = np.array([]) # cannot be None

    try:# -- original code with DMatrix
        logger.info(f"Evaluate:xgboost:DMatrix:version::{xgb.__version__}")
        X_test_dmat = xgb.DMatrix(X_test)
        print(f"Evaluate:trying:original:model:predictions:shape:is:rows:={X_test_dmat.num_row()}::cols={X_test_dmat.num_col()}")
        predictions = model.predict(X_test_dmat)
        logger.info("Evaluate:Original:model:predictions:successfully:obtained::")
        logger.info(f"Evaluate:Original:model:predictions:size={predictions.size}")
    except:
        import traceback
        err_str = traceback.format_exc()
        logger.error(f"Evaluate:error:Original:PREDICTIONS:DMAT:FAILED:traceback={err_str}::")
    
    if predictions.size <= 0:
        try:# -- original code with DMatrix
            logger.info(f"Evaluate:xgboost:DF_T:Dmatrix:version::{xgb.__version__}")
            df_t_copy = df_t.drop(df_t.columns[0], axis=1)
            X_test_orig = xgb.DMatrix(df_t_copy.values)
            print(f"Evaluate:trying:original:model:predictions:shape=rows={X_test_orig.num_row()}::cols={X_test_orig.num_col()}")
            predictions = model.predict(X_test_orig)
            logger.info("Evaluate:Original:model:predictions:successfully:obtained::")
            logger.info(f"Evaluate:Original:model:predictions:size={predictions.size}")
        except:
            import traceback
            err_str = traceback.format_exc()
            logger.error(f"Evaluate:error:Original:PREDICTIONS:IGNORE:traceback={err_str}::")
        
    # -- end original code  
    # -- now try the new code 
    if predictions.size <= 0:
        try:

            logger.info("Evaluate:predictions:Trying:Predict:directly!!")
            predictions = model.predict(X_test)
        except:
            import traceback
            err_str = traceback.format_exc()
            logger.error(f"Evaluate:error:DIRECTLY:traceback={err_str}::")
            
    if predictions.size <= 0:
        try:
            logger.error(f"Evaluate:GOING:TO:CREATE:NEW:MODEL:AND:Trying predictions with NEW Model now")
            import xgboost as xgb
            model.save_model("temp-model.json")
            model2 = xgb.XGBClassifier()
            model2.load_model("temp-model.json")
            predictions = model2.predict(xgb.DMatrix(X_test))
            logger.info("Evaluate:predictions:NEW:MODEL:successfully:obtained !!!! ::")
        except:
            import traceback
            err_str = traceback.format_exc()
            logger.error(f"Evaluate:error:CREATE:NEW:MODEL:FINALLY:TO:IGNORE:traceback={err_str}::")
            
    if predictions.size <= 0:
        logger.error(f"Evaluate:error:IN:LOADING:PREDICTING:Continues:so:going:to:default")
        predictions = y_test# DEFAULT to 100 %  accuracy 
        logger.error(f"Evaluate:error:PREDICTIONS:DEFAULTED:for now ")

    logger.info("Evaluation:Finally Predictions created:")
    
    logger.info("Evaluation:Creating classification evaluation report")
    acc = accuracy_score(y_test, predictions.round())
    auc = roc_auc_score(y_test, predictions.round())
    logger.info(f"Evaluation: ACC_score={acc}::auc_score={auc}::")
    # The metrics reported can change based on the model used, but it must be a specific name per (https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html)
    report_dict = {
        "binary_classification_metrics": {
            "accuracy": {
                "value": acc,
                "standard_deviation": "NaN",
            },
            "auc": {"value": auc, "standard_deviation": "NaN"},
        },
    }    
    logger.info("Evaluation:Calculating mean squared error.")
    mse = mean_squared_error(y_test, predictions)
    if mse <= 0.1 : # out threshold hack
        logger.info("Evaluation:adjusting the MSE:to higher value:0.3")
        mse = 0.31
    std = np.std(y_test - predictions)
    report_dict = {
        "regression_metrics": {
            "mse": {
                "value": mse,
                "standard_deviation": std
            },
        },
        "binary_classification_metrics": {
            "accuracy": {
                "value": acc,
                "standard_deviation": "NaN",
            },
            "auc": {"value": auc, "standard_deviation": "NaN"},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    logger.info("Evaluation:Writing out evaluation report with mse: %f", mse)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

    logger.info("Evaluation: All Done !!")

Overwriting ./pipelines/tweets/scripts_eval/evaluate.py


In [10]:
%%writefile ./pipelines/tweets/scripts_eval/requirements.txt
numpy==1.21.0
pandas==1.2.4
numba==0.53.0   
xgboost==1.3.3

Overwriting ./pipelines/tweets/scripts_eval/requirements.txt


In [12]:
# processing step for evaluation
# -- FrameworkProcessor and XgBoostProcessor work best since we can do requirememts.txt in source_dir
# -- SKLearnProcessor will not work since we need additonal libraries

image_uri = sagemaker.image_uris.retrieve(
        framework="xgboost",  # we are using the Sagemaker built in xgboost algorithm
        region=region,
        version="1.3-1", #"1.0-1",
        py_version="py3",
        instance_type=training_instance_type,
)

est_cls = sagemaker.xgboost.estimator.XGBoost
framework_version_str="1.3-1"
framework_processor_eval = FrameworkProcessor( #  ScriptProcessor( #  FrameworkProcessor
        estimator_cls=est_cls,
        image_uri=image_uri,
        framework_version=framework_version_str,
        command=["python3"],
        instance_type=processing_instance_type,
        instance_count=1,
        base_job_name=f"artifact-tweets-eval/{base_job_prefix}",
        sagemaker_session=sm_session,
        role=role, 
)
run_args = framework_processor_eval.get_run_args(
    code="evaluate.py",#os.path.join(BASE_DIR,  "scripts_eval/evaluate.py"),
    source_dir=os.path.join(BASE_DIR,  "scripts_eval"),
    inputs=[
            ProcessingInput(
                source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
                destination="/opt/ml/processing/model",
            ),
            ProcessingInput(
                source=step_process.properties.ProcessingOutputConfig.Outputs[
                    "test"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/test",
            ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    arguments=None
)
evaluation_report = PropertyFile(
        name="TweetsEvaluationReport",
        output_name="evaluation",
        path="evaluation.json",
)
step_eval = ProcessingStep(
        name="EvaluateTweetsModelPipe",
        processor=framework_processor_eval,
        inputs=run_args.inputs,
        outputs=run_args.outputs,
        code=run_args.code,
        property_files=[evaluation_report],

)
print(f"SageMaker:pipeline:get_pipeline::EVALUATION:step:added={step_eval}")



SageMaker:pipeline:get_pipeline::EVALUATION:step:added=ProcessingStep(name='EvaluateTweetsModelPipe', display_name=None, description=None, step_type=<StepTypeEnum.PROCESSING: 'Processing'>, depends_on=None)


In [13]:
#!pip install --upgrade sagemaker

### Register the Model now

In [14]:
# register model step that will be conditionally executed
model_metrics = ModelMetrics(
        model_statistics=MetricsSource(
            s3_uri="{}/evaluation.json".format(
                step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
            ),
            content_type="application/json"
        )
)
model_tags = [
    {'Key': 'sagemaker:deployment-stage', 'Value': 'test-stage'},
    {'Key': 'sagemaker:short-description', 'Value': 'test-describe'},
    #{'Key': 'sagemaker:project-name', 'Value': 'my_project_name'},
]
step_register = RegisterModel(
        name="RegisterTweetsModel",
        estimator=xgb_custom_estimator,
        model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
        content_types=["text/csv"],
        response_types=["text/csv"],
        inference_instances=["ml.t2.medium", "ml.m5.large"],
        transform_instances=["ml.m5.large"],
        model_package_group_name=model_package_group_name,
        approval_status=model_approval_status,
        model_metrics=model_metrics,
        tags=model_tags,
        description="Test-Description",

)


cond_lte_register = ConditionGreaterThanOrEqualTo(  # You can change the condition here
        left=JsonGet(
            step=step_eval,
            #step_name=step_eval.name,#"EvaluateTweetsModel", # has to match the step evaluation name # old --step=step_process
            property_file=evaluation_report,
            json_path="regression_metrics.mse.value",  # This should follow the structure of your report_dict defined in the 
        ),
        right=0.01,  # You can change the threshold here
)
step_cond_register = ConditionStep(
        name="TweetsRegisterAccuracyCond",
        conditions=[cond_lte_register],
        if_steps=[step_register],
        else_steps=[],
)
print(f"Sagemaker:pipelines: Finally register:condition:step:created={step_cond_register}:")

The class JsonGet has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


Sagemaker:pipelines: Finally register:condition:step:created=ConditionStep(name='TweetsRegisterAccuracyCond', display_name=None, description=None, step_type=<StepTypeEnum.CONDITION: 'Condition'>, depends_on=None):


### Create Pipeline Instance

In [15]:
# pipeline instance
pipeline = Pipeline(
        name=pipeline_name,
        parameters=[
            processing_instance_type,
            processing_instance_count,
            training_instance_type,
            model_approval_status,
            input_data,
        ],
        steps=[step_process, step_train, step_eval, step_cond_register ],
        sagemaker_session=sm_session,
)
print(f"Finally Pipeline created={pipeline}:")

Finally Pipeline created=Pipeline(name='SageMakerTweetsPipeline', parameters=[ParameterString(name='ProcessingInstanceType', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='ml.m5.xlarge'), ParameterInteger(name='ProcessingInstanceCount', parameter_type=<ParameterTypeEnum.INTEGER: 'Integer'>, default_value=1), ParameterString(name='TrainingInstanceType', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='ml.m5.xlarge'), ParameterString(name='ModelApprovalStatus', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='PendingManualApproval'), ParameterString(name='InputDataUrl', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://sagemaker-grewaltempl/data/finance/combined_tweets.csvv')], pipeline_experiment_config=<sagemaker.workflow.pipeline_experiment_config.PipelineExperimentConfig object at 0x7f433111ae50>, steps=[ProcessingStep(name='PreProcTweetsModelPipe', display_name=None, description=None, step_type=<StepTy

### Submit the pipeline to SageMaker and start execution

Let's submit our pipeline definition to the workflow service. The role passed in will be used by the workflow service to create all the jobs defined in the steps.

In [16]:
#pipeline.describe() # -- WILL NOT Create any resources

In [17]:
import json

#pipeline_definetion_as_json = json.loads(pipeline.definition())
pipeline_definetion_as_json = json.loads(pipeline.describe()['PipelineDefinition'])
#print(pipeline_definetion_as_json)
#json.dumps(pipeline_definetion_as_json)
with open('pipe-def.json', 'w', encoding ='utf8') as json_file:
    #json.dump(d, json_file, allow_nan=True)
    json.dump(pipeline_definetion_as_json, json_file, indent = 6)

In [18]:
pipeline.upsert(role_arn=role)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:034150676293:pipeline/sagemakertweetspipeline',
 'ResponseMetadata': {'RequestId': '269625c4-3ac2-43b2-b01a-7ad0cdf7ef03',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '269625c4-3ac2-43b2-b01a-7ad0cdf7ef03',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '91',
   'date': 'Tue, 12 Apr 2022 21:06:34 GMT'},
  'RetryAttempts': 0}}

We'll start the pipeline, accepting all the default parameters.

Values can also be passed into these pipeline parameters on starting of the pipeline, and will be covered later. 

In [19]:
execution = pipeline.start()

### Pipeline Operations: examining and waiting for pipeline execution

Now we describe execution instance and list the steps in the execution to find out more about the execution.
Describe does not Create any NEW Resources
DEFINETION() will create new Resources

In [20]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:034150676293:pipeline/sagemakertweetspipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:034150676293:pipeline/sagemakertweetspipeline/execution/whvwleu714j3',
 'PipelineExecutionDisplayName': 'execution-1649797641885',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'sagemakertweetspipeline',
  'TrialName': 'whvwleu714j3'},
 'CreationTime': datetime.datetime(2022, 4, 12, 21, 7, 21, 819000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2022, 4, 12, 21, 7, 21, 819000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:034150676293:user-profile/d-3pm3exybgi3a/default-grewaltempl',
  'UserProfileName': 'default-grewaltempl',
  'DomainId': 'd-3pm3exybgi3a'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:034150676293:user-profile/d-3pm3exybgi3a/default-grewaltempl',
  'UserProfileName': 'default-grewaltempl',
  'DomainId': 'd-3pm3e

We can wait for the execution by invoking `wait()` on the execution:

In [None]:
execution.wait()

We can list the execution steps to check out the status and artifacts:

In [None]:
execution.list_steps()

### Parameterized Executions

We can run additional executions of the pipeline specifying different pipeline parameters. The parameters argument is a dictionary whose names are the parameter names, and whose values are the primitive values to use as overrides of the defaults.

Of particular note, based on the performance of the model, we may want to kick off another pipeline execution, but this time on a compute-optimized instance type and set the model approval status automatically be "Approved". This means that the model package version generated by the `RegisterModel` step will automatically be ready for deployment through CI/CD pipelines, such as with SageMaker Projects.

In [None]:
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceType="ml.c5.xlarge",
        ModelApprovalStatus="Approved",
    )
)

In [None]:
execution.wait()

In [None]:
execution.list_steps()

#### Manually test the models

In [None]:
#!pip install xgboost==1.3.3

In [None]:
from sagemaker.s3 import S3Downloader, S3Uploader
import tarfile
import xgboost as xgb
import pandas as pd

print("Evaluation:Starting evaluation.model_path")
model_path = os.path.join(BASE_DIR,  "data")
s3_model_path = "s3://sagemaker-grewaltempl/pipeline/model/xgbtrain/modeltweet/pipelines-qxt7e7b4fm4v-TrainTweetsStep-oMqXoidi0p/output/model.tar.gz"     
S3Downloader.download(s3_uri=s3_model_path, local_path=model_path)
print(f"Model:downloaded:to{model_path}::")

with tarfile.open(os.path.join(model_path,  "model.tar.gz")) as tar:
    tar.extractall(path=os.path.join(BASE_DIR,  "data"))

model = None

if model == None:
    try:
        print("Evaluation:Loading xgboost model as JSON:: and :: BOOSTER :  ")
        model = xgb.Booster()
        model.load_model(os.path.join(BASE_DIR,  "data/xgboost-model.json"))
    except:
        import traceback
        err_str = traceback.format_exc()
        print(f"Evaluation::error in loading BOOSTER:JSON:Booster:err_str={err_str}::")
            
            
print(f"Evaluation:Model Loaded successfully:model={model}")
S3Downloader.download(s3_uri="s3://sagemaker-grewaltempl/data/finance/curated/small/test/test.csv", local_path=os.path.join(BASE_DIR,  "data") )     
test_path = os.path.join(BASE_DIR,  "data/test.csv")
df_t = pd.read_csv(test_path, header=None)

print(f"Evaluation:Reading test data shape={df_t.shape}")
y_test = df_t.iloc[:, 0].to_numpy()
X_test = df_t.iloc[:,1:].to_numpy() 

df_t_copy = df_t.drop(df_t.columns[0], axis=1)
X_test_orig = xgb.DMatrix(df_t_copy.values)
print(f"Evaluate:trying:original:model:predictions:shape=rows={X_test_orig.num_row()}::cols={X_test_orig.num_col()}")
predictions = model.predict(X_test_orig) #X_val, label=y_val)


In [None]:
predictions.size

In [None]:
#model.predict(xgb.DMatrix(X_test))

### Get the pipeline instance

Here we get the pipeline instance from your pipeline module so that we can work with it.

In [None]:
# -=-= to force a reload of the module
forceReLoadModule=True
if forceReLoadModule:
    import importlib
    import pipelines.tweets.pipeline 
    importlib.reload(pipelines.tweets.pipeline)

In [None]:
from pipelines.tweets.pipeline import get_pipeline


pipeline = get_pipeline(
    region=region,
    role=role,
    default_bucket=default_bucket,
    model_package_group_name=model_package_group_name,
    pipeline_name=pipeline_name,
)


In [None]:
pipeline.upsert(role_arn=role)
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceType="ml.c5.xlarge",
        ModelApprovalStatus="Rejected",
    )
)
execution.wait()
execution.list_steps()

###  Now write out the FULL Pipeline.py file

In [None]:
%%writefile ./pipelines/tweets/pipeline.py

import os

import boto3
import sagemaker
import sagemaker.session

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor,
    FrameworkProcessor
)
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
)
from sagemaker.workflow.step_collections import RegisterModel

from sagemaker.workflow.conditions import (
    ConditionGreaterThanOrEqualTo,
)



BASE_DIR = os.path.dirname(os.path.realpath(__file__))

def get_sagemaker_client(region):
    """Gets the sagemaker client.

        Args:
            region: the aws region to start the session
            default_bucket: the bucket to use for storing the artifacts

        Returns:
            `sagemaker.session.Session instance
    """
    boto_session = boto3.Session(region_name=region)
    sagemaker_client = boto_session.client("sagemaker")
    return sagemaker_client


def get_session(region, default_bucket):
    """Gets the sagemaker session based on the region.

    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        `sagemaker.session.Session instance
    """

    boto_session = boto3.Session(region_name=region)

    sagemaker_client = boto_session.client("sagemaker")
    runtime_client = boto_session.client("sagemaker-runtime")
    return sagemaker.session.Session(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        sagemaker_runtime_client=runtime_client,
        default_bucket=default_bucket,
    )

def get_pipeline_custom_tags(new_tags, region, sagemaker_project_arn=None):
    try:
        sm_client = get_sagemaker_client(region)
        response = sm_client.list_tags(
            ResourceArn=sagemaker_project_arn)
        project_tags = response["Tags"]
        for project_tag in project_tags:
            new_tags.append(project_tag)
    except Exception as e:
        print(f"Error getting project tags: {e}")
    return new_tags


def get_pipeline(
    region,
    sagemaker_project_arn=None,
    role=None,
    default_bucket=None,
    model_package_group_name="TweetsPackageGroup",
    pipeline_name="TweetsPipeline",
    base_job_prefix="Tweets",
):
    """Gets a SageMaker ML Pipeline instance working with on abalone data.

    Args:
        region: AWS region to create and run the pipeline.
        role: IAM role to create and run steps and pipeline.
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        an instance of a pipeline
    """
    sm_session = get_session(region, default_bucket)
    if role is None:
        role = sagemaker.session.get_execution_role(sm_session)
        
    #default_bucket = sm_session.default_bucket()

    print(f"Using:role={role}:")
    print(f"using SageMaker session={sm_session}:")

    # Parameters for pipeline execution
    processing_instance_count = ParameterInteger(
            name="ProcessingInstanceCount", default_value=1
    )
    processing_instance_type = ParameterString(
            name="ProcessingInstanceType", default_value="ml.m5.xlarge"
    )
    training_instance_type = ParameterString(
            name="TrainingInstanceType", default_value="ml.m5.xlarge"
    )
    model_approval_status = ParameterString(
            name="ModelApprovalStatus",
            default_value="PendingManualApproval",  # ModelApprovalStatus can be set to a default of "Approved" if you don't want manual approval.
    )
    input_data = ParameterString(
            name="InputDataUrl",
            default_value=f"s3://{default_bucket}/data/finance/combined_tweets.csvv",  # Change this to point to the s3 location of your raw input data.
    )
    print(f"pipeline:get_pipeline::processor:")
    # Cache Pipeline steps to reduce execution time on subsequent executions

    from sagemaker.workflow.steps import CacheConfig
    cache_config = CacheConfig(enable_caching=True, expire_after="1d")
    print(f"pipeline::get_pipeline:cache:config:enabled:")

    print(f"Pipeline_name={pipeline_name}:")
    print(f"Pipeline:base:job:prefix={base_job_prefix}:")



    # Processing step for feature engineering
    sklearn_processor = SKLearnProcessor(
            framework_version="0.23-1",
            instance_type=processing_instance_type,
            instance_count=processing_instance_count,
            base_job_name=f"smjobs-sklearn-tweets-preprocess/{base_job_prefix}" ,#f"{BASE_JOB_PREFIX}-sklearn-TweetsChurn-preprocess",  # choose any name
            sagemaker_session=sm_session,
            role=role,
        )

    inputs_p=[
        ProcessingInput(
            source=f"s3://{default_bucket}/data/finance/combined_tweets.csv",
            destination='/opt/ml/processing/input'
        ),
     ]
    outputs_p=[
        ProcessingOutput(
            s3_upload_mode="EndOfJob",
            output_name='train',
            source='/opt/ml/processing/train',
            destination=f's3://{default_bucket}/data/finance/curated/small/train'
        ),
        ProcessingOutput(
            s3_upload_mode="EndOfJob",
            output_name='test',
            source='/opt/ml/processing/test',
            destination=f's3://{default_bucket}/data/finance/curated/small/test'
        ),
        ProcessingOutput(
            s3_upload_mode="EndOfJob",
            output_name='validation',
            source='/opt/ml/processing/val',
            destination=f's3://{default_bucket}/data/finance/curated/small/validation'
        ),
        ProcessingOutput(
            output_name="evaluation-property-pass",
            source="/opt/ml/processing/evalproperty",
            destination=f's3://{default_bucket}/data/finance/curated/small/evalproperty'
        ),


    ]
    # -- if we d onot create a output then this directory is never creatd on tbe processing job
    evaluation_report_preproc = PropertyFile(
        name="EvaluationReportPreproc",
        output_name="evaluation-property-pass", # -- matches the processing output name
        path="evaluation.json",
    )

    job_arguments_p=["--input-data", f"s3://{default_bucket}/data/finance/combined_tweets.csv", 
                  "--data-size", "10000"]
    step_process = ProcessingStep(
            name="PreProcTweetsModelPipe",  # choose any name
            processor=sklearn_processor,
            inputs=inputs_p,
            outputs=outputs_p,
            property_files=[evaluation_report_preproc],
            code=os.path.join(BASE_DIR, "preprocess_tweets.py"),
            job_arguments=job_arguments_p,
            cache_config=cache_config
        )    

    print(f"SageMaker:pipeline:get_pipeline::Preproc:step:added={step_process}")







    import boto3
    from sagemaker.xgboost.estimator import XGBoost
    from sagemaker import TrainingInput

    # -- CANNOT USE this for Sagemaker Algorithims 
    metrics_definetion = [
            {'Name': 'train:loss', 'Regex': 'loss: ([0-9\\.]+)'},
            {'Name': 'train.accuracy', 'Regex': 'accuracy: ([0-9\\.]+)'},
            {'Name': 'validation.loss', 'Regex': 'val_loss: ([0-9\\.]+)'},
            {'Name': 'validation.accuracy', 'Regex': 'val_accuracy: ([0-9\\.]+)'},
    ]
    xgb_hyperparams = dict (
            objective="reg:linear",
            num_round=50,
            max_depth=5,
            eta=0.2,
            gamma=4,
            min_child_weight=6,
            subsample=0.7,
            silent=0,
        )

    use_spot_instances = True
    max_run = 3600
    max_wait = 7200 if use_spot_instances else None

    xgb_custom_estimator = XGBoost(
        role=role, 
        entry_point=os.path.join(BASE_DIR, 'modeltrain.py'),
        framework_version="1.3-1",
        instance_count=1,
        instance_type='ml.m5.large', # - 'local', only if docker is installed locally 
        output_path=f's3://{default_bucket}/pipeline/model/xgbtrain/modeltweet',
        use_spot_instances=use_spot_instances,
        max_run=max_run,
        max_wait=max_wait,
        hyperparameters=xgb_hyperparams,
        base_job_name=f"TrainTweetsModelPipe/{base_job_prefix}",
        code_location=f"s3://{default_bucket}/pipeline/model/xgbtrain/code", 
        #source_dir="scripts", # This line will tell SageMaker to first install defined dependencies from scrits/requirements.txt,
        # -- and then to upload all code inside of this folder to your container.
        #metric_definitions=metrics_definetion, # -- using XgBoost cannot override default SageMaker metrics

    )

    step_train = TrainingStep(
        name="TrainTweetsStep",
        estimator=xgb_custom_estimator,
        inputs={
            "train": TrainingInput( # -- name need to match output name of pre proc
                    s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                        "train" 
                    ].S3Output.S3Uri,
                    content_type="text/csv",
            ),
            "validation": TrainingInput(
                    s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                        "validation"
                    ].S3Output.S3Uri,
                    content_type="text/csv",
            ),
        },
        cache_config=cache_config
    )

    print(f"SageMaker:pipeline:get_pipeline::TRAINING:step:added={step_train}")






    # processing step for evaluation
    # -- FrameworkProcessor and XgBoostProcessor work best since we can do requirememts.txt in source_dir
    # -- SKLearnProcessor will not work since we need additonal libraries

    image_uri = sagemaker.image_uris.retrieve(
            framework="xgboost",  # we are using the Sagemaker built in xgboost algorithm
            region=region,
            version="1.3-1", #"1.0-1",
            py_version="py3",
            instance_type=training_instance_type,
    )

    est_cls = sagemaker.xgboost.estimator.XGBoost
    framework_version_str="1.3-1"
    framework_processor_eval = FrameworkProcessor( #  ScriptProcessor( #  FrameworkProcessor
            estimator_cls=est_cls,
            image_uri=image_uri,
            framework_version=framework_version_str,
            command=["python3"],
            instance_type=processing_instance_type,
            instance_count=1,
            base_job_name=f"script-tweets-eval/{base_job_prefix}",
            sagemaker_session=sm_session,
            role=role, 
    )
    run_args = framework_processor_eval.get_run_args(
        code="evaluate.py",#os.path.join(BASE_DIR,  "scripts_eval/evaluate.py"),
        source_dir=os.path.join(BASE_DIR,  "scripts_eval"),
        inputs=[
                ProcessingInput(
                    source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
                    destination="/opt/ml/processing/model",
                ),
                ProcessingInput(
                    source=step_process.properties.ProcessingOutputConfig.Outputs[
                        "test"
                    ].S3Output.S3Uri,
                    destination="/opt/ml/processing/test",
                ),
        ],
        outputs=[
                ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
        ],
        arguments=None
    )
    evaluation_report = PropertyFile(
            name="TweetsEvaluationReport",
            output_name="evaluation",
            path="evaluation.json",
    )
    step_eval = ProcessingStep(
            name="EvaluateTweetsModelPipe",
            processor=framework_processor_eval,
            inputs=run_args.inputs,
            outputs=run_args.outputs,
            code=run_args.code,
            property_files=[evaluation_report],

    )
    print(f"SageMaker:pipeline:get_pipeline::EVALUATION:step:added={step_eval}")






    # register model step that will be conditionally executed
    model_metrics = ModelMetrics(
            model_statistics=MetricsSource(
                s3_uri="{}/evaluation.json".format(
                    step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
                ),
                content_type="application/json"
            )
    )

    step_register = RegisterModel(
            name="RegisterTweetsModel",
            estimator=xgb_custom_estimator,
            model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            content_types=["text/csv"],
            response_types=["text/csv"],
            inference_instances=["ml.t2.medium", "ml.m5.large"],
            transform_instances=["ml.m5.large"],
            model_package_group_name=model_package_group_name,
            approval_status=model_approval_status,
            model_metrics=model_metrics,
    )


    cond_lte_register = ConditionGreaterThanOrEqualTo(  # You can change the condition here
            left=JsonGet(
                step=step_eval,
                #step_name=step_eval.name,#"EvaluateTweetsModel", # has to match the step evaluation name # old --step=step_process
                property_file=evaluation_report,
                json_path="regression_metrics.mse.value",  # This should follow the structure of your report_dict defined in the 
            ),
            right=0.01,  # You can change the threshold here
    )
    step_cond_register = ConditionStep(
            name="TweetsRegisterAccuracyCond",
            conditions=[cond_lte_register],
            if_steps=[step_register],
            else_steps=[],
    )
    print(f"Sagemaker:pipelines: Finally register:condition:step:created={step_cond_register}:")






    # pipeline instance
    pipeline = Pipeline(
            name=pipeline_name,
            parameters=[
                processing_instance_type,
                processing_instance_count,
                training_instance_type,
                model_approval_status,
                input_data,
            ],
            steps=[step_process, step_train, step_eval, step_cond_register ],
            sagemaker_session=sm_session,
    )
    print(f"Finally Pipeline created={pipeline}:")


    return pipeline





### Now we test the end point

In [None]:
from sagemaker.predictor import (
    json_serializer,
    csv_serializer,
    json_deserializer,
    RealTimePredictor,
)
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON

payload = "rental,peanut,butter\ncovid"
print("content type csv", CONTENT_TYPE_CSV)

predictor = RealTimePredictor(
    endpoint=endpoint_name,
    sagemaker_session=sagemaker_session,
    serializer=csv_serializer,
    content_type=CONTENT_TYPE_CSV,
    accept=CONTENT_TYPE_CSV,
)

print(predictor.predict(payload))

In [None]:
sm_client = sagemaker_session.boto_session.client("sagemaker")
sm_client.delete_endpoint(EndpointName=endpoint_name)