# Imports and Common

In [113]:
import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.workflow.pipeline_context import PipelineSession

In [114]:
sess = boto3.Session()
sm = sess.client("sagemaker")
role = get_execution_role()
sagemaker_session = sagemaker.Session(boto_session=sess)
region = boto3.Session().region_name

pipeline_session = PipelineSession()

model_package_group_name = 'SpamModelPackageGroup'
# bucket_name = 'spamdetectionstack-spamdetectionbucket571a4b89-15qlkoyxizyds'
bucket_name = sagemaker_session.default_bucket()
bucket_key_prefix = 'sms-spam-classifier'
pipeline_name = 'sms-spam-classifier-pipeline'

vocabulary_length = 9013
print(role)

Couldn't call 'get_role' to get Role ARN from role name admin-tmm to get Role path.


arn:aws:iam::756059218166:role/service-role/AmazonSageMaker-ExecutionRole-20221118T152873


# Download Data


In [115]:
!mkdir -p dataset
!curl https://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip -o dataset/smsspamcollection.zip
!unzip -o dataset/smsspamcollection.zip -d dataset
!head -10 dataset/SMSSpamCollection

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  198k  100  198k    0     0   352k      0 --:--:-- --:--:-- --:--:--  352k
Archive:  dataset/smsspamcollection.zip
  inflating: dataset/SMSSpamCollection  
  inflating: dataset/readme          
ham	Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...
ham	Ok lar... Joking wif u oni...
spam	Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's
ham	U dun say so early hor... U c already then say...
ham	Nah I don't think he goes to usf, he lives around here though
spam	FreeMsg Hey there darling it's been 3 week's now and no word back! I'd like some fun you up for it still? Tb ok! XxX std chgs to send, £1.50 to rcv
ham	Even my brother is not like to speak with me. They treat

In [116]:
raw_s3 = sagemaker_session.upload_data(
    path='./dataset/SMSSpamCollection',
    bucket=bucket_name,
    key_prefix=f'{bucket_key_prefix}/data/raw'
)

# Pipeline Parameters

In [117]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString

# raw input data
input_data = ParameterString(
    name="InputData", default_value=raw_s3
)

# status of newly trained model in registry
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="Approved"
)

# processing step parameters
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.m5.large"
)
processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount", default_value=1
)

# training step parameters
training_instance_type = ParameterString(
    name="TrainingInstanceType", default_value="ml.c5.2xlarge"
)


# Preprocessing Step

In [118]:
!mkdir -p code

In [119]:
%%writefile code/preprocess.py

import subprocess
subprocess.check_call([sys.executable, "-m", "pip", "install", "pyyaml"])
subprocess.check_call([sys.executable, "-m", "pip", "install", "pandas"])

import pandas as pd
import numpy as np
import string
import sys

from hashlib import md5

import glob
import numpy as np
import pandas as pd
import os
import json
import joblib
import yaml
from io import StringIO
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import tarfile

base_dir = "/opt/ml/processing"
base_output_dir = "/opt/ml/output/"
vocabulary_length = 9013


def text_to_word_sequence(text,
                          filters='!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n',
                          lower=True, split=" "):
    """Converts a text to a sequence of words (or tokens).
    # Arguments
        text: Input text (string).
        filters: list (or concatenation) of characters to filter out, such as
            punctuation. Default: `!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n`,
            includes basic punctuation, tabs, and newlines.
        lower: boolean. Whether to convert the input to lowercase.
        split: str. Separator for word splitting.
    # Returns
        A list of words (or tokens).
    """
    if lower:
        text = text.lower()

    translate_dict = dict((c, split) for c in filters)
    translate_map = str.maketrans(translate_dict)
    text = text.translate(translate_map)

    seq = text.split(split)
    return [i for i in seq if i]


def hashing_trick(text, n,
                  hash_function=None,
                  filters='!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n',
                  lower=True,
                  split=' '):
    """Converts a text to a sequence of indexes in a fixed-size hashing space.
    # Arguments
        text: Input text (string).
        n: Dimension of the hashing space.
        hash_function: defaults to python `hash` function, can be 'md5' or
            any function that takes in input a string and returns a int.
            Note that 'hash' is not a stable hashing function, so
            it is not consistent across different runs, while 'md5'
            is a stable hashing function.
        filters: list (or concatenation) of characters to filter out, such as
            punctuation. Default: `!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n`,
            includes basic punctuation, tabs, and newlines.
        lower: boolean. Whether to set the text to lowercase.
        split: str. Separator for word splitting.
    # Returns
        A list of integer word indices (unicity non-guaranteed).
    `0` is a reserved index that won't be assigned to any word.
    Two or more words may be assigned to the same index, due to possible
    collisions by the hashing function.
    The [probability](
        https://en.wikipedia.org/wiki/Birthday_problem#Probability_table)
    of a collision is in relation to the dimension of the hashing space and
    the number of distinct objects.
    """
    if hash_function is None:
        hash_function = hash
    elif hash_function == 'md5':
        hash_function = lambda w: int(md5(w.encode()).hexdigest(), 16)

    seq = text_to_word_sequence(text,
                                filters=filters,
                                lower=lower,
                                split=split)
    return [int(hash_function(w) % (n - 1) + 1) for w in seq]

def one_hot(text, n,
            filters='!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n',
            lower=True,
            split=' '):
    """One-hot encodes a text into a list of word indexes of size n.
    This is a wrapper to the `hashing_trick` function using `hash` as the
    hashing function; unicity of word to index mapping non-guaranteed.
    # Arguments
        text: Input text (string).
        n: int. Size of vocabulary.
        filters: list (or concatenation) of characters to filter out, such as
            punctuation. Default: `!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n`,
            includes basic punctuation, tabs, and newlines.
        lower: boolean. Whether to set the text to lowercase.
        split: str. Separator for word splitting.
    # Returns
        List of integers in [1, n]. Each integer encodes a word
        (unicity non-guaranteed).
    """
    return hashing_trick(text, n,
                         hash_function='md5',
                         filters=filters,
                         lower=lower,
                         split=split)

def vectorize_sequences(sequences, vocabulary_length):
    results = np.zeros((len(sequences), vocabulary_length))
    for i, sequence in enumerate(sequences):
       results[i, sequence] = 1. 
    return results

def one_hot_encode(messages, vocabulary_length):
    data = []
    for msg in messages:
        temp = one_hot(msg, vocabulary_length)
        data.append(temp)
    return data

def transform(messages, vocabulary_length):
    one_hot_data = one_hot_encode(messages, vocabulary_length)
    return vectorize_sequences(one_hot_data, vocabulary_length)


if __name__ == '__main__':
    df = pd.read_csv(f'{base_dir}/input/SMSSpamCollection', sep='\t', header=None)
    df[df.columns[0]] = df[df.columns[0]].map({'ham': 0, 'spam': 1})

    targets = df[df.columns[0]].values
    messages = df[df.columns[1]].values

    # one hot encoding for each SMS message
    encoded_messages = transform(messages, vocabulary_length)
    df2 = pd.DataFrame(encoded_messages)
    df2.insert(0, 'spam', targets)

    # Split into training and validation sets (80%/20% split)
    split_index = int(np.ceil(df.shape[0] * 0.8))
    train_set = df2[:split_index]
    val_set = df2[split_index:]

    train_set.to_csv(f'{base_dir}/train/train.gz', header=True, index=False, compression='gzip')
    val_set.to_csv(f'{base_dir}/val/val.gz', header=True, index=False, compression='gzip')


Overwriting code/preprocess.py


In [120]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=sklearn_framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-sms-spam-classifier-process",
    role=role,
    sagemaker_session=pipeline_session,
)

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="val", source="/opt/ml/processing/val"),
    ],
    code="code/preprocess.py",
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


In [121]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

step_process = ProcessingStep(
    name="PreprocessData",
    step_args=processor_args,
)

# Training Step

In [122]:
%%writefile code/train.py

from __future__ import print_function

import pip

try:
    from pip import main as pipmain
except:
    from pip._internal import main as pipmain

pipmain(['install', 'pandas'])


import logging
import mxnet as mx
from mxnet import gluon, autograd
from mxnet.gluon import nn
import numpy as np
import json
import time
import pandas

#logging.basicConfig(level=logging.DEBUG)

# ------------------------------------------------------------ #
# Training methods                                             #
# ------------------------------------------------------------ #


def train(hyperparameters, input_data_config, channel_input_dirs, output_data_dir,
          num_gpus, num_cpus, hosts, current_host, **kwargs):
    # SageMaker passes num_cpus, num_gpus and other args we can use to tailor training to
    # the current container environment, but here we just use simple cpu context.
    ctx = mx.cpu()

    # retrieve the hyperparameters and apply some defaults in case they are not provided.
    batch_size = hyperparameters.get('batch_size', 100)
    epochs = hyperparameters.get('epochs', 10)
    learning_rate = hyperparameters.get('learning_rate', 0.01)
    momentum = hyperparameters.get('momentum', 0.9)
    log_interval = hyperparameters.get('log_interval', 200)

    train_data_path = channel_input_dirs['train']
    val_data_path = channel_input_dirs['val']
    train_data = get_train_data(train_data_path, batch_size)
    val_data = get_val_data(val_data_path, batch_size)

    # define the network
    net = define_network()

    # Collect all parameters from net and its children, then initialize them.
    net.initialize(mx.init.Normal(sigma=1.), ctx=ctx)
    
    # Trainer is for updating parameters with gradient.
    if len(hosts) == 1:
        kvstore = 'device' if num_gpus > 0 else 'local'
    else:
        kvstore = 'dist_device_sync' if num_gpus > 0 else 'dist_sync'

    trainer = gluon.Trainer(net.collect_params(), 'sgd',
                            {'learning_rate': learning_rate, 'momentum': momentum},
                            kvstore=kvstore)
    
    metric = mx.metric.Accuracy()
    loss = gluon.loss.SigmoidBinaryCrossEntropyLoss()

    for epoch in range(epochs):
        
        # reset data iterator and metric at begining of epoch.
        metric.reset()
        btic = time.time()
        for i, (data, label) in enumerate(train_data):
            # Copy data to ctx if necessary
            data = data.as_in_context(ctx)
            label = label.as_in_context(ctx)
            
            # Start recording computation graph with record() section.
            # Recorded graphs can then be differentiated with backward.
            with autograd.record():
                output = net(data)
                L = loss(output, label)
            L.backward()

            # take a gradient step with batch_size equal to data.shape[0]
            trainer.step(data.shape[0])

            # update metric at last.
            sigmoid_output = output.sigmoid() 
            prediction = mx.nd.abs(mx.nd.ceil(sigmoid_output - 0.5))
            metric.update([label], [prediction])

            if i % log_interval == 0 and i > 0:
                name, acc = metric.get()
                print('[Epoch %d Batch %d] Training: %s=%f, %f samples/s' %
                      (epoch, i, name, acc, batch_size / (time.time() - btic)))

            btic = time.time()

        name, acc = metric.get()
        print('[Epoch %d] Training: %s=%f' % (epoch, name, acc))

        name, val_acc = test(ctx, net, val_data)
        print('[Epoch %d] Validation: %s=%f' % (epoch, name, val_acc))

    return net

def save(net, model_dir):
    y = net(mx.sym.var('data'))
    y.save('{}/model.json'.format(model_dir))
    net.collect_params().save('{}/model.params'.format(model_dir))

def define_network():
    net = nn.Sequential()
    with net.name_scope():
        net.add(nn.Dense(64, activation="relu"))
        net.add(nn.Dense(1))
    return net

def get_train_data(data_path, batch_size):
    print('Train data path: ' + data_path)
    df = pandas.read_csv('{}/train.gz'.format(data_path))
    features = df[df.columns[1:]].values.astype(dtype=np.float32)
    labels = df[df.columns[0]].values.reshape((-1, 1)).astype(dtype=np.float32)
    
    return gluon.data.DataLoader(gluon.data.ArrayDataset(features, labels), batch_size=batch_size, shuffle=True)

def get_val_data(data_path, batch_size):
    print('Validation data path: ' + data_path)
    df = pandas.read_csv('{}/val.gz'.format(data_path))
    features = df[df.columns[1:]].values.astype(dtype=np.float32)
    labels = df[df.columns[0]].values.reshape((-1, 1)).astype(dtype=np.float32)
    
    return gluon.data.DataLoader(gluon.data.ArrayDataset(features, labels), batch_size=batch_size, shuffle=False)

def test(ctx, net, val_data):
    metric = mx.metric.Accuracy()
    for data, label in val_data:
        data = data.as_in_context(ctx)
        label = label.as_in_context(ctx)
        
        output = net(data)
        sigmoid_output = output.sigmoid() 
        prediction = mx.nd.abs(mx.nd.ceil(sigmoid_output - 0.5))
        
        metric.update([label], [prediction])
    return metric.get()


# ------------------------------------------------------------ #
# Hosting methods                                              #
# ------------------------------------------------------------ #


def model_fn(model_dir):
    net = gluon.nn.SymbolBlock(
        outputs=mx.sym.load('{}/model.json'.format(model_dir)),
        inputs=mx.sym.var('data'))
    
    net.load_params('{}/model.params'.format(model_dir), ctx=mx.cpu())

    return net

def transform_fn(net, data, input_content_type, output_content_type):
    try:
        parsed = json.loads(data)
        nda = mx.nd.array(parsed)
        
        output = net(nda)
        sigmoid_output = output.sigmoid()
        prediction = mx.nd.abs(mx.nd.ceil(sigmoid_output - 0.5))
        
        output_obj = {}
        output_obj['predicted_label'] = prediction.asnumpy().tolist()
        output_obj['predicted_probability'] = sigmoid_output.asnumpy().tolist()

        response_body = json.dumps(output_obj)
        return response_body, output_content_type
    except Exception as ex:
        response_body = '{error: }' + str(ex)
        return response_body, output_content_type

Overwriting code/train.py


In [123]:
from sagemaker.mxnet import MXNet
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.model_step import ModelStep
import time

output_path = f's3://{bucket_name}/{bucket_key_prefix}/model'
code_location = f's3://{bucket_name}/{bucket_key_prefix}/code'

hyperparams = { 
    'batch_size': 100,
    'epochs': 20,
    'learning_rate': 0.01
}

mxnet = MXNet('code/train.py',
    role=role,
    instance_count=1,
    instance_type=training_instance_type,
    output_path=output_path,
    base_job_name='sms-spam-classifier-mxnet',
    framework_version='1.2',
    py_version='py3',
    code_location=code_location,
    hyperparameters=hyperparams,
    sagemaker_session=pipeline_session,
)

train_args = mxnet.fit(
    inputs={
        'train': TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        'val': TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["val"].S3Output.S3Uri,
            content_type="text/csv",
        )
    }
)

step_train_model = TrainingStep(name="TrainMXNetModel", step_args=train_args)

# Model Evaluation

In [124]:
%%writefile code/evaluate.py

import os
import json
import sys
import numpy as np
import pandas as pd
import pathlib
import tarfile
import mxnet as mx
from mxnet import gluon, autograd
from mxnet.gluon import nn
import mxnet.metric

def get_val_data(data_path):
    print('Validation data path: ' + data_path)
    df = pandas.read_csv(f'{data_path}/val.gz')
    features = df[df.columns[1:]].values.astype(dtype=np.float32)
    labels = df[df.columns[0]].values.reshape((-1, 1)).astype(dtype=np.float32)
    return features, labels

def model_fn(model_dir):
    net = gluon.nn.SymbolBlock(
        outputs=mx.sym.load(f'{model_dir}/model.json'),
        inputs=mx.sym.var('data'))
    net.load_params(f'{model_dir}/model.params', ctx=mx.cpu())
    return net

if __name__ == "__main__":
    model_dir = "./model"
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    val_data_path = f"/opt/ml/processing/val"
    with tarfile.open(model_path, "r:gz") as tar:
        tar.extractall(model_dir)

    model = model_fn(model_dir)
    features, labels = get_val_data(val_data_path)
    predictions = model.output(features)
    
    f1_metric = mxnet.labels.F1()
    f1_metric.update(y_test, predictions)
    scores = f1_metric.get()
    print("\nTest F1 Scores:", scores)

    # Available metrics to add to model: https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html
    report_dict = {
        "binary_classification_metrics": {
            "f1": {"value": scores, "standard_deviation": "NaN"},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))


Overwriting code/evaluate.py


In [125]:
from sagemaker.workflow.properties import PropertyFile
from sagemaker.sklearn.processing import ScriptProcessor

mxnet_eval_image_uri = sagemaker.image_uris.retrieve(
    framework="mxnet",
    region=region,
    version='1.2',
    py_version='py3',
    image_scope="training",
    instance_type=training_instance_type,
)

evaluate_model_processor = ScriptProcessor(
    role=role,
    image_uri=mxnet_eval_image_uri,
    command=["python3"],
    instance_count=1,
    instance_type=processing_instance_type,
    sagemaker_session=pipeline_session,
)

# Create a PropertyFile
# A PropertyFile is used to be able to reference outputs from a processing step, for instance to use in a condition step.
# For more information, visit https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html
evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)

eval_args = evaluate_model_processor.run(
    inputs=[
        ProcessingInput(
            source=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["val"].S3Output.S3Uri,
            destination="/opt/ml/processing/val",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluate.py",
)

step_evaluate_model = ProcessingStep(
    name="EvaluateModelPerformance",
    step_args=eval_args,
    property_files=[evaluation_report],
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


# Define a Register Model Step to Create a Model Package

In [126]:
from sagemaker.model import Model
from sagemaker.sklearn.model import SKLearnModel
from sagemaker import PipelineModel

mxnet_image_uri = sagemaker.image_uris.retrieve(
    framework="mxnet",
    region=region,
    version='1.2',
    py_version='py3',
    image_scope="inference",
    instance_type=training_instance_type,
)

mxnet_model = Model(
    image_uri=mxnet_image_uri,
    model_data=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

pipeline_model = PipelineModel(
    models=[mxnet_model],
    role=role,
    sagemaker_session=pipeline_session
)


The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


In [127]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel


evaluation_s3_uri = "{}/evaluation.json".format(
    step_evaluate_model.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=evaluation_s3_uri,
        content_type="application/json",
    )
)

register_args = pipeline_model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.m5.large"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    model_metrics=model_metrics,
    approval_status=model_approval_status,
)

step_register_pipeline_model = ModelStep(
    name="PipelineModel",
    step_args=register_args,
)


Job Name:  sagemaker-mxnet-2022-12-08-04-06-00-085
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7fe936c23130>, 'LocalPath': '/opt/ml/processing/model', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'input-2', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7fe93892a440>, 'LocalPath': '/opt/ml/processing/test', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-756059218166/sagemaker-mxnet-2022-12-08-04-06-00-085/input/code/evaluate.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'

# Define a Pipeline

In [128]:
from sagemaker.workflow.pipeline import Pipeline

# Create a Sagemaker Pipeline.
# Each parameter for the pipeline must be set as a parameter explicitly when the pipeline is created.
# Also pass in each of the steps created above.
# Note that the order of execution is determined from each step's dependencies on other steps,
# not on the order they are passed in below.
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        training_instance_type,
        processing_instance_type,
        processing_instance_count,
        input_data,
        model_approval_status,
    ],
    steps=[
        step_process,
        step_train_model,
        step_evaluate_model,
        step_register_pipeline_model
    ],
)

In [129]:
import json

definition = json.loads(pipeline.definition())
definition

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.



Job Name:  sklearn-sms-spam-classifier-process-2022-12-08-04-06-01-728
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': ParameterString(name='InputData', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://spamdetectionstack-spamdetectionbucket571a4b89-15qlkoyxizyds/sms-spam-classifier/data/raw/SMSSpamCollection'), 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-756059218166/sms-spam-classifier-pipeline/code/7587b805a495d34e231e9485881e57d0/preprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train', 'AppManaged': False, 'S3Output': {'S3Uri': Join(on='/', values=['s3:/', 'sa

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.c5.2xlarge'},
  {'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.large'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://spamdetectionstack-spamdetectionbucket571a4b89-15qlkoyxizyds/sms-spam-classifier/data/raw/SMSSpamCollection'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'Approved'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'PreprocessData',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSiz

# Submit the pipeline to SageMaker and start execution¶

In [130]:
pipeline.upsert(role_arn=role)


Job Name:  sklearn-sms-spam-classifier-process-2022-12-08-04-06-02-341
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': ParameterString(name='InputData', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://spamdetectionstack-spamdetectionbucket571a4b89-15qlkoyxizyds/sms-spam-classifier/data/raw/SMSSpamCollection'), 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-756059218166/sms-spam-classifier-pipeline/code/7587b805a495d34e231e9485881e57d0/preprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train', 'AppManaged': False, 'S3Output': {'S3Uri': Join(on='/', values=['s3:/', 'sa

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.



Job Name:  sagemaker-mxnet-2022-12-08-04-06-02-779
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7fe93a7998d0>, 'LocalPath': '/opt/ml/processing/model', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'input-2', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7fe938307340>, 'LocalPath': '/opt/ml/processing/test', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-756059218166/sms-spam-classifier-pipeline/code/7588cc9984b560f59069d4b7aa6c64db/evaluate.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3Compress

{'PipelineArn': 'sms-spam-classifier-pipeline'}

In [131]:
execution = pipeline.start()


Job Name:  sklearn-sms-spam-classifier-process-2022-12-08-04-06-03-124
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': ParameterString(name='InputData', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://spamdetectionstack-spamdetectionbucket571a4b89-15qlkoyxizyds/sms-spam-classifier/data/raw/SMSSpamCollection'), 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-756059218166/sklearn-sms-spam-classifier-process-2022-12-08-04-06-03-124/input/code/preprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train', 'AppManaged': False, 'S3Output': {'S3Uri': Join(on='/', values=['s3:/',

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.



Job Name:  sagemaker-mxnet-2022-12-08-04-06-04-123
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7fe936c23130>, 'LocalPath': '/opt/ml/processing/model', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'input-2', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7fe93892a440>, 'LocalPath': '/opt/ml/processing/test', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-756059218166/sagemaker-mxnet-2022-12-08-04-06-04-123/input/code/evaluate.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.



Job Name:  sagemaker-mxnet-2022-12-08-04-06-04-666
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7fe936c23130>, 'LocalPath': '/opt/ml/processing/model', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'input-2', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7fe93892a440>, 'LocalPath': '/opt/ml/processing/test', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-756059218166/sagemaker-mxnet-2022-12-08-04-06-04-666/input/code/evaluate.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'

In [132]:
try:
    execution.wait()
except:
    pass

execution.list_steps()


{'PipelineExecutionSteps': [{'EndTime': 1670472494.744308,
   'FailureReason': "RuntimeError: Failed to run: ['docker-compose', '-f', '/tmp/tmp34l2pdlg/docker-compose.yaml', 'up', '--build', '--abort-on-container-exit']",
   'StartTime': 1670472364.830035,
   'StepName': 'PreprocessData',
   'StepStatus': 'Failed'}]}

# Deploy Model

In [None]:
%%writefile utils.py

import argparse
import boto3
import logging
import os
from botocore.exceptions import ClientError
import tarfile
import zipfile

logger = logging.getLogger(__name__)
sm_client = boto3.client("sagemaker")


def get_approved_package(model_package_group_name):
    """Gets the latest approved model package for a model package group.

    Args:
        model_package_group_name: The model package group name.

    Returns:
        The SageMaker Model Package ARN.
    """
    try:
        # Get the latest approved model package
        response = sm_client.list_model_packages(
            ModelPackageGroupName=model_package_group_name,
            ModelApprovalStatus="Approved",
            SortBy="CreationTime",
            MaxResults=100,
        )
        approved_packages = response["ModelPackageSummaryList"]

        # Fetch more packages if none returned with continuation token
        while len(approved_packages) == 0 and "NextToken" in response:
            logger.debug("Getting more packages for token: {}".format(response["NextToken"]))
            response = sm_client.list_model_packages(
                ModelPackageGroupName=model_package_group_name,
                ModelApprovalStatus="Approved",
                SortBy="CreationTime",
                MaxResults=100,
                NextToken=response["NextToken"],
            )
            approved_packages.extend(response["ModelPackageSummaryList"])

        # Return error if no packages found
        if len(approved_packages) == 0:
            error_message = (
                f"No approved ModelPackage found for ModelPackageGroup: {model_package_group_name}"
            )
            logger.error(error_message)
            raise Exception(error_message)

        # Return the pmodel package arn
        model_package_arn = approved_packages[0]["ModelPackageArn"]
        logger.info(f"Identified the latest approved model package: {model_package_arn}")
        return approved_packages[0]
        # return model_package_arn
    except ClientError as e:
        error_message = e.response["Error"]["Message"]
        logger.error(error_message)
        raise Exception(error_message)

In [None]:
from utils import get_approved_package

sm_client = boto3.client("sagemaker")

pck = get_approved_package(model_package_group_name)
model_description = sm_client.describe_model_package(ModelPackageName=pck["ModelPackageArn"])

model_description

In [None]:
from sagemaker import ModelPackage

model_package_arn = model_description["ModelPackageArn"]
model = ModelPackage(
    role=role,
    model_package_arn=model_package_arn,
    sagemaker_session=sagemaker_session
)

endpoint_name = "sms-spam-classifier-mxnet2"

print(f"EndpointName= {endpoint_name}")
model.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.large",
    endpoint_name=endpoint_name
)

In [None]:
from sagemaker.predictor import Predictor

predictor = Predictor(endpoint_name=endpoint_name)


In [None]:
import pandas as pd

data = pd.read_csv('dataset/SMSSpamCollection')

pred_count = 10
payload = data.iloc[:pred_count].to_csv(header=False, index=False)
p = predictor.predict(payload, initial_args={"ContentType": "text/csv"})
print(p.decode("utf-8"))