In [None]:
%env S3_DATASET_BUCKET={{YOUR_S3_BUCKET}}
%env S3_DATASET_TRAIN=knn/input/iris_train.csv
%env S3_DATASET_TEST=knn/input/iris_test.csv
%env S3_TRAIN_OUTPUT=knn/output
%env SAGEMAKER_ROLE={{YOUR_SAGEMAKER_ROLE}}

In [5]:
import os
import random
import string

import boto3
import matplotlib.pyplot as plt
import pandas as pd
import sagemaker
from IPython.display import display
from sagemaker import image_uris
from sagemaker.deserializers import JSONDeserializer
from sagemaker.estimator import Estimator, Predictor
from sagemaker.inputs import TrainingInput
from sagemaker.serializers import CSVSerializer
from sklearn.model_selection import train_test_split

In [6]:
# Define constants
CSV_PATH = './tmp/iris.csv'
S3_DATASET_BUCKET = os.getenv('S3_DATASET_BUCKET')
S3_DATASET_TRAIN = os.getenv('S3_DATASET_TRAIN')
S3_DATASET_TEST = os.getenv('S3_DATASET_TEST')
S3_TRAIN_OUTPUT = os.getenv('S3_TRAIN_OUTPUT')
SAGEMAKER_ROLE = os.getenv('SAGEMAKER_ROLE')
ESTIMATOR_INSTANCE_COUNT = 1
ESTIMATOR_INSTANCE_TYPE = 'ml.m5.large'
PREDICTOR_INSTANCE_TYPE = 'ml.t2.medium'
PREDICTOR_ENDPOINT_NAME = f'sagemaker-knn-{PREDICTOR_INSTANCE_TYPE}'.replace('.', '-')

# Define variables used over this notebook
bucket = boto3.resource('s3').Bucket(S3_DATASET_BUCKET)
train_df = None
test_df = None
train_object_path = None
test_object_path = None
knn = None
predictor = None

In [None]:
# Download a sample csv
!mkdir -p tmp
!curl -o "$(pwd)/tmp/iris.csv" -L https://raw.githubusercontent.com/aws/amazon-sagemaker-examples/master/hyperparameter_tuning/r_bring_your_own/iris.csv

In [None]:
############################################################
# Data preparation
############################################################

def load_csv(path: str) -> pd.DataFrame:
    """ Load a csv file to transform pandas DataFrame

    Args:
        path (str): Path to a csv file

    Returns:
        pd.DataFrame: DataFrame to be trained
    """
    df = pd.read_csv(path)
    # Move the last label column to the first
    # See https://docs.aws.amazon.com/sagemaker/latest/dg/cdf-training.html#cdf-csv-format
    df = df[['Species', 'Sepal.Length', 'Sepal.Width', 'Petal.Length', 'Petal.Width']]
    # Convert target string to int
    df['Species'] = df['Species'].map({'setosa': 0, 'versicolor': 1, 'virginica': 2})
    return df


def plot(df: pd.DataFrame) -> None:
    """ Plot DataFrame

    Args:
        df (pd.DataFrame): DataFrame which you want to plot
    """
    pd.plotting.scatter_matrix(df, figsize=(15, 15), c=df['Species'])
    plt.show()


def upload_csv_to_s3(df: pd.DataFrame, object_path: str) -> str:
    """ Upload a csv file to be trained by SageMaker

    Args:
        df (pd.DataFrame): DataFrame which is saved as csv format
        object_path (str): An S3 object path under your specified bucket

    Returns:
        str: An S3 object uri
    """
    filename = ''.join([random.choice(string.digits + string.ascii_lowercase) for i in range(10)])
    path = os.path.abspath(os.path.join('./tmp', filename))
    df.to_csv(path, header=False, index=False)
    # Change content-type because the default is binary/octet-stream
    bucket.upload_file(path, object_path, ExtraArgs={'ContentType': 'text/csv'})
    return f's3://{bucket.name}/{object_path}'


if __name__ == '__main__':
    # Prepare data
    df = load_csv(CSV_PATH)
    display(df)
    plot(df)
    train_df, test_df = train_test_split(df, shuffle=True, random_state=0)  # type: (pd.DataFrame, pd.DataFrame)

    train_object_path = upload_csv_to_s3(train_df, S3_DATASET_TRAIN)
    test_object_path = upload_csv_to_s3(test_df, S3_DATASET_TEST)

In [None]:
############################################################
# Model build
############################################################

def get_estimator(**hyperparams) -> Estimator:
    """ Get a SageMaker estimator

    Args:
        **hyperparams: Hyperparameters

    Returns:
        Estimator: A SageMaker estimator to which necessary arguments and hyperparameters are set
    """
    estimator = Estimator(
        image_uri=image_uris.retrieve('knn', boto3.Session().region_name),  # AWS provided container in ECR,
        role=SAGEMAKER_ROLE,
        instance_count=ESTIMATOR_INSTANCE_COUNT,
        instance_type=ESTIMATOR_INSTANCE_TYPE,
        input_mode='Pipe',
        output_path=f's3://{S3_DATASET_BUCKET}/{S3_TRAIN_OUTPUT}',
        sagemaker_session=sagemaker.Session(),
    )
    hyperparams.update({'predictor_type': 'classifier'})
    estimator.set_hyperparameters(**hyperparams)
    return estimator


def train(estimator: Estimator, train_object_path: str, test_object_path: str) -> None:
    """ Train a SageMaker estimator synchronously

    Args:
        estimator (Estimator): A SageMaker estimator to be trained
        train_object_path (str): An S3 object path used as train data
        test_object_path (str): An S3 object path used as test data
    """
    # Specify content-type because the default is application/x-recordio-protobuf
    train_input = TrainingInput(train_object_path, content_type='text/csv', input_mode='Pipe')
    test_input = TrainingInput(test_object_path, content_type='text/csv', input_mode='Pipe')
    estimator.fit({'train': train_input, 'test': test_input})


if __name__ == '__main__':
    knn = get_estimator(k=1, sample_size=1000)
    train(knn, train_object_path, test_object_path)

In [None]:
############################################################
# Model deploy
############################################################

def deploy(estimator: Estimator) -> Predictor:
    """ Deploy a SageMaker estimator and create an inference endpoint

    Args:
        estimator (Estimator): A SageMaker estimator to be deployed

    Returns:
        Predictor: A SageMaker predictor which you use for inference
    """
    return estimator.deploy(
        initial_instance_count=1,
        instance_type=PREDICTOR_INSTANCE_TYPE,
        serializer=CSVSerializer(),
        deserializer=JSONDeserializer(),
        endpoint_name=PREDICTOR_ENDPOINT_NAME
    )


def validate(predictor: Predictor, test_df: pd.DataFrame) -> pd.DataFrame:
    """ Get pandas DataFrame for validation

    This does not include scores such as accuracy, precision, etc.

    Args:
        predictor (Predictor): A SageMaker predictor
        test_df (pd.DataFrame): Test data

    Returns:
        pd.DataFrame: pandas DataFrame to be used for validation
    """
    rows = []

    for i, data in test_df.iterrows():
        predict = predictor.predict(
            pd.DataFrame([data.drop('Species')]).to_csv(header=False, index=False),
            initial_args={'ContentType': 'text/csv'}
        )
        predicted_label = predict['predictions'][0]['predicted_label']

        row = data.tolist()
        row.append(predicted_label)
        row.append(data['Species'] == predicted_label)
        rows.extend([row])

    return pd.DataFrame(rows, columns=('Species', 'Sepal.Length', 'Sepal.Width', 'Petal.Length', 'Petal.Width', 'Prediction', 'Result'))


if __name__ == '__main__':
    predictor = deploy(knn)
    predictions = validate(predictor, test_df)
    display(predictions)

In [None]:
############################################################
# Delete a model and an inference endpoint
############################################################

def delete_model(predictor: Predictor) -> None:
    """ Delete a SageMaker model

    Args:
        predictor (Predictor): A SageMaker predictor
    """
    try:
        predictor.delete_model()
        print(f'Deleted a model')
    except BaseException as e:
        print(e)


def delete_endpoint(predictor: Predictor) -> None:
    """ Delete a SageMaker endpoint including a SageMaker endpoint config

    Args:
        predictor (Predictor): A SageMaker predictor
    """
    try:
        predictor.delete_endpoint(delete_endpoint_config=True)
        print(f'Deleted {predictor.endpoint_name}')
    except BaseException as e:
        print(e)


if __name__ == '__main__':
    delete_model(predictor)
    delete_endpoint(predictor)