In [None]:
pip install sagemaker -U

In [62]:
import os
import boto3
import re
import sagemaker
from sagemaker.session import Session
from sagemaker.inputs import TrainingInput
from sagemaker.xgboost.estimator import XGBoost
import pandas as pd
import json
import numpy as np
role = sagemaker.get_execution_role()
region = sagemaker.Session().boto_region_name
session = Session()
from IPython.display import display
from time import strftime, gmtime
from sagemaker.inputs import TrainingInput
from sagemaker.serializers import CSVSerializer
from sklearn import preprocessing
bucket = sagemaker.Session().default_bucket()
prefix = "distributed-xgb-training"

## Sample Data Preparation and Visualization

Mobile operators have historical records on which customers ultimately ended up churning and which continued using the service. We can use this historical information to construct an ML model of one mobile operator’s churn using a process called training. After training the model, we can pass the profile information of an arbitrary customer (the same profile information that we used to train the model) to the model, and have the model predict whether this customer is going to churn. Of course, we expect the model to make mistakes. After all, predicting the future is tricky business! But we’ll learn how to deal with prediction errors.

The dataset we use is publicly available and was mentioned in the book Discovering Knowledge in Data by Daniel T. Larose. It is attributed by the author to the University of California Irvine Repository of Machine Learning Datasets. Let’s download and read that dataset in now:

In [25]:
s3 = boto3.client("s3")
s3.download_file(
    f"sagemaker-example-files-prod-{region}",
    "datasets/tabular/synthetic/churn.txt",
    "churn.txt",
)

By modern standards, it’s a relatively small dataset, with only 5,000 records, where each record uses 21 attributes to describe the profile of a customer of an unknown US mobile operator. The attributes are:

State: the US state in which the customer resides, indicated by a two-letter abbreviation; for example, OH or NJ

Account Length: the number of days that this account has been active

Area Code: the three-digit area code of the corresponding customer’s phone number

Phone: the remaining seven-digit phone number

Int’l Plan: whether the customer has an international calling plan: yes/no

VMail Plan: whether the customer has a voice mail feature: yes/no

VMail Message: the average number of voice mail messages per month

Day Mins: the total number of calling minutes used during the day

Day Calls: the total number of calls placed during the day

Day Charge: the billed cost of daytime calls

Eve Mins, Eve Calls, Eve Charge: the billed cost for calls placed during the evening

Night Mins, Night Calls, Night Charge: the billed cost for calls placed during nighttime

Intl Mins, Intl Calls, Intl Charge: the billed cost for international calls

CustServ Calls: the number of calls placed to Customer Service

Churn?: whether the customer left the service: true/false

The last attribute, Churn?, is known as the target attribute: the attribute that we want the ML model to predict. Because the target attribute is binary, our model will be performing binary prediction, also known as binary classification.

In [27]:
import pandas as pd
churn = pd.read_csv("./churn.txt")
pd.set_option("display.max_columns", 500)
churn.head(5)

Unnamed: 0,State,Account Length,Area Code,Phone,Int'l Plan,VMail Plan,VMail Message,Day Mins,Day Calls,Day Charge,Eve Mins,Eve Calls,Eve Charge,Night Mins,Night Calls,Night Charge,Intl Mins,Intl Calls,Intl Charge,CustServ Calls,Churn?
0,PA,163,806,403-2562,no,yes,300,8.162204,3,7.579174,3.933035,4,6.508639,4.065759,100,5.111624,4.92816,6,5.673203,3,True.
1,SC,15,836,158-8416,yes,no,0,10.018993,4,4.226289,2.325005,0,9.972592,7.14104,200,6.436188,3.221748,6,2.559749,8,False.
2,MO,131,777,896-6253,no,yes,300,4.70849,3,4.76816,4.537466,3,4.566715,5.363235,100,5.142451,7.139023,2,6.254157,4,False.
3,WY,75,878,817-5729,yes,yes,700,1.268734,3,2.567642,2.528748,5,2.333624,3.773586,450,3.814413,2.245779,6,1.080692,6,False.
4,WY,146,878,450-4942,yes,no,0,2.696177,3,5.908916,6.015337,3,3.670408,3.751673,250,2.796812,6.905545,4,7.134343,6,True.


In [28]:
churn = churn.drop("Phone", axis=1)
churn["Area Code"] = churn["Area Code"].astype(object)

In [29]:
churn["target"] = churn["Churn?"].map({"True.": 1, "False.": 0})
churn.drop(["Churn?"], axis=1, inplace=True)

In [30]:
churn = churn[["target"] + churn.columns.tolist()[:-1]]

In [31]:
# One Hot Encode Cat Variables
churn=pd.get_dummies(churn, dtype=int)
churn

Unnamed: 0,target,Account Length,VMail Message,Day Mins,Day Calls,Day Charge,Eve Mins,Eve Calls,Eve Charge,Night Mins,Night Calls,Night Charge,Intl Mins,Intl Calls,Intl Charge,CustServ Calls,State_AK,State_AL,State_AR,State_AZ,State_CA,State_CO,State_CT,State_DC,State_DE,State_FL,State_GA,State_HI,State_IA,State_ID,State_IL,State_IN,State_KS,State_KY,State_LA,State_MA,State_MD,State_ME,State_MI,State_MN,State_MO,State_MS,State_MT,State_NC,State_ND,State_NE,State_NH,State_NJ,State_NM,State_NV,State_NY,State_OH,State_OK,State_OR,State_PA,State_RI,State_SC,State_SD,State_TN,State_TX,State_UT,State_VA,State_VT,State_WA,State_WI,State_WV,State_WY,Area Code_657,Area Code_658,Area Code_659,Area Code_676,Area Code_677,Area Code_678,Area Code_686,Area Code_707,Area Code_716,Area Code_727,Area Code_736,Area Code_737,Area Code_758,Area Code_766,Area Code_776,Area Code_777,Area Code_778,Area Code_786,Area Code_787,Area Code_788,Area Code_797,Area Code_798,Area Code_806,Area Code_827,Area Code_836,Area Code_847,Area Code_848,Area Code_858,Area Code_866,Area Code_868,Area Code_876,Area Code_877,Area Code_878,Int'l Plan_no,Int'l Plan_yes,VMail Plan_no,VMail Plan_yes
0,1,163,300,8.162204,3,7.579174,3.933035,4,6.508639,4.065759,100,5.111624,4.928160,6,5.673203,3,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,0,1
1,0,15,0,10.018993,4,4.226289,2.325005,0,9.972592,7.141040,200,6.436188,3.221748,6,2.559749,8,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,1,0
2,0,131,300,4.708490,3,4.768160,4.537466,3,4.566715,5.363235,100,5.142451,7.139023,2,6.254157,4,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,1
3,0,75,700,1.268734,3,2.567642,2.528748,5,2.333624,3.773586,450,3.814413,2.245779,6,1.080692,6,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,1
4,1,146,0,2.696177,3,5.908916,6.015337,3,3.670408,3.751673,250,2.796812,6.905545,4,7.134343,6,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,1,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4995,0,4,800,10.862632,5,7.250969,6.936164,1,8.026482,4.921314,350,6.748489,4.872570,8,2.122530,9,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1
4996,0,140,0,1.581127,8,3.758307,7.377591,7,1.328827,0.939932,300,4.522661,6.938571,2,4.600473,4,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,1,0,1,0
4997,0,32,700,0.163836,5,4.243980,5.841852,3,2.340554,0.939469,450,5.157898,4.388328,7,1.060340,6,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,1,0,0,1
4998,1,142,600,2.034454,5,3.014859,4.140554,3,3.470372,6.076043,150,4.362780,7.173376,3,4.871900,7,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1


In [32]:
from sklearn.model_selection import train_test_split

train, val_n_test = train_test_split(
    churn, test_size=0.3, random_state=42, stratify=churn["target"]
)

val, test = train_test_split(
    val_n_test, test_size=0.3, random_state=42, stratify=val_n_test["target"]
)

In [33]:
train.to_csv("train.csv", header=False, index=False)
val.to_csv("validation.csv", header=False, index=False)
test.to_csv("test.csv", header=False, index=False)

It is recommended to split data into multiple chunks for distributed training using sagemaker xgboost algorithm mode.
When using the `Script Mode` you decide in the logic how to load the data onto workers and may not need to split data into multiple chunks.
For this demostration of using script mode, we split the data into multiple chunks and upload to S3

In [34]:
from tqdm import tqdm

for i in tqdm(range(200)):
    boto3.Session().resource("s3").Bucket(bucket).Object(
        os.path.join(prefix, f"train_xgb_csv/data_{i}.csv")
    ).upload_file("train.csv")

100%|██████████| 200/200 [01:01<00:00,  3.24it/s]


In [12]:
boto3.Session().resource("s3").Bucket(bucket).Object(
    os.path.join(prefix, "validation_xgb_csv/data.csv")
).upload_file("validation.csv")

In [13]:
boto3.Session().resource("s3").Bucket(bucket).Object(
    os.path.join(prefix, "test_xgb_csv/data.csv")
).upload_file("test.csv")

In [2]:
training_dataset_s3_path = f"s3://{bucket}/{prefix}/train_xgb_csv/"
validation_dataset_s3_path = f"s3://{bucket}/{prefix}/validation_xgb_csv/"

output_prefix = "xgboost-perf-training"
s3_output_location = f"s3://{bucket}/{output_prefix}/output_xgb"
training_dataset_s3_path, validation_dataset_s3_path

('s3://sagemaker-us-east-1-715253196401/sagemaker/DEMO-xgboost-dist-algo/train_xgb_csv/',
 's3://sagemaker-us-east-1-715253196401/sagemaker/DEMO-xgboost-dist-algo/validation_xgb_csv/')

# SageMaker Training  Mode

There are 3 modes of training models on SageMaker:
1. Built-In Algorithms (Algorithm Mode):
    - SageMaker provides pre-built, optimized algorithms.
    - Users only need to provide the dataset and set hyperparameters.
    - Ideal for common machine learning tasks with minimal code.
    - Examples: XGBoost, Linear Learner, DeepAR. More supported frameworks can be found [here](https://docs.aws.amazon.com/sagemaker/latest/dg/algorithms-tabular.html)

2. Bring Your Own Script (Script Mode):
    - Users write custom training scripts in frameworks like TensorFlow or PyTorch.
    - SageMaker manages the underlying infrastructure and dependencies.
    - Offers more flexibility than built-in algorithms.
    - Suitable for custom models or when built-in algorithms don't fit the use case.

3. Bring Your Own Container (Container Mode):
    - Users provide a complete Docker container with all necessary dependencies.
    - Offers maximum flexibility and control over the training environment.
    - Ideal for complex environments or proprietary algorithms.
    - Requires more setup and maintenance than the other modes.

Each mode offers a different balance between ease of use and customization, allowing users to choose the best approach for their specific machine learning needs.

# Xgboost Algorithm Training Mode

With SageMaker XGBoost version 1.2-2 or later, you can use one or more single-GPU instances for training. The hyperparameter tree_method needs to be set to gpu_hist. When using more than one instance (distributed setup), the data needs to be divided among instances as follows (the same as thenon-GPU distributed training steps mentioned in XGBoost Algorithm). Although this option is performant and can be used in various training setups, it doesn’t extend to using all GPUs when choosing multi-GPU instances such as g5.12xlarge.

With SageMaker XGBoost version 1.5-1 and above, you can now use all GPUs on each instance when using multi-GPU instances. The ability to use all GPUs in multi-GPU instance is offered by integrating the Dask framework.

You can use this setup to complete training quickly. Apart from saving time, this option will also be useful to work around blockers such as maximum usable instance (soft) limits, or if the training job is unable to provision a large number of single-GPU instances for some reason.

The configurations to use this option are the same as the previous option, except for the following differences:

Add the new hyperparameter use_dask_gpu_training with string value true.
When creating TrainingInput, set the distribution parameter to FullyReplicated, whether using single or multiple instances. The underlying Dask framework will carry out the data load and split the data among Dask workers. This is different from the data distribution setting for all other distributed training with SageMaker XGBoost.
Note that splitting data into smaller files still applies for Parquet, where Dask will read each file as a partition. Because you’ll have a Dask worker per GPU, the number of files should be greater than instance count * GPU count per instance. Also, making each file too small and having a very large number of files can degrade performance. For more information, see Avoid Very Large Graphs. For CSV, we still recommend splitting up large files into smaller ones to reduce data download time and enable quicker reads. However, it’s not a requirement.

Currently, the supported input formats with this option for multi-gpu instance training (algorithm mode) are:

- text/csv
- application/x-parquet

#### Distributed Training using Multi-GPU Instances 

To enable distributed Training using multi-GPU instances, we need to pass the hyperparamters of:
```
"tree_method": "gpu_hist"
"use_dask_gpu_training": "true"
```
in the training estimator

In [None]:
import os
import boto3
import re
import sagemaker
from sagemaker.session import Session
from sagemaker.inputs import TrainingInput
from sagemaker.xgboost.estimator import XGBoost

role = sagemaker.get_execution_role()
region = sagemaker.Session().boto_region_name
session = Session()

output_path = "s3://{}/{}/{}/output".format(bucket, prefix, "algo-dist-xgb")
hyperparams = {
    "objective": "binary:logistic",
    "num_round": "500",
    "verbosity": "3",
    "tree_method": "gpu_hist",
    "eval_metric": "auc",
    "use_dask_gpu_training": "true"
}

content_type = "text/csv"
instance_type = "ml.g5.12xlarge" # ml.g5.12xlarge has 4 GPUs

xgboost_container = sagemaker.image_uris.retrieve("xgboost", region, "1.7-1")
xgb_algo_mode_estimator = sagemaker.estimator.Estimator(
    image_uri=xgboost_container,
    hyperparameters=hyperparams,
    role=role,
    instance_count=1,
    instance_type=instance_type,
    output_path=output_path,
    # security_group_ids = ["sg-04544eabba5214ad3"],    # VPC Security Group IDs (Train in VPC mode)
    # subnets = ["subnet-0a2bb2e564f57cb27","subnet-0f948912e1df38195","subnet-0106dbb912c0b9bb9"], # VPC Subnets IDs (Train in VPC mode)
    max_run=7200,
    # volume_size=200, #deafult is 30 GB
    keep_alive_period_in_seconds = 1000, #used to keep training pool warm for specified time (s) max of 3600(s)

)

train_input = TrainingInput(
    training_dataset_s3_path, content_type=content_type,
    distribution = 'FullyReplicated'
)

validation_input = TrainingInput(
    validation_dataset_s3_path, content_type=content_type,
     distribution = 'FullyReplicated'
)

xgb_algo_mode_estimator.fit({"train": train_input, "validation": validation_input},
                             wait = True # Set to False to continue execution without waiting for the training job to complete. You can always check status of job later.
                             )

##### Deploy Model to real-time endpoint

In [None]:
predictor = xgb_algo_mode_estimator.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.xlarge"
)

In [None]:
# Test Endpoint
features=pd.read_csv("test.csv").iloc[:,1:]
num_examples=len(features)

content_type = "text/csv"

predictor.serializer = sagemaker.serializers.CSVSerializer()
batch_size = 1500
predict_prob = []
for i in np.arange(0, num_examples, step=batch_size):
    payload=features.iloc[i : (i + batch_size), :].to_csv(header=False, index=False).strip()
    predict_prob.extend(predictor.predict(payload))

predict_prob

In [None]:
#Clean Up
predictor.delete_model()
predictor.delete_endpoint()

#### Distributed Training using single GPU Instances

To enable distributed Training using single GPU instances, we need to pass the hyperparamters of:
```
"tree_method": "gpu_hist"
```
to the training estimator and pass s3 data distribution config as 'ShardedByS3Key' which splits data across instances during data loading. More info can be found [here](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html) 


In [None]:
import os
import boto3
import re
import sagemaker
from sagemaker.session import Session
from sagemaker.inputs import TrainingInput
from sagemaker.xgboost.estimator import XGBoost

role = sagemaker.get_execution_role()
region = sagemaker.Session().boto_region_name
session = Session()

output_path = "s3://{}/{}/{}/output".format(bucket, prefix, "algo-dist-xgb")
hyperparams = {
    "objective": "binary:logistic",
    "num_round": "500",
    "verbosity": "3",
    "tree_method": "gpu_hist",
    "eval_metric": "auc",
    # "use_dask_gpu_training": "true"
}

content_type = "text/csv"
instance_type = "ml.g5.2xlarge" # ml.g5.2xlarge has 1 GPU

xgboost_container = sagemaker.image_uris.retrieve("xgboost", region, "1.7-1")
xgb_algo_mode_estimator2 = sagemaker.estimator.Estimator(
    image_uri=xgboost_container,
    hyperparameters=hyperparams,
    role=role,
    instance_count=2,
    instance_type=instance_type,
    output_path=output_path,
    max_run=7200,
    # volume_size=200,
    keep_alive_period_in_seconds = 1000,
     # security_group_ids = ["sg-0da0029e091e8ab51"],    # VPC Security Group IDs (Train in VPC mode)
    # subnets = ["subnet-0a2bb2e564f57cb27","subnet-0f948912e1df38195","subnet-0106dbb912c0b9bb9"], # VPC Subnets IDs (Train in VPC mode)

)

train_input = TrainingInput(
    training_dataset_s3_path, content_type=content_type,  distribution = 'ShardedByS3Key'
)

validation_input = TrainingInput(
    validation_dataset_s3_path, content_type=content_type,  distribution = 'ShardedByS3Key'
)

xgb_algo_mode_estimator2.fit({"train": train_input, "validation": validation_input},
                             wait = True # Set to False to continue execution without waiting for the training job to complete. You can always check status of job later.
                             )

###### Deploy to Realtime endpoint

In [None]:
predictor = xgb_algo_mode_estimator2.deploy(
    initial_instance_count = 1, 
    instance_type = "ml.m5.xlarge"
)

In [None]:
# Test Endpoint
features=pd.read_csv("test.csv").iloc[:,1:]
num_examples=len(features)

content_type = "text/csv"

predictor.serializer = sagemaker.serializers.CSVSerializer()
batch_size = 1500
predict_prob = []
for i in np.arange(0, num_examples, step=batch_size):
    payload=features.iloc[i : (i + batch_size), :].to_csv(header=False, index=False).strip()
    predict_prob.extend(predictor.predict(payload))

predict_prob

In [None]:
#Clean Up
predictor.delete_model()
predictor.delete_endpoint()

#### Distributed Training using CPU Instances

To enable distributed Training using CPU instances, we need to pass s3 data distribution config as 'ShardedByS3Key' which splits data across instances during data loading. More info can be found [here](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html) 


In [None]:
import os
import boto3
import re
import sagemaker
from sagemaker.session import Session
from sagemaker.inputs import TrainingInput
from sagemaker.xgboost.estimator import XGBoost

role = sagemaker.get_execution_role()
region = sagemaker.Session().boto_region_name
session = Session()

output_path = "s3://{}/{}/{}/output".format(bucket, prefix, "algo-dist-xgb")
hyperparams = {
    "objective": "binary:logistic",
    "num_round": "500",
    "verbosity": "3",
    # "tree_method": "gpu_hist",
    "eval_metric": "auc",
    # "use_dask_gpu_training": "true"
}

content_type = "text/csv"
instance_type = "ml.m5.2xlarge"

xgboost_container = sagemaker.image_uris.retrieve("xgboost", region, "1.7-1")
xgb_algo_mode_estimator_cpu = sagemaker.estimator.Estimator(
    image_uri=xgboost_container,
    hyperparameters=hyperparams,
    role=role,
    instance_count=2,
    instance_type=instance_type,
    output_path=output_path,
    max_run=7200,
    # volume_size=200,
    keep_alive_period_in_seconds = 1000,
     # security_group_ids = ["sg-0da0029e091e8ab51"],    # VPC Security Group IDs (Train in VPC mode)
    # subnets = ["subnet-0a2bb2e564f57cb27","subnet-0f948912e1df38195","subnet-0106dbb912c0b9bb9"], # VPC Subnets IDs (Train in VPC mode)
)

train_input = TrainingInput(
    training_dataset_s3_path, content_type=content_type,  distribution = 'ShardedByS3Key'
)

validation_input = TrainingInput(
    validation_dataset_s3_path, content_type=content_type,  distribution = 'ShardedByS3Key'
)

xgb_algo_mode_estimator_cpu.fit({"train": train_input, "validation": validation_input},
                             wait = True # Set to False to continue execution without waiting for the training job to complete. You can always check status of job later.
                             )

###### Deploy to Realtime endpoint

In [None]:
predictor = xgb_algo_mode_estimator_cpu.deploy(
    initial_instance_count = 1, 
    instance_type = "ml.m5.xlarge"
)

In [None]:
# Test Endpoint
features=pd.read_csv("test.csv").iloc[:,1:]
num_examples=len(features)

content_type = "text/csv"

predictor.serializer = sagemaker.serializers.CSVSerializer()
batch_size = 1500
predict_prob = []
for i in np.arange(0, num_examples, step=batch_size):
    payload=features.iloc[i : (i + batch_size), :].to_csv(header=False, index=False).strip()
    predict_prob.extend(predictor.predict(payload))

predict_prob

In [None]:
#Clean Up
predictor.delete_model()
predictor.delete_endpoint()

# Xgboost Distributed Training Script Mode

##### Create an XGBoost training script

SageMaker can now run an XGBoost script using the XGBoost estimator. When run on SageMaker, a number of helpful environment variables are available to access properties of the training environment, such as:

- `SM_MODEL_DIR`: A string representing the path to the directory to write model artifacts to. Any artifacts saved in this folder are uploaded to S3 for model hosting after the training job completes.
- `SM_OUTPUT_DIR`: A string representing the filesystem path to write output artifacts to. Output artifacts may include checkpoints, graphs, and other files to save, not including model artifacts. These artifacts are compressed and uploaded to S3 to the same S3 prefix as the model artifacts.

When two input channels, `train` and `validation`, are used in the call to the XGBoost estimator's `fit()` method, the following environment variables are set, following the format `SM_CHANNEL_[channel_name]`:

- `SM_CHANNEL_TRAIN`: A string representing the path to the directory containing data in the 'train' channel.
- `SM_CHANNEL_VALIDATION`: Same as above, but for the 'validation' channel.

A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to the `model_dir` so that it can be hosted later. Hyperparameters are passed to your script as arguments and can be retrieved with an `argparse.ArgumentParser` instance. For example, the script that we run in this notebook is provided as the accompanying file (`abalone.py`) and also shown below:

In [40]:
import os
os.makedirs("train_code", exist_ok=True)

In [66]:
%%writefile train_code/train.py
import argparse
import json
import logging
import os
import pandas as pd
import pickle as pkl

from sagemaker_containers import entry_point
from sagemaker_xgboost_container.data_utils import get_dmatrix
from sagemaker_xgboost_container import distributed

import xgboost as xgb


def _xgb_train(params, dtrain, evals, num_boost_round, model_dir, is_master):
    """Run xgb train on arguments given with rabit initialized.

    This is our rabit execution function.

    :param args_dict: Argument dictionary used to run xgb.train().
    :param is_master: True if current node is master host in distributed training,
                        or is running single node training job.
                        Note that rabit_run includes this argument.
    """
    booster = xgb.train(params=params,
                        dtrain=dtrain,
                        evals=evals,
                        num_boost_round=num_boost_round)
    if is_master:
        model_location = os.path.join(model_dir, 'xgboost-model')
        booster.save_model(model_location)
        logging.info("Stored trained model at {}".format(model_location))


if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    # Hyperparameters are described here.
    parser.add_argument('--max_depth', type=int,)
    parser.add_argument('--eta', type=float)
    parser.add_argument('--gamma', type=int)
    parser.add_argument('--min_child_weight', type=int)
    parser.add_argument('--subsample', type=float)
    parser.add_argument('--verbosity', type=int)
    parser.add_argument('--objective', type=str)
    parser.add_argument('--num_round', type=int)
    parser.add_argument('--tree_method', type=str, default="auto")
    parser.add_argument('--predictor', type=str, default="auto")

    # Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument('--output_data_dir', type=str, default=os.environ.get('SM_OUTPUT_DATA_DIR'))
    parser.add_argument('--model_dir', type=str, default=os.environ.get('SM_MODEL_DIR'))
    parser.add_argument('--train', type=str, default=os.environ.get('SM_CHANNEL_TRAIN'))
    parser.add_argument('--validation', type=str, default=os.environ.get('SM_CHANNEL_VALIDATION'))
    parser.add_argument('--sm_hosts', type=str, default=os.environ.get('SM_HOSTS'))
    parser.add_argument('--sm_current_host', type=str, default=os.environ.get('SM_CURRENT_HOST'))
    parser.add_argument("--content_type", type=str, default="csv")
    args, _ = parser.parse_known_args()

    # Get SageMaker host information from runtime environment variables
    sm_hosts = json.loads(args.sm_hosts)
    sm_current_host = args.sm_current_host

    content_type  = args.content_type.split('/')[-1]
    dtrain = get_dmatrix(args.train, content_type)
    dval = get_dmatrix(args.validation, content_type)
    watchlist = [(dtrain, 'train'), (dval, 'validation')] if dval is not None else [(dtrain, 'train')]

    train_hp = {
        'max_depth': args.max_depth,
        'eta': args.eta,
        'gamma': args.gamma,
        'min_child_weight': args.min_child_weight,
        'subsample': args.subsample,
        'verbosity': args.verbosity,
        'objective': args.objective,
        'tree_method': args.tree_method,
        'predictor': args.predictor,
    }

    xgb_train_args = dict(
        params=train_hp,
        dtrain=dtrain,
        evals=watchlist,
        num_boost_round=args.num_round,
        model_dir=args.model_dir)

    if len(sm_hosts) > 1:
        # Wait until all hosts are able to find each other
        entry_point._wait_hostname_resolution()

        # Execute training function after initializing rabit.
        distributed.rabit_run(
            exec_fun=_xgb_train,
            args=xgb_train_args,
            include_in_training=(dtrain is not None),
            hosts=sm_hosts,
            current_host=sm_current_host,
            update_rabit_args=True
        )
    else:
        # If single node training, call training method directly.
        if dtrain:
            xgb_train_args['is_master'] = True
            _xgb_train(**xgb_train_args)
        else:
            raise ValueError("Training channel must have data to train model.")

Overwriting train_code/train.py


Because the container imports your training script, always put your training code in a main guard `(if __name__=='__main__':)` so that the container does not inadvertently run your training code at the wrong point in execution.

For more information about training environment variables, please visit the [SageMaker Training Toolkit](https://github.com/aws/sagemaker-training-toolkit).

---
##### Train the XGBoost model

After setting training parameters, we kick off training, and poll for status until training is complete.

To run our training script on SageMaker, we construct a sagemaker.xgboost.estimator.XGBoost estimator, which accepts several constructor arguments:

* __entry_point__: The path to the Python script that SageMaker runs for training and prediction.
* __role__: Role ARN
* __train_instance_type__ *(optional)*: The type of SageMaker instances for training.
* __sagemaker_session__ *(optional)*: The session used to train on SageMaker.
* __hyperparameters__ *(optional)*: A dictionary passed to the train function as hyperparameters.

SageMaker Training Directory Setup for Script Mode:

- Create a root project directory.
- Place main training script (e.g., train.py) in root.
- Add other Python modules/scripts to root or subdirectories.
- Include requirements.txt for dependencies. Sagemaker automatically installs all libs listed in this text file

Example structure:
```
project/
    ├── train.py
    ├── requirements.txt
    ├── utils.py
```

SageMaker estimator setup:
```
estimator = Estimator(
    entry_point='train.py',
    source_dir='path/to/project',
    ...
)
```
Key points:

    Include all necessary code files.
    List dependencies in requirements.txt.
    SageMaker packages entire directory content.    

---

Once the training is done, SageMaker packages your model artifacts along with any dependencies used for training including the `inference.py` which we will use as our inference logic. It is also possible to pass a seperate inference logic if you wish so before model deployment.
After training we deploy the model using the Estimator object and point to our inference script for serving the model. Here we have defined SaegMaker specific functions `model_fn`, `predict_fn` to load and make prediction on the model.

In [67]:
%%writefile train_code/inference.py
import os
import json
import pickle
import xgboost as xgb
import numpy as np
import io
import pandas as pd

def model_fn(model_dir):
    """
    Load the XGBoost model from the specified directory.

    Args:
        model_dir (str): The directory where the model file is saved.

    Returns:
        xgb.Booster: The loaded XGBoost model.
    """
    model_file = os.path.join(model_dir, "xgboost-model")
    model = xgb.Booster()
    model.load_model(model_file)
    return model



def input_fn(request_body, request_content_type):
    """
    Deserialize and prepare the prediction input.

    Args:
        request_body (str or bytes): The request payload.
        request_content_type (str): The content type of the request.

    Returns:
        numpy.ndarray: The input data as a numpy array.
    """
    if request_content_type == "application/json":
        input_data = json.loads(request_body)
        return np.array(input_data)

    elif request_content_type == "text/csv":
        # Read the csv file into a pandas dataframe
        input_data = pd.read_csv(io.StringIO(request_body),header=None)
        return input_data.values

    elif request_content_type == "application/octet-stream":
        # Assume the bytes contain a CSV file
        input_data = pd.read_csv(io.BytesIO(request_body),header=None)
        return input_data.values

    else:
        raise ValueError(f"Unsupported content type: {request_content_type}")
def predict_fn(input_data, model):
    """
    Make predictions using the XGBoost model.

    Args:
        input_data (numpy.ndarray): The input data.
        model (xgb.Booster): The XGBoost model.

    Returns:
        numpy.ndarray: The model's predictions.
    """
    dmatrix = xgb.DMatrix(input_data)
    return model.predict(dmatrix)

Overwriting train_code/inference.py


In [None]:
# Open Source distributed script mode
from sagemaker.session import Session
from sagemaker.inputs import TrainingInput
from sagemaker.xgboost.estimator import XGBoost

session = Session()

instance_count = 2 # for distributed training, set instance greater than 1
instance_type = "ml.m5.xlarge"
output_path = "s3://{}/{}/{}/output".format(bucket, prefix, "algo-dist-xgb")
content_type ="text/csv" #dataset extension

hyperparams = {
    "max_depth": "5",
    "eta": "0.2",
    "gamma": "4",
    "min_child_weight": "6",
    "subsample": "0.7",
    "objective": "binary:logistic",
    "num_round": "50",
    "verbosity": "3",
    "content_type": content_type,
}

xgb_script_mode_estimator = XGBoost(
    source_dir="train_code", # parent folder of training logic and dependencies
    entry_point="train.py", # training logic path
    framework_version="1.7-1",  # Note: framework_version is mandatory
    hyperparameters=hyperparams,
    role=role,
    volume_size=50,
    instance_count=instance_count, # for distributed training, set instance greater than 1
    instance_type=instance_type,
    output_path=output_path,  
    keep_alive_period_in_seconds = 1000, # Keep instance warm to negate cold start for fast experimentation trials. Charge is incurred for warm instances
     # security_group_ids = ["sg-0da0029e091e8ab51"],    # VPC Security Group IDs (Train in VPC mode)
    # subnets = ["subnet-0a2bb2e564f57cb27","subnet-0f948912e1df38195","subnet-0106dbb912c0b9bb9"], # VPC Subnets IDs (Train in VPC mode)
)

train_input = TrainingInput(
   training_dataset_s3_path,  content_type= content_type, distribution = "ShardedByS3Key" if instance_count>1 else "FullyReplicated"
)
validation_input = TrainingInput(
    validation_dataset_s3_path, content_type = content_type
)
xgb_script_mode_estimator.fit({"train": train_input,
                               "validation": validation_input})

##### Deploy Model to real-time endpoint

In [None]:
predictor = xgb_script_mode_estimator.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.xlarge",
    entry_point="inference.py", # path to inference script within the model package
)

In [None]:
# Test Endpoint
features = pd.read_csv("test.csv").iloc[:,1:]
num_examples = len(features)

content_type = "text/csv"

predictor.serializer = sagemaker.serializers.CSVSerializer()
batch_size = 1500
predict_prob = []
for i in np.arange(0, num_examples, step=batch_size):
    payload = features.iloc[i:(i + batch_size), :].to_csv(header=False,index=False).strip()
    predict_prob.extend(predictor.predict(payload))

predict_prob

In [None]:
#Clean Up
predictor.delete_model()
predictor.delete_endpoint()

# Automatic model Tuning

Amazon SageMaker automatic model tuning, also known as hyperparameter tuning, finds the best version of a model by running many training jobs on your dataset using the algorithm and ranges of hyperparameters that you specify. It then chooses the hyperparameter values that result in a model that performs the best, as measured by a metric that you choose. For example, suppose that you want to solve a binary classification problem on this marketing dataset. Your goal is to maximize the area under the curve (auc) metric of the algorithm by training an XGBoost Algorithm model. You don't know which values of the eta, alpha, min_child_weight, and max_depth hyperparameters to use to train the best model. To find the best values for these hyperparameters, you can specify ranges of values that Amazon SageMaker hyperparameter tuning searches to find the combination of values that results in the training job that performs the best as measured by the objective metric that you chose. Hyperparameter tuning launches training jobs that use hyperparameter values in the ranges that you specified, and returns the training job with highest auc.

In [77]:
from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner

hyperparameter_ranges = {'eta': ContinuousParameter(0, 1),
                        'min_child_weight': IntegerParameter(1, 10),                       
                        'max_depth': IntegerParameter(1, 10)}

objective_metric_name = 'validation:auc'

In [78]:
tuner = HyperparameterTuner(xgb_algo_mode_estimator, # pass in the estimator created above (agnostic to framework or training mode)
                            objective_metric_name,
                            hyperparameter_ranges,
                            max_jobs=6, # maximum number of canditate jobs
                            max_parallel_jobs=3, # number of jobs to execute in parallel
                            early_stopping_type="Auto", # Enable Early Stopping
                           )

tuner.fit({'train': train_input, 'validation': validation_input})

........................................................................................................!


In [None]:
inference_instance_type = "ml.m5.xlarge"

# Deploy to the best performing training job to a realtime endpoint
predictor = tuner.deploy(
    initial_instance_count=1,
    instance_type=inference_instance_type,
    entry_point="inference.py",
)

In [None]:
#Clean Up
predictor.delete_model()
predictor.delete_endpoint()