# SageMaker Pipelines EMR Step With Cluster Lifecycle Management


---
This notebook is based on [this](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/emr-step/sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb).

---

This notebook illustrates how an [EMR step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-emr) can be run from a SageMaker Pipeline. This example requires a cluster config. The EMR step uses the cluster config to create an EMR cluster, performs the required job and finally closes the cluster.

The steps in this pipeline include:
* Preprocess the Abalone dataset with PySpark on EMR

## Contents

1. [Prerequisites](#Prerequisites)
1. [Configuration Setup](#Configuration-Setup)
1. [Parameters](#Parameters)
1. [Data Preparation with Spark on EMR](#Data-Preparation)
1. [Cleanup](#Cleanup)


## Prerequisites

To run this notebook you will need to run first the AWS CDK app that will setup the followimg IAM entitites:
1. Service role for Amazon EMR (EMR role) - this is passed as the `ServiceRole` parameter. Is role is called `f"arn:aws:iam::{account}:role/service-role/AmazonEMR-ServiceRole-ForSageMakerPipelines"` and is referenced below in the notebook.
2. An Instance profile for cluster EC2 instances (EC2 instance profile) - this is passed as the `JobFlowRole` parameter the instance profile is called `f"arn:aws:iam::{account}:instance-profile/service-role/AmazonEMR-InstanceProfile-ForSageMakerPipelines"` and is referenced below in the notebook.
3. A managed policy That will need to attach to the SageMaker Execution role used by the SageMaker Pipeline.

To learn more about the service role and instance profile see ['EMR IAM roles'](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-iam-roles.html) for more details.

Next you need to following the instructions below to attach the following IAM policies to the SageMaker Execution Role used by this notebook and the SageMaker Pipeline.

Run the following cell to identify the SageMaker execution role

In [1]:
import sagemaker


sagemaker.get_execution_role()

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


'arn:aws:iam::<AWS_ACCOUNT_ID>:role/service-role/AmazonSageMaker-ExecutionRole-20240712T150567'

Next,

1. Go to the IAM, and search for the role name printed above `AmazonSageMaker-ExecutionRole-XXXXXX` in the search bar and click on it.
2. Click add policies on the right, and search for policy `EMRSageMakerPipelinesIntegration` created by CDK app, select it, then click add permissions.
3. Again, follow the same process to add the AWS managed policy `AmazonSageMakerPipelinesIntegrations` to the role.

## Setup 

### Setup Dependencies

In [2]:
%pip install --upgrade sagemaker

Note: you may need to restart the kernel to use updated packages.


In [3]:
import os
import json

import boto3
import sagemaker
import sagemaker.session

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor,
)
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
)
from sagemaker.workflow.functions import (
    JsonGet,
)
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
)
from sagemaker.workflow.model_step import ModelStep
from sagemaker.model import Model

from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig
from sagemaker.workflow.pipeline_context import PipelineSession

## Configuration Setup

Let's now configure the setup we need, which includes the session object from the SageMaker Python SDK, and necessary configurations for the pipelines, such as object types, input and output buckets and so on.

In [4]:
# Create the SageMaker Session

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
region = sagemaker_session.boto_region_name
boto_session = boto3.Session(region_name=region)
sagemaker_client = sagemaker_session.sagemaker_client
default_bucket = sagemaker_session.default_bucket()


account = boto_session.client("sts").get_caller_identity()["Account"]


pipeline_session = PipelineSession(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    default_bucket=default_bucket,
)

## Parameters


In [5]:
pipeline_name = "EMRStepPipeline"
base_job_prefix = "emr-step-pipeline"
processing_instance_type = "ml.m5.xlarge"
training_instance_type = "ml.m5.xlarge"
BASE_DIR = "code"

job_flow_role = f"arn:aws:iam::{account}:instance-profile/service-role/AmazonEMR-InstanceProfile-ForSageMakerPipelines"
service_role = f"arn:aws:iam::{account}:role/service-role/AmazonEMR-ServiceRole-ForSageMakerPipelines"

In [6]:
# Define variables and parameters needed for the Pipeline steps
# parameters for pipeline execution

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
input_data = ParameterString(
    name="InputDataUrl",
    default_value=f"s3://sagemaker-example-files-prod-{region}/datasets/tabular/uci_abalone/abalone.csv",
)

output_path = f"s3://{default_bucket}/{base_job_prefix}/prep"

## Data Preparation

A PySpark job on EMR is used to prepare the for the training job. Using the script `preprocess.py`, the dataset is featurized and split into train, test, and validation datasets.

The output of this step is used as the input to the TrainingStep.

The dataset you use is the [UCI Machine Learning Abalone Dataset](https://archive.ics.uci.edu/ml/datasets/abalone) [1].  The aim for this task is to determine the age of an abalone snail from its physical measurements. At the core, this is a regression problem.

The dataset contains several features: length (the longest shell measurement), diameter (the diameter perpendicular to length), height (the height with meat in the shell), whole_weight (the weight of whole abalone), shucked_weight (the weight of meat), viscera_weight (the gut weight after bleeding), shell_weight (the weight after being dried), sex ('M', 'F', 'I' where 'I' is Infant), and rings (integer).

The number of rings turns out to be a good approximation for age (age is rings + 1.5). However, to obtain this number requires cutting the shell through the cone, staining the section, and counting the number of rings through a microscope, which is a time-consuming task. However, the other physical measurements are easier to determine. You use the dataset to build a predictive model of the variable rings through these other physical measurements.

Before you upload the data to an S3 bucket, install the SageMaker Python SDK and gather some constants you can use later in this notebook.

> [1] Dua, D. and Graff, C. (2019). [UCI Machine Learning Repository](http://archive.ics.uci.edu/ml). Irvine, CA: University of California, School of Information and Computer Science.

In [7]:
!mkdir -p code

In [8]:
%%writefile code/preprocess.py

from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.functions import vector_to_array
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.ml import Pipeline
from argparse import ArgumentParser


def process(args):
    prefix = args.output
    print("Starting spark session")
    spark = (SparkSession.builder.appName("EMRPySparkWithIceberg")
             .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
             .config("spark.sql.catalog.dev", "org.apache.iceberg.spark.SparkCatalog")
             .config("spark.sql.catalog.dev.type", "hadoop")
             .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
             .config("spark.sql.catalog.dev.warehouse", f"{prefix}/iceberg")
             .getOrCreate()
            )
    spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

    schema = StructType(
        [
            StructField("sex", StringType(), True),
            StructField("length", DoubleType(), True),
            StructField("diameter", DoubleType(), True),
            StructField("height", DoubleType(), True),
            StructField("whole_weight", DoubleType(), True),
            StructField("shucked_weight", DoubleType(), True),
            StructField("viscera_weight", DoubleType(), True),
            StructField("shell_weight", DoubleType(), True),
            StructField("rings", DoubleType(), True),
        ]
    )

    print("Reading source data")
    df = spark.read.csv(args.input, schema=schema).fillna("missing", subset=["sex"])

    from pyspark.ml.feature import (
        Imputer,
        OneHotEncoder,
        StandardScaler,
        OneHotEncoder,
        VectorAssembler,
    )

    numerical_features = [
        "length",
        "diameter",
        "height",
        "whole_weight",
        "shucked_weight",
        "viscera_weight",
        "shell_weight",
    ]
    print("Performing feature engineering")
    pipeline = Pipeline(
        stages=[
            StringIndexer(inputCol="sex", outputCol="cat_sex"),
            OneHotEncoder(
                inputCols=["cat_sex"],
                outputCols=["feature_sex"],
                dropLast=False,
            ),
            Imputer(
                inputCols=numerical_features,
                outputCols=[f"impute_{c}" for c in numerical_features],
                strategy="median",
            ),
            VectorAssembler(
                inputCols=[f"impute_{c}" for c in numerical_features], outputCol="vector"
            ),
            StandardScaler(inputCol="vector", outputCol="features"),
        ]
    )
    print("Fitting transformers")
    model = pipeline.fit(df)
    print("Transforming source data")
    df_out = (
        model.transform(df)
        .select(
            "rings",
            vector_to_array(F.col("features")).alias("features"),
            vector_to_array(F.col("feature_sex")).alias("feature_sex"),
        )
        .select(
            [F.col("rings")]
            + [F.col("features")[idx] for idx in range(len(numerical_features))]
            + [F.col("feature_sex")[idx] for idx in range(3)]
        )
    )

    # shuffle
    # split train/test/valid
    # write out to csvs without headers or indices
    print("Writing train/valid/test spits")
    train, valid, test = df_out.orderBy(F.rand()).randomSplit([0.7, 0.15, 0.15])
    train.repartition(1).write.mode("overwrite").csv(f"{prefix}/train")
    valid.repartition(1).write.mode("overwrite").csv(f"{prefix}/valid")
    test.repartition(1).write.mode("overwrite").csv(f"{prefix}/test")

    ## Create a DataFrame.
    data = spark.createDataFrame([
     ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
     ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
     ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
     ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
    ],["id", "creation_date", "last_update_time"])
    
    ## Write a DataFrame as a Iceberg dataset to the Amazon S3 location.
    spark.sql(f"""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string,
    creation_date string,
    last_update_time string)
    USING iceberg
    location '{prefix}/iceberg/db/iceberg_table'""")
    
    data.writeTo("dev.db.iceberg_table").append()

    spark.stop()


if __name__ == "__main__":
    parser = ArgumentParser()
    parser.add_argument("--input")
    parser.add_argument("--output")
    args, _ = parser.parse_known_args()
    process(args)

Overwriting code/preprocess.py


In [9]:
script = sagemaker_session.upload_data("code/preprocess.py", key_prefix=f"{base_job_prefix}/app")

In [10]:
output_path

's3://sagemaker-us-west-2-<AWS_ACCOUNT_ID>/emr-step-pipeline/prep'

In [12]:
# Process the training data step using a PySpark script.
# Split the training data set into train, test, and validation datasets
# Run as a step as a job flow on EMR
emr_config = EMRStepConfig(
    jar="command-runner.jar",
    args=[
        "spark-submit",
        "--deploy-mode",
        "cluster",
        script,
        "--input",
        input_data,
        "--output",
        output_path,
        "--conf",
        '"spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar"'
    ]
)

# The following is based on https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-iceberg-use-spark-cluster.html
Configurations = [
    {
        "Classification": "iceberg-defaults",
        "Properties": {
            "iceberg.enabled": "true"
        }
    }
]

step_emr = EMRStep(
    name="EMRStep",
    cluster_id=None,
    step_config=emr_config,
    display_name="Preprocess",
    description="preprocess data for XGBoost",
    # The list of cluster_config parameters that can be included is available in
    # https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html
    cluster_config={
        "Applications": [
            {
                "Name": "Spark",
            }
        ],
        "Instances": {
            "InstanceGroups": [
                {"InstanceRole": "MASTER", "InstanceCount": 1, "InstanceType": "m5.2xlarge"},
                {"InstanceRole": "CORE", "InstanceCount": 2, "InstanceType": "m5.2xlarge"},
            ]
        },
        "BootstrapActions": [],
        "Configurations": Configurations,
        "ReleaseLabel": "emr-6.6.0",
        "JobFlowRole": job_flow_role,
        "ServiceRole": service_role,
        "LogUri": f"s3://aws-logs-{account}-{region}/elasticmapreduce/",
        "Tags": [
              { 
                 "Key": "for-use-with-amazon-emr-managed-policies",
                 "Value": "true"
              }
           ]
    },
)

In [13]:
# Use the same pipeline name across executions for cache usage.
 
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        input_data,
    ],
    steps=[step_emr],
    sagemaker_session=pipeline_session,
)

## Execute the pipeline

In [14]:
definition = json.loads(pipeline.definition())
definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'InputDataUrl',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-example-files-prod-us-west-2/datasets/tabular/uci_abalone/abalone.csv'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'EMRStep',
   'Type': 'EMR',
   'Arguments': {'StepConfig': {'HadoopJarStep': {'Jar': 'command-runner.jar',
      'Args': ['spark-submit',
       '--deploy-mode',
       'cluster',
       's3://sagemaker-us-west-2-<AWS_ACCOUNT_ID>/emr-step-pipeline/app/preprocess.py',
       '--input',
       {'Get': 'Parameters.InputDataUrl'},
       '--output',
       's3://sagemaker-us-west-2-<AWS_ACCOUNT_ID>/emr-step-pipeline/prep',
       '--conf',
       '"spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar"']}},
    'ClusterConfig': 

In [15]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-west-2:<AWS_ACCOUNT_ID>:pipeline/EMRStepPipeline',
 'ResponseMetadata': {'RequestId': 'd3e5eb5c-0903-4eed-92c5-e544ef59319e',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'd3e5eb5c-0903-4eed-92c5-e544ef59319e',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '83',
   'date': 'Fri, 12 Jul 2024 14:32:47 GMT'},
  'RetryAttempts': 0}}

In [16]:
execution = pipeline.start()

In [29]:
execution.list_steps()

[{'StepName': 'EMRStep',
  'StartTime': datetime.datetime(2024, 7, 12, 14, 32, 49, 321000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 7, 12, 14, 39, 28, 220000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'EMR': {'ClusterId': 'j-2KENAHBVW670Q',
    'StepId': 's-01209892SZIQADW5X59B',
    'StepName': 'pipelines-execution-u70drfvfns2z-step-EMRStep-EMRStep-oA94wpWjRX'}},
  'AttemptCount': 1}]

In [19]:
execution.wait()

In [20]:
!aws s3 ls {output_path}/iceberg/db/iceberg_table/

                           PRE data/
                           PRE metadata/
2024-07-09 13:30:10          0 metadata_$folder$


## Cleanup
Running the following cell will delete the following resources created in this notebook
* SageMaker Pipeline


In [21]:
# Delete the Pipeline
sagemaker_client.delete_pipeline(PipelineName=pipeline_name)