# XGBoost SageMaker Pipeline with EMR

In [1]:
import sagemaker
from sagemaker import image_uris
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CacheConfig, ProcessingStep, TrainingStep
from sagemaker.processing import ScriptProcessor
from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [2]:
sagemaker_role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session()
pipeline_session = PipelineSession()
aws_region = sagemaker_session.boto_session.region_name
emr_role = "arn:aws:iam::047922237497:role/EMRServerlessExecutionRole-01"

In [3]:
s3_bucket = "aamlops2024"
s3_prefix = "xgboost-pipeline-emr"
app_info_key = f"{s3_prefix}/emr-tracking/app_info.json"
script_uri = f"s3://{s3_bucket}/{s3_prefix}"

In [4]:
image_uri=sagemaker.image_uris.retrieve(
        framework="sklearn",
        region="us-east-1",
        version="1.2-1",  # ✅ You can also try "1.2-1"
        instance_type="ml.m5.large"
    )
image_uri

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


'683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3'

In [5]:
# Define cache configuration
cache_config = CacheConfig(
    enable_caching=True,             # Enable caching
    expire_after="P30D"              # Cache expiry in ISO 8601 duration format (e.g., P30D = 30 days)
)

## 0. Create EMR Application step

In [6]:
script_processor = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.large",
    instance_count=1,
    role=sagemaker_role,
    sagemaker_session=pipeline_session,
    base_job_name="xgb_pipeline_emr",
    env={
        "S3_BUCKET": s3_bucket,
        "S3_KEY": app_info_key
    }
)

In [7]:
create_emr_app_step = ProcessingStep(
    name="Create_EMR_Application",
    processor=script_processor,
    code="./code/create_emr_app.py",
    cache_config=cache_config
)

## 1. Preprocessing

### 1.1 Submit_Preprocessing_Job

In [8]:
local_path = "./code/spark_preprocessing2.py"
destination_path = f"s3://{s3_bucket}/{s3_prefix}/code"
preprocessing_entry_point_s3uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=destination_path
)
preprocessing_entry_point_s3uri

's3://aamlops2024/xgboost-pipeline-emr/code/spark_preprocessing2.py'

In [9]:
script_processor = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.large",
    instance_count=1,
    role=sagemaker_role,
    sagemaker_session=pipeline_session,
    base_job_name="xgb_pipeline_emr_submit_job",
    env={
        "APP_INFO_KEY": app_info_key,
        "S3_BUCKET": s3_bucket,
        "S3_PREFIX": s3_prefix,
        "ENTRY_POINT": preprocessing_entry_point_s3uri,
        "EMR_ROLE": emr_role,
        "JOB_TYPE": "preprocessing",
        "IMPUT": f"s3://{s3_bucket}/{s3_prefix}/data/",
        "OUTPUT": f"s3://{s3_bucket}/{s3_prefix}/processing/",
    },
)

In [10]:
submit_preprocess_step = ProcessingStep(
    name="xgb-emr-Submit_Preprocessing",
    processor=script_processor,
    code="code/submit_emr_job_preprocessing.py",
    job_arguments=[
        "--region", "us-east-1",
    ],
    cache_config=cache_config,
)
submit_preprocess_step.add_depends_on([create_emr_app_step])

## 1.2  Wait for Preprocessing Completion

In [11]:
script_processor = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.large",
    instance_count=1,
    role=sagemaker_role,
    sagemaker_session=pipeline_session,
    base_job_name="xgb_pipeline_emr_submit_job",
    env={
        "APP_INFO_KEY": app_info_key,
        "S3_BUCKET": s3_bucket,
        "S3_PREFIX": s3_prefix,
        "JOB_TYPE": "preprocessing"
    }
)

In [12]:
wait_preprocess_step = ProcessingStep(
    name="Wait_Preprocessing_Job",
    processor=script_processor,
    code="code/wait_for_emr_job.py",
    job_arguments=[
        "--region", "us-east-1",
    ],
    cache_config=cache_config,
)
wait_preprocess_step.add_depends_on([submit_preprocess_step])

## 2 Model Training

### 2.1 Submit Training Job

In [13]:
local_path = "./code/spark_training_mllib.py"
destination_path = f"s3://{s3_bucket}/{s3_prefix}/code"
training_entry_point_s3uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=destination_path
)
training_entry_point_s3uri

's3://aamlops2024/xgboost-pipeline-emr/code/spark_training_mllib.py'

In [14]:
script_processor = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.large",
    instance_count=1,
    role=sagemaker_role,
    sagemaker_session=pipeline_session,
    base_job_name="xgb_pipeline_emr_submit_job",
    env={
        "APP_INFO_KEY": app_info_key,
        "EMR_ROLE": emr_role,
        "ENTRY_POINT": training_entry_point_s3uri,
        "S3_BUCKET": s3_bucket,
        "S3_PREFIX": s3_prefix,
        "JOB_TYPE": "training",
        "TRAIN": f"s3://{s3_bucket}/{s3_prefix}/processing/train/",
        "VALIDATION": f"s3://{s3_bucket}/{s3_prefix}/processing/validation/",
        "OUTPUT": f"s3://{s3_bucket}/{s3_prefix}/archives",
    }
)


In [15]:
submit_train_step = ProcessingStep(
    name="Submit_Training_Job",
    processor=script_processor,
    code="code/submit_emr_job_training.py",
    job_arguments=[
        "--region", "us-east-1",
    ],
    # cache_config=cache_config,
)
submit_train_step.add_depends_on([wait_preprocess_step])

### 2.1 Wait for Training Completion

In [16]:
script_processor = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.large",
    instance_count=1,
    role=sagemaker_role,
    sagemaker_session=pipeline_session,
    base_job_name="xgb_pipeline_emr_submit_job",
    env={
        "APP_INFO_KEY": app_info_key,
        "S3_BUCKET": s3_bucket,
        "S3_PREFIX": s3_prefix,
        "JOB_TYPE": "training"
    }
)

In [17]:
wait_train_step = ProcessingStep(
    name="Wait_Training_Job",
    processor=script_processor,
    code="./code/wait_for_emr_job.py",
    job_arguments=[
        "--region", "us-east-1",
    ],
    # cache_config=cache_config
)
wait_train_step.add_depends_on([submit_train_step])

## 6. Pipeline

In [18]:
pipeline = Pipeline(
    name="xgboost-pipeline-emr-v1-0",
    steps=[
        create_emr_app_step,
        submit_preprocess_step,
        wait_preprocess_step,
        submit_train_step,
        wait_train_step
    ]
)

In [19]:
pipeline.upsert(role_arn=sagemaker_role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:047922237497:pipeline/xgboost-pipeline-emr-v1-0',
 'ResponseMetadata': {'RequestId': 'e48a31d0-d2a8-44fc-8fac-e6175e8a5eef',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'e48a31d0-d2a8-44fc-8fac-e6175e8a5eef',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '93',
   'date': 'Tue, 08 Apr 2025 13:11:25 GMT'},
  'RetryAttempts': 0}}

In [20]:
pipeline.start()

_PipelineExecution(arn='arn:aws:sagemaker:us-east-1:047922237497:pipeline/xgboost-pipeline-emr-v1-0/execution/n8py8kx3yr7h', sagemaker_session=<sagemaker.session.Session object at 0x7f1dafe21ad0>)

In [21]:
# !aws s3 cp s3://aamlops2024/xgboost-pipeline-emr/data/train1.csv .

In [22]:
# import pandas as pd
# data = pd.read_csv("./train1.csv")


In [23]:
# data.to_parquet("train.parquet")

In [24]:
# data = pd.read_parquet("./train.parquet")

In [25]:
# data

In [26]:
# !aws s3 cp ./train.parquet s3://aamlops2024/xgboost-pipeline-emr/data/train.parquet