# UDACITY Designing Your First Workflow - Step Functions

## Step Functions & SageMaker

In the prior exercises, we've been working with many small services. This can be overwhelming for a data scientist that wants to establish a consistent methodology for handling data. Step Functions is an orchestration service that can allow us to utilize SageMaker in a methodical and consistent way. Step Functions also integrates with Lambda, which can allow us to potentially automate our entire machine learning pipeline end-to-end. Let's get a handle on what a 'step' in a step function looks like.

In this exercise, you will create a preprocessing step and a training step. Then you will create a step function to chain the two steps.

## Exercise: Grant Permissions and install packages.

Attach the IAMFullAccess and the StepFunctionsFullAccess polices to your SageMaker execution role.

In [1]:
%%bash
pip install stepfunctions -q

## Exercise: Fill out preprocessing step.

The 'step' interface is designed to be quite similar to the Preprocessing Job in lesson 2. The main difference between these is the ability of a 'step' to interface with other steps. Given the successful outcome of a single step, the next step specified in a workflow will automatically continue. In our case, a training step will launch given the successful outcome of a preprocessing step. The preprocessing step has been encoded for you. Upload the preprocessing code 'HelloBlazePreprocess.py' and the zipped dataset 'reviews_Musical_Instruments_5.json.zip' to s3, and fill out the constants in the code below. 

Code below is the preprocessing step. Fill in the constants in the code.

In [2]:
import boto3
s3 = boto3.client("s3")
s3.upload_file(
    "HelloBlazePreprocess.py",
    Bucket="udacity-landingzone",
    Key="lesson3-stepfunction/script/HelloBlazePreprocess.py")

In [3]:
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from stepfunctions.steps.sagemaker import ProcessingStep
import sagemaker
import time

role = get_execution_role()

timestamp = int(time.time())
PREPROCESSING_JOB_NAME = f"PreprocessingJob-{timestamp}"
TRAINING_JOB_NAME = f"TrainingJob-{timestamp}"

input_data = 's3://udacity-landingzone/lesson3-stepfunction/input/Toys_and_Games_5.json.zip'
input_preprocessing_code = 's3://udacity-landingzone/lesson3-stepfunction/script/HelloBlazePreprocess.py'
sess = sagemaker.Session()

sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
                                     role=role,
                                     instance_type='ml.m5.large',
                                     instance_count=1)


training_bucket = 'udacity-landingzone'
suffix = "lesson3-stepfunction/output/"
processed_data_train = "{}{}/{}".format("s3://", training_bucket, suffix + "Toys_and_Games_5.json.zip_train")
processed_data_test = "{}{}/{}".format("s3://", training_bucket, suffix + "Toys_and_Games_5.json.zip_test")

inputs=[
    ProcessingInput(
        source=input_data, 
        destination='/opt/ml/processing/input', 
        input_name = 'input-1'),  
    ProcessingInput(
        source=input_preprocessing_code , 
        destination='/opt/ml/processing/input/script', 
        input_name = 'code')]


outputs=[
    ProcessingOutput(
        source='/opt/ml/processing/output/train', 
        destination=processed_data_train, 
        output_name = 'train_data'), 
    ProcessingOutput(
        source='/opt/ml/processing/output/test', 
        destination=processed_data_test, 
        output_name = 'test_data')
]


processing_step = ProcessingStep(
    "SageMaker pre-processing step 4",
    processor=sklearn_processor,
    job_name=PREPROCESSING_JOB_NAME,
    inputs=inputs,
    outputs=outputs,
    container_entrypoint=["python3", "/opt/ml/processing/input/script/HelloBlazePreprocess.py"]
)


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


## Exercise: Fill out Training Step

Upon the success of the preprocessing step, we wish to execute a training step. A training step is defined below. Fill the constants in the code.

In [4]:
from stepfunctions.steps.sagemaker import TrainingStep
import boto3

WORKFLOW_OUTPUT = "s3://udacity-landingzone/lesson3-stepfunction/workflow/"

region_name = boto3.Session().region_name
container = sagemaker.image_uris.retrieve(
    region=region_name, framework="blazingtext", version="latest"
)

helloBlazeEstimator = sagemaker.estimator.Estimator(
    container,
    role=role,
    instance_count=1,
    instance_type='ml.m5.large',
    volume_size=30,
    max_run=360000,
    input_mode="File",
    output_path=WORKFLOW_OUTPUT,
    sagemaker_session=sess,
)

helloBlazeEstimator.set_hyperparameters(mode='supervised')

training_step = TrainingStep(
    "SageMaker Training Step",
    estimator=helloBlazeEstimator,
    data={
        "train": sagemaker.TrainingInput(processed_data_train, content_type="text/plain"), 
        "validation": sagemaker.TrainingInput(processed_data_test, content_type="text/plain")},
    job_name=TRAINING_JOB_NAME,
    wait_for_completion=True,
)

## Exercise: Create Workflow & Execute It. 

To link the steps, you'll need to create a role that is capable of doing so. Go to IAM and create a Step Functions role, and attach the CloudWatchEventsFullAccess and SageMakerFullAccess policies. Once done, make use of the above steps to create a workflow. Quick debugging tip: jobs must have a unique name; you'll need to rename job names when debugging. Consider creating a method that will dynamically create unique job names! 

In [5]:
print(PREPROCESSING_JOB_NAME)
print(TRAINING_JOB_NAME)

PreprocessingJob-1728097683
TrainingJob-1728097683


In [6]:
import boto3
from stepfunctions.steps import Chain
from stepfunctions.workflow import Workflow
import time

workflow_role = "arn:aws:iam::002427974286:role/UdacitySageMakerStepFunctionExecutionRole"
workflow_name = "workflow-stepfunction-processing"  # This is the state machine name

# CloudWatch Logs configuration
log_group_arn = "arn:aws:logs:us-east-1:002427974286:log-group:/aws/vendedlogs/states/workflow-stepfunction-processing-Logs:*"  # Replace with your log group ARN

logging_configuration = {
    "level": "ALL",  # Log levels: ALL, ERROR, FATAL
    "includeExecutionData": True,
    "destinations": [
        {
            "cloudWatchLogsLogGroup": {
                "logGroupArn": log_group_arn
            }
        }
    ]
}

# Create boto3 client for Step Functions
sfn_client = boto3.client('stepfunctions')

# Check if the workflow (state machine) already exists
def get_existing_workflow_arn(name):
    state_machines = sfn_client.list_state_machines()["stateMachines"]
    for sm in state_machines:
        if sm["name"] == name:
            return sm["stateMachineArn"]
    return None

# Define the workflow graph
workflow_graph = Chain([processing_step, training_step])

# Check if workflow exists, get its ARN if it exists
workflow_arn = get_existing_workflow_arn(workflow_name)

# Create or update the workflow
if workflow_arn:
    print(f"Attaching to existing workflow: {workflow_name}")
    # Attach to the existing workflow using its ARN
    workflow = Workflow.attach(workflow_arn)
    # Update the workflow definition using the Workflow object
    workflow.update(definition=workflow_graph, role=workflow_role)
    
    # Update logging configuration via boto3 client
    response = sfn_client.update_state_machine(
        stateMachineArn=workflow_arn,
        loggingConfiguration=logging_configuration
    )
    print(f"Updated workflow with logging: {response}")
else:
    print(f"Creating new workflow: {workflow_name}")
    # Create a new workflow
    workflow = Workflow(
        name=workflow_name,
        definition=workflow_graph,
        role=workflow_role,
    )
    workflow.create()
    
    # Add logging configuration after creating the workflow via boto3 client
    workflow_arn = workflow.arn
    response = sfn_client.update_state_machine(
        stateMachineArn=workflow_arn,
        loggingConfiguration=logging_configuration
    )
    print(f"Created workflow with logging: {response}")

# Execute the workflow with unique job names
execution = workflow.execute(
    inputs={
        "PreprocessingJobName": PREPROCESSING_JOB_NAME,
        "TrainingJobName": TRAINING_JOB_NAME
    }
)

execution_output = execution.get_output(wait=True)


Attaching to existing workflow: workflow-stepfunction-processing
Updated workflow with logging: {'updateDate': datetime.datetime(2024, 10, 5, 3, 8, 4, 812000, tzinfo=tzlocal()), 'revisionId': 'c4627de5-8453-4245-89cc-c584c3618618', 'ResponseMetadata': {'RequestId': '56a22035-e0de-4c2e-9803-b6bee7b3809e', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '56a22035-e0de-4c2e-9803-b6bee7b3809e', 'date': 'Sat, 05 Oct 2024 03:08:04 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '83', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}


You can track the outcome of this workflow through a custom UI that gets generated! Check it out!

In [7]:
execution.render_progress()