# Spam Email Classifier with AWS SageMaker Pipelines

The purpose of this notebook is to create and train a model that can predict, with high accuracy, whether a given email is a spam or not (ham). This is a binary classification problem that involves Natural Language Processing (NLP).

I will be using the <a href='http://www2.aueb.gr/users/ion/data/enron-spam/'>enron spam email dataset</a>.

Contents:

* 1. [Installing and Importing Dependencies](#libraries)
* 2. [Setup SageMaker Environment](#libraries)
* 3. [SageMaker Data Pipeline](#pipeline)
    * 3.1. [Define Parameters](#pipeline_parameters)
    * 3.2  [Loading the Dataset](#pipeline_dataset)
    * 3.3. [Data Pre-Processing and Train/Test Split](#pipeline_preprocessing)
    * 3.4. [Train Model](#pipeline_train)
    * 3.5. [Create Model](#pipeline_create)
    * 3.6. [Deploy Model](#pipeline_deploy)
    * 3.7. [Register Model](#pipeline_register)
    * 3.8. [Pipeline Creation and Execution](#pipeline_execution)
* 4. [Model Predictions](#pipeline_predictions)

## <a class="anchor" id="libraries">Installing and Importing Dependencies</a>

This notebook makes use of the following dependencies:
* <a href='https://www.nltk.org/'>NLTK</a> - Natural Language Toolkit for Python
* <a href='https://scikit-learn.org/'>Scikit-learn</a> - Preprocessing and Machine Learning
* <a href='https://pandas.pydata.org/'>Pandas</a> - Data analysis and visualization
* <a href='https://numpy.org/'>Numpy</a> - Scientific computing
* <a href='https://seaborn.pydata.org/'>Seaborn</a> - Statistical data visualization

**This notebook is meant to work on an AWS SageMaker environment (JupyterLab or SageMaker Studio).**

In [2]:
# Install and download required modules for NLTK
# ONLY RUN THIS ONCE
# !conda install nltk #This installs nltk in a conda environment
# %pip install nltk #This installs nltk using pip
# %pip install pandas --upgrade
# %pip install numpy==1.19.5
# %pip install seaborn --upgrade

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
Note: you may need to restart the kernel to use updated packages.
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
Note: you may need to restart the kernel to use updated packages.
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
Note: you may need to restart the kernel to use updated packages.
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
Collecting seaborn
  Using cached seaborn-0.11.2-py3-none-any.whl (292 kB)
Installing collected packages: seaborn
  Attempting uninstall: seaborn
    Found existing installation: seaborn 0.10.0
    Uninstalling seaborn-0.10.0:
      Successfully uninstalled seaborn-0.10.0
Successfully installed seaborn-0.11.2
Note: you may need to restart the kernel to use updated packages.


In [1]:
# Load the dataset
import os
from pathlib import Path
import urllib.request
import tarfile
from sklearn.datasets import load_files
#Exploratory Data Analysis
import pandas as pd
import numpy as np
# Data Visualization
import seaborn as sns
from matplotlib import pyplot as plt
sns.color_palette("Spectral")
sns.set()
%matplotlib inline
#Feature Engineering - Text pre-processing
import string
import re
import nltk
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer
from nltk.stem import WordNetLemmatizer
#Feature Engineering - Vectorization with Bag of words + TF-IDF
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
# Model Training
from sklearn import set_config
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from sklearn.naive_bayes import MultinomialNB # Naive Bayes
from sklearn.linear_model import SGDClassifier # SVM Linear classifier with SGD
from sklearn.neighbors import KNeighborsClassifier # K-nearest neighbors
from sklearn.ensemble import RandomForestClassifier # Random Forest
from sklearn.pipeline import Pipeline
# AWS Libraries
import boto3
import sagemaker
nltk.download('stopwords') #Download the necessary datasets
nltk.download('wordnet')
nltk.download('omw-1.4')
print(f'SageMaker version: {sagemaker.__version__}')

SageMaker version: 2.70.0


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


## <a class="anchor" id="libraries">Setup SageMaker Environment</a>

### Execution role and default parameters

In [2]:
#import necessary execution role so that you can read from S3 buckets
role = sagemaker.get_execution_role()

#source default session parameters (region, default S3 bucket etc)
region = boto3.Session().region_name
sagemaker_session = sagemaker.Session()
s3_client = boto3.client('s3', region_name=region)
sagemaker_client = boto3.client("sagemaker-runtime")
default_bucket = 'enron-spam-classifier'
print(sagemaker_session.default_bucket())
prefix = 'spam-classifier-sagemaker-pipeline'

sagemaker-us-east-1-926549670619


## <a class="anchor" id="pipeline">SageMaker Data Pipeline</a>

### <a class="anchor" id="pipeline_parameters">Define Parameters</a>

In [181]:
from sagemaker.workflow.parameters import (ParameterInteger, ParameterString)

#specify location of input data
input_data_uri = f"s3://{default_bucket}/{prefix}/data/emails.csv"
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)

#specify default number of instances for processing step
processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)

#specify default instance type for processing step
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m4.xlarge"
)

#specify default instance type for training step
train_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m4.xlarge",
)

#specify default model approval mode
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="Approved"
)
print(f"done!")

done!


### <a class="anchor" id="pipeline_dataset">Loading the dataset</a>

In [182]:
# Create folder structure where code will be stored
code_folder_path = "../code"
code_folder = Path(code_folder_path)
try:
    code_folder.mkdir(parents=True, exist_ok=False)
except FileExistsError as e:
    print(f"Directory already exists: {code_folder_path}")
print(f"done!")

Directory already exists: ../code
done!


In [184]:
%%writefile ../code/dataset_loading.py
import sys
import subprocess
import os
import pandas as pd
import numpy as np
import urllib.request
import tarfile
from sklearn.datasets import load_files
from pathlib import Path
import boto3

print(f"Importing dependencies from requirements.txt...")
subprocess.check_call([
    sys.executable, "-m", "pip", "install", "sagemaker",
])

import sagemaker

print(f"Setting AWS_DEFAULT_REGION environment variable...")
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'

# Function to create an S3 Bucket
def create_bucket(bucket_name, region=None):
    """Create an S3 bucket in a specified region

    If a region is not specified, the bucket is created in the S3 default
    region (us-east-1).

    :param bucket_name: Bucket to create
    :param region: String region to create bucket in, e.g., 'us-west-2'
    :return: True if bucket created, else False
    """

    # Create bucket
    try:
        if region is None:
            s3_client = boto3.client('s3')
            s3_client.create_bucket(Bucket=bucket_name)
        else:
            s3_client = boto3.client('s3', region_name=region)
            location = {'LocationConstraint': region}
            s3_client.create_bucket(Bucket=bucket_name,
                                    CreateBucketConfiguration=location)
    except ClientError as e:
        logging.error(e)
        return False
    return True

#source default session parameters (region, default S3 bucket etc)
region = boto3.Session().region_name
sagemaker_session = sagemaker.Session()
s3_client = boto3.client('s3', region_name=region)
sagemaker_client = boto3.client("sagemaker-runtime")
default_bucket = 'enron-spam-classifier'
prefix = 'spam-classifier-sagemaker-pipeline'

# Create folder structure where the dataset will be stored
dataset_folder_path = '/opt/ml/processing/input/spam_dataset'
dataset_folder = Path(dataset_folder_path)
upload_files = False
try:
    dataset_folder.mkdir(parents=True, exist_ok=False)
    upload_files = True
except FileExistsError as e:
    print(f"Directory already exists: {dataset_folder_path}")
    
# Download the dataset from the source location
print(f"Downloading the dataset from the source location...")
if upload_files:
    for i in range(1, 7):
        urllib.request.urlretrieve("http://nlp.cs.aueb.gr/software_and_datasets/Enron-Spam/preprocessed/enron"+str(i)+".tar.gz", dataset_folder_path+"/enron"+str(i)+".tar.gz")
        print(f"Extracting enron"+str(i)+".tar.gz...")
        file = tarfile.open(str(Path(dataset_folder_path+'/enron'+str(i)+'.tar.gz').absolute()))
        file.extractall(dataset_folder_path)
        # Remove Tar file
        file_to_rem = Path(dataset_folder_path+'/enron'+str(i)+'.tar.gz')
        file_to_rem.unlink()

print(f"Loading dataset...")
X, y = [], []
for subdir in os.listdir(dataset_folder_path):
    if os.path.isdir and subdir != '.DS_Store' and subdir != '.ipynb_checkpoints':
        #and subdir  == 'enron1'
        print(f"Loading files from path {os.path.join(dataset_folder_path, subdir)}")
        email_dataset = load_files(os.path.join(dataset_folder_path, subdir))
        X = np.append(X, email_dataset.data)
        y = np.append(y, email_dataset.target)

# Create pandas dataframe for further analysis and manipulation
print("Creating pandas dataframe for further analysis and manipulation...")
emails = pd.DataFrame(columns=['label', 'label_name', 'text'])
emails['label'] = [y for y in y]
emails['label_name'] = emails.apply (lambda row: 'ham' if row.label == 0 else 'spam', axis=1)
emails['text'] = [x for x in X]

#Save dataframe as CSV
print(f"Saving pandas dataframe to local csv file...")
dataset_folder_path = '/opt/ml/processing/input/spam_dataset/pandas'
dataset_folder = Path(dataset_folder_path)
try:
    dataset_folder.mkdir(parents=True, exist_ok=False)
    upload_files = True
except FileExistsError as e:
    print(f"Directory already exists: {dataset_folder_path}")
emails.to_csv(str(Path(dataset_folder_path+'/emails.csv').absolute()),index=False)

#create bucket
print(f"Creating bucket {default_bucket}...")
create_bucket(default_bucket)

#upload the data to your default S3 bucket or another S3 bucket of your choosing
print(f"Uploading csv to S3...")
base_uri = f"s3://{default_bucket}/{prefix}/data"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=dataset_folder_path,
    desired_s3_uri=base_uri,
)
print(input_data_uri)

print(f"done!")
        


Overwriting ../code/dataset_loading.py


In [186]:
from sagemaker.sklearn.processing import SKLearnProcessor

framework_version = "0.23-1"

sklearn_processor_dataset_loading = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-text-datasetloading",
    role=role,
)
print(f"done!")

done!


### <a class="anchor" id="pipeline_preprocessing">Data Pre-Processing and Train/Test Split</a>

In [183]:
%%writefile ../code/requirements.txt
nltk
pandas == 1.3.5
numpy == 1.19.5

Overwriting ../code/requirements.txt


In [185]:
%%writefile ../code/text_preprocessing.py
import sys
import subprocess
import os
import pandas as pd
import numpy as np
from pathlib import Path

print(f"Importing dependencies from requirements.txt...")
subprocess.check_call([
    sys.executable, "-m", "pip", "install", "nltk",
])
# subprocess.check_call([
#     sys.executable, "-m", "pip", "install", "-r",
#     "/opt/ml/processing/input/code/requirements.txt",
# ])

#Feature Engineering - Text pre-processing
import string
import re
import nltk
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer
from nltk.stem import WordNetLemmatizer
#Feature Engineering - Vectorization with Bag of words + TF-IDF
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer

print(f"Downloading nltk corpora...")
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('omw-1.4')

#Text Pre-Processing Function
def email_text_preprocessing(document_text,text_normalization='none'):
    '''
    :param str document: document to be pre-processed in text format.
    :param str text_normalization: It defines the type of normalization to be applied, possible values are: 'none', 'stemming', 'lemmatization'
    1. Remove b' characters that appear at the beggining of each email
    2. Remove special characters
    3. Remove punctuation marks
    4. Remove stopwords
    5. Stem, Lemmatize text, or do nothing else.
    '''
    alt_text = re.sub(r'^b\'', '', document_text)
    alt_text = re.sub(r'\\r\\n', ' ', alt_text)
    alt_text = re.sub(r'\s+',' ',alt_text)
    no_punct = [char for char in alt_text if char not in string.punctuation]
    no_punct = ''.join(no_punct)
    no_stopwords = [word for word in no_punct.split() if word.lower() not in stopwords.words('english')]
    res_text = None
    if text_normalization == 'stemming':
        stemmer = PorterStemmer()
        stemmed = [stemmer.stem(word) for word in no_stopwords]
        stemmed = ' '.join(stemmed)
        res_text = stemmed
    elif text_normalization == 'lemmatization':
        lemmatizer = WordNetLemmatizer()
        lemmatized = [lemmatizer.lemmatize(word) for word in no_stopwords]
        lemmatized = ' '.join(lemmatized)
        res_text = lemmatized
    else:
        res_text = ' '.join(no_stopwords)

    return res_text

#Load dataframe from csv
print(f"Creating pandas dataframe from local csv file...")
dataset_folder_path = '/opt/ml/processing/input'
dataset_folder = Path(dataset_folder_path)
emails = pd.read_csv(str(Path(dataset_folder_path+'/emails.csv').absolute()))

#Create new column in data frame with pre-proccesed text (lemmatized)
print(f"Creating new column in data frame with pre-proccesed text (lemmatized)...")
emails['preprocessed_text_lemmatized'] = ''
emails.astype({'preprocessed_text_lemmatized': 'string'}).dtypes
emails['preprocessed_text_lemmatized'] = emails['text']
emails['preprocessed_text_lemmatized'] = emails['preprocessed_text_lemmatized'].apply(lambda x: email_text_preprocessing(str(x), text_normalization='lemmatization'))

#Create column in format expected by the BlazingText Algorithm
print(f"Creating column in format expected by the BlazingText Algorithm...")
emails['blazingtext_lemmatized'] = ''
emails.astype({'blazingtext_lemmatized': 'string'}).dtypes
emails['blazingtext_lemmatized'] = '__label__' + emails['label_name'].astype(str) + ' ' + emails['preprocessed_text_lemmatized']

#Create train:test split (train: 70%, validation: 25%, test: 5%)
print(f"Creating train:test split (train: 70%, validation: 25%, test: 5%)...")
train, validation, test = np.split(emails, [int(0.7 * len(emails)), int(0.85 * len(emails))])
                     
#Create Series datasets for BlazingText format
train = train['blazingtext_lemmatized']
validation = validation['blazingtext_lemmatized']
test = test['blazingtext_lemmatized']
                     
#Save datasets
print(f"Saving datasets...")
base_dir = "/opt/ml/processing"
pd.DataFrame(train).to_csv(str(Path(base_dir+'/train/train.csv').absolute()), header=False, index=False)
pd.DataFrame(validation).to_csv(str(Path(base_dir+'/validation/validation.csv').absolute()), header=False, index=False)
pd.DataFrame(test).to_csv(str(Path(base_dir+'/test/test.csv').absolute()), header=False, index=False)

print(f"Number of emails in the training dataset: {train.shape[0]}")
print(f"Number of emails in the validation set: {validation.shape[0]}")
print(f"Number of emails in the test set: {test.shape[0]}")



Overwriting ../code/text_preprocessing.py


In [187]:
from sagemaker.sklearn.processing import SKLearnProcessor

framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-text-preprocessing",
    role=role,
)
print(f"done!")

done!


#### Setup data loading and text pre-processing steps

In [188]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

default_bucket = 'enron-spam-classifier'
prefix = 'spam-classifier-sagemaker-pipeline'
base_job_name="sklearn-text-preprocessing"
output_destination_base = "s3://"+default_bucket+"/"+prefix+"/"+base_job_name+"/output/"

s3_client.upload_file(Filename='../code/dataset_loading.py', Bucket=default_bucket, Key=f'{prefix}/code/dataset_loading.py')
dataset_loading_script_uri = f's3://{default_bucket}/{prefix}/code/dataset_loading.py'

s3_client.upload_file(Filename='../code/requirements.txt', Bucket=default_bucket, Key=f'{prefix}/code/requirements.txt')
requirements_script_uri = f's3://{default_bucket}/{prefix}/code/requirements.txt'

s3_client.upload_file(Filename='../code/text_preprocessing.py', Bucket=default_bucket, Key=f'{prefix}/code/text_preprocessing.py')
preprocess_script_uri = f's3://{default_bucket}/{prefix}/code/text_preprocessing.py'

dataload_step = ProcessingStep(
    name="BlazingTextDatasetLoadingStep",
    processor=sklearn_processor_dataset_loading,
    inputs=[],
    outputs=[
        #ProcessingOutput(output_name="dataset", source="/opt/ml/processing/input/spam_dataset/pandas", destination=output_destination_base+"pandas")
    ],
    code=dataset_loading_script_uri,
)

process_step = ProcessingStep(
    name="BlazingTextPreProcessingStep",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input")
        # Send requirements.txt
#         ProcessingInput(source=requirements_script_uri, destination="/opt/ml/processing/input/code", input_name="requirements.txt")
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination=output_destination_base+"train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation", destination=output_destination_base+"validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test", destination=output_destination_base+"test"),
    ],
    code=preprocess_script_uri,
)
print(f"done!")

done!


### <a class="anchor" id="pipeline_train">Train Model</a>

In [228]:
# set up estimator:

from sagemaker.estimator import Estimator

default_bucket = 'enron-spam-classifier'
prefix = 'spam-classifier-sagemaker-pipeline'
base_job_name="sklearn-text-preprocessing"

bt_estimator = Estimator(
    role=role,
    instance_type=train_instance_type,
    instance_count=1,
    image_uri=sagemaker.image_uris.retrieve("blazingtext", region),
    output_path=f's3://{default_bucket}/{prefix}/training_jobs',
    base_job_name='bt-model-estimator',
    input_mode = 'File'
)

#for more info on hyperparameters, see: https://docs.aws.amazon.com/sagemaker/latest/dg/blazingtext.html
bt_estimator.set_hyperparameters(mode="supervised",
                                 epochs=50,
                                 learning_rate=0.02,
                                 min_count=2,
                                 early_stopping=True,
                                 patience=5,
                                 min_epochs=20,
                                 word_ngrams=4
                                )
print(f"done!")

done!


#### Setup model training step

In [229]:
# set up model training step
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

s3_data_base_uri=f"s3://{default_bucket}/{prefix}/{base_job_name}/output/"

train_step = TrainingStep(
    name='BlazingTextTrainingStep',
    estimator=bt_estimator,
    inputs={
        'train': sagemaker.inputs.TrainingInput(
            #s3_data=process_step.properties.ProcessingOutputConfig.Outputs['train'].S3Output.S3Uri,
            s3_data=str(s3_data_base_uri)+"train/train.csv",
            content_type="text/csv"
        ),
        'validation': sagemaker.inputs.TrainingInput(
            #s3_data=process_step.properties.ProcessingOutputConfig.Outputs['validation'].S3Output.S3Uri,
            s3_data=str(s3_data_base_uri)+"validation/validation.csv",
            content_type="text/csv"
        )
    }
)
print(f"done!")

done!


### <a class="anchor" id="pipeline_create">Create Model</a>

#### Setup Create Model Step

In [230]:
from sagemaker.workflow.steps import CreateModelStep

model = sagemaker.model.Model(
    name='spam-classifier-blazingtext-model',
    image_uri=train_step.properties.AlgorithmSpecification.TrainingImage,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role
)

inputs = sagemaker.inputs.CreateModelInput(
    instance_type="ml.m4.xlarge"
)

create_model_step = CreateModelStep(
    name="BlazingTextModelCreationStep",
    model=model,
    inputs=inputs
)
print(f"done!")

done!


### <a class="anchor" id="pipeline_deploy">Deploy Model</a>

In [231]:
%%writefile ../code/deploy_model.py

import time
from datetime import datetime
import boto3
import argparse


# Parse argument variables passed via the DeployModel processing step
parser = argparse.ArgumentParser()
parser.add_argument('--model-name', type=str)
parser.add_argument('--region', type=str)
parser.add_argument('--endpoint-instance-type', type=str)
parser.add_argument('--endpoint-name', type=str)
args = parser.parse_args()

region = args.region
boto3.setup_default_session(region_name=region)
sagemaker_boto_client = boto3.client('sagemaker')

# truncate name per sagameker length requirememnts (63 char max) if necessary
endpoint_config_name = f'{args.endpoint_name}-config-{datetime.now().strftime("%Y%m%d-%H%M%S")}'

# create new endpoint config file
create_ep_config_response = sagemaker_boto_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[{
        'InstanceType': args.endpoint_instance_type,
        'InitialVariantWeight': 1,
        'InitialInstanceCount': 1,
        'ModelName': args.model_name,
        'VariantName': 'AllTraffic'
        }])

print("ModelName: {}".format(args.model_name))

# create endpoint if model endpoint does not already exist, otherwise update the endpoint
try:
    create_endpoint_response = sagemaker_boto_client.create_endpoint(
        EndpointName=args.endpoint_name,
        EndpointConfigName=endpoint_config_name
    )
except:
    create_endpoint_response = sagemaker_boto_client.update_endpoint(
        EndpointName=args.endpoint_name,
        EndpointConfigName=endpoint_config_name
    )

endpoint_info = sagemaker_boto_client.describe_endpoint(EndpointName=args.endpoint_name)
endpoint_status = endpoint_info['EndpointStatus']

while endpoint_status != 'InService':
    endpoint_info = sagemaker_boto_client.describe_endpoint(EndpointName=args.endpoint_name)
    endpoint_status = endpoint_info['EndpointStatus']
    print('Endpoint status:', endpoint_status)
    if endpoint_status != 'InService':
        time.sleep(30)

Overwriting ../code/deploy_model.py


#### Setup deploy model step

In [232]:
s3_client.upload_file(Filename='../code/deploy_model.py', Bucket=default_bucket, Key=f'{prefix}/code/deploy_model.py')
deploy_model_script_uri = f's3://{default_bucket}/{prefix}/code/deploy_model.py'
pipeline_endpoint_name = 'spam-classifier-btext'

deployment_instance_type = "ml.m4.xlarge"

deploy_model_processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=role,
    instance_type='ml.m5.xlarge',
    instance_count=1,
    volume_size_in_gb=60,
    base_job_name='sklearn-deploy-model',
    sagemaker_session=sagemaker_session)

deploy_step = ProcessingStep(
    name='BlazingTextModelDeploymentStep',
    processor=deploy_model_processor,
    job_arguments=[
        "--model-name", create_model_step.properties.ModelName,
        "--region", region,
        "--endpoint-instance-type", deployment_instance_type,
        "--endpoint-name", pipeline_endpoint_name
    ],
    code=deploy_model_script_uri)
print(f"done!")

done!


### <a class="anchor" id="pipeline_register">Register Model</a>

#### Setup Register Model Step

In [233]:
from sagemaker.workflow.step_collections import RegisterModel

register_step = RegisterModel(
    name="BlazingTextModelRegistrationStep",
    estimator=bt_estimator,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=prefix,
    approval_status=model_approval_status,
)
print(f"done!")

done!


### <a class="anchor" id="pipeline_execution">Pipeline Creation and Execution</a>

In [234]:
from sagemaker.workflow.pipeline import Pipeline

#run full pipeline
steps_full = [
                dataload_step,
                process_step,
                train_step,
                create_model_step,
                deploy_step,
                register_step]

#run data loading step
steps_dataset_loading = [dataload_step]

#run data processing step
steps_preprocessing = [process_step]

#run full pipeline
steps_train_deployment = [
                train_step,
                create_model_step,
                deploy_step,
                register_step]

pipeline_name = 'BlazingTextPipeline'

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        train_instance_type,
        model_approval_status,
        input_data
    ],
    steps=steps_train_deployment, #switch to steps_preprocessing if you would like to run only the data processing step
)
print(f"done!")

done!


In [235]:
import json

definition = json.loads(pipeline.definition())
definition

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m4.xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m4.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'Approved'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://enron-spam-classifier/spam-classifier-sagemaker-pipeline/data/emails.csv'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'BlazingTextTrainingStep',
   'Type': 'Training',
   'Arguments': {'AlgorithmSpecification': {'TrainingInputMode': 'File',
     'TrainingImage': '811284229777.dkr.ecr.us-east-1.amazonaws.com/blazingtext:1'},
    'OutputDataConfig': {'S3OutputPath': 's3://enron-spam-classifier/spam-cla

In [236]:
pipeline.upsert(role_arn=role)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:926549670619:pipeline/blazingtextpipeline',
 'ResponseMetadata': {'RequestId': '13b55bad-fcbd-4c1f-914a-b4b349396bf9',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '13b55bad-fcbd-4c1f-914a-b4b349396bf9',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '87',
   'date': 'Sun, 02 Jan 2022 03:00:51 GMT'},
  'RetryAttempts': 0}}

In [237]:
execution = pipeline.start()

In [238]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:926549670619:pipeline/blazingtextpipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:926549670619:pipeline/blazingtextpipeline/execution/cuzv6t71qsvj',
 'PipelineExecutionDisplayName': 'execution-1641092456228',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'blazingtextpipeline',
  'TrialName': 'cuzv6t71qsvj'},
 'CreationTime': datetime.datetime(2022, 1, 2, 3, 0, 56, 76000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2022, 1, 2, 3, 0, 56, 76000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:926549670619:user-profile/d-aim3ywjn7ssh/samcaso',
  'UserProfileName': 'samcaso',
  'DomainId': 'd-aim3ywjn7ssh'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:926549670619:user-profile/d-aim3ywjn7ssh/samcaso',
  'UserProfileName': 'samcaso',
  'DomainId': 'd-aim3ywjn7ssh'},
 'ResponseMetadata': {'RequestId': '9e83e3f2-704f-44d5-

In [239]:
execution.wait()

In [240]:
execution.list_steps()

[{'StepName': 'BlazingTextModelDeploymentStep',
  'StartTime': datetime.datetime(2022, 1, 2, 3, 8, 37, 220000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 1, 2, 3, 16, 15, 936000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:926549670619:processing-job/pipelines-cuzv6t71qsvj-blazingtextmodeldepl-yguizugwm9'}}},
 {'StepName': 'BlazingTextModelRegistrationStep',
  'StartTime': datetime.datetime(2022, 1, 2, 3, 8, 35, 964000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 1, 2, 3, 8, 36, 777000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:926549670619:model-package/spam-classifier-sagemaker-pipeline/3'}}},
 {'StepName': 'BlazingTextModelCreationStep',
  'StartTime': datetime.datetime(2022, 1, 2, 3, 8, 35, 964000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 1, 2, 3, 8, 36, 9

## <a class="anchor" id="pipeline_predictions">Model Predictions</a>

#### Define function that removes the '__ label __\<label name\>' string required by the BlazingText algorithm to use the test dataset emails for predictions.

In [247]:
def remove_label(document_text):
    alt_text = re.sub(r'^__label__[a-z]+', '', document_text)
    return alt_text

print("done!")


done!


#### Grab sample emails from the test dataset

In [295]:
#Read test dataset from S3
response = s3_client.get_object(Bucket=default_bucket, Key="spam-classifier-sagemaker-pipeline/sklearn-text-preprocessing/output/test/test.csv")
status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

if status == 200:
    print(f"Successful S3 get_object response. Status - {status}")
    test_emails_df = pd.read_csv(response.get("Body"))
    #print(test_emails_df)
else:
    print(f"Unsuccessful S3 get_object response. Status - {status}")

print('-----')
print(test_emails_df.iloc[10][0])
print('-----')
print(test_emails_df.iloc[20][0])
print('-----')
print(test_emails_df.iloc[30][0])
#emails['blazingtext_lemmatized'] = '__label__'
test_emails = [remove_label(str(test_emails_df.iloc[10][0])),
              remove_label(str(test_emails_df.iloc[20][0])),
              remove_label(str(test_emails_df.iloc[30][0]))]

payload = {"instances" : test_emails}

Successful S3 get_object response. Status - 200
-----
__label__ham bSubject total transfer capability attached total transfer capability ttc july 4 2001 attached outage information reliable time posting attached outage information subject change without notice myrna neeley administrative assistant outage coordination transmission maintenance california independent system operator voice 916 351 2171 fax 916 351 2367 e mail mneeley caiso com ttc 07 04 01 pdf
-----
__label__spam Subject creative dc cam 3200 z 154 00 creative dc cam 3200 z 154 00 stylishly designed cool chrome silver color ultra compact body creative dc cam 3200 z power packed great feature small scale true effective 3 2 megapixels advanced ccd imaging technology 12 time maximum precision zoom dc cam 3200 z delivers crisp clear image rich vibrant color best digital photo result feature 3 2 megapixels ccd sensor 3 x optical plus 4 x digital zoom mpeg 4 video capture playback audio built speakerintelligent powerful strobe fl

#### Define function to be used to return model predicitons using the previosly generated SageMaker Endpoint

In [296]:
def get_predictions(payload, endpoint_name, client):
    response = client.invoke_endpoint(EndpointName=endpoint_name,
        Body=json.dumps(payload),
        ContentType='application/json')
    predictions = json.loads(response['Body'].read().decode('utf-8'))
    return list(zip(payload['instances'], predictions))

#### Return Predictions from the SageMaker Endpoint

In [297]:
# return predictions
pipeline_endpoint_name="spam-classifier-btext"
#source default session parameters (region, default S3 bucket etc)
# region = boto3.Session().region_name
# sagemaker_session = sagemaker.Session()
# s3_client = boto3.client('s3', region_name=region)
# sagemaker_client = boto3.client("sagemaker-runtime")
get_predictions(payload, pipeline_endpoint_name, sagemaker_client)

[(' bSubject total transfer capability attached total transfer capability ttc july 4 2001 attached outage information reliable time posting attached outage information subject change without notice myrna neeley administrative assistant outage coordination transmission maintenance california independent system operator voice 916 351 2171 fax 916 351 2367 e mail mneeley caiso com ttc 07 04 01 pdf',
  {'label': ['__label__ham'], 'prob': [0.993827760219574]}),
 (' Subject creative dc cam 3200 z 154 00 creative dc cam 3200 z 154 00 stylishly designed cool chrome silver color ultra compact body creative dc cam 3200 z power packed great feature small scale true effective 3 2 megapixels advanced ccd imaging technology 12 time maximum precision zoom dc cam 3200 z delivers crisp clear image rich vibrant color best digital photo result feature 3 2 megapixels ccd sensor 3 x optical plus 4 x digital zoom mpeg 4 video capture playback audio built speakerintelligent powerful strobe flash 16 mb integr