In [87]:
%%sh
pip -q install --upgrade stepfunctions

# Setup
Add a policy to your SageMaker role in IAM
If you are running this notebook on an Amazon SageMaker notebook instance, the IAM role assumed by your notebook instance needs permission to create and run workflows in AWS Step Functions. To provide this permission to the role, do the following.

Open the Amazon SageMaker console.
Select Notebook instances and choose the name of your notebook instance
Under Permissions and encryption select the role ARN to view the role on the IAM console
Choose Attach policies and search for AWSStepFunctionsFullAccess.
Select the check box next to AWSStepFunctionsFullAccess and choose Attach policy
If you are running this notebook in a local environment, the SDK will use your configured AWS CLI configuration. For more information, see Configuring the AWS CLI.

Next, create an execution role in IAM for Step Functions.

Create an execution role for Step Functions
You need an execution role so that you can create and execute workflows in Step Functions.

Go to the IAM console
Select Roles and then Create role.
Under Choose the service that will use this role select Step Functions
Choose Next until you can enter a Role name
Enter a name such as StepFunctionsWorkflowExecutionRole and then select Create role
Attach a policy to the role you created. The following steps attach a policy that provides full access to Step Functions, however as a good practice you should only provide access to the resources you need.

Under the Permissions tab, click Add inline policy
Enter the following in the JSON tab
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sagemaker:CreateTransformJob",
                "sagemaker:DescribeTransformJob",
                "sagemaker:StopTransformJob",
                "sagemaker:CreateTrainingJob",
                "sagemaker:DescribeTrainingJob",
                "sagemaker:StopTrainingJob",
                "sagemaker:CreateHyperParameterTuningJob",
                "sagemaker:DescribeHyperParameterTuningJob",
                "sagemaker:StopHyperParameterTuningJob",
                "sagemaker:CreateModel",
                "sagemaker:CreateEndpointConfig",
                "sagemaker:CreateEndpoint",
                "sagemaker:DeleteEndpointConfig",
                "sagemaker:DeleteEndpoint",
                "sagemaker:UpdateEndpoint",
                "sagemaker:ListTags",
                "lambda:InvokeFunction",
                "sqs:SendMessage",
                "sns:Publish",
                "ecs:RunTask",
                "ecs:StopTask",
                "ecs:DescribeTasks",
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:UpdateItem",
                "dynamodb:DeleteItem",
                "batch:SubmitJob",
                "batch:DescribeJobs",
                "batch:TerminateJob",
                "glue:StartJobRun",
                "glue:GetJobRun",
                "glue:GetJobRuns",
                "glue:BatchStopJobRun"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "iam:PassRole"
            ],
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "iam:PassedToService": "sagemaker.amazonaws.com"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "events:PutTargets",
                "events:PutRule",
                "events:DescribeRule"
            ],
            "Resource": [
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTrainingJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTransformJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTuningJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForECSTaskRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForBatchJobsRule"
            ]
        }
    ]
}
Choose Review policy and give the policy a name such as StepFunctionsWorkflowExecutionPolicy
Choose Create policy. You will be redirected to the details page for the role.
Copy the Role ARN at the top of the Summary

In [3]:
import sagemaker
sagemaker_execution_role = sagemaker.get_execution_role()
sagemaker_execution_role

'arn:aws:iam::515654810248:role/service-role/AmazonSageMaker-ExecutionRole-20210517T165208'

In [4]:

# paste the StepFunctionsWorkflowExecutionRole ARN from above
workflow_execution_role = 'arn:aws:iam::515654810248:role/StepFunctionsWorkflowExecutionRole'

In [7]:
import stepfunctions
import logging

In [9]:
session = sagemaker.Session()
stepfunctions.set_stream_logger(level=logging.INFO)
stepfunctions

<module 'stepfunctions' from '/home/ec2-user/anaconda3/envs/pytorch_p36/lib/python3.6/site-packages/stepfunctions/__init__.py'>

('us-east-1', 'sagemaker-us-east-1-515654810248')

In [17]:
import numpy as np
import pandas as pd
df = pd.read_csv('diabetes.csv')
df.head()

Unnamed: 0,preg_count,glucose_concentration,diastolic_bp,triceps_skin_fold_thickness,two_hr_serum_insulin,bmi,diabetes_pedi,age,diabetes_class
0,6,148,72,35,0,33.6,0.627,50,1
1,1,85,66,29,0,26.6,0.351,31,0
2,8,183,64,0,0,23.3,0.672,32,1
3,1,89,66,23,94,28.1,0.167,21,0
4,0,137,40,35,168,43.1,2.288,33,1


In [18]:
from sklearn.model_selection import train_test_split

In [20]:
X=df.iloc[:,:-1]
y=df.iloc[:,-1]
xTrain, xTest, yTrain, yTest = train_test_split(X, y, test_size = 0.2, random_state = 0)
trainDF=xTrain.join(yTrain)
testDF=xTest.join(yTest)
column=[ 'diabetes_class',
        'preg_count',
 'glucose_concentration',
 'diastolic_bp',
 'triceps_skin_fold_thickness',
 'two_hr_serum_insulin',
 'bmi',
 'diabetes_pedi',
 'age',
]
trainDF=trainDF[column]
testDF=testDF[column[1:]]
trainDF.to_csv('train.csv',index=False,index_label='Row',header=False,columns=column)
testDF.to_csv('test.csv',index=False,index_label='Row',header=False)

In [22]:
import re
import boto3
region = boto3.Session().region_name
bucketNM = session.default_bucket()

TrainFile = r'LogisticR/Train/Train.csv'
TestFile = r'LogisticR/Test/Test.csv'
Valfile = r'LogisticR/Val/Val.csv'
ModelFolder=r'LogisticR/model/'
s3ModelOutput= r's3://{0}/{1}'.format(bucketNM,ModelFolder)
s3Train = r's3://{0}/{1}'.format(bucketNM,TrainFile)
s3Test = r's3://{0}/{1}'.format(bucketNM,TestFile)
s3Val = r's3://{0}/{1}'.format(bucketNM,Valfile)
s3ModelOutput

's3://sagemaker-us-east-1-515654810248/LogisticR/model/'

In [23]:
with open('train.csv','rb') as f:
    boto3.Session().resource('s3').Bucket(bucketNM).Object(TrainFile).upload_fileobj(f)

with open('test.csv','rb') as f:
    boto3.Session().resource('s3').Bucket(bucketNM).Object(TestFile).upload_fileobj(f)

In [32]:
ECRdockercontainer=sagemaker.amazon.amazon_estimator.get_image_uri(session.boto_region_name,'linear-learner','latest')
LogisticModel=sagemaker.estimator.Estimator(image_uri=ECRdockercontainer,
                                            role=sagemaker_execution_role,
                                            train_instance_count=1,
                                        train_instance_type='ml.m4.xlarge',
                                        output_path=s3ModelOutput,
                                        sagemaker_session=session,
                                        base_job_name ='Logistic-Demo-v1'
                                           )
LogisticModel.set_hyperparameters(predictor_type='binary_classifier',mini_batch_size=100)



In [74]:
# step function
from stepfunctions import steps
from stepfunctions.steps import TrainingStep, ModelStep, TransformStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow
from stepfunctions.template import TrainingPipeline
from stepfunctions.template.utils import replace_parameters_with_jsonpath


In [75]:
"""
execution_input = ExecutionInput(schema={
    'JobName': str, 
    'ModelName': str,
    'EndpointName': str
})
"""

"\nexecution_input = ExecutionInput(schema={\n    'JobName': str, \n    'ModelName': str,\n    'EndpointName': str\n})\n"

In [76]:
training_step = steps.TrainingStep(
    'Train Step', 
    estimator=LogisticModel,
    data={
        'train': sagemaker.session.s3_input(s3Train, content_type='text/csv')
    },
    job_name="trainingjob"
)

In [77]:
model_step = steps.ModelStep(
    'Save model',
    model=training_step.get_expected_model(),
    model_name="logisticmodel"  
)


In [78]:
transform_step = steps.TransformStep(
    'Transform Input Dataset',
    transformer=LogisticModel.transformer(
        instance_count=1,
        instance_type='ml.m5.large'
    ),
    job_name="trainingjob",     
    model_name="logisticmodel", 
    data=s3Test,
    content_type='text/csv'
)

In [79]:
"""
endpoint_config_step = steps.EndpointConfigStep(
    "Create Endpoint Config",
    endpoint_config_name=execution_input['ModelName'],
    model_name=execution_input['ModelName'],
    initial_instance_count=1,
    instance_type='ml.m5.large'
)
"""

'\nendpoint_config_step = steps.EndpointConfigStep(\n    "Create Endpoint Config",\n    endpoint_config_name=execution_input[\'ModelName\'],\n    model_name=execution_input[\'ModelName\'],\n    initial_instance_count=1,\n    instance_type=\'ml.m5.large\'\n)\n'

In [80]:
"""
endpoint_step = steps.EndpointStep(
    "Create Endpoint",
    endpoint_name=execution_input['EndpointName'],
    endpoint_config_name=execution_input['ModelName']
)
"""

'\nendpoint_step = steps.EndpointStep(\n    "Create Endpoint",\n    endpoint_name=execution_input[\'EndpointName\'],\n    endpoint_config_name=execution_input[\'ModelName\']\n)\n'

In [81]:
workflow_definition = steps.Chain([
    training_step,
    model_step,
    transform_step
])

In [82]:
workflow_execution_role

'arn:aws:iam::515654810248:role/StepFunctionsWorkflowExecutionRole'

In [83]:
sagemaker_execution_role 

'arn:aws:iam::515654810248:role/service-role/AmazonSageMaker-ExecutionRole-20210517T165208'

In [84]:
from time import strftime, gmtime
timestamp = strftime('%d-%H-%M-%S', gmtime())

workflow = Workflow(
    name='{}-{}'.format('MyTrainTransformDeploy_v1', timestamp),
    definition=workflow_definition,
    role=workflow_execution_role
)

In [85]:
workflow.render_graph()

In [86]:
workflow.create()


[32m[INFO] Workflow created successfully on AWS Step Functions.[0m


'arn:aws:states:us-east-1:515654810248:stateMachine:MyTrainTransformDeploy_v1-23-05-27-15'