# Migrating workflows from KNIX to AWS Stepfunctions

KNIX is compatible with AWS Lambda and Step Functions with expanded support for sophisticated parallel executions.

## Goal for this Notebook:
Show a simple example of migrating a sample workflow in Python, using SDKs provided for both AWS and KNIX. This is aimed for those looking to get into the field or those who are already in the field and looking to see an example how to move existing workflows between AWS and KNIX.

### This Notebook will show basic examples of:
* Importing SDKs
* Generating and using SDK objects
* converting KNIX names to valid ARNs
* Importing and exporting worflow and function definitions

### Required Libraries:
* [json] (http://www.json.org/)
* [zipfile] (https://docs.python.org/3/library/zipfile)

### Things to remember:
* In Step-Functions workflow descriptions, 'Resource' ARN needs to be changed into real Lambda ARNs. This can be achieved by prepending a fixed, user-specific prefix of the form of "arn:aws:lambda:eu-central-1:123456789012:function:" . Note this step does not require any change in user code.

* In Lambda,  function handler needs to be configured when creating the function. In KNIX the function handler name must always be called "handle". Note this step does not require changing user code.

* KNIX users should put the libraries that they would like to be part of LD_LIBRARY_PATH in a ./lib/ folder, which is inside their deployment zip and sits parallel to their fuction code (referring to the .py file that has the 'handle' method)

* In KNIX, if user's deployment zip contains ELF executable binaries that can be invoked from the python code (using the subprocess module), then these binaries should be invoked using their complete path, and not via symbolic links to them.

* User code in Lambda is only allowed to create files in /tmp, whereas, in KNIX the entire filesystem is writable.


## Now let's start to migrate a workflow from KNIX to AWS. 

First, install the required AWS SDK. Please note that you need to configure your credentials for using this SDK, e.g by adding your credentials to ~/.aws/config:


In [1]:
pip install boto3

Collecting boto3
  Using cached https://files.pythonhosted.org/packages/8f/47/fd52106b41769acac53dfe1923bb363d6e4cc583a3a951c54a85a415593f/boto3-1.14.1-py2.py3-none-any.whl
Collecting botocore<1.18.0,>=1.17.1 (from boto3)
Collecting s3transfer<0.4.0,>=0.3.0 (from boto3)
  Using cached https://files.pythonhosted.org/packages/69/79/e6afb3d8b0b4e96cefbdc690f741d7dd24547ff1f94240c997a26fa908d3/s3transfer-0.3.3-py2.py3-none-any.whl
Collecting jmespath<1.0.0,>=0.7.1 (from boto3)
  Using cached https://files.pythonhosted.org/packages/07/cb/5f001272b6faeb23c1c9e0acc04d48eaaf5c862c17709d20e3469c6e0139/jmespath-0.10.0-py2.py3-none-any.whl
Collecting docutils<0.16,>=0.10 (from botocore<1.18.0,>=1.17.1->boto3)
  Using cached https://files.pythonhosted.org/packages/22/cd/a6aa959dca619918ccb55023b4cb151949c64d4d5d55b3f4ffd7eee0c6e8/docutils-0.15.2-py3-none-any.whl
Collecting urllib3<1.26,>=1.20; python_version != "3.4" (from botocore<1.18.0,>=1.17.1->boto3)
  Using cached https://files.pythonhosted.

Now import the required librares:

In [2]:
import json
from zipfile import ZipFile
from mfn_sdk import MfnClient
import boto3
from botocore.exceptions import ClientError

get a boto3 client object for the StepFunctions service:

In [3]:
client_sf = boto3.client('stepfunctions')

get a boto3 client object for the Identity and Access Management service:

In [4]:
# Create IAM client
iam = boto3.client('iam')

create an execution role for the AWS Stepfunctions service 

In [32]:
try: 
    response = iam.get_role(RoleName='tutorialStepFunctionsPolicy')
    print(response['Role']['Arn'])
    awsSfRoleArn  = response['Role']['Arn']
except ClientError as e:
    print(e)
    response = iam.create_role(
        Path='/service-role/',
        RoleName='tutorialStepFunctionsPolicy', 
        AssumeRolePolicyDocument='{"Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Principal": {"Service": "states.eu-central-1.amazonaws.com"}, "Action": "sts:AssumeRole"}]}',
        Description='KNIX migration tutorial',
        MaxSessionDuration=3600)
    awsSfRoleArn  = response['Role']['Arn']
    
"""    
#if e.response['Error']['Code'] == "EntityAlreadyExistsException":
#    print("Error: State Machine already exists at AWS Stepfunctions")
#    response = iam.get_role(RoleName='tutorialStepFunctionsPolicy')


# Create a policy to allow Stepfunctions to execute Lambda functions
try: 
    response = iam.create_role(
        Path='/service-role/',
        RoleName='tutorialStepFunctionsPolicy', 
        AssumeRolePolicyDocument='{"Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Principal": {"Service": "states.eu-central-1.amazonaws.com"}, "Action": "sts:AssumeRole"}]}',
        Description='KNIX migration tutorial',
        MaxSessionDuration=3600)
except ClientError as e:
    if e.response['Error']['Code'] == "EntityAlreadyExistsException":
        print("Error: State Machine already exists at AWS Stepfunctions")
        response = iam.get_role(RoleName='tutorialStepFunctionsPolicy')

#"EntityAlreadyExistsException":
#    repsponse = iam.get_role('tutorialStepFunctionsPolicy')
"""

arn:aws:iam::218181671562:role/service-role/tutorialStepFunctionsPolicy


'    \n#if e.response[\'Error\'][\'Code\'] == "EntityAlreadyExistsException":\n#    print("Error: State Machine already exists at AWS Stepfunctions")\n#    response = iam.get_role(RoleName=\'tutorialStepFunctionsPolicy\')\n\n\n# Create a policy to allow Stepfunctions to execute Lambda functions\ntry: \n    response = iam.create_role(\n        Path=\'/service-role/\',\n        RoleName=\'tutorialStepFunctionsPolicy\', \n        AssumeRolePolicyDocument=\'{"Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Principal": {"Service": "states.eu-central-1.amazonaws.com"}, "Action": "sts:AssumeRole"}]}\',\n        Description=\'KNIX migration tutorial\',\n        MaxSessionDuration=3600)\nexcept ClientError as e:\n    if e.response[\'Error\'][\'Code\'] == "EntityAlreadyExistsException":\n        print("Error: State Machine already exists at AWS Stepfunctions")\n        response = iam.get_role(RoleName=\'tutorialStepFunctionsPolicy\')\n\n#"EntityAlreadyExistsException":\n#    repsponse

In [33]:
print(str(response))
awsSfRoleArn = response['Role']['Arn']

{'Role': {'Path': '/service-role/', 'RoleName': 'tutorialStepFunctionsPolicy', 'RoleId': 'AROATFTFEDKFHBSEQUJ64', 'Arn': 'arn:aws:iam::218181671562:role/service-role/tutorialStepFunctionsPolicy', 'CreateDate': datetime.datetime(2020, 6, 12, 11, 18, 12, tzinfo=tzutc()), 'AssumeRolePolicyDocument': {'Version': '2012-10-17', 'Statement': [{'Effect': 'Allow', 'Principal': {'Service': 'states.eu-central-1.amazonaws.com'}, 'Action': 'sts:AssumeRole'}]}, 'Description': 'KNIX migration tutorial', 'MaxSessionDuration': 3600, 'RoleLastUsed': {}}, 'ResponseMetadata': {'RequestId': '0886a3d2-5fb2-490a-a8ed-25be171a4806', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '0886a3d2-5fb2-490a-a8ed-25be171a4806', 'content-type': 'text/xml', 'content-length': '939', 'date': 'Fri, 12 Jun 2020 11:54:40 GMT'}, 'RetryAttempts': 2}}


In [34]:
print(awsSfRoleArn)

arn:aws:iam::218181671562:role/service-role/tutorialStepFunctionsPolicy


Now use your credentials to get a knix client object:

In [35]:
client_mfn = MfnClient(
    #mfn_url="http://knix.io/mfn",
    mfn_url="http://localhost:8080",
    mfn_user="mfn@mfn",
    mfn_password="mfn",
    mfn_name="KS",
    proxies={"http_proxy": "None", "https_proxy": "None"}
    )

We need a few parameters to prepare the data access to AWS services suchas as Lambda and Stepfunctions:
* lambdaprefix: the ARN name prefix for AWS Lambda functions
* awsSfRoleName: the name (ARN) allowing the boto3 client to access the the AWS Stepfunctions service
* knixWfName: the name of the source workflow on KNIX
* sfWFName: the name of the target workflow on AWS Stepfunctions

In [36]:
user_id = str(boto3.resource('iam').CurrentUser().arn.split(":")[4])
lambdaPrefix = "arn:aws:lambda:eu-central-1:%s:function:" % user_id 
awsSfRoleName = awsSfRoleArn # "arn:aws:iam::218181671562:role/service-role/StatesExecutionRole-eu-central-1" 
sfWFName = "CallCenterStateMachine"
knixWfName = "test_wf_knix" # "workflow_task_chain_test" 

In [37]:
for w in client_mfn.workflows:
    if w.name == knixWfName:
       knixWf = w.json
    else:
        raise Exception("Error: knix workflow not found!")

Now, knixWf contains the source workflow definition, now lets get the corresponding KNIX Function definitions. The names of these functions need to be translated to valid ARNs for the transfer to AWS StepFunctions using a name prefix.   

In [38]:
knixWf = json.loads(knixWf)
print("Processing %s ... " % knixWfName)

for att, val in knixWf['States'].items():
    if "Resource" in list(val.keys()):
        val['Resource'] =  lambdaPrefix + val['Resource']
        print (val['Resource'])
    else:
        print("processing Non-Task state")
        #print(str(val))
        #val = val
        pass

Processing test_wf_knix ... 
arn:aws:lambda:eu-central-1:218181671562:function:AssignCaseFunction
arn:aws:lambda:eu-central-1:218181671562:function:CloseCaseFunction
arn:aws:lambda:eu-central-1:218181671562:function:EscalateCaseFunction
processing Non-Task state
processing Non-Task state
arn:aws:lambda:eu-central-1:218181671562:function:OpenCaseFunction
arn:aws:lambda:eu-central-1:218181671562:function:WorkOnCaseFunction


Let's check the resulting StepFunctions workflow definition json before uploading it to AWS:

In [39]:
sf_def = json.dumps(knixWf, indent = 6)
print(sf_def)

{
      "Comment": "A simple AWS Step Functions state machine that automates a call center support session.",
      "StartAt": "OpenCaseFunction",
      "States": {
            "AssignCaseFunction": {
                  "Next": "WorkOnCaseFunction",
                  "Resource": "arn:aws:lambda:eu-central-1:218181671562:function:AssignCaseFunction",
                  "Type": "Task"
            },
            "CloseCaseFunction": {
                  "End": true,
                  "Resource": "arn:aws:lambda:eu-central-1:218181671562:function:CloseCaseFunction",
                  "Type": "Task"
            },
            "EscalateCaseFunction": {
                  "Next": "Fail",
                  "Resource": "arn:aws:lambda:eu-central-1:218181671562:function:EscalateCaseFunction",
                  "Type": "Task"
            },
            "Fail": {
                  "Cause": "Engage Tier 2 Support.",
                  "Type": "Fail"
            },
            "IsCaseResolved": {
       

Now let's upload the definition using the boto3 client object using the roleArn parameter defined previously. Generate an error  if the name already exists. 

In [40]:
try:
  client_sf.create_state_machine(
   name=sfWFName,
   roleArn=awsSfRoleName,
   definition=sf_def)

except ClientError as e:
    if e.response['Error']['Code'] == 'StateMachineAlreadyExists':
        print("Error: State Machine already exists at AWS Stepfunctions")
    else:
        print("Unexpected error: %s" % e)

Now list all state machines for this account:

In [41]:
stateMachines = client_sf.list_state_machines()
print("all published machines: ")
for sm in stateMachines['stateMachines']:
   print(sm['name']) #

all published machines: 
CallCenterStateMachine
ChainStateMachine
ChoiceStateWithComplexCondition
Helloworld
Imported_from_SAND
Iterate_over_array
Komplex_und_verschachtelt
Motivational_Chain
MyExpressStateMachine
MyStateMachine
MyStateMachine_EquivalentParallel
MyTestChain1
RO-Test1
SAND_Motivational_Test
SandTest
SandTest2
complex_with_not
hh_test
inputpath_outputpath_resultpath_test
knixWorkflowName
knix_workflow_task_chain_test
looptest
myIteration
myMotivationalSFN
myMotivationalSFN1
myMotivationalSFN2
myMotivationalSFN3
myMotivationalSFN4
myMotivationalSFN5
nestedstatemachine
parallel-states-fail
parallel-states-success
parallel_state_test
parallel_timers
sand2sf_pipeline
state_machine
state_machine_definition
succeedtest
test
test_ASL
test_parallel_workflow
test_sand_migration
test_sand_migration_sdk
testcatchretry
twochoices
wait_test
wf_ASL
wf_echo


Now execute the state machine

In [47]:
response = client_sf.start_execution(
    stateMachineArn="arn:aws:states:eu-central-1:218181671562:stateMachine:CallCenterStateMachine", #% (user_id, sfWFName),
    input='{\"inputCaseID\" : \"001\"}'
)

In [48]:
print(response)

{'executionArn': 'arn:aws:states:eu-central-1:218181671562:execution:CallCenterStateMachine:4f603756-a666-4c67-931d-cd9123e18eed', 'startDate': datetime.datetime(2020, 6, 12, 13, 55, 13, 123000, tzinfo=tzlocal()), 'ResponseMetadata': {'RequestId': 'c3574f24-ab9d-40ac-916b-b7267f7cbe79', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'c3574f24-ab9d-40ac-916b-b7267f7cbe79', 'content-type': 'application/x-amz-json-1.0', 'content-length': '158'}, 'RetryAttempts': 0}}


In [49]:
executionArn = response['executionArn']

In [52]:
response = client_sf.get_execution_history(executionArn=executionArn)

In [57]:
print(response)
for ev in response['events']:
        #if "lambdaFunctionSucceededEventDetails" in ev:
        if "stateExitedEventDetails" in ev:
            print(str(ev["stateExitedEventDetails"]["output"]))
            #print (str(ev['lambdaFunctionSucceededEventDetails']['output']))

{'events': [{'timestamp': datetime.datetime(2020, 6, 12, 13, 55, 13, 123000, tzinfo=tzlocal()), 'type': 'ExecutionStarted', 'id': 1, 'previousEventId': 0, 'executionStartedEventDetails': {'input': '{"inputCaseID" : "001"}', 'roleArn': 'arn:aws:iam::218181671562:role/service-role/StatesExecutionRole-eu-central-1'}}, {'timestamp': datetime.datetime(2020, 6, 12, 13, 55, 13, 146000, tzinfo=tzlocal()), 'type': 'TaskStateEntered', 'id': 2, 'previousEventId': 0, 'stateEnteredEventDetails': {'name': 'OpenCaseFunction', 'input': '{"inputCaseID" : "001"}'}}, {'timestamp': datetime.datetime(2020, 6, 12, 13, 55, 13, 146000, tzinfo=tzlocal()), 'type': 'LambdaFunctionScheduled', 'id': 3, 'previousEventId': 2, 'lambdaFunctionScheduledEventDetails': {'resource': 'arn:aws:lambda:eu-central-1:218181671562:function:OpenCaseFunction', 'input': '{"inputCaseID" : "001"}'}}, {'timestamp': datetime.datetime(2020, 6, 12, 13, 55, 13, 172000, tzinfo=tzlocal()), 'type': 'LambdaFunctionStarted', 'id': 4, 'previous