Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
nitial Step Functions Execute and Poll sample.
  • Loading branch information
mwarman committed Apr 4, 2019
1 parent b6a39cd commit 5a03f43
Show file tree
Hide file tree
Showing 9 changed files with 3,084 additions and 0 deletions.
62 changes: 62 additions & 0 deletions step-functions-api-sync/README.md
@@ -0,0 +1,62 @@
# Sample: Step Functions - Execute and Poll

This sample serverless application illustrates how to implement a simple Step Functions State Machine using Serverless Framework plugins and poll for the result of the state machine execution.

Step functions state machine executions are asynchronous. This sample project demonstrates how to invoke a step function state machine programmatically and how to poll the execution history events to determine when execution has completed and extract the resulting output.

## How to Use

### Install Dependencies

Install the project dependencies with the Yarn package manager. Navigate to the application base directory and issue the following command:

```
yarn install
```

### Run the Application

Run the serverless application on AWS. Navigate to the application base directory and issue the following command:

```
sls deploy
```

The Serverless Framework packages the application and deploys it to AWS. The application creates an API Gateway API, Lambda Functions, and a Step Functions State Machine. The State Machine is invoked by the `createMessage` Lambda function.

To test the deployed application, you may use cURL, or better yet, a REST client such as Postman.

Send a POST request to the `/messages` endpoint. You may leave the request body empty or include JSON in the following format:

```
{
"audience": "Step Functions"
}
```

An invocation of a Step Function State Machine is asynchronous. Therefore, when the `createMessage` Lambda function invokes the State machine, it returns immediately. The response body contains an ARN referencing the specific execution of the State Machine. The Lambda function contains logic to poll for the outcome of the State Machine execution. A sample response format is:

```
{
"message": "HELLO STEP FUNCTIONS!",
"createdAt": 1554380384990
}
```

When the final state of the execution is detected, the ouput retrieved from the State Machine execution and returned to the caller via the API Gateway.

### Remove the Application

To remove the serverless application from AWS, navigate to the application base directory and issue the following command:

```
sls remove
```

## See Also
[How to manage your AWS Step Functions with Serverless][sls-stepfunctions]
[serverless-step-functions](https://www.npmjs.com/package/serverless-step-functions "serverless-step-functions | NPM")
[serverless-pseudo-parameters](https://www.npmjs.com/package/serverless-pseudo-parameters "serverless-pseudo-parameters | NPM")

[sls-stepfunctions]: https://serverless.com/blog/how-to-manage-your-aws-step-functions-with-serverless/ "How to manage your AWS Step Functions with Serverless | Serverless Blog"
[aws-stepf-gateway]: https://docs.aws.amazon.com/step-functions/latest/dg/tutorial-api-gateway.html "Creating a Step Functions API Using API Gateway | AWS Docs"
33 changes: 33 additions & 0 deletions step-functions-api-sync/handler/createMessage.js
@@ -0,0 +1,33 @@
const serverless = require('serverless-http');
const bodyParser = require('body-parser');
const express = require('express');
const app = express();
app.use(bodyParser.json({ strict: false }));

const { startExecutionAndWait } = require('../stepf/execution');

/**
* The create message endpoint.
*/
app.post('/messages', function (req, res) {

// validate request
const { audience } = req.body;
if (typeof audience !== 'string') {
res.status(400).json({ message: '"audience" must be a string' });
}

// perform business logic, i.e. invoke the step function state machine
startExecutionAndWait(req.body)
.then((output) => {
console.log(`Execution result.\n${JSON.stringify(output, null, 2)}`);
res.json(output);
})
.catch((err) => {
console.log(`Error occurred. Detail:\n${JSON.stringify(err, null, 2)}`);
res.status(500).json(err);
});

});

module.exports.handle = serverless(app);
21 changes: 21 additions & 0 deletions step-functions-api-sync/handler/formatMessage.js
@@ -0,0 +1,21 @@
/**
* Function: Format Message
* Description: Create a simple message using supplied 'audience' attribute if present.
* Stage: 1
*/
exports.handle = async (event, context) => {
console.log(`event:\n${JSON.stringify(event, null, 2)}`);
console.log(`context:\n${JSON.stringify(context, null, 2)}`);

const audience = event.audience || 'World';

const createdAt = new Date().getTime();
const message = `Hello ${audience}!`;

const result = {
message,
createdAt
};

return result;
}
21 changes: 21 additions & 0 deletions step-functions-api-sync/handler/toUpperCase.js
@@ -0,0 +1,21 @@

/**
* Function: To Uppercase
* Description: Convert the message to UPPER CASE.
* Stage: 2
*/
exports.handle = async (event, context) => {
console.log(`event:\n${JSON.stringify(event, null, 2)}`);
console.log(`context:\n${JSON.stringify(context, null, 2)}`);

/* The result from the previous Step Function Stage is the event, i.e. the input, for this Stage. */
const result = {
...event
};

if (!!result.message) {
result.message = result.message.toUpperCase();
}

return result;
}
19 changes: 19 additions & 0 deletions step-functions-api-sync/package.json
@@ -0,0 +1,19 @@
{
"name": "serverless-samples-stepf-api-sync",
"version": "1.0.0",
"main": "index.js",
"repository": "https://github.com/mwarman/serverless-samples",
"author": "Matt Warman",
"license": "Apache-2.0",
"private": false,
"dependencies": {
"aws-sdk": "^2.429.0",
"body-parser": "^1.18.3",
"express": "^4.16.4",
"serverless-http": "^1.9.1"
},
"devDependencies": {
"serverless-pseudo-parameters": "^2.4.0",
"serverless-step-functions": "^1.16.0"
}
}
64 changes: 64 additions & 0 deletions step-functions-api-sync/serverless.yml
@@ -0,0 +1,64 @@
service: samples-stepf-api-sync

plugins:
- serverless-step-functions
- serverless-pseudo-parameters

provider:
name: aws
runtime: nodejs8.10
stage: ${opt:stage, 'dev'}
region: us-east-1
memorySize: 128
iamRoleStatements:
- Effect: Allow
Action:
- states:StartExecution
Resource:
- ${self:resources.Outputs.CreateMessageStateMachineArn.Value}
- Effect: Allow
Action:
- states:GetExecutionHistory
Resource:
- "*"

functions:
createMessage:
handler: handler/createMessage.handle
environment:
STATE_MACHINE_ARN: ${self:resources.Outputs.CreateMessageStateMachineArn.Value}
events:
- http:
path: /messages
method: POST
formatMessage:
handler: handler/formatMessage.handle
toUppercase:
handler: handler/toUpperCase.handle

stepFunctions:
stateMachines:
createMessage:
name: ${self:service}-${self:provider.stage}-createMessage
definition:
Comment: A sample state machine that creates a Hello World message and converts it to uppercase.
StartAt: FormatMessage
States:
FormatMessage:
Type: Task
Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${self:provider.stage}-formatMessage"
Next: ToUppercase
ToUppercase:
Type: Task
Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${self:provider.stage}-toUppercase"
Next: EndState
EndState:
Type: Pass
End: true

resources:
Outputs:
CreateMessageStateMachineArn:
Description: ARN of the CreateMessage Step Functions State Machine
Value:
Ref: SamplesDashstepfDashapiDashsyncDashdevDashcreateMessage
42 changes: 42 additions & 0 deletions step-functions-api-sync/stepf/execution.js
@@ -0,0 +1,42 @@
const AWS = require('aws-sdk');
const { pollExecutionHistory } = require('./history');

// AWS SDK - StepFunctions
const stepfunctions = new AWS.StepFunctions();

// Environment Variable - ARN of the Step Functions State Machine
const STATE_MACHINE_ARN = process.env.STATE_MACHINE_ARN || 'ARN';

/**
* Starts an execution of an AWS Step Functions State Machine. Passes the
* inputObj parameter to the execution.
* @param {*} inputObj Input to the state machine execution.
*/
const startExecution = async (inputObj = {}) => {
console.log(`startExecution`);
const input = JSON.stringify(inputObj);
const params = {
stateMachineArn: STATE_MACHINE_ARN,
input
};
const execution = await stepfunctions.startExecution(params).promise();
console.log(`execution:\n${JSON.stringify(execution, null, 2)}`);
return execution;
};
exports.startExecution = startExecution;

/**
* Starts an execution of an AWS Step Functions State Machine. Passes the
* inputObj parameter to the execution. Polls the event history of the
* execution for completion. Returns the output from the final event in
* the execution.
* @param {*} inputObj Input to the state machine execution.
* @returns Output from the state machine execution as JSON.
*/
const startExecutionAndWait = async (inputObj = {}) => {
console.log(`startExecutionAndWait`);
const execution = await startExecution(inputObj);
const event = await pollExecutionHistory(execution.executionArn);
return JSON.parse(event.stateExitedEventDetails.output);
};
exports.startExecutionAndWait = startExecutionAndWait;
58 changes: 58 additions & 0 deletions step-functions-api-sync/stepf/history.js
@@ -0,0 +1,58 @@
const AWS = require('aws-sdk');
const stepfunctions = new AWS.StepFunctions();

/**
* Uses the AWS SDK to obtain the event history for a Step Functions State Machine
* Execution. The response is an array of JSON event objects sorted from newest to oldest.
* @param {string} executionArn ARN of a Step Function State Machine Execution.
* @returns An array of event history objects describing the state machine execution status.
*/
const getExecutionHistory = async (executionArn = '') => {
console.log(`getExecutionHistory for executionArn: ${executionArn}`);
const params = {
executionArn,
maxResults: 0,
reverseOrder: true
};
const events = await stepfunctions.getExecutionHistory(params).promise();
console.log(`Execution events:\n${JSON.stringify(events, null, 2)}`);
return events;
};
exports.getExecutionHistory = getExecutionHistory;

/**
* Returns a Promise which waits for the supplied number of milliseconds and then resolves.
* Utility for delaying activities.
* @param {number} millis The number of milliseconds to delay before resolving the Promise.
* @returns A Promise.
*/
const timeout = (millis = 0) => {
return new Promise(resolve => setTimeout(resolve, millis));
};

/**
* Periodically retrieves the event history for a Step Functions State Machine Execution.
* Searches the array of events for one matching the supplied type and name, typically the final
* event in a state machine. Once found, polling ceases and the matching event object is returned.
* @param {string} executionArn ARN of a Step Function State Machine Execution.
* @param {string} eventType The event type to poll for.
* @param {string} eventName The event name to poll for.
* @param {number} interval The interval, in milliseconds, to poll for events.
* @returns An event history object matching the supplied event type and name.
*/
const pollExecutionHistory = async (executionArn, eventType = 'PassStateExited', eventName = 'EndState', interval = 250) => {
console.log(`pollExecutionHistory for eventType:${eventType} and eventName:${eventName} at interval:${interval}ms`);
let event;
do {
// Using the timeout function with Promise.all slows the loop, ensuring getExecutionHistory
// is invoked just once per interval (e.g. 250ms)
const [timeoutResult, executionHistory] = await Promise.all([
timeout(interval),
getExecutionHistory(executionArn)
]);
event = executionHistory.events.find((event) => event.type === eventType && event.stateExitedEventDetails.name === eventName);
console.log(`Event:\n${JSON.stringify(event, null, 2)}`);
} while(!event) // exit the loop if a matching event is found, otherwise poll the event history again
return event;
};
exports.pollExecutionHistory = pollExecutionHistory;

0 comments on commit 5a03f43

Please sign in to comment.