## Build end-to-end ML/AI workflows with the AWS Step Functions Data Science SDK
### Sample Use Case: Data preparation and XGBoost regression for sales price prediction workflow

Imagine you own an ecommerce store, and you want to create an ML pipeline that prepares our daily sales’ transactions data for training a regression model, for example to predict the sales price of some items. For this exercise I use the Online Retail dataset from the UCI, containing gift products’ selling transactions, including its price.1

You will use an Amazon SageMaker notebook with Python 3 and the AWS Step Functions Data Science SDK installed. For instructions on how to create a notebook and how to install the Data Science SDK on it you can check the documentation [here](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/readmelink.html#getting-started-with-sample-jupyter-notebooks).

*Note the AWS Step Functions Data Science SDK should be already installed if you are using an Amazon SageMaker Studio notebook with a "Python 3 Data Science" instance.*

In [1]:
#import sys
#!{sys.executable} -m pip install --upgrade stepfunctions

### Preparation
Start by loading some libraries and setting up the roles on the notebook.

In [2]:
import boto3, sagemaker, time, random, uuid, logging, stepfunctions, io, random

from sagemaker.amazon.amazon_estimator import get_image_uri
from stepfunctions import steps
from stepfunctions.steps import TrainingStep, ModelStep, TransformStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow
from stepfunctions.template import TrainingPipeline
from stepfunctions.template.utils import replace_parameters_with_jsonpath

# Retrieve the SageMaker Execution Role from the notebook
sagemaker_execution_role = sagemaker.get_execution_role()

# REPLACE with your Step Functions WorkflowExecutionRole ARN
# For instructions on how to configure permissions and getting this role check the Step Functions Data Science SDK documentation
workflow_execution_role = "arn:aws:iam::[Account ID]:role/StepFunctionsWorkflowExecutionRole" #REPLACE with your role

session = sagemaker.Session()
stepfunctions.set_stream_logger(level=logging.INFO)

bucket = session.default_bucket() #REPLACE with your S3 bucket name, or use the session default with 'session.default_bucket()'
prefix = 'ml-pipelines/sample-price-estimation'

This sample exercise uses the Online Retail dataset from the UCI, containing gift products’ selling transactions, including its price.

- *Repository: Dua, D. and Graff, C. (2019). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science.*

- *Dataset: Daqing Chen, Sai Liang Sain, and Kun Guo, Data mining for the online retail industry: A case study of RFM model-based customer segmentation using data mining, Journal of Database Marketing and Customer Strategy Management, Vol. 19, No. 3, pp. 197-208, 2012 (Published online before print: 27 August 2012. doi: 10.1057/dbm.2012.17). [https://archive.ics.uci.edu/ml/datasets/Online+Retail].*

Retrieve the sample file from the UCI - Online Retail's dataset site:

In [3]:
from urllib.request import urlretrieve 
urlretrieve("https://archive.ics.uci.edu/ml/machine-learning-databases/00352/Online%20Retail.xlsx", "Online%20Retail.xlsx")

('Online%20Retail.xlsx', <http.client.HTTPMessage at 0x7f4dd19a12b0>)

The original dataset file is in 'xlsx' format. Convert this to 'csv' format, considering the datetime attribute.

In [4]:
import xlrd
import csv
from datetime import datetime

def csv_from_excel():
    wb = xlrd.open_workbook('Online%20Retail.xlsx', on_demand=True)
    sh = wb.sheet_by_name('Online Retail')
    your_csv_file = open('FILE_DATA', 'w')
    wr = csv.writer(your_csv_file, quoting=csv.QUOTE_MINIMAL)

    for rownum in range(sh.nrows):
        date = sh.row_values(rownum)[4]
        if isinstance(date, float) or isinstance(date, int):
            year, month, day, hour, minute, sec = xlrd.xldate_as_tuple(date, wb.datemode)
            py_date = "%02d/%02d/%02d %02d:%02d" % (month, day, year, hour, minute)
            wr.writerow(sh.row_values(rownum)[0:4] + [py_date] + sh.row_values(rownum)[5:8])
        else:
            wr.writerow(sh.row_values(rownum))

    your_csv_file.close()

FILE_DATA = 'retail.csv'
    
csv_from_excel()

Have a look at the resulting 'csv' dataset file...

In [5]:
import pandas
import os

# Also upload to our S3 bucket for preparing for the upcoming steps...
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'FILE_DATA')).upload_file('FILE_DATA')

df = pandas.read_csv('FILE_DATA')
df.head(5)

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365.0,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6.0,12/01/2010 08:26,2.55,17850.0,United Kingdom
1,536365.0,71053.0,WHITE METAL LANTERN,6.0,12/01/2010 08:26,3.39,17850.0,United Kingdom
2,536365.0,84406B,CREAM CUPID HEARTS COAT HANGER,8.0,12/01/2010 08:26,2.75,17850.0,United Kingdom
3,536365.0,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6.0,12/01/2010 08:26,3.39,17850.0,United Kingdom
4,536365.0,84029E,RED WOOLLY HOTTIE WHITE HEART.,6.0,12/01/2010 08:26,3.39,17850.0,United Kingdom


### Data transformation

Now you will create a first task for performing a simple data transformation. Note that this step could also be calling in example an AWS Glue ETL job, or similar. In this case, you will first explore the data preparation in the notebook and later prepare it for its automation via an AWS Lambda function created from the same notebook.

For the purpose of this sample exercise, drop some fields not needed for performing the dummy 'UnitPrice' regression predicting the items price. Also encode the categorical variable 'StockCode' to have numerical values.

In [6]:
df['StockCode'] = df['StockCode'].astype('category')
df['StockCodeEnc'] = df['StockCode'].cat.codes

df = df.drop(['InvoiceNo', 'Description', 'InvoiceDate', 'Country', 'StockCode'], axis=1)
df = df[['UnitPrice', 'StockCodeEnc', 'Quantity', 'CustomerID']]

df.head(5)

Unnamed: 0,UnitPrice,StockCodeEnc,Quantity,CustomerID
0,2.55,3536,6.0,17850.0
1,3.39,2794,6.0,17850.0
2,2.75,3044,8.0,17850.0
3,3.39,2985,6.0,17850.0
4,3.39,2984,6.0,17850.0


Now split the data into training, validation, and testing datasets for performing the SageMaker training job, and upload the files to our Amazon S3 bucket.

In [7]:
import numpy as np
import os

train_data, validation_data, test_data = np.split(df.sample(frac=1, random_state=1729), [int(0.7 * len(df)), int(0.9 * len(df))])
train_data.to_csv('train.csv', header=False, index=False)
validation_data.to_csv('validation.csv', header=False, index=False)
test_data.to_csv('test_real.csv', header=False, index=False)
test_data.drop(['UnitPrice'], axis=1).to_csv('test.csv', header=False, index=False)

boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train/train.csv')).upload_file('train.csv')
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'validation/validation.csv')).upload_file('validation.csv')
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'test/test.csv')).upload_file('test.csv')

#### Automating data preparation with AWS Lambda

Prepare a script for running this same data transformation, but now from an AWS Lambda function... we want to automate, automate, and automate!

In [None]:
# Create data transformation lambda:

# First download pandas and numpy for using in our AWS Lambda package, as these do not come in AWS Lambda's base...
!mkdir lambda
urlretrieve("https://files.pythonhosted.org/packages/7b/fd/41698f20fd297cef2dc43a72a8ca42d149eaf7d954f1fb2bd3fc366a658d/pandas-0.25.3-cp38-cp38-manylinux1_x86_64.whl", "lambda/pandas-0.25.3-cp38-cp38-manylinux1_x86_64.whl")
urlretrieve("https://files.pythonhosted.org/packages/d7/6a/3fed132c846d1e47963f30376cc041e9dd586d286d931055ad06ff65c6c7/numpy-1.17.4-cp38-cp38-manylinux1_x86_64.whl", "lambda/numpy-1.17.4-cp38-cp38-manylinux1_x86_64.whl")
!unzip -o lambda/pandas-0.25.3-cp38-cp38-manylinux1_x86_64.whl -d lambda
!unzip -o lambda/numpy-1.17.4-cp38-cp38-manylinux1_x86_64.whl -d lambda

# then install the pytz dependency locally...
!pip install -t lambda pytz

# and remove the files no longer needed...
!rm -rf lambda/*.whl lambda/*.dist-info lambda/__pycache__

# prepare the lambda function code...
file_name = 'lambda/lambda_function.py'
def MakeFile(file_name):
    with open(file_name, 'w') as f:
        f.write('''\
import json
import boto3
import pandas
import numpy as np
import os
bucket = 'session.default_bucket()' #Replace with your S3 bucket name, or the session default with 'session.default_bucket()'
prefix = 'ml-pipelines/sample-price-estimation'
filename = 'retail.csv'
def lambda_handler(event, context):
    s3 = boto3.resource('s3')
    s3.Bucket(bucket).download_file(prefix + "/" + filename, '/tmp/retail.csv')
    df = pandas.read_csv("/tmp/retail.csv")
    df['StockCode'] = df['StockCode'].astype('category')
    df['StockCodeEnc'] = df['StockCode'].cat.codes
    df = df.drop(['InvoiceNo', 'Description', 'InvoiceDate', 'Country', 'StockCode'], axis=1)
    df = df[['UnitPrice', 'StockCodeEnc', 'Quantity', 'CustomerID']]
    train_data, validation_data, test_data = np.split(df.sample(frac=1, random_state=1729), [int(0.7 * len(df)), int(0.9 * len(df))])
    train_data.to_csv('/tmp/train.csv', header=False, index=False)
    validation_data.to_csv('/tmp/validation.csv', header=False, index=False)
    test_data.to_csv('/tmp/test_real.csv', header=False, index=False)
    test_data.drop(['UnitPrice'], axis=1).to_csv('/tmp/test.csv', header=False, index=False)
    s3.Bucket(bucket).Object(os.path.join(prefix, 'train/train.csv')).upload_file('/tmp/train.csv')
    s3.Bucket(bucket).Object(os.path.join(prefix, 'validation/validation.csv')).upload_file('/tmp/validation.csv')
    s3.Bucket(bucket).Object(os.path.join(prefix, 'test/test.csv')).upload_file('/tmp/test.csv')
    return {
        'statusCode': 200,
        'body': ('Date transformation complete for retail.csv - ' + bucket + '/' + prefix + '/' + filename)
    }
        ''')
MakeFile(file_name)

# finally, create the runtime file with Pandas and Numpy...
import zipfile
def zipFilesInDir(dirName, zipFileName):
   # create the zip file...
   with zipfile.ZipFile(zipFileName, 'w', zipfile.ZIP_DEFLATED) as zipObj:
       # iterate over all the files in the directory
       for folderName, subfolders, filenames in os.walk(dirName):
           for filename in filenames:
               # create complete filepath of file in directory
               filePath = os.path.join(folderName, filename)
               # add file to zip
               zipObj.write(filePath)
zipFilesInDir('lambda', 'lambda_function.zip')
!rm -fr lambda

try:
    f = open("lambda_function.zip")
    print("Lambda file created: lambda_function.zip")
except IOError:
    print("Error - Lambda file not created")
finally:
    f.close()

#REPLACE with your proper Lambda role
!aws lambda create-function --function-name 'ml-pipelines-data-transformation-lambda' \
    --runtime python3.8 --role 'arn:aws:iam::[Account ID]:role/LambdaDynamo' \
    --handler lambda_function.lambda_handler \
    --zip-file 'fileb://lambda_function.zip' \
    --description 'Sample ML pipeline data transformation lambda'  \
    --timeout 600  \
    --memory-size 256  \
    --publish

### Building our ML pipeline with the AWS Step Functions Data Science SDK

You are now ready for creating your actual ML pipeline steps. You will start by preparing the training job for Amazon SageMaker, and the data transformation function in AWS Lambda that you created before.

In [11]:
xgb = sagemaker.estimator.Estimator(
    get_image_uri(boto3.Session().region_name, 'xgboost'),
    sagemaker_execution_role, 
    train_instance_count = 1, 
    train_instance_type = 'ml.m5.large',
    train_volume_size = 5,
    output_path = 's3://{}/{}/output'.format(bucket, prefix),
    sagemaker_session = session
)

xgb.set_hyperparameters(
    objective = 'reg:linear',
    num_round = 50,
    max_depth = 5,
    eta = 0.2,
    gamme = 4,
    min_child_weight = 6,
    subsample = 0.7,
    silent = 0
)

In [12]:
# SageMaker expects unique names for jobs/models/endpoints. Pass these for each execution via placeholders:
execution_input = ExecutionInput(schema={
    'JobName': str, 
    'ModelName': str
})

In [13]:
preparation_step = steps.LambdaStep(
    'Preparing data (Lambda)',
    parameters={  
        "FunctionName": "ml-pipelines-data-transformation-lambda",
        "Payload": {  
           "JobName": execution_input['JobName']
        }
    }
)

preparation_step.add_retry(steps.Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=15,
    max_attempts=2,
    backoff_rate=4.0
))

In [14]:
training_step = steps.TrainingStep(
    'Training (SageMaker)', 
    estimator=xgb,
    data={
        'train': sagemaker.s3_input(s3_data='s3://{}/{}/train'.format(bucket, prefix), content_type='csv'),
        'validation': sagemaker.s3_input(s3_data='s3://{}/{}/validation'.format(bucket, prefix), content_type='csv')
    },
    job_name=execution_input['JobName']  
)

In [15]:
model_step = steps.ModelStep(
    'Save model (SageMaker)',
    model=training_step.get_expected_model(),
    model_name=execution_input['ModelName']  
)

You will now add another AWS Lambda function for validating the accuracy of your trained model, once the training job completes. In this case, use the metric for the Root Mean Squared Error (RMSE) provided by the Amazon SageMaker XGBoost model by default.

In [None]:
# Create validation lambda:
file_name = 'lambda_function.py'
def MakeFile(file_name):
    with open(file_name, 'w') as f:
        f.write('''\
import json
import boto3
def lambda_handler(event, context):
    sm = boto3.client('sagemaker')
    rmse = sm.describe_training_job(TrainingJobName=event['JobName'])['FinalMetricDataList'][0]['Value']
    print(rmse)
    return {
        'statusCode': 200,
        'rmse': json.dumps(rmse)
    }
        ''')
MakeFile(file_name)

# create the zip file...
with zipfile.ZipFile('lambda_function.zip', 'w', zipfile.ZIP_DEFLATED) as zipObj:
    zipObj.write('lambda_function.py')
!rm -f lambda_function.py

try:
    f = open("lambda_function.zip")
    print("Lambda file created: lambda_function.zip")
except IOError:
    print("Error - Lambda file not created")
finally:
    f.close()

#REPLACE with your proper Lambda role
!aws lambda create-function --function-name 'ml-pipelines-validation-lambda' \
    --runtime python3.8 --role 'arn:aws:iam::[Account ID]:role/LambdaDynamo' \
    --handler lambda_function.lambda_handler \
    --zip-file 'fileb://lambda_function.zip' \
    --description 'Sample ML pipeline validation metric lambda'  \
    --timeout 60  \
    --memory-size 128  \
    --publish

In [19]:
validation_lambda_step = steps.LambdaStep(
    'Validating RMSE (Lambda)',
    parameters={  
        "FunctionName": "ml-pipelines-validation-lambda",
        "Payload": {  
           "JobName": execution_input['JobName']
        }
    }
)

validation_lambda_step.add_retry(steps.Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=15,
    max_attempts=2,
    backoff_rate=4.0
))

For illustrating a typical decision logic that is common in ML workflows, you might want to automate the process of deploying new models to production when new data is available and re-trainings are performed. For example, by comparing the new models’ performance versus a given threshold.

In this example, you decide upon the RMSE whether to continue the ML pipeline for running inferences for the new model in production, or otherwise stop the pipeline if the new model is not meeting the accuracy threshold (set at 50% just for illustrating the concept). This is a mechanism that looks to automate the re-training with new data. For example, you could trigger this ML pipeline everyday with the new data provided from your recent customer's transactions.

In [20]:
transform_step = steps.TransformStep(
    'Batch inference (SageMaker)',
    transformer=xgb.transformer(
        instance_count=1,
        instance_type='ml.m5.large'
    ),
    job_name=execution_input['JobName'],     
    model_name=execution_input['ModelName'], 
    data='s3://{}/{}/test'.format(bucket, prefix),
    content_type='text/csv'
)

In [21]:
worse_step = steps.Pass(
    'Worse model',
    parameters={
        "Error": ("The new model is not accurate enough. RMSE:" + str(validation_lambda_step.output()["Payload"]["rmse"]))
    }
)

In [22]:
choice_state = steps.Choice(
    state_id='RMSE >=50% ?' #REPLACE with your desired threshold for RMSE
)

In [23]:
#REPLACE the rmse values with your desired threshold
choice_state.add_choice(
    rule=steps.ChoiceRule.StringGreaterThanEquals(variable=validation_lambda_step.output()["Payload"]["rmse"], value="50"),
    next_step=transform_step
)
choice_state.add_choice(
    rule=steps.ChoiceRule.StringLessThan(variable=validation_lambda_step.output()["Payload"]["rmse"], value="50"),
    next_step=worse_step
)

Assuming the RMSE is good enough, you now perform an Amazon SageMaker Batch Transformation for running inferences on all of your testing dataset.

*Note in this case you are not creating and 'Endpoint Configuration' and an 'Endpoint', but shall you need it in your use case to respond to real-time inferences, you can then follow the steps in the AWS Step Functions Data Science SDK examples or uncomment the following lines:*

In [24]:
#endpoint_config_step = steps.EndpointConfigStep(
#    "Create Endpoint Config",
#    endpoint_config_name=execution_input['ModelName'],
#    model_name=execution_input['ModelName'],
#    initial_instance_count=1,
#    instance_type='ml.m5.large'
#)

#endpoint_step = steps.EndpointStep(
#    "Create Endpoint",
#    endpoint_name=execution_input['EndpointName'],
#    endpoint_config_name=execution_input['ModelName']
#)

You are now ready for chaining the steps of your ML pipeline with the AWS Step Functions Data Science SDK, and set up the workflow with the create command.

In [33]:
workflow_definition = steps.Chain([
    preparation_step,
    training_step,
    model_step,
    validation_lambda_step,
    choice_state
])

In [34]:
workflow = Workflow(
    name='ml-pipelines-sample-price-estimation_v1',
    definition=workflow_definition,
    role=workflow_execution_role,
    execution_input=execution_input
)

In [35]:
workflow.render_graph(portrait=False)

Keep in mind that when you use the SDK in Jupyter notebooks, you define the workflows locally in the notebook instance but **they do not actually exist on AWS Step Functions until the “create” command is called**. Similarly, **these are not executed until the “execute” command is called**, after which you can track its progress on the notebook.

In [None]:
workflow.create()

Now you are ready for testing this pipeline by calling the execute command, and debugging its execution logs if necessary.

In [37]:
execution = workflow.execute(
    inputs={
        'JobName': 'regression-{}'.format(uuid.uuid1().hex), # Each Sagemaker Job requires a unique name
        'ModelName': 'regression-{}'.format(uuid.uuid1().hex), # Each Model requires a unique name
    }
)

[32m[INFO] Workflow execution started successfully on AWS Step Functions.[0m


In [54]:
execution.render_progress()

*Note you can re-run the previous cell until verifying the execution is completed. It is also possible to verify the execution details directly on the AWS Step Functions console.*

You can also list the events with its details.

In [55]:
execution.list_events(html=True)

ID,Type,Step,Resource,Elapsed Time (ms),Timestamp
1,ExecutionStarted,,-,0.0,"Jan 23, 2020 03:59:54.548 PM"
"{  ""input"": {  ""JobName"": ""regression-655d56783df911ea9b0d0df2a83ecc9a"",  ""ModelName"": ""regression-655d57e03df911ea9b0d0df2a83ecc9a""  },  ""roleArn"": ""arn:aws:iam::889960878219:role/StepFunctionsWorkflowExecutionRole"" }","{  ""input"": {  ""JobName"": ""regression-655d56783df911ea9b0d0df2a83ecc9a"",  ""ModelName"": ""regression-655d57e03df911ea9b0d0df2a83ecc9a""  },  ""roleArn"": ""arn:aws:iam::889960878219:role/StepFunctionsWorkflowExecutionRole"" }","{  ""input"": {  ""JobName"": ""regression-655d56783df911ea9b0d0df2a83ecc9a"",  ""ModelName"": ""regression-655d57e03df911ea9b0d0df2a83ecc9a""  },  ""roleArn"": ""arn:aws:iam::889960878219:role/StepFunctionsWorkflowExecutionRole"" }","{  ""input"": {  ""JobName"": ""regression-655d56783df911ea9b0d0df2a83ecc9a"",  ""ModelName"": ""regression-655d57e03df911ea9b0d0df2a83ecc9a""  },  ""roleArn"": ""arn:aws:iam::889960878219:role/StepFunctionsWorkflowExecutionRole"" }","{  ""input"": {  ""JobName"": ""regression-655d56783df911ea9b0d0df2a83ecc9a"",  ""ModelName"": ""regression-655d57e03df911ea9b0d0df2a83ecc9a""  },  ""roleArn"": ""arn:aws:iam::889960878219:role/StepFunctionsWorkflowExecutionRole"" }","{  ""input"": {  ""JobName"": ""regression-655d56783df911ea9b0d0df2a83ecc9a"",  ""ModelName"": ""regression-655d57e03df911ea9b0d0df2a83ecc9a""  },  ""roleArn"": ""arn:aws:iam::889960878219:role/StepFunctionsWorkflowExecutionRole"" }"
2,TaskStateEntered,Preparing data (Lambda),-,49.0,"Jan 23, 2020 03:59:54.597 PM"
"{  ""name"": ""Preparing data (Lambda)"",  ""input"": {  ""JobName"": ""regression-655d56783df911ea9b0d0df2a83ecc9a"",  ""ModelName"": ""regression-655d57e03df911ea9b0d0df2a83ecc9a""  } }","{  ""name"": ""Preparing data (Lambda)"",  ""input"": {  ""JobName"": ""regression-655d56783df911ea9b0d0df2a83ecc9a"",  ""ModelName"": ""regression-655d57e03df911ea9b0d0df2a83ecc9a""  } }","{  ""name"": ""Preparing data (Lambda)"",  ""input"": {  ""JobName"": ""regression-655d56783df911ea9b0d0df2a83ecc9a"",  ""ModelName"": ""regression-655d57e03df911ea9b0d0df2a83ecc9a""  } }","{  ""name"": ""Preparing data (Lambda)"",  ""input"": {  ""JobName"": ""regression-655d56783df911ea9b0d0df2a83ecc9a"",  ""ModelName"": ""regression-655d57e03df911ea9b0d0df2a83ecc9a""  } }","{  ""name"": ""Preparing data (Lambda)"",  ""input"": {  ""JobName"": ""regression-655d56783df911ea9b0d0df2a83ecc9a"",  ""ModelName"": ""regression-655d57e03df911ea9b0d0df2a83ecc9a""  } }","{  ""name"": ""Preparing data (Lambda)"",  ""input"": {  ""JobName"": ""regression-655d56783df911ea9b0d0df2a83ecc9a"",  ""ModelName"": ""regression-655d57e03df911ea9b0d0df2a83ecc9a""  } }"
3,TaskScheduled,Preparing data (Lambda),Step Functions execution,49.0,"Jan 23, 2020 03:59:54.597 PM"
"{  ""resourceType"": ""lambda"",  ""resource"": ""invoke"",  ""region"": ""eu-west-1"",  ""parameters"": {  ""FunctionName"": ""ml-pipelines-data-transformation-lambda"",  ""Payload"": {  ""JobName"": ""regression-655d56783df911ea9b0d0df2a83ecc9a""  }  } }","{  ""resourceType"": ""lambda"",  ""resource"": ""invoke"",  ""region"": ""eu-west-1"",  ""parameters"": {  ""FunctionName"": ""ml-pipelines-data-transformation-lambda"",  ""Payload"": {  ""JobName"": ""regression-655d56783df911ea9b0d0df2a83ecc9a""  }  } }","{  ""resourceType"": ""lambda"",  ""resource"": ""invoke"",  ""region"": ""eu-west-1"",  ""parameters"": {  ""FunctionName"": ""ml-pipelines-data-transformation-lambda"",  ""Payload"": {  ""JobName"": ""regression-655d56783df911ea9b0d0df2a83ecc9a""  }  } }","{  ""resourceType"": ""lambda"",  ""resource"": ""invoke"",  ""region"": ""eu-west-1"",  ""parameters"": {  ""FunctionName"": ""ml-pipelines-data-transformation-lambda"",  ""Payload"": {  ""JobName"": ""regression-655d56783df911ea9b0d0df2a83ecc9a""  }  } }","{  ""resourceType"": ""lambda"",  ""resource"": ""invoke"",  ""region"": ""eu-west-1"",  ""parameters"": {  ""FunctionName"": ""ml-pipelines-data-transformation-lambda"",  ""Payload"": {  ""JobName"": ""regression-655d56783df911ea9b0d0df2a83ecc9a""  }  } }","{  ""resourceType"": ""lambda"",  ""resource"": ""invoke"",  ""region"": ""eu-west-1"",  ""parameters"": {  ""FunctionName"": ""ml-pipelines-data-transformation-lambda"",  ""Payload"": {  ""JobName"": ""regression-655d56783df911ea9b0d0df2a83ecc9a""  }  } }"
4,TaskStarted,Preparing data (Lambda),Step Functions execution,122.0,"Jan 23, 2020 03:59:54.670 PM"
"{  ""resourceType"": ""lambda"",  ""resource"": ""invoke"" }","{  ""resourceType"": ""lambda"",  ""resource"": ""invoke"" }","{  ""resourceType"": ""lambda"",  ""resource"": ""invoke"" }","{  ""resourceType"": ""lambda"",  ""resource"": ""invoke"" }","{  ""resourceType"": ""lambda"",  ""resource"": ""invoke"" }","{  ""resourceType"": ""lambda"",  ""resource"": ""invoke"" }"
5,TaskSucceeded,Preparing data (Lambda),Step Functions execution,556.0,"Jan 23, 2020 04:00:26.104 PM"
"{  ""resourceType"": ""lambda"",  ""resource"": ""invoke"",  ""output"": {  ""ExecutedVersion"": ""$LATEST"",  ""Payload"": {  ""statusCode"": 200,  ""body"": ""Date transformation complete for retail.csv - rodzanto2019ml/ml-pipelines/sample-price-estimation/retail.csv""  },  ""SdkHttpMetadata"": {  ""HttpHeaders"": {  ""Connection"": ""keep-alive"",  ""Content-Length"": ""139"",  ""Content-Type"": ""application/json"",  ""Date"": ""Thu, 23 Jan 2020 16:00:26 GMT"",  ""X-Amz-Executed-Version"": ""$LATEST"",  ""x-amzn-Remapped-Content-Length"": ""0"",  ""x-amzn-RequestId"": ""22328216-b5fe-47b9-9382-3b518ad7dd59"",  ""X-Amzn-Trace-Id"": ""root=1-5e29c2fa-7a42e8748895a3d0751dc1fd;sampled=0""  },  ""HttpStatusCode"": 200  },  ""SdkResponseMetadata"": {  ""RequestId"": ""22328216-b5fe-47b9-9382-3b518ad7dd59""  },  ""StatusCode"": 200  } }","{  ""resourceType"": ""lambda"",  ""resource"": ""invoke"",  ""output"": {  ""ExecutedVersion"": ""$LATEST"",  ""Payload"": {  ""statusCode"": 200,  ""body"": ""Date transformation complete for retail.csv - rodzanto2019ml/ml-pipelines/sample-price-estimation/retail.csv""  },  ""SdkHttpMetadata"": {  ""HttpHeaders"": {  ""Connection"": ""keep-alive"",  ""Content-Length"": ""139"",  ""Content-Type"": ""application/json"",  ""Date"": ""Thu, 23 Jan 2020 16:00:26 GMT"",  ""X-Amz-Executed-Version"": ""$LATEST"",  ""x-amzn-Remapped-Content-Length"": ""0"",  ""x-amzn-RequestId"": ""22328216-b5fe-47b9-9382-3b518ad7dd59"",  ""X-Amzn-Trace-Id"": ""root=1-5e29c2fa-7a42e8748895a3d0751dc1fd;sampled=0""  },  ""HttpStatusCode"": 200  },  ""SdkResponseMetadata"": {  ""RequestId"": ""22328216-b5fe-47b9-9382-3b518ad7dd59""  },  ""StatusCode"": 200  } }","{  ""resourceType"": ""lambda"",  ""resource"": ""invoke"",  ""output"": {  ""ExecutedVersion"": ""$LATEST"",  ""Payload"": {  ""statusCode"": 200,  ""body"": ""Date transformation complete for retail.csv - rodzanto2019ml/ml-pipelines/sample-price-estimation/retail.csv""  },  ""SdkHttpMetadata"": {  ""HttpHeaders"": {  ""Connection"": ""keep-alive"",  ""Content-Length"": ""139"",  ""Content-Type"": ""application/json"",  ""Date"": ""Thu, 23 Jan 2020 16:00:26 GMT"",  ""X-Amz-Executed-Version"": ""$LATEST"",  ""x-amzn-Remapped-Content-Length"": ""0"",  ""x-amzn-RequestId"": ""22328216-b5fe-47b9-9382-3b518ad7dd59"",  ""X-Amzn-Trace-Id"": ""root=1-5e29c2fa-7a42e8748895a3d0751dc1fd;sampled=0""  },  ""HttpStatusCode"": 200  },  ""SdkResponseMetadata"": {  ""RequestId"": ""22328216-b5fe-47b9-9382-3b518ad7dd59""  },  ""StatusCode"": 200  } }","{  ""resourceType"": ""lambda"",  ""resource"": ""invoke"",  ""output"": {  ""ExecutedVersion"": ""$LATEST"",  ""Payload"": {  ""statusCode"": 200,  ""body"": ""Date transformation complete for retail.csv - rodzanto2019ml/ml-pipelines/sample-price-estimation/retail.csv""  },  ""SdkHttpMetadata"": {  ""HttpHeaders"": {  ""Connection"": ""keep-alive"",  ""Content-Length"": ""139"",  ""Content-Type"": ""application/json"",  ""Date"": ""Thu, 23 Jan 2020 16:00:26 GMT"",  ""X-Amz-Executed-Version"": ""$LATEST"",  ""x-amzn-Remapped-Content-Length"": ""0"",  ""x-amzn-RequestId"": ""22328216-b5fe-47b9-9382-3b518ad7dd59"",  ""X-Amzn-Trace-Id"": ""root=1-5e29c2fa-7a42e8748895a3d0751dc1fd;sampled=0""  },  ""HttpStatusCode"": 200  },  ""SdkResponseMetadata"": {  ""RequestId"": ""22328216-b5fe-47b9-9382-3b518ad7dd59""  },  ""StatusCode"": 200  } }","{  ""resourceType"": ""lambda"",  ""resource"": ""invoke"",  ""output"": {  ""ExecutedVersion"": ""$LATEST"",  ""Payload"": {  ""statusCode"": 200,  ""body"": ""Date transformation complete for retail.csv - rodzanto2019ml/ml-pipelines/sample-price-estimation/retail.csv""  },  ""SdkHttpMetadata"": {  ""HttpHeaders"": {  ""Connection"": ""keep-alive"",  ""Content-Length"": ""139"",  ""Content-Type"": ""application/json"",  ""Date"": ""Thu, 23 Jan 2020 16:00:26 GMT"",  ""X-Amz-Executed-Version"": ""$LATEST"",  ""x-amzn-Remapped-Content-Length"": ""0"",  ""x-amzn-RequestId"": ""22328216-b5fe-47b9-9382-3b518ad7dd59"",  ""X-Amzn-Trace-Id"": ""root=1-5e29c2fa-7a42e8748895a3d0751dc1fd;sampled=0""  },  ""HttpStatusCode"": 200  },  ""SdkResponseMetadata"": {  ""RequestId"": ""22328216-b5fe-47b9-9382-3b518ad7dd59""  },  ""StatusCode"": 200  } }","{  ""resourceType"": ""lambda"",  ""resource"": ""invoke"",  ""output"": {  ""ExecutedVersion"": ""$LATEST"",  ""Payload"": {  ""statusCode"": 200,  ""body"": ""Date transformation complete for retail.csv - rodzanto2019ml/ml-pipelines/sample-price-estimation/retail.csv""  },  ""SdkHttpMetadata"": {  ""HttpHeaders"": {  ""Connection"": ""keep-alive"",  ""Content-Length"": ""139"",  ""Content-Type"": ""application/json"",  ""Date"": ""Thu, 23 Jan 2020 16:00:26 GMT"",  ""X-Amz-Executed-Version"": ""$LATEST"",  ""x-amzn-Remapped-Content-Length"": ""0"",  ""x-amzn-RequestId"": ""22328216-b5fe-47b9-9382-3b518ad7dd59"",  ""X-Amzn-Trace-Id"": ""root=1-5e29c2fa-7a42e8748895a3d0751dc1fd;sampled=0""  },  ""HttpStatusCode"": 200  },  ""SdkResponseMetadata"": {  ""RequestId"": ""22328216-b5fe-47b9-9382-3b518ad7dd59""  },  ""StatusCode"": 200  } }"


In [56]:
workflow.list_executions(html=True)

Name,Status,Started,End Time
42169118-aa5d-49a4-a7ae-4caafaf38f04,SUCCEEDED,"Jan 23, 2020 03:59:54.548 PM","Jan 23, 2020 04:06:24.138 PM"


In [57]:
workflow.list_workflows(html=True)

Name,Creation Date
WorkshopMgmtManageAccounts,"Oct 14, 2019 02:05:27.801 PM"
ml-pipelines-sample-price-estimation_v1,"Jan 23, 2020 03:59:52.115 PM"
training-pipeline-2019-11-15-13-57-27,"Nov 15, 2019 01:58:50.548 PM"


You can even export the AWS CloudFormation template for the pipeline you have just built, in order to deploy it later on as infrastructure as code if required.

In [None]:
print(Workflow.get_cloudformation_template(workflow))