In [15]:
%%writefile abalone.py
from __future__ import print_function

import argparse
import json
import logging
import os
import pickle as pkl

import pandas as pd
import xgboost as xgb
from sagemaker_containers import entry_point
from sagemaker_xgboost_container import distributed
from sagemaker_xgboost_container.data_utils import get_dmatrix


def _xgb_train(params, dtrain, evals, num_boost_round, model_dir, is_master):
    """Run xgb train on arguments given with rabit initialized.
    This is our rabit execution function.
    :param args_dict: Argument dictionary used to run xgb.train().
    :param is_master: True if current node is master host in distributed training,
                        or is running single node training job.
                        Note that rabit_run will include this argument.
    """
    booster = xgb.train(params=params, dtrain=dtrain, evals=evals, num_boost_round=num_boost_round)

    if is_master:
        model_location = model_dir + "/xgboost-model"
        pkl.dump(booster, open(model_location, "wb"))
        logging.info("Stored trained model at {}".format(model_location))


if __name__ == "__main__":
    parser = argparse.ArgumentParser()

    # Hyperparameters are described here.
    parser.add_argument(
        "--max_depth",
        type=int,
    )
    parser.add_argument("--eta", type=float)
    parser.add_argument("--gamma", type=int)
    parser.add_argument("--min_child_weight", type=int)
    parser.add_argument("--subsample", type=float)
    parser.add_argument("--verbosity", type=int)
    parser.add_argument("--objective", type=str)
    parser.add_argument("--num_round", type=int)
    parser.add_argument("--tree_method", type=str, default="auto")
    parser.add_argument("--predictor", type=str, default="auto")

    # Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument("--output_data_dir", type=str, default=os.environ.get("SM_OUTPUT_DATA_DIR"))
    parser.add_argument("--model_dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--validation", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION"))
    parser.add_argument("--sm_hosts", type=str, default=os.environ.get("SM_HOSTS"))
    parser.add_argument("--sm_current_host", type=str, default=os.environ.get("SM_CURRENT_HOST"))

    args, _ = parser.parse_known_args()

    # Get SageMaker host information from runtime environment variables
    sm_hosts = json.loads(args.sm_hosts)
    sm_current_host = args.sm_current_host

    dtrain = get_dmatrix(args.train, "libsvm")
    dval = get_dmatrix(args.validation, "libsvm")
    watchlist = (
        [(dtrain, "train"), (dval, "validation")] if dval is not None else [(dtrain, "train")]
    )

    train_hp = {
        "max_depth": args.max_depth,
        "eta": args.eta,
        "gamma": args.gamma,
        "min_child_weight": args.min_child_weight,
        "subsample": args.subsample,
        "verbosity": args.verbosity,
        "objective": args.objective,
        "tree_method": args.tree_method,
        "predictor": args.predictor,
    }

    xgb_train_args = dict(
        params=train_hp,
        dtrain=dtrain,
        evals=watchlist,
        num_boost_round=args.num_round,
        model_dir=args.model_dir,
    )

    if len(sm_hosts) > 1:
        # Wait until all hosts are able to find each other
        entry_point._wait_hostname_resolution()

        # Execute training function after initializing rabit.
        distributed.rabit_run(
            exec_fun=_xgb_train,
            args=xgb_train_args,
            include_in_training=(dtrain is not None),
            hosts=sm_hosts,
            current_host=sm_current_host,
            update_rabit_args=True,
        )
    else:
        # If single node training, call training method directly.
        if dtrain:
            xgb_train_args["is_master"] = True
            _xgb_train(**xgb_train_args)
        else:
            raise ValueError("Training channel must have data to train model.")


def model_fn(model_dir):
    """Deserialize and return fitted model.
    Note that this should have the same name as the serialized model in the _xgb_train method
    """
    model_file = "xgboost-model"
    booster = pkl.load(open(os.path.join(model_dir, model_file), "rb"))
    return booster

Writing abalone.py


In [33]:
import boto3
import sagemaker
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.xgboost import XGBoost, XGBoostModel
from sagemaker.session import Session
from sagemaker.inputs import TrainingInput

In [21]:
hyperparameters = {
        "max_depth": "5",
        "eta": "0.2",
        "gamma": "4",
        "min_child_weight": "6",
        "subsample": "0.7",
        "objective": "reg:squarederror",
        "num_round": "50",
        "verbosity": "2",
    }

# set an output path where the trained model will be saved
bucket = 'yudong-data'
prefix = 'DEMO-xgboost-as-a-framework'
output_path = 's3://{}/{}/{}/output'.format(bucket, prefix, 'abalone-xgb-framework')

In [26]:
%%time
s3 = boto3.client("s3")
# Load the dataset
FILE_DATA = "abalone"
s3.download_file(
    "sagemaker-sample-files", f"datasets/tabular/uci_abalone/abalone.libsvm", FILE_DATA
)
sagemaker.Session().upload_data(FILE_DATA, bucket=bucket, key_prefix=prefix + "/train")

CPU times: user 203 ms, sys: 15.6 ms, total: 218 ms
Wall time: 1.97 s


's3://yudong-data/DEMO-xgboost-as-a-framework/train/abalone'

In [18]:
estimator = XGBoost(entry_point = "abalone.py", 
                    framework_version='1.2-2',
                    hyperparameters=hyperparameters,
                    role=sagemaker.get_execution_role(),
                    instance_count=1,
                    instance_type='local',
                    output_path=output_path)

In [27]:
train_input = TrainingInput("s3://yudong-data/DEMO-xgboost-as-a-framework/train/abalone", content_type="text/libsvm")

In [28]:
train_input

<sagemaker.inputs.TrainingInput at 0x7ff41db26320>

In [29]:
estimator.fit({"train": train_input, "validation": train_input})

Creating cpjv7h6pwe-algo-1-l8fao ... 
Creating cpjv7h6pwe-algo-1-l8fao ... done
Attaching to cpjv7h6pwe-algo-1-l8fao
[36mcpjv7h6pwe-algo-1-l8fao |[0m [2021-07-21 15:29:45.249 48645729713d:1 INFO utils.py:27] RULE_JOB_STOP_SIGNAL_FILENAME: None
[36mcpjv7h6pwe-algo-1-l8fao |[0m [2021-07-21:15:29:45:INFO] Imported framework sagemaker_xgboost_container.training
[36mcpjv7h6pwe-algo-1-l8fao |[0m [2021-07-21:15:29:45:INFO] No GPUs detected (normal if no gpus installed)
[36mcpjv7h6pwe-algo-1-l8fao |[0m [2021-07-21:15:29:45:INFO] Invoking user training script.
[36mcpjv7h6pwe-algo-1-l8fao |[0m [2021-07-21:15:29:45:INFO] Module abalone does not provide a setup.py. 
[36mcpjv7h6pwe-algo-1-l8fao |[0m Generating setup.py
[36mcpjv7h6pwe-algo-1-l8fao |[0m [2021-07-21:15:29:45:INFO] Generating setup.cfg
[36mcpjv7h6pwe-algo-1-l8fao |[0m [2021-07-21:15:29:45:INFO] Generating MANIFEST.in
[36mcpjv7h6pwe-algo-1-l8fao |[0m [2021-07-21:15:29:45:INFO] Installing module with the following comma

In [30]:
model_data = estimator.model_data

In [31]:
model_data

's3://yudong-data/DEMO-xgboost-as-a-framework/abalone-xgb-framework/output/sagemaker-xgboost-2021-07-21-15-29-42-361/model.tar.gz'

In [32]:
%%writefile inference-abalone.py
import json
import os
import pickle as pkl

import numpy as np
import sagemaker_xgboost_container.encoder as xgb_encoders


def model_fn(model_dir):
    """
    Deserialize and return fitted model.
    """
    model_file = "xgboost-model"
    booster = pkl.load(open(os.path.join(model_dir, model_file), "rb"))
    return booster


def input_fn(request_body, request_content_type):
    """
    The SageMaker XGBoost model server receives the request data body and the content type,
    and invokes the `input_fn`.
    Return a DMatrix (an object that can be passed to predict_fn).
    """
    if request_content_type == "text/libsvm":
        return xgb_encoders.libsvm_to_dmatrix(request_body)
    else:
        raise ValueError("Content type {} is not supported.".format(request_content_type))


def predict_fn(input_data, model):
    """
    SageMaker XGBoost model server invokes `predict_fn` on the return value of `input_fn`.
    Return a two-dimensional NumPy array where the first columns are predictions
    and the remaining columns are the feature contributions (SHAP values) for that prediction.
    """
    prediction = model.predict(input_data)
    feature_contribs = model.predict(input_data, pred_contribs=True, validate_features=False)
    output = np.hstack((prediction[:, np.newaxis], feature_contribs))
    return output


def output_fn(predictions, content_type):
    """
    After invoking predict_fn, the model server invokes `output_fn`.
    """
    if content_type == "text/csv":
        return ",".join(str(x) for x in predictions[0])
    else:
        raise ValueError("Content type {} is not supported.".format(content_type))

Writing inference-abalone.py


In [34]:
xgb_inference_model = XGBoostModel(
        model_data=model_data,
        role=sagemaker.get_execution_role(),
        entry_point="inference-abalone.py",
        framework_version="1.2-1",
    )

In [35]:
predictor = xgb_inference_model.deploy(
        initial_instance_count=1,
        instance_type="local",
    )

Attaching to 51968qvjvf-algo-1-deh8j
[36m51968qvjvf-algo-1-deh8j |[0m [2021-07-21:15:40:45:INFO] No GPUs detected (normal if no gpus installed)
[36m51968qvjvf-algo-1-deh8j |[0m [2021-07-21:15:40:45:INFO] No GPUs detected (normal if no gpus installed)
[36m51968qvjvf-algo-1-deh8j |[0m [2021-07-21:15:40:45:INFO] nginx config: 
[36m51968qvjvf-algo-1-deh8j |[0m worker_processes auto;
[36m51968qvjvf-algo-1-deh8j |[0m daemon off;
[36m51968qvjvf-algo-1-deh8j |[0m pid /tmp/nginx.pid;
[36m51968qvjvf-algo-1-deh8j |[0m error_log  /dev/stderr;
[36m51968qvjvf-algo-1-deh8j |[0m 
[36m51968qvjvf-algo-1-deh8j |[0m worker_rlimit_nofile 4096;
[36m51968qvjvf-algo-1-deh8j |[0m 
[36m51968qvjvf-algo-1-deh8j |[0m events {
[36m51968qvjvf-algo-1-deh8j |[0m   worker_connections 2048;
[36m51968qvjvf-algo-1-deh8j |[0m }
[36m51968qvjvf-algo-1-deh8j |[0m 
[36m51968qvjvf-algo-1-deh8j |[0m http {
[36m51968qvjvf-algo-1-deh8j |[0m   include /etc/nginx/mime.types;
[36m51968qvjvf-algo-1-deh

In [36]:
def do_inference_on_local_endpoint(predictor, libsvm_str):
    label, *features = libsvm_str.strip().split()
    predictions = predictor.predict(" ".join(["-99"] + features))  # use dummy label -99
    print("Prediction: {}".format(predictions))

In [38]:
a_young_abalone = "6 1:3 2:0.37 3:0.29 4:0.095 5:0.249 6:0.1045 7:0.058 8:0.067"
do_inference_on_local_endpoint(predictor, a_young_abalone)

an_old_abalone = "15 1:1 2:0.655 3:0.53 4:0.175 5:1.2635 6:0.486 7:0.2635 8:0.415"
do_inference_on_local_endpoint(predictor, an_old_abalone)

Prediction: [['6.8532515', '0.0', '-0.3545924', '-0.12613766', '-0.36350462', '-0.37387854', '-0.83996755', '1.5954899', '0.38270319', '-2.9971762', '9.930313']]
[36m51968qvjvf-algo-1-deh8j |[0m 172.18.0.1 - - [21/Jul/2021:15:41:11 +0000] "POST /invocations HTTP/1.1" 200 113 "-" "python-urllib3/1.26.5"
Prediction: [['14.508704', '0.0', '-0.0045526065', '-0.07738679', '0.023501989', '0.35198748', '0.9640153', '0.92003435', '0.040878277', '2.3599126', '9.930313']]
[36m51968qvjvf-algo-1-deh8j |[0m 172.18.0.1 - - [21/Jul/2021:15:41:11 +0000] "POST /invocations HTTP/1.1" 200 114 "-" "python-urllib3/1.26.5"
