# pystepfunction

- Create AWS Stepfunction asl.json files (state machine definition files) using python.
- Pre-made tasks that are easy to use. 
- Test dataflow through the stepfunction using the same python code.
- Visualise the stepfunction using pyvis for easier dubugging without the use of the AWS console.


## Installation

In [None]:
pip install pystepfunction

# Tasks
pystepfunction has Task implementations for various AWS resources.

In [1]:
from pystepfunction.tasks import LambdaTask, GlueTask, SucceedTask
from pystepfunction.branch import Branch


# create a simple chain of tasks
lambda_task = (
    LambdaTask(name="LambdaTaskName", function_arn="my-lambda-arn")
    .and_then(GlueTask(name="GlueTaskName", job_name="my-glue-job-name"))
    .and_then(SucceedTask(name="SucceedTaskName"))
)
# or
lambda_task = (
    LambdaTask(name="LambdaTaskName", function_arn="my-lambda-arn") 
    >> GlueTask(name="GlueTaskName", job_name="my-glue-job-name")
    >> SucceedTask(name="SucceedTaskName")
)

# create a branch
easy_branch = Branch(comment="This is an easy branch", start_task=lambda_task)
# view the asl as a dict
asl = easy_branch.to_asl()
print(asl)

{'Comment': 'This is an easy branch', 'StartAt': 'LambdaTaskName', 'States': {'LambdaTaskName': {'Type': 'Task', 'Resource': 'arn:aws:states:::lambda:invoke', 'End': False, 'Next': 'GlueTaskName', 'Parameters': {'FunctionName': 'my-lambda-arn'}}, 'GlueTaskName': {'Type': 'Task', 'Resource': 'arn:aws:states:::glue:startJobRun.sync', 'End': False, 'Next': 'SucceedTaskName', 'Parameters': {'JobName': 'my-glue-job-name'}}, 'SucceedTaskName': {'Type': 'Succeed', 'Resource': '', 'End': False}}}


## Add error handling to a task



In [3]:
from pystepfunction.errors import ERROR_STATE_ALL
from pystepfunction.tasks import LambdaTask, SucceedTask, FailTask, Retry
from pystepfunction.branch import Branch

succeed = SucceedTask(name="Succeed")
fail = FailTask(name="Fail")
task = (
    LambdaTask(name="LambdaTaskName", function_arn="my-lambda-arn")
    .with_retries([Retry(error_equals=[ERROR_STATE_ALL], interval_seconds=1, max_attempts=2)])
    .with_catcher(catcher={ERROR_STATE_ALL: fail})
    >> succeed
)
branch = Branch(start_task=task)
branch.to_asl()

{'Comment': '',
 'StartAt': 'LambdaTaskName',
 'States': {'LambdaTaskName': {'Type': 'Task',
   'Resource': 'arn:aws:states:::lambda:invoke',
   'End': False,
   'Next': 'Succeed',
   'Parameters': {'FunctionName': 'my-lambda-arn'},
   'Retry': [{'ErrorEquals': ['States.ALL'],
     'IntervalSeconds': 1,
     'MaxAttempts': 2,
     'BackoffRate': 1.0}],
   'Catch': [{'ErrorEquals': 'States.ALL', 'Next': 'Fail'}]},
  'Succeed': {'Type': 'Succeed', 'Resource': '', 'End': False},
  'Fail': {'Type': 'Fail', 'Cause': '', 'Error': ''}}}

# Manipulate input and output state of a Task

In [5]:
from pystepfunction.state import StateMachine
from pystepfunction.tasks import LambdaTask, TaskInputState, TaskOutputState
# Inputs use paramaters and result paths
# Both use jsonpath syntax

parameters = {"Input1.$": "$.Input1", "Input2.$": "$.Input2"}
input_state = TaskInputState(parameters=parameters, input_path="$.Inputs")

# Inputs use paramaters and result paths
# Both use jsonpath syntax
output_state = TaskOutputState(
    result_path="$.TaskResult",
    result_selector={"Output1.$": "$.Output1", "Output2.$": "$.Output2"},
)

lambda_task = LambdaTask(name="LambdaTaskName", function_arn="my-lambda-arn").with_input(input_state).with_output(output_state)
lambda_task.to_asl()

# Test the dataflow of a task
# define what is expected back from the lambda resource for testing purposes


{'LambdaTaskName': {'Type': 'Task',
  'Resource': 'arn:aws:states:::lambda:invoke',
  'End': False,
  'InputPath': '$.Inputs',
  'Parameters': {'FunctionName': 'my-lambda-arn',
   'Input1.$': '$.Input1',
   'Input2.$': '$.Input2'},
  'ResultPath': '$.TaskResult',
  'ResultSelector': {'Output1.$': '$.Output1', 'Output2.$': '$.Output2'}}}

In [6]:
resource_result = {"Output1": "val1", "Output2": "val2", "Output3": "val3"}
lambda_task.with_resource_result(resource_result)

# create a new state and set its value
state = StateMachine()
state.update_state(state={"Inputs": {"Input1": "val1", "Input2": "val2"}}, msg="Input state")

# view the value log
state.state_log


[{'Inputs': {'Input1': 'val1', 'Input2': 'val2'}, '__msg__': 'Input state'}]

In [1]:
state.apply_task(lambda_task)
state.state_log


NameError: name 'state' is not defined

# visualise with pyvis

In [11]:
from pystepfunction.viz import BranchViz

BranchViz(easy_branch).show("easy_branch.html") 

easy_branch.html


# Visualise with networkx and matplotlib
TODO

# Add error handling

# Create choice and parallel flows

# Test data flows