In [121]:
'''
import sys
!{sys.executable} -m pip install --upgrade stepfunctions
'''

'\nimport sys\n!{sys.executable} -m pip install --upgrade stepfunctions\n'

In [122]:
# import libraries
import boto3
from botocore.client import Config
import logging
import sagemaker
import stepfunctions

from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.estimator import Estimator
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.s3 import S3Uploader
from sagemaker.transformer import Transformer

from stepfunctions.inputs import ExecutionInput
from stepfunctions import steps
from stepfunctions.steps import Parallel
from stepfunctions.steps.sagemaker import TrainingStep, ModelStep, TransformStep
from stepfunctions.steps.compute import LambdaStep
from stepfunctions.workflow import Workflow

import os
import zipfile

In [123]:
# define boto3 clients
s3_client = boto3.client('s3', config=Config(signature_version='s3v4'))
lambda_client = boto3.client('lambda')
resource_client = boto3.client('resourcegroupstaggingapi')

# define catch-all execution role
hbomax_datascience_service_role = 'arn:aws:iam::613630599026:role/hbomax-datascience-service-role'

# set logging
stepfunctions.set_stream_logger(level=logging.INFO)

# collect session info
region = boto3.Session().region_name
acount_id = boto3.client('sts').get_caller_identity().get('Account')

# define s3 bucket
resources_bucket = 'hbomax-datascience-development-dev'

# sagemaker session
sagemaker_session = sagemaker.Session(default_bucket=resources_bucket)

# name the Stepfunctions pipeline
pipeline_name = 'FTInferenceRoutine'

# XGBoost image
#xgboost_image = get_image_uri(region, 'xgboost', repo_version='latest')
xgboost_image = '613630599026.dkr.ecr.us-east-1.amazonaws.com/temp-backgrounddata/hbomax-xgboost-shap:1.0-1-cpu-py3'

In [124]:
# define the data sources
    # raw data
s3_train_raw = sagemaker.s3_input(s3_data=f's3://{resources_bucket}/free_trial_model/snowflake-hbomax-staging/train', content_type='text/csv')
s3_test_raw = sagemaker.s3_input(s3_data=f's3://{resources_bucket}/free_trial_model/snowflake-hbomax-staging/test', content_type='text/csv')
s3_new_raw = sagemaker.s3_input(s3_data=f's3://{resources_bucket}/lifecycle/free-trial-propensity-model/new', content_type='text/csv')

    # transformed data
s3_train_transformed = sagemaker.s3_input(s3_data=f's3://{resources_bucket}/free_trial_model/snowflake-hbomax-staging/train/transformed', content_type='text/csv')
s3_test_transformed = sagemaker.s3_input(s3_data=f's3://{resources_bucket}/free_trial_model/snowflake-hbomax-staging/test/transformed', content_type='text/csv')
s3_new_transformed = sagemaker.s3_input(s3_data=f's3://{resources_bucket}/free_trial_model/snowflake-hbomax-staging/new/transformed', content_type='text/csv')

s3_out_data = f's3://{resources_bucket}/free_trial_model/snowflake-hbomax-staging/output'

#s3_schema_dir = f's3://{resources_bucket}/free_trial_model/snowflake-hbomax-staging/schema'

In [125]:
def writeLambda(function_name, role, description):
    zip_name = f'{function_name}.zip'
    lambda_source_code = f'lambda_code/{function_name}.py'

    zf = zipfile.ZipFile(zip_name, mode='w')
    zf.write(lambda_source_code, arcname=lambda_source_code.split('/')[-1])
    zf.close()

    #s3_client.copy_object(zip_name, Bucket='datascience-hbo-users', zip_name) # ExtraArgs={"ServerSideEncryption": "aws:kms"},
    
    S3Uploader.upload(local_path=zip_name, 
                      desired_s3_uri=f's3://datascience-hbo-users/lambda_code', ## UPDATE!!
                      #kms_key='alias/aws/s3',
                      #kms_key='aws:kms',
                      session=sagemaker_session)

    lambda_client = boto3.client('lambda')

    # delete the existing function if necessary
    response = resource_client.get_resources(
    TagFilters=[
                    {
                        'Key': 'function_name',
                        'Values': [
                            function_name
                        ]
                    },
                ]
            )
    
    if len(response['ResourceTagMappingList']) > 0:
        lambda_client.delete_function(FunctionName=function_name)

    # create the function
    response = lambda_client.create_function(
        FunctionName=function_name,
        Runtime='python3.7',
        Role=role,
        Handler=f'{function_name}.lambda_handler',
        Code={
            'S3Bucket': 'datascience-hbo-users', ## UPDATE!!
            'S3Key': 'lambda_code/{}'.format(zip_name)
        },
        Description=description,
        Timeout=15,
        MemorySize=128,
        Tags={
        'function_name': function_name
        }
    )

    # delete the zip archive
    os.remove(zip_name)

In [126]:
for f in [{'name':'free_trial_transform_new_data', 'description':'Collect the most recent model and transform new data for inference.'},
          {'name':'free_trial_query_sagemaker_job', 'description':'Query for status of the transform new data job.'},
          {'name':'free_trial_xgboost_transform', 'description':'Query for status of the transform new data job.'},
          {'name':'free_trial_collect_dayofweek', 'description':'Collect the current day of the week.'},
         ]:
    
    writeLambda(function_name=f['name'], role=hbomax_datascience_service_role, description=f['description'])

In [127]:
# Define runtime input.  SageMaker expects unique names for each job, model and endpoint. 
execution_input = ExecutionInput(schema={
    'SKLearnFeaturizerJobName': str,
    'TransformTrainJobName': str,
    'TransformTestJobName': str,
    'FeaturizerModelName': str,
    'XGBModelName': str,
    'TrainXGBoostJobName': str,
    'PipelineModelName': str,
    'TransformNewJobName': str,
    'TransformXGBoostJobName': str,
    'TimestampPrefix': str
})

In [128]:
day_of_week_step = LambdaStep(
    'Collect Day of Week',
    parameters={  
        "FunctionName": 'free_trial_collect_dayofweek'
    }
)

In [129]:
# define the SKLearn Preprocessing Estimator
sklearn_featurizer = SKLearn(
    source_dir='sklearn_featurizer',
    entry_point='featurizer.py',
    role=hbomax_datascience_service_role,
    output_kms_key='alias/aws/s3',
    train_instance_type="ml.c4.xlarge",
    hyperparameters = {'resources_bucket': resources_bucket},
    sagemaker_session=sagemaker_session)

In [130]:
fit_featurizer_step = TrainingStep(
    'Fit Featurizer', 
    estimator = sklearn_featurizer,
    data={
        'train': s3_train_raw.config['DataSource']['S3DataSource']['S3Uri'],
    },
    tags= {'model': 'free_trial_sklearn_featurizer'},
    job_name=execution_input['SKLearnFeaturizerJobName']
)

In [131]:
create_featurizer_model_step = ModelStep(
    'Create Featurizer Model', 
    model = fit_featurizer_step.get_expected_model(),
    tags= {'model': 'free_trial_sklearn_featurizer'},
    model_name=execution_input['FeaturizerModelName']
)

In [132]:
train_transformer = Transformer(
    model_name=execution_input['FeaturizerModelName'],
    instance_count=3,
    instance_type='ml.m4.2xlarge',
    strategy='MultiRecord',
    assemble_with='Line',
    output_kms_key='alias/aws/s3',
    accept='text/csv',
    output_path=s3_train_transformed.config['DataSource']['S3DataSource']['S3Uri']
)

transform_train_step = TransformStep(
    'Transform Training Data',
    transformer=train_transformer,
    job_name=execution_input['TransformTrainJobName'],
    model_name=execution_input['FeaturizerModelName'],
    data=s3_train_raw.config['DataSource']['S3DataSource']['S3Uri'],
    content_type= 'text/csv',
    split_type='Line',
    wait_for_completion=True
)

In [133]:
test_transformer = Transformer(
    model_name=execution_input['FeaturizerModelName'],
    instance_count=3,
    instance_type='ml.m4.2xlarge',
    strategy='MultiRecord',
    assemble_with='Line',
    output_kms_key='alias/aws/s3',
    accept='text/csv',
    output_path=s3_test_transformed.config['DataSource']['S3DataSource']['S3Uri']
)

transform_test_step = TransformStep(
    'Transform Test Data',
    transformer=test_transformer,
    job_name=execution_input['TransformTestJobName'],
    model_name=execution_input['FeaturizerModelName'],
    data=s3_test_raw.config['DataSource']['S3DataSource']['S3Uri'],
    content_type= 'text/csv',
    split_type='Line',
    wait_for_completion=True
)

In [134]:
parallel_transform_step = Parallel(
    state_id="Branch Transformations"
)

parallel_transform_step.add_branch(transform_test_step)
parallel_transform_step.add_branch(transform_train_step)

In [135]:
# define the XGBoost Model Estimator
xgboost_estimator = Estimator(image_name = xgboost_image,
                          role = hbomax_datascience_service_role, 
                          train_instance_count = 1, 
                          train_instance_type='ml.m4.4xlarge',
                          output_path = s3_out_data,
                          output_kms_key = 'alias/aws/s3',
                          hyperparameters = {
                                             'eval_metric':'auc'
                                            , 'alpha':1.218487609
                                            , 'eta':0.225242353
                                            , 'max_depth':10
                                            , 'min_child_weight':2.284773815
                                            , 'num_round':2
                                            , 'objective':'binary:logistic'
                                            , 'rate_drop':0.3
                                            , 'tweedie_variance_power':1.4
                                          },
                          sagemaker_session=sagemaker_session)

In [136]:
train_xgboost_step = TrainingStep(
    'Train XGBoost', 
    estimator = xgboost_estimator,
    data={
        'train': s3_train_transformed,
        'validation': s3_test_transformed
    },
    tags= {'model': 'free_trial_xgboost'},
    job_name=execution_input['TrainXGBoostJobName']
)

In [137]:
query_train_xgboost_step = LambdaStep(
    'Query Train New Data',
    parameters={  
        "FunctionName": 'free_trial_query_sagemaker_job',
        'Payload':{
            'job_type': 'Train',
            'job_name': execution_input['TrainXGBoostJobName']
        }
    }
)

In [138]:
create_xgboost_model_step = ModelStep(
    'Create XGBoost Model', 
    model = train_xgboost_step.get_expected_model(),
    model_name=execution_input['XGBModelName'],
    tags= {'model': 'free_trial_xgboost'},
)

In [139]:
transform_new_data_step = LambdaStep(
    'Transform New Data',
    parameters={  
        "FunctionName": 'free_trial_transform_new_data',
        'Payload':{
            "bucket": resources_bucket,
            'TransformJobName': execution_input['TransformNewJobName']
        }
    }
)

In [140]:
query_transform_new_data_step = LambdaStep(
    'Query Transform New Data',
    parameters={  
        "FunctionName": 'free_trial_query_sagemaker_job',
        'Payload':{
            'job_type': 'Transform',
            'job_name': execution_input['TransformNewJobName']
        }
    }
)

In [141]:
transform_new_data_complete_step = steps.states.Choice(
    'Transform new data Complete?'
)

In [142]:
succeed_state = steps.Succeed(
    state_id="Success"             
)

transform_new_data_fail_state = steps.Fail(
    state_id="Transform New Data Fail"
)

In [143]:
infer_new_data_step = LambdaStep(
    'Infer New Data',
    parameters={  
        "FunctionName": 'free_trial_xgboost_transform',
        'Payload':{
            "bucket": resources_bucket,
            'TransformJobName': execution_input['TransformXGBoostJobName']
        }
    }
)

In [144]:
transform_new_data_wait_state = steps.Wait(
    state_id="Wait for transform new data",
    seconds=60
)

transform_new_data_complete_step.add_choice(
    rule=steps.choice_rule.ChoiceRule.StringEquals(variable=query_transform_new_data_step.output()['Payload']["JobStatus"], value='Completed'), 
    next_step=infer_new_data_step
)

transform_new_data_complete_step.add_choice(
    rule=steps.choice_rule.ChoiceRule.StringEquals(variable=query_transform_new_data_step.output()['Payload']["JobStatus"], value='Failed'), 
    next_step=transform_new_data_fail_state
)

transform_new_data_complete_step.add_choice(
    rule=steps.choice_rule.ChoiceRule.StringEquals(variable=query_transform_new_data_step.output()['Payload']["JobStatus"], value='InProgress'), 
    next_step=transform_new_data_wait_state
)

transform_new_data_wait_state.next(query_transform_new_data_step)

Query Transform New Data LambdaStep(parameters={'FunctionName': 'free_trial_query_sagemaker_job', 'Payload': {'job_type': 'Transform', 'job_name': <stepfunctions.inputs.placeholders.ExecutionInput object at 0x7f9792137780>}}, resource='arn:aws:states:::lambda:invoke', type='Task')

In [145]:
query_infer_new_data_step = LambdaStep(
    'Query Infer New Data',
    parameters={  
        "FunctionName": 'free_trial_query_sagemaker_job',
        'Payload':{
            'job_type': 'Transform',
            'job_name': execution_input['TransformXGBoostJobName']
        }
    }
)

In [146]:
infer_new_data_complete_step = steps.states.Choice(
    'Infer new data Complete?'
)

In [147]:
infer_new_data_fail_state = steps.Fail(
    state_id="Infer New Data Fail"
)

In [148]:
infer_new_data_wait_state = steps.Wait(
    state_id="Wait for infer new data",
    seconds=60
)

infer_new_data_complete_step.add_choice(
    rule=steps.choice_rule.ChoiceRule.StringEquals(variable=query_infer_new_data_step.output()['Payload']["JobStatus"], value='Completed'), 
    next_step=succeed_state
)

infer_new_data_complete_step.add_choice(
    rule=steps.choice_rule.ChoiceRule.StringEquals(variable=query_infer_new_data_step.output()['Payload']["JobStatus"], value='Failed'), 
    next_step=infer_new_data_fail_state
)

infer_new_data_complete_step.add_choice(
    rule=steps.choice_rule.ChoiceRule.StringEquals(variable=query_infer_new_data_step.output()['Payload']["JobStatus"], value='InProgress'), 
    next_step=infer_new_data_wait_state
)



infer_new_data_wait_state.next(query_infer_new_data_step)

Query Infer New Data LambdaStep(parameters={'FunctionName': 'free_trial_query_sagemaker_job', 'Payload': {'job_type': 'Transform', 'job_name': <stepfunctions.inputs.placeholders.ExecutionInput object at 0x7f9792137898>}}, resource='arn:aws:states:::lambda:invoke', type='Task')

In [149]:
complete_retrain_step = steps.states.Choice(
    'Retrain?'
)

complete_retrain_step.add_choice(
    rule=steps.choice_rule.ChoiceRule.StringEquals(variable=day_of_week_step.output()['Payload']["retrain"], value='True'), 
    next_step=fit_featurizer_step
)

complete_retrain_step.add_choice(
    rule=steps.choice_rule.ChoiceRule.StringEquals(variable=day_of_week_step.output()['Payload']["retrain"], value='False'), 
    next_step=transform_new_data_step
)

In [150]:
#workflow_definition = steps.Chain([fit_featurizer_step, create_featurizer_model_step, parallel_transform_step, train_xgboost_step, create_xgboost_model_step,transform_new_data_step, query_transform_new_data_step, transform_new_data_complete_step])
#workflow_definition = steps.Chain([train_xgboost_step, create_xgboost_model_step,transform_new_data_step, query_transform_new_data_step, transform_new_data_complete_step])
workflow_definition = steps.Chain([day_of_week_step, complete_retrain_step])

steps.Chain([fit_featurizer_step, create_featurizer_model_step, parallel_transform_step]) #, train_xgboost_step, create_xgboost_model_step,transform_new_data_step, query_transform_new_data_step, transform_new_data_complete_step])

steps.Chain([infer_new_data_step, query_infer_new_data_step, infer_new_data_complete_step])

workflow = Workflow(
    name=pipeline_name,
    definition=workflow_definition,
    role=hbomax_datascience_service_role,
    execution_input=execution_input
)

workflow.create()
workflow.update(workflow_definition)

[31m[ERROR] A workflow with the same name already exists on AWS Step Functions. To update a workflow, use Workflow.update().[0m
[32m[INFO] Workflow updated successfully on AWS Step Functions. All execute() calls will use the updated definition and role within a few seconds. [0m


'arn:aws:states:us-east-1:613630599026:stateMachine:FTInferenceRoutine'