# UDACITY Designing Your First Workflow - Tying it All Together

AWS is comprised of many services, and one of the main skills you'll develop as an ML Engineer working in AWS is in chaining these services together to accomplish specific data engineering goals. With Lambda, you've learned how to launch serverless jobs, and with Step Functions, you've learned how to create a workflow that chains jobs together. Now, you'll learn how to launch a Step Function using a Lambda job. 

Before starting this, it's important to highlight that this is not the only way to accomplish something like this. Multiple services integrate with Step Functions, and so it follows that there are multiple ways to launch Step Functions. These services, among others, include API Gateway, EventBridge, and even other Step Functions. 

Your task is to create a new lambda function that will launch the state machine you created in the **last exercise**. You'll then launch this lambda function from the command line. To find the definition of the step function you've made, click into the step function and look for the definition under the 'Definition' tab. 

First, create a new Lambda role. Attach to this role the StepFunctionsFullAccess policy. Then create a new lambda function under the default template, and attach this new role to it. Use the starter code below to help you modify the lambda handler to accomplish your task. 

As Step Function cannot execute more than once with the same name, you must update the definition with a new name. You can find the existing definition of a Step Function in the AWS Console under 'Step Functions'. In the lambda function code below, update the 'definition' with the step function definition from your last exercise, with the only difference being the, step fucntion name, processing-job name and the training-job name. 

## Exercise: Create the Lambda Function

In [1]:
import json
import boto3
import time

client = boto3.client('stepfunctions')

# todo, copy the definition from the last exercise and paste it below. 
# Also change the names of step function, training job and processing job. 
definition = """{
  "StartAt": "SageMaker pre-processing step",
  "States": {
    "SageMaker pre-processing step": {
      "Resource": "arn:aws:states:::sagemaker:createProcessingJob.sync",
      "Parameters": {
        "ProcessingJobName": "preprocessing-step-2022",
        "ProcessingInputs": [
          {
            "InputName": "input-1",
            "AppManaged": false,
            "S3Input": {
              "S3Uri": "s3://udacity-ml-workflow/lambda/reviews_Musical_Instruments_5.json.zip",
              "LocalPath": "/opt/ml/processing/input",
              "S3DataType": "S3Prefix",
              "S3InputMode": "File",
              "S3DataDistributionType": "FullyReplicated",
              "S3CompressionType": "None"
            }
          },
          {
            "InputName": "code",
            "AppManaged": false,
            "S3Input": {
              "S3Uri": "s3://udacity-ml-workflow/lambda/HelloBlazePreprocess.py",
              "LocalPath": "/opt/ml/processing/input/code",
              "S3DataType": "S3Prefix",
              "S3InputMode": "File",
              "S3DataDistributionType": "FullyReplicated",
              "S3CompressionType": "None"
            }
          }
        ],
        "ProcessingOutputConfig": {
          "Outputs": [
            {
              "OutputName": "train_data",
              "AppManaged": false,
              "S3Output": {
                "S3Uri": "s3://sagemaker-us-west-2-565094796913/hello_blaze_train_scikit",
                "LocalPath": "/opt/ml/processing/output/train",
                "S3UploadMode": "EndOfJob"
              }
            },
            {
              "OutputName": "test_data",
              "AppManaged": false,
              "S3Output": {
                "S3Uri": "s3://sagemaker-us-west-2-565094796913/hello_blaze_test_scikit",
                "LocalPath": "/opt/ml/processing/output/test",
                "S3UploadMode": "EndOfJob"
              }
            }
          ]
        },
        "AppSpecification": {
          "ImageUri": "246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-scikit-learn:0.20.0-cpu-py3",
          "ContainerEntrypoint": [
            "python3",
            "/opt/ml/processing/input/code/HelloBlazePreprocess.py"
          ]
        },
        "RoleArn": "arn:aws:iam::565094796913:role/execution_role",
        "ProcessingResources": {
          "ClusterConfig": {
            "InstanceCount": 1,
            "InstanceType": "ml.m5.large",
            "VolumeSizeInGB": 30
          }
        }
      },
      "Type": "Task",
      "Next": "SageMaker Training Step"
    },
    "SageMaker Training Step": {
      "Resource": "arn:aws:states:::sagemaker:createTrainingJob.sync",
      "Parameters": {
        "AlgorithmSpecification": {
          "TrainingImage": "433757028032.dkr.ecr.us-west-2.amazonaws.com/blazingtext:1",
          "TrainingInputMode": "File"
        },
        "OutputDataConfig": {
          "S3OutputPath": "s3://udacity-sagemaker-solutiondata2021/l3e3/workflow_output"
        },
        "StoppingCondition": {
          "MaxRuntimeInSeconds": 360000
        },
        "ResourceConfig": {
          "InstanceCount": 1,
          "InstanceType": "ml.m5.large",
          "VolumeSizeInGB": 30
        },
        "RoleArn": "arn:aws:iam::934110659707:role/service-role/AmazonSageMaker-ExecutionRole-20220111T121707",
        "InputDataConfig": [
          {
            "DataSource": {
              "S3DataSource": {
                "S3DataType": "S3Prefix",
                "S3Uri": "s3://sagemaker-us-west-2-565094796913/hello_blaze_train_scikit",
                "S3DataDistributionType": "FullyReplicated"
              }
            },
            "ContentType": "text/plain",
            "ChannelName": "train"
          },
          {
            "DataSource": {
              "S3DataSource": {
                "S3DataType": "S3Prefix",
                "S3Uri": "s3://sagemaker-us-west-2-565094796913/hello_blaze_test_scikit",
                "S3DataDistributionType": "FullyReplicated"
              }
            },
            "ContentType": "text/plain",
            "ChannelName": "validation"
          }
        ],
        "HyperParameters": {
          "mode": "supervised"
        },
        "TrainingJobName": "training"
      },
      "Type": "Task",
      "End": true
    }
  }
}

"""

def lambda_handler(event, context):
    #todo 
    client.update_state_machine(definition=definition, stateMachineArn='arn:aws:iam::934110659707:role/service-role/AmazonSageMaker-ExecutionRole-20220111T121707') 
    # Give AWS time to register the defintion
    time.sleep(5)
    #todo
    client.start_execution(input='{}', name='lambda-123', stateMachineArn='arn:aws:iam::934110659707:role/service-role/AmazonSageMaker-ExecutionRole-20220111T121707') 
    
    return {
        'statusCode': 200,
        'body': 'The step function has successfully launched!'
    }


## Exercise: Launch the Lambda Function


Launch the lambda function and confirm the step function is created successfully.

## Conceptual Exercise: What are next steps? 

Right now, the Step Function that we made in the prior exercise has a hard-coded location of the dataset we input, as well as all of the locations of the intermediary steps. What are ways that you could modify the Step Function to make it more generalizable? If you could input an S3 location, how could you integrate it with Lambda so that it could asychronously be called? 