In [23]:
import logging
import json
import uuid

import boto3
import stepfunctions
from stepfunctions import steps
from stepfunctions.workflow import Workflow

In [24]:
stepfunctions.set_stream_logger(level=logging.INFO)
workflow_execution_role = 'arn:aws:iam::829044821271:role/StepFunctionsWorkflowExecutionRole'

In [25]:
with open('./face_clip/aws_batch/batch_names.json', 'r') as f:
    face_clip_name = json.load(f)
    
with open('./tag_extraction/aws_batch/batch_names.json', 'r') as f:
    tag_extraction_name = json.load(f)

In [26]:
face_clip_step = steps.BatchSubmitJobStep(
    state_id = 'Face Clip Step',
    parameters={
        'JobDefinition': face_clip_name['jobDefinition'],
        'JobName': face_clip_name['job'],
        'JobQueue': face_clip_name['jobQueue']
    }
)

In [27]:
tag_extraction_step = steps.BatchSubmitJobStep(
    state_id = 'Tag Extraction Step',
    parameters={
        'JobDefinition': tag_extraction_name['jobDefinition'],
        'JobName': tag_extraction_name['job'],
        'JobQueue': tag_extraction_name['jobQueue']
    }
)

In [28]:
chain_list = [face_clip_step, tag_extraction_step]
workflow_definition = steps.Chain(chain_list)

In [29]:
workflow = Workflow(
    name='preprocessing_workflow_{0}'.format(uuid.uuid4()),
    definition=workflow_definition,
    role=workflow_execution_role,
)

In [30]:
workflow.create()

[32m[INFO] Workflow created successfully on AWS Step Functions.[0m


'arn:aws:states:ap-northeast-1:829044821271:stateMachine:preprocessing_workflow_9be8ad65-9f8a-429e-a343-e6ce2b00076f'

In [31]:
workflow.render_graph()

In [32]:
workflow.execute()

[32m[INFO] Workflow execution started successfully on AWS Step Functions.[0m
