# Building an Automated Pipeline for Amazon Forecast
When a file is put in S3 with data for training, build a pipeline that automatically performs data import, training, forecasting, and exporting of predictions in Amazon Forecast.
We use Step Functions and Lambda.

# Configure SageMaker role
Attach policies for the services you use and also set up trust relationships, as shown in the figure below.

![IAMroles_Permissions](https://user-images.githubusercontent.com/27226946/89102049-7d18e380-d440-11ea-91a6-6ab2c7e63870.png)

![IAMroles_TrustRelationships](https://user-images.githubusercontent.com/27226946/89102054-84d88800-d440-11ea-9199-0583be09aa1c.png)

Edit trust relationship should be written as follows.

In [None]:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "sagemaker.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    },
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "forecast.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    },
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    },
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "events.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    },
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "states.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

## Getting SageMaker role

In [1]:
from sagemaker import get_execution_role

role_sm = get_execution_role()

In [2]:
role_sm

'arn:aws:iam::805433377179:role/service-role/AmazonSageMaker-ExecutionRole-20200716T084970'

# 1.make Lambda function
We will create a Lambda function. The files we will use are located in lambdas/.

In [3]:
import boto3

In [4]:
lambda_ = boto3.client('lambda')

In [5]:
!rm -f lambdas/createdatasetimport/datasetimport.zip
!cd lambdas/createdatasetimport; zip -r datasetimport .

zip_file = open("lambdas/createdatasetimport/datasetimport.zip", "rb").read()


lambda_.create_function(
    FunctionName="datasetimport",
    Runtime="python3.7",
    Role=role_sm,
    Handler="datasetimport.lambda_handler",
    Code={"ZipFile": zip_file},
    Timeout=60*15,
    MemorySize=3008
)

  adding: datasetimport.py (deflated 54%)


{'ResponseMetadata': {'RequestId': 'b1616ceb-ed4b-4960-9053-74c7b6014efd',
  'HTTPStatusCode': 201,
  'HTTPHeaders': {'date': 'Sat, 01 Aug 2020 04:58:21 GMT',
   'content-type': 'application/json',
   'content-length': '893',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'b1616ceb-ed4b-4960-9053-74c7b6014efd'},
  'RetryAttempts': 0},
 'FunctionName': 'datasetimport',
 'FunctionArn': 'arn:aws:lambda:us-east-1:805433377179:function:datasetimport',
 'Runtime': 'python3.7',
 'Role': 'arn:aws:iam::805433377179:role/service-role/AmazonSageMaker-ExecutionRole-20200716T084970',
 'Handler': 'datasetimport.lambda_handler',
 'CodeSize': 563,
 'Description': '',
 'Timeout': 900,
 'MemorySize': 3008,
 'LastModified': '2020-08-01T04:58:21.089+0000',
 'CodeSha256': '30T3uMu47wgeOsG6npZy2utPjmZuV4pGj3VUCVXtCgY=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': '2d3b8aef-8d87-4af0-9f15-6802ebc49bd3',
 'State': 'Active',
 'LastUpdateStatus': 'Successful'}

In [44]:
!rm -f lambdas/GetStatusImport/getstatusimport.zip
!cd lambdas/GetStatusImport; zip -r getstatusimport .

zip_file = open("lambdas/GetStatusImport/getstatusimport.zip", "rb").read()

lambda_.create_function(
    FunctionName="getstatusimport",
    Runtime="python3.7",
    Role=role_sm,
    Handler="getstatusimport.lambda_handler",
    Code={"ZipFile": zip_file},
    Timeout=60*15,
    MemorySize=3008
)

  adding: .ipynb_checkpoints/ (stored 0%)
  adding: .ipynb_checkpoints/getstatusimport-checkpoint.py (deflated 46%)
  adding: getstatusimport.py (deflated 46%)


{'ResponseMetadata': {'RequestId': 'e1675de0-4efb-4423-a459-70c911ed836b',
  'HTTPStatusCode': 201,
  'HTTPHeaders': {'date': 'Sat, 01 Aug 2020 05:05:50 GMT',
   'content-type': 'application/json',
   'content-length': '899',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'e1675de0-4efb-4423-a459-70c911ed836b'},
  'RetryAttempts': 1},
 'FunctionName': 'getstatusimport',
 'FunctionArn': 'arn:aws:lambda:us-east-1:805433377179:function:getstatusimport',
 'Runtime': 'python3.7',
 'Role': 'arn:aws:iam::805433377179:role/service-role/AmazonSageMaker-ExecutionRole-20200716T084970',
 'Handler': 'getstatusimport.lambda_handler',
 'CodeSize': 976,
 'Description': '',
 'Timeout': 900,
 'MemorySize': 3008,
 'LastModified': '2020-08-01T05:05:50.737+0000',
 'CodeSha256': '9mvwQunGCFr8xb7i0Ig/NhV72+CkMG+Aif5qoliSQNU=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': '433429d4-8a98-4a61-867e-cb2efab5ea09',
 'State': 'Active',
 'LastUpdateStatus': 'Successful'}

In [7]:
!rm -f lambdas/createpredictor/predictor.zip
!cd lambdas/createpredictor; zip -r predictor .

zip_file = open("lambdas/createpredictor/predictor.zip", "rb").read()

lambda_.create_function(
    FunctionName="predictor",
    Runtime="python3.7",
    Role=role_sm,
    Handler="predictor.lambda_handler",
    Code={"ZipFile": zip_file},
    Timeout=60*15,
    MemorySize=3008
)

  adding: predictor.py (deflated 56%)


{'ResponseMetadata': {'RequestId': '32c4dbda-4c9d-4952-bf78-f95d7f095021',
  'HTTPStatusCode': 201,
  'HTTPHeaders': {'date': 'Sat, 01 Aug 2020 04:58:23 GMT',
   'content-type': 'application/json',
   'content-length': '881',
   'connection': 'keep-alive',
   'x-amzn-requestid': '32c4dbda-4c9d-4952-bf78-f95d7f095021'},
  'RetryAttempts': 0},
 'FunctionName': 'predictor',
 'FunctionArn': 'arn:aws:lambda:us-east-1:805433377179:function:predictor',
 'Runtime': 'python3.7',
 'Role': 'arn:aws:iam::805433377179:role/service-role/AmazonSageMaker-ExecutionRole-20200716T084970',
 'Handler': 'predictor.lambda_handler',
 'CodeSize': 645,
 'Description': '',
 'Timeout': 900,
 'MemorySize': 3008,
 'LastModified': '2020-08-01T04:58:23.829+0000',
 'CodeSha256': 'ozekRr7hwZFxQxduOnRc7zUB8yRzFMFFIkSxbDr80cg=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': '9a224cb0-0f08-46d5-9421-56c1328dc488',
 'State': 'Active',
 'LastUpdateStatus': 'Successful'}

In [8]:
!rm -f lambdas/GetStatusPredictor/getstatuspredictor.zip
!cd lambdas/GetStatusPredictor; zip -r getstatuspredictor .

zip_file = open("lambdas/GetStatusPredictor/getstatuspredictor.zip", "rb").read()

lambda_.create_function(
    FunctionName="getstatuspredictor",
    Runtime="python3.7",
    Role=role_sm,
    Handler="getstatuspredictor.lambda_handler",
    Code={"ZipFile": zip_file},
    Timeout=60*15,
    MemorySize=3008
)

  adding: getstatuspredictor.py (deflated 46%)


{'ResponseMetadata': {'RequestId': '3d726efa-df40-48f3-a1b7-5c0c2e7475b1',
  'HTTPStatusCode': 201,
  'HTTPHeaders': {'date': 'Sat, 01 Aug 2020 04:58:24 GMT',
   'content-type': 'application/json',
   'content-length': '908',
   'connection': 'keep-alive',
   'x-amzn-requestid': '3d726efa-df40-48f3-a1b7-5c0c2e7475b1'},
  'RetryAttempts': 0},
 'FunctionName': 'getstatuspredictor',
 'FunctionArn': 'arn:aws:lambda:us-east-1:805433377179:function:getstatuspredictor',
 'Runtime': 'python3.7',
 'Role': 'arn:aws:iam::805433377179:role/service-role/AmazonSageMaker-ExecutionRole-20200716T084970',
 'Handler': 'getstatuspredictor.lambda_handler',
 'CodeSize': 382,
 'Description': '',
 'Timeout': 900,
 'MemorySize': 3008,
 'LastModified': '2020-08-01T04:58:24.873+0000',
 'CodeSha256': 'U6OA69b4ByECTijrgQPYBZWZt0zq1em5fs/aX19mcJg=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': '00d21d71-ef98-44f0-bad0-422681a7c422',
 'State': 'Active',
 'LastUpdateStatus': 'Succe

In [9]:
!rm -f lambdas/createforecast/forecast.zip
!cd lambdas/createforecast; zip -r forecast .

zip_file = open("lambdas/createforecast/forecast.zip", "rb").read()

lambda_.create_function(
    FunctionName="forecast",
    Runtime="python3.7",
    Role=role_sm,
    Handler="forecast.lambda_handler",
    Code={"ZipFile": zip_file},
    Timeout=60*15,
    MemorySize=3008
)

  adding: forecast.py (deflated 45%)


{'ResponseMetadata': {'RequestId': 'ae7b4859-fcd1-4c04-bcc2-60f8cfaa1fdd',
  'HTTPStatusCode': 201,
  'HTTPHeaders': {'date': 'Sat, 01 Aug 2020 04:58:26 GMT',
   'content-type': 'application/json',
   'content-length': '878',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'ae7b4859-fcd1-4c04-bcc2-60f8cfaa1fdd'},
  'RetryAttempts': 0},
 'FunctionName': 'forecast',
 'FunctionArn': 'arn:aws:lambda:us-east-1:805433377179:function:forecast',
 'Runtime': 'python3.7',
 'Role': 'arn:aws:iam::805433377179:role/service-role/AmazonSageMaker-ExecutionRole-20200716T084970',
 'Handler': 'forecast.lambda_handler',
 'CodeSize': 374,
 'Description': '',
 'Timeout': 900,
 'MemorySize': 3008,
 'LastModified': '2020-08-01T04:58:25.958+0000',
 'CodeSha256': 'aozu0Z3fO0iTdTir/My4WCE9/wQ3/Tg58r7FTq5wytc=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': '1d2e5f1c-2106-432a-8648-4c01047d1c28',
 'State': 'Active',
 'LastUpdateStatus': 'Successful'}

In [10]:
!rm -f lambdas/GetStatusForecast/getstatusforecast.zip
!cd lambdas/GetStatusForecast; zip -r getstatusforecast .

zip_file = open("lambdas/GetStatusForecast/getstatusforecast.zip", "rb").read()

lambda_.create_function(
    FunctionName="getstatusforecast",
    Runtime="python3.7",
    Role=role_sm,
    Handler="getstatusforecast.lambda_handler",
    Code={"ZipFile": zip_file},
    Timeout=60*15,
    MemorySize=3008
)

  adding: getstatusforecast.py (deflated 47%)


{'ResponseMetadata': {'RequestId': '246416ac-47d5-4ff2-aeff-1b004df9bc2b',
  'HTTPStatusCode': 201,
  'HTTPHeaders': {'date': 'Sat, 01 Aug 2020 04:58:27 GMT',
   'content-type': 'application/json',
   'content-length': '905',
   'connection': 'keep-alive',
   'x-amzn-requestid': '246416ac-47d5-4ff2-aeff-1b004df9bc2b'},
  'RetryAttempts': 0},
 'FunctionName': 'getstatusforecast',
 'FunctionArn': 'arn:aws:lambda:us-east-1:805433377179:function:getstatusforecast',
 'Runtime': 'python3.7',
 'Role': 'arn:aws:iam::805433377179:role/service-role/AmazonSageMaker-ExecutionRole-20200716T084970',
 'Handler': 'getstatusforecast.lambda_handler',
 'CodeSize': 374,
 'Description': '',
 'Timeout': 900,
 'MemorySize': 3008,
 'LastModified': '2020-08-01T04:58:26.956+0000',
 'CodeSha256': 'm93syuTvFZ9cc4CyWa7RDnw5uCl3z6UxqnxpGa4AVp8=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': '8a3845b2-4ca1-4ec2-8459-c5af0d1ae8f8',
 'State': 'Active',
 'LastUpdateStatus': 'Successf

In [11]:
!rm -f lambdas/createforecastexportjob/forecastexportjob.zip
!cd lambdas/createforecastexportjob; zip -r forecastexportjob .

zip_file = open("lambdas/createforecastexportjob/forecastexportjob.zip", "rb").read()

lambda_.create_function(
    FunctionName="forecastexportjob",
    Runtime="python3.7",
    Role=role_sm,
    Handler="forecastexportjob.lambda_handler",
    Code={"ZipFile": zip_file},
    Timeout=60*15,
    MemorySize=3008
)

  adding: forecastexportjob.py (deflated 53%)


{'ResponseMetadata': {'RequestId': '7c5bd9c9-9c71-483f-8017-26d8e9ea3b00',
  'HTTPStatusCode': 201,
  'HTTPHeaders': {'date': 'Sat, 01 Aug 2020 04:58:30 GMT',
   'content-type': 'application/json',
   'content-length': '905',
   'connection': 'keep-alive',
   'x-amzn-requestid': '7c5bd9c9-9c71-483f-8017-26d8e9ea3b00'},
  'RetryAttempts': 0},
 'FunctionName': 'forecastexportjob',
 'FunctionArn': 'arn:aws:lambda:us-east-1:805433377179:function:forecastexportjob',
 'Runtime': 'python3.7',
 'Role': 'arn:aws:iam::805433377179:role/service-role/AmazonSageMaker-ExecutionRole-20200716T084970',
 'Handler': 'forecastexportjob.lambda_handler',
 'CodeSize': 508,
 'Description': '',
 'Timeout': 900,
 'MemorySize': 3008,
 'LastModified': '2020-08-01T04:58:30.275+0000',
 'CodeSha256': '30oXiVATLm2ex6B2FQiMagL8mYdEUPh2cr7rTDJnFSQ=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': 'bea316ae-5d16-4644-afd4-7f546dc77fed',
 'State': 'Active',
 'LastUpdateStatus': 'Successf

In [12]:
!rm -f lambdas/GetStatusForecastExportJob/getstatusforecastexportjob.zip
!cd lambdas/GetStatusForecastExportJob; zip -r getstatusforecastexportjob .

zip_file = open("lambdas/GetStatusForecastExportJob/getstatusforecastexportjob.zip", "rb").read()

lambda_.create_function(
    FunctionName="getstatusforecastexportjob",
    Runtime="python3.7",
    Role=role_sm,
    Handler="getstatusforecastexportjob.lambda_handler",
    Code={"ZipFile": zip_file},
    Timeout=60*15,
    MemorySize=3008
)

  adding: getstatusforecastexportjob.py (deflated 47%)


{'ResponseMetadata': {'RequestId': '8f3d50b5-3251-46c8-908b-50e14d9769a5',
  'HTTPStatusCode': 201,
  'HTTPHeaders': {'date': 'Sat, 01 Aug 2020 04:58:31 GMT',
   'content-type': 'application/json',
   'content-length': '932',
   'connection': 'keep-alive',
   'x-amzn-requestid': '8f3d50b5-3251-46c8-908b-50e14d9769a5'},
  'RetryAttempts': 0},
 'FunctionName': 'getstatusforecastexportjob',
 'FunctionArn': 'arn:aws:lambda:us-east-1:805433377179:function:getstatusforecastexportjob',
 'Runtime': 'python3.7',
 'Role': 'arn:aws:iam::805433377179:role/service-role/AmazonSageMaker-ExecutionRole-20200716T084970',
 'Handler': 'getstatusforecastexportjob.lambda_handler',
 'CodeSize': 406,
 'Description': '',
 'Timeout': 900,
 'MemorySize': 3008,
 'LastModified': '2020-08-01T04:58:31.369+0000',
 'CodeSha256': '7gfhIm0E6+FmQlVZCsXHR08X42ddNQ7ioaYPIRIG2OU=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': 'bda98b75-5294-48fc-88e9-083e659a9f22',
 'State': 'Active',
 'L

In [13]:
!rm -f lambdas/NotifyUser/notifyuser.zip
!cd lambdas/NotifyUser; zip -r notifyuser .

zip_file = open("lambdas/NotifyUser/notifyuser.zip", "rb").read()

lambda_.create_function(
    FunctionName="notifyuser",
    Runtime="python3.7",
    Role=role_sm,
    Handler="notifyuser.lambda_handler",
    Code={"ZipFile": zip_file},
    Timeout=60*15,
    MemorySize=3008
)

  adding: notifyuser.py (deflated 12%)


{'ResponseMetadata': {'RequestId': 'd51dc13e-2839-470b-b479-115769c18c38',
  'HTTPStatusCode': 201,
  'HTTPHeaders': {'date': 'Sat, 01 Aug 2020 04:58:32 GMT',
   'content-type': 'application/json',
   'content-length': '884',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'd51dc13e-2839-470b-b479-115769c18c38'},
  'RetryAttempts': 0},
 'FunctionName': 'notifyuser',
 'FunctionArn': 'arn:aws:lambda:us-east-1:805433377179:function:notifyuser',
 'Runtime': 'python3.7',
 'Role': 'arn:aws:iam::805433377179:role/service-role/AmazonSageMaker-ExecutionRole-20200716T084970',
 'Handler': 'notifyuser.lambda_handler',
 'CodeSize': 263,
 'Description': '',
 'Timeout': 900,
 'MemorySize': 3008,
 'LastModified': '2020-08-01T04:58:32.342+0000',
 'CodeSha256': 'KIb96AGobdF9EmRtk+NH6jWCOH7e/DZSChpqr2FgFrU=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': 'c089fd53-b937-40b7-9ab7-ba71a8ac2582',
 'State': 'Active',
 'LastUpdateStatus': 'Successful'}

# 2. Step Functins: Create state machine
The definition of the state machine is created, and the creation of the state machine is performed based on the definition.

In [14]:
import sagemaker

In [15]:
sagemaker_session = sagemaker.Session()

In [16]:
sagemaker_session.boto_region_name

'us-east-1'

In [17]:
sts = boto3.client('sts')
id_info = sts.get_caller_identity()

In [18]:
id_info['Account']

'805433377179'

In [19]:
import json

In [20]:
def_sfn={
  "Comment": "Amazon Forecast example of the Amazon States Language using an AWS Lambda Function",
  "StartAt": "datasetimport",
  "States": {
    "datasetimport": {
      "Type": "Task",
      "InputPath":"$",
      "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:datasetimport",
      "ResultPath":"$",
      "Next": "GetStatusImport"
    },
    "GetStatusImport": {
      "Type": "Task",
      "InputPath":"$",
      "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:getstatusimport",
      "ResultPath":"$",
      "Next": "CheckStatusImport"
    },
    "CheckStatusImport": {
        "Type": "Choice",
        "InputPath":"$",
        "Choices": [
        {
        "Variable": "$.is_active_import",
        "BooleanEquals": True,
        "Next": "predictor"
        }
        ],
        "Default": "SleepCheckStatusImport"
        },
    "SleepCheckStatusImport": {
        "Type": "Wait",
        "Seconds": 300,
        "Next": "GetStatusImport"
    },
    "predictor": {
      "Type": "Task",
      "InputPath":"$",
      "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:predictor",
      "ResultPath":"$",
      "Next": "GetStatusPredictor"
    },
    "GetStatusPredictor": {
      "Type": "Task",
      "InputPath":"$",
      "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:getstatuspredictor",
      "ResultPath":"$",
      "Next": "CheckStatusPredictor"
    },
    "CheckStatusPredictor": {
        "Type": "Choice",
        "InputPath":"$",
        "Choices": [
        {
        "Variable": "$.is_active_predictor",
        "BooleanEquals": True,
        "Next": "forecast"
        }
        ],
        "Default": "SleepCheckStatusPredictor"
        },
    "SleepCheckStatusPredictor": {
        "Type": "Wait",
        "Seconds": 300,
        "Next": "GetStatusPredictor"
    },
    "forecast": {
      "Type": "Task",
      "InputPath":"$",
      "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:forecast",
      "ResultPath":"$",
      "Next": "GetStatusForecast"
    },
    "GetStatusForecast": {
      "Type": "Task",
      "InputPath":"$",
      "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:getstatusforecast",
      "ResultPath":"$",
      "Next": "CheckStatusForecast"
    },  
    "CheckStatusForecast": {
        "Type": "Choice",
        "InputPath":"$",
        "Choices": [
        {
        "Variable": "$.is_active_forecast",
        "BooleanEquals": True,
        "Next": "forecastexportjob"
        }
        ],
        "Default": "SleepCheckStatusForecast"
        },
    "SleepCheckStatusForecast": {
        "Type": "Wait",
        "Seconds": 300,
        "Next": "GetStatusForecast"
    },
    "forecastexportjob": {
      "Type": "Task",
      "InputPath":"$",
      "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:forecastexportjob",
      "ResultPath":"$",
      "Next": "GetStatusForecastExportjob"
    },
    "GetStatusForecastExportjob": {
      "Type": "Task",
      "InputPath":"$",
      "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:getstatusforecastexportjob",
      "ResultPath":"$",
      "Next": "CheckStatusExport"
    },  
    "CheckStatusExport": {
        "Type": "Choice",
        "InputPath":"$",
        "Choices": [
        {
        "Variable": "$.is_active_export",
        "BooleanEquals": True,
        "Next": "NotifyUser"
        }
        ],
        "Default": "SleepCheckStatusExport"
        },
    "SleepCheckStatusExport": {
        "Type": "Wait",
        "Seconds": 300,
        "Next": "GetStatusForecastExportjob"
    },
    "NotifyUser": {
      "Type": "Task",
      "InputPath":"$",
      "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:notifyuser",
      "ResultPath":"$",
      "End": True
    }
  }
}


In [21]:
with open('./definition.json', 'w') as f:
    json.dump(def_sfn, f, indent=2, ensure_ascii=False)

In [22]:
import boto3
sfn = boto3.client('stepfunctions')

In [23]:
sfn.create_state_machine(
        name="demo-forecast",
        definition=open("definition.json").read(),
        roleArn=role_sm
)

{'stateMachineArn': 'arn:aws:states:us-east-1:805433377179:stateMachine:demo-forecast',
 'creationDate': datetime.datetime(2020, 8, 1, 4, 58, 40, 544000, tzinfo=tzlocal()),
 'ResponseMetadata': {'RequestId': '5ed46cbd-0054-4f74-af7d-83a6898c3f2e',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '5ed46cbd-0054-4f74-af7d-83a6898c3f2e',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '118'},
  'RetryAttempts': 0}}

# 3.AWS CloudTrail :create trail  
https://docs.aws.amazon.com/step-functions/latest/dg/tutorial-cloudwatch-events-s3.html

Configure CloudTrail and CloudWatch Events to run Step Functions by triggering S3 object placement as described in this guide.

https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/services/cloudtrail.html#CloudTrail.Client.create_trail

In [24]:
! pip freeze | grep boto3

boto3==1.14.16


In [25]:
cloudtrail = boto3.client('cloudtrail')

In [26]:
sts = boto3.client('sts')
id_info = sts.get_caller_identity()
print(id_info['Account'])

805433377179


In [27]:
bucket_name = 'demo-forecast-' + id_info['Account']

In [28]:
bucket_name

'demo-forecast-805433377179'

In [29]:
output_trail_bucket = bucket_name + '-trail'

In [30]:
s3 = boto3.client('s3')
s3.create_bucket(Bucket=output_trail_bucket)

{'ResponseMetadata': {'RequestId': '2D5570296F99AFFF',
  'HostId': 'kaUj1h9DjNswQ0fDhW9nABvashdgLplGhcjvxWcM6pN9VrCNw2FDdxTPZ40+vawzTYurnlwgcvo=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'kaUj1h9DjNswQ0fDhW9nABvashdgLplGhcjvxWcM6pN9VrCNw2FDdxTPZ40+vawzTYurnlwgcvo=',
   'x-amz-request-id': '2D5570296F99AFFF',
   'date': 'Sat, 01 Aug 2020 04:58:47 GMT',
   'location': '/demo-forecast-805433377179-trail',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'Location': '/demo-forecast-805433377179-trail'}

## set bucket policy
setting policy with boto3  
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-example-bucket-policies.html

In [31]:
import json

In [32]:
bucket_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AWSCloudTrailAclCheck20150319",
            "Effect": "Allow",
            "Principal": {
                "Service": "cloudtrail.amazonaws.com"
            },
            "Action": "s3:GetBucketAcl",
            "Resource": "arn:aws:s3:::" + bucket_name + "-trail"
        },
        {
            "Sid": "AWSCloudTrailWrite20150319",
            "Effect": "Allow",
            "Principal": {
                "Service": "cloudtrail.amazonaws.com"
            },
            "Action": "s3:PutObject",
            "Resource": "arn:aws:s3:::" + bucket_name + "-trail/AWSLogs/" + id_info['Account'] + "/*",
            "Condition": {
                "StringEquals": {
                    "s3:x-amz-acl": "bucket-owner-full-control"
                }
            }
        }
    ]
}

# Convert the policy from JSON dict to string
bucket_policy = json.dumps(bucket_policy)

In [33]:
# Set the new policy
s3 = boto3.client('s3')
s3.put_bucket_policy(Bucket=output_trail_bucket, Policy=bucket_policy)

{'ResponseMetadata': {'RequestId': '21FC2A7FFFF16A31',
  'HostId': '4sDzvdhiylXFme5a3AD/j/GRNM/Dr2ZfWlTlPqpOU9t48JSGuNwxO4LwapyKivrA9SNUwekQuJc=',
  'HTTPStatusCode': 204,
  'HTTPHeaders': {'x-amz-id-2': '4sDzvdhiylXFme5a3AD/j/GRNM/Dr2ZfWlTlPqpOU9t48JSGuNwxO4LwapyKivrA9SNUwekQuJc=',
   'x-amz-request-id': '21FC2A7FFFF16A31',
   'date': 'Sat, 01 Aug 2020 04:58:51 GMT',
   'server': 'AmazonS3'},
  'RetryAttempts': 0}}

In [34]:
cloudtrail.create_trail(
    Name='forecast-trail',
    S3BucketName=output_trail_bucket,
    EnableLogFileValidation=True
)

{'Name': 'forecast-trail',
 'S3BucketName': 'demo-forecast-805433377179-trail',
 'IncludeGlobalServiceEvents': True,
 'IsMultiRegionTrail': False,
 'TrailARN': 'arn:aws:cloudtrail:us-east-1:805433377179:trail/forecast-trail',
 'LogFileValidationEnabled': True,
 'IsOrganizationTrail': False,
 'ResponseMetadata': {'RequestId': '544c195e-3128-4793-8bdd-141631859504',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '544c195e-3128-4793-8bdd-141631859504',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '272',
   'date': 'Sat, 01 Aug 2020 04:58:51 GMT'},
  'RetryAttempts': 0}}

In [35]:
cloudtrail.put_event_selectors(
    TrailName='forecast-trail',
    EventSelectors=[
        {
            'ReadWriteType': 'All',
            'IncludeManagementEvents': True,
            'DataResources': [
                {
                    'Type': 'AWS::S3::Object',
                    'Values': [
                        f'arn:aws:s3:::{bucket_name}/',
                    ]
                },
            ]
        },
    ]
)

{'TrailARN': 'arn:aws:cloudtrail:us-east-1:805433377179:trail/forecast-trail',
 'EventSelectors': [{'ReadWriteType': 'All',
   'IncludeManagementEvents': True,
   'DataResources': [{'Type': 'AWS::S3::Object',
     'Values': ['arn:aws:s3:::demo-forecast-805433377179/']}],
   'ExcludeManagementEventSources': []}],
 'ResponseMetadata': {'RequestId': '97dc03ae-d64b-49de-a8da-4b943fdff200',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '97dc03ae-d64b-49de-a8da-4b943fdff200',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '285',
   'date': 'Sat, 01 Aug 2020 04:58:51 GMT'},
  'RetryAttempts': 0}}

### enable logging
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudtrail.html#CloudTrail.Client.start_logging

In [36]:
cloudtrail.start_logging(Name='forecast-trail')

{'ResponseMetadata': {'RequestId': 'b5759dbe-7ce3-4803-8aef-75655c4ec646',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'b5759dbe-7ce3-4803-8aef-75655c4ec646',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'date': 'Sat, 01 Aug 2020 04:58:53 GMT'},
  'RetryAttempts': 0}}

# 4.CloudWatch Event: build a rule
https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/services/events.html


put_rule(): create rule  
https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/services/events.html#CloudWatchEvents.Client.put_rule


In [37]:
bucket_name

'demo-forecast-805433377179'

In [38]:
cwe = boto3.client('events')

In [39]:
ep_str ='{"source":["aws.s3"], \
        "detail-type":["AWS API Call via CloudTrail"], \
        "detail":{"eventSource":["s3.amazonaws.com"], \
        "eventName":["PutObject", "CompleteMultipartUpload"], \
        "requestParameters":{"bucketName":["'+ bucket_name + '"]}}}'

In [40]:
cwe.put_rule(
    Name='demo-forecast',
    EventPattern=ep_str,
    State='ENABLED'
)

{'RuleArn': 'arn:aws:events:us-east-1:805433377179:rule/demo-forecast',
 'ResponseMetadata': {'RequestId': '078b3d71-b2ef-4256-8a44-2ccbf1f4a16b',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '078b3d71-b2ef-4256-8a44-2ccbf1f4a16b',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '70',
   'date': 'Sat, 01 Aug 2020 04:58:56 GMT'},
  'RetryAttempts': 0}}

In [41]:
cwe.put_targets(
    Rule='demo-forecast',
    Targets=[
        {
            'Id': 'forecast',
            'Arn': "arn:aws:states:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":stateMachine:demo-forecast",
            'RoleArn': role_sm
        }
    ]
)

{'FailedEntryCount': 0,
 'FailedEntries': [],
 'ResponseMetadata': {'RequestId': '5b7e176e-049b-48ec-a3b9-a1ca5831cbcb',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '5b7e176e-049b-48ec-a3b9-a1ca5831cbcb',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '41',
   'date': 'Sat, 01 Aug 2020 04:58:56 GMT'},
  'RetryAttempts': 0}}

# 5.Put additional file into S3
When upload a file to S3, Step Functions runs.

It may not work if you run it all at once. wait about 30 seconds. the S3 file can be uploaded repeatedly.

In [46]:
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)

bucket.upload_file('./output/tr_target_add_20091201_20101209.csv', 'input/tr_target_add_20091201_20101209.csv')

# 6.Next
When Step Functions is completed, the prediction results are output to S3, import the data source in QuickSight using the same procedure as before and visualize it.