* changed by nov05 on 2024-11-23
* check [the Google Docs tutorial](https://docs.google.com/document/d/1Um47l8guJbz3r_OnQyV1aTgI_93fkWcyiUI3xig-cmQ)   

# UDACITY Designing Your First Workflow - Step Functions

## Step Functions & SageMaker

In the prior exercises, we've been working with many small services. This can be overwhelming for a data scientist that wants to establish a consistent methodology for handling data. Step Functions is an orchestration service that can allow us to utilize SageMaker in a methodical and consistent way. Step Functions also integrates with Lambda, which can allow us to potentially automate our entire machine learning pipeline end-to-end. Let's get a handle on what a 'step' in a step function looks like.

In this exercise, you will create a preprocessing step and a training step. Then you will create a step function to chain the two steps.

## Exercise: Grant Permissions and install packages.

Attach the **IAMFullAccess** and the **StepFunctionsFullAccess** polices to your SageMaker execution role in `AWS IAM`.

In [1]:
## Verify the role ARN 
from sagemaker import get_execution_role
role = get_execution_role()
print(role)
## arn:aws:iam::807711953667:role/service-role/AmazonSageMaker-ExecutionRole-20241121T213663

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
arn:aws:iam::807711953667:role/service-role/AmazonSageMaker-ExecutionRole-20241121T213663


In [2]:
%%bash
pip install stepfunctions
## Successfully installed stepfunctions-2.3.0



## Exercise: Fill out preprocessing step.

The 'step' interface is designed to be quite similar to the Preprocessing Job in lesson 2. The main difference between these is the ability of a 'step' to interface with other steps. Given the successful outcome of a single step, the next step specified in a workflow will automatically continue. In our case, a training step will launch given the successful outcome of a preprocessing step. The preprocessing step has been encoded for you. Upload the preprocessing code 'HelloBlazePreprocess.py' and the zipped dataset 'reviews_Musical_Instruments_5.json.zip' to s3, and fill out the constants in the code below. 

Code below is the preprocessing step. Fill in the constants in the code.

In [3]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from stepfunctions.steps.sagemaker import ProcessingStep
from datetime import datetime
import random


role = get_execution_role()
session = sagemaker.Session()

input_data = 's3://sagemaker-studio-807711953667-mmx0am1bt28/step_upload/reviews_Musical_Instruments_5.json.zip'
input_preprocessing_code = 's3://sagemaker-studio-807711953667-mmx0am1bt28/step_upload/HelloBlazePreprocess.py'
processed_data_train = f"s3://{session.default_bucket()}/hello_blaze_train_scikit"
processed_data_test = f"s3://{session.default_bucket()}/hello_blaze_test_scikit"
print("Train data:", processed_data_train)
print("Test data:", processed_data_test)

inputs = [
    ProcessingInput(source=input_data, 
                    destination='/opt/ml/processing/input', ## don't change this path
                    input_name='input_data'),  
    ProcessingInput(source=input_preprocessing_code, 
                    destination='/opt/ml/processing/input/code',  ## don't change this path
                    input_name='input_code')
]
outputs = [
    ProcessingOutput(source='/opt/ml/processing/output/train', ## don't change this path
                     destination=processed_data_train, 
                     output_name='train_data'), 
    ProcessingOutput(source='/opt/ml/processing/output/test', ## don't change this path
                     destination=processed_data_test, 
                     output_name='test_data')
]
sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
                                     role=role,
                                     instance_type='ml.m5.large',
                                     instance_count=1)
preprocessing_job_name = (
    f"udacity-step-preprocess-"
    f"{datetime.now().strftime('%Y%m%d%H%M%S')}-" ## attach datetime to make it unique
    f"{random.randint(100, 999)}" ## attach 3 random digits
)
print("Preprocessing job:", preprocessing_job_name)

###################################
## define preprocessing step
###################################
processing_step = ProcessingStep(
    "SageMaker Preprocessing Step",
    processor=sklearn_processor,
    job_name=preprocessing_job_name,
    inputs=inputs,
    outputs=outputs,
    container_entrypoint=["python3", 
                          "/opt/ml/processing/input/code/HelloBlazePreprocess.py"],
)
## s3://sagemaker-us-east-1-807711953667/hello_blaze_train_scikit
## s3://sagemaker-us-east-1-807711953667/hello_blaze_test_scikit

Train data: s3://sagemaker-us-east-1-807711953667/hello_blaze_train_scikit
Test data: s3://sagemaker-us-east-1-807711953667/hello_blaze_test_scikit
Preprocessing job: udacity-step-preprocess-20241124035109-152


## Exercise: Fill out Training Step

Upon the success of the preprocessing step, we wish to execute a training step. A training step is defined below. Fill the constants in the code.

In [4]:
from stepfunctions.steps.sagemaker import TrainingStep
import boto3

workflow_output = "s3://sagemaker-studio-807711953667-mmx0am1bt28/step_output/"
region_name = boto3.Session().region_name
container = sagemaker.image_uris.retrieve(
    region=region_name, 
    framework="blazingtext", 
    version="latest"
)
helloBlazeEstimator = sagemaker.estimator.Estimator(
    container,
    role=role,
    instance_count=1,
    instance_type='ml.m5.large',
    volume_size=30,
    max_run=360000,
    input_mode="File",
    output_path=workflow_output,
    sagemaker_session=session,
)
helloBlazeEstimator.set_hyperparameters(mode='supervised')
training_job_name = (
    f"udacity-step-train-"
    f"{datetime.now().strftime('%Y%m%d%H%M%S')}-" ## attach datetime to make it unique
    f"{random.randint(100, 999)}" ## attach 3 random digits
)
print("Training job:", training_job_name)
###################################
## define training step
###################################
training_step = TrainingStep(
    "SageMaker Training Step",
    estimator=helloBlazeEstimator,
    data={
        "train": sagemaker.TrainingInput(processed_data_train, 
                                         content_type="text/plain"), 
        "validation": sagemaker.TrainingInput(processed_data_test, 
                                              content_type="text/plain")
    },
    job_name=training_job_name,
    wait_for_completion=True,
)

Training job: udacity-step-train-20241124035109-354


## Exercise: Create Workflow & Execute It. 

To link the steps, you'll need to create a role that is capable of doing so. Go to `IAM` and create a Step Functions role, and attach the **CloudWatchEventsFullAccess** and **SageMakerFullAccess** policies. Once done, make use of the above steps to create a workflow. Quick debugging tip: jobs must have a unique name; you'll need to rename job names when debugging. Consider creating a method that will dynamically create unique job names! 

In [5]:
%%time
from stepfunctions.steps import Chain
from stepfunctions.workflow import Workflow

workflow_role = "arn:aws:iam::807711953667:role/udacity_step_20241123"
workflow_graph = Chain([processing_step, training_step])
workflow = Workflow(
    name="udacity_step_handson",
    definition=workflow_graph,
    role=workflow_role,
)
workflow.create()
###################################
## execute workflow
###################################
execution = workflow.execute(
    inputs={
        ## Each pre processing job (SageMaker processing job) requires a unique name
        "PreprocessingJobName": preprocessing_job_name, 
        ## Each Sagemaker Training job requires a unique name
        "TrainingJobName": training_job_name,        
    }
)
execution_output = execution.get_output(wait=True)
## CPU times: user 627 ms, sys: 79.8 ms, total: 707 ms
## Wall time: 5min 19s

CPU times: user 743 ms, sys: 63.1 ms, total: 806 ms
Wall time: 5min 47s


You can track the outcome of this workflow through a custom UI that gets generated! Check it out!    

⚠️ [vaib-amz commented on Feb 11, 2020](https://github.com/aws/aws-step-functions-data-science-sdk-python/issues/29#issuecomment-584783445)    
  `The graph visualizations provided by the Step Functions Data Science SDK are supported only on the standard Jupyter environment at present.`

In [6]:
execution.render_progress()