# Train and deploy thousands of SageMaker models on the cheap
A major reason to use the cloud for developing, training, and deploying machine learning models is the ability to scale both quickly and cheaply. Here, we'll demonstrate the ability to train and deploy literally thousands of XGBoost models using Amazon SageMaker. 

We're going to bring our own XGBoost script here, but you should be able to modify this to use either a 1P algorithm or another model of your choice.

Note, we are actually generating arbitrary data in the training cluster itself. The SageMaker API is going to expect a real CSV file in S3, so let's just post a dummy set there.

In [32]:
%%writefile train.csv
286050,1995,2052,3,2.5,1.05,1

Overwriting train.csv


In [33]:
%%writefile validation.csv
286050,1995,2052,3,2.5,1.05,1

Overwriting validation.csv


In [103]:
!pip install -qU awscli boto3 sagemaker

In [104]:
import sagemaker
import numpy as np
import pandas as pd
import time
import boto3
import os


sess = sagemaker.Session()
bucket = sess.default_bucket()
prefix = '1K'
full_s3_path = 's3://{}/{}/'.format(bucket, prefix)
print ('Writing all data to {}'.format(full_s3_path))
os.system('''aws s3 cp train.csv {}'''.format(full_s3_path))
os.system('''aws s3 cp validation.csv {}'''.format(full_s3_path))

Writing all data to s3://sagemaker-us-east-1-181880743555/1K/


0

Next, we will loop through each of those cities and put them into batches. Each batch can be some arbitrary number, why not pick 100? That means we'll grab 100 cities at a time, package up the feature generation and model training into a single script, and run that on a dedicated SageMaker cluster.

So, each batch will have 100 cities. For simplicity, let's assume the amount of data we need to train a single model is quite small, less than 1 GB. That means we don't need to distribute the jobs, we can run them on single nodes.

In addition, to make the logging easier, we should actually train these models linearly, that is, one after the next. For 2000 models, that means we'll have 20 training jobs, and each training job will actually produce 100 models. 

## Loop Through Batches and Run SageMaker Jobs on Spot
Now, we're going to step through our 20 batches. For each batch, we'll do the following steps:
1. Package up a script that loops through 100 cities, generates data, and trains models
2. Run that script on a spot training job using the XGBoost estimator
3. Ensure that the XGBoost estimator is called with`wait=False` so that the jobs start roughly at the same time.

The big picture here is that we are running 20 training jobs on SageMaker spot instances, and each job is looping through our 100 models, massaging the training data, training a model, and writing it to S3.

In [1]:
%%writefile xgboost.py

import argparse
import json
import logging
import os
import pandas as pd
import pickle as pkl
import xgboost as xgb
import numpy as np
from sklearn.model_selection import train_test_split

from sagemaker_containers import entry_point
from sagemaker_xgboost_container.data_utils import get_dmatrix

DATA_PREFIX = '1K'
MULTI_MODEL_ARTIFACTS = 'multi_model_artifacts'
MAX_YEAR = 2019

def gen_price(house):
    base_price = int(house['SQUARE_FEET'] * 150)
    price = int(base_price + (10000 * house['NUM_BEDROOMS']) + \
                               (15000 * house['NUM_BATHROOMS']) + \
                               (15000 * house['LOT_ACRES']) + \
                               (15000 * house['GARAGE_SPACES']) - \
                               (5000 * (MAX_YEAR - house['YEAR_BUILT'])))
    return price

def gen_random_house():
    house = {'SQUARE_FEET':   int(np.random.normal(3000, 750)),
              'NUM_BEDROOMS':  np.random.randint(2, 7),
              'NUM_BATHROOMS': np.random.randint(2, 7) / 2,
              'LOT_ACRES':     round(np.random.normal(1.0, 0.25), 2),
              'GARAGE_SPACES': np.random.randint(0, 4),
              'YEAR_BUILT':    min(MAX_YEAR, int(np.random.normal(1995, 10)))}

    price = gen_price(house)
    
    return [price, house['YEAR_BUILT'],   house['SQUARE_FEET'], 
                    house['NUM_BEDROOMS'], house['NUM_BATHROOMS'], 
                    house['LOT_ACRES'],    house['GARAGE_SPACES']]

def gen_houses(num_houses):
    house_list = []
    for i in range(num_houses):
        house_list.append(gen_random_house())
    df = pd.DataFrame(house_list, 
                       columns=['PRICE',        'YEAR_BUILT',    'SQUARE_FEET',  'NUM_BEDROOMS',
                                'NUM_BATHROOMS','LOT_ACRES',     'GARAGE_SPACES'])
    return df

def split_data(df):
    splits = [0.6, 0.3, 0.1]
    
    # split data into train and test sets
    seed      =   7
    val_size  = splits[1]
    test_size = splits[2]
    
    num_samples = df.shape[0]
    X1 = df.values[:num_samples, 1:] # keep only the features, skip the target, all rows
    Y1 = df.values[:num_samples, :1] # keep only the target, all rows

    # Use split ratios to divide up into train/val/test
    X_train, X_val, y_train, y_val = \
        train_test_split(X1, Y1, test_size=(test_size + val_size), random_state=seed)
    
    # Of the remaining non-training samples, give proper ratio to validation and to test
    X_test, X_test, y_test, y_test = \
        train_test_split(X_val, y_val, test_size=(test_size / (test_size + val_size)), 
                         random_state=seed)
    
    # reassemble the datasets with target in first column and features after that
    train = np.concatenate([y_train, X_train], axis=1)
    val   = np.concatenate([y_val,   X_val],   axis=1)
    test  = np.concatenate([y_test,  X_test],  axis=1)

    return train, val, test

def save_data_locally(location, train, val, test):
    os.makedirs(f'data/{location}/train')
    np.savetxt( f'data/{location}/train/{location}_train.csv', train, delimiter=',', fmt='%.2f')
    
    os.makedirs(f'data/{location}/val')
    np.savetxt(f'data/{location}/val/{location}_val.csv', val, delimiter=',', fmt='%.2f')
    
    os.makedirs(f'data/{location}/test')
    np.savetxt(f'data/{location}/test/{location}_test.csv', test, delimiter=',', fmt='%.2f')

def model_fn(model_dir):
    """Deserialize and return fitted model.

    Note that this should have the same name as the serialized model in the _xgb_train method
    """
    model_file = 'xgboost-model'
    booster = pkl.load(open(os.path.join(model_dir, model_file), 'rb'))
    return booster
        
def parse_args():
    
    parser = argparse.ArgumentParser()
    
#       Hyperparameters are described here.
    parser.add_argument('--max_depth', type=int,)
    parser.add_argument('--eta', type=float)
    parser.add_argument('--gamma', type=int)
    parser.add_argument('--min_child_weight', type=int)
    parser.add_argument('--subsample', type=float)
    parser.add_argument('--objective', type=str)
    parser.add_argument('--num_round', type=int)

#     Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument('--output_data_dir', type=str, default=os.environ.get('SM_OUTPUT_DATA_DIR'))
    parser.add_argument('--model_dir', type=str, default=os.environ.get('SM_MODEL_DIR'))
    parser.add_argument('--train', type=str, default=os.environ.get('SM_CHANNEL_TRAIN'))
    parser.add_argument('--validation', type=str, default=os.environ.get('SM_CHANNEL_VALIDATION'))
    parser.add_argument('--sm_hosts', type=str, default=os.environ.get('SM_HOSTS'))
    parser.add_argument('--sm_current_host', type=str, default=os.environ.get('SM_CURRENT_HOST'))

    # pass in an argument for the models we need to generate data for
    parser.add_argument('--n_models_to_train', type=str)
    parser.add_argument('--job_id', type=str)
    parser.add_argument('--n_jobs_to_run', type=str)

    args, _ = parser.parse_known_args()
    
    return args

def get_cities(args):
    '''
    Takes the argparser, computes the set of models to train for this job id. For example
        if job_id is 1, then compute
            CITY_1 ... CITY_99
        if job_id is 2, then compute
            CITY_100 ... CITY_199
    '''
    rt = []
    
    job_id = int(args.job_id)
        
    total_models = int(args.n_models_to_train)
    
    models_per_job = round(total_models / int(args.n_jobs_to_run))

    # compute the upper and lower bounds
    upper_bound = job_id * models_per_job
    lower_bound = upper_bound - models_per_job
        
    for model_i in range(lower_bound, upper_bound):
        model_name = 'CITY_{}'.format(model_i)
        rt.append(model_name)
        
    return rt


def xgb_train(args, train, city):
    """Run xgb train on arguments given with rabit initialized.

    This is our rabit execution function.

    :param args_dict: Argument dictionary used to run xgb.train().
    :param is_master: True if current node is master host in distributed training,
                        or is running single node training job.
                        Note that rabit_run will include this argument.
    """
    train_hp = {
        'max_depth': args.max_depth,
        'eta': args.eta,
        'gamma': args.gamma,
        'min_child_weight': args.min_child_weight,
        'subsample': args.subsample,
        'objective': args.objective
        }
    
    dtrain = get_dmatrix('data/{}/train/train.csv'.format(city), 'text/csv')
    dval = get_dmatrix(args.validation, 'libsvm')
    watchlist = [(dtrain, 'train'), (dval, 'validation')] if dval is not None else [(dtrain, 'train')]
    
    # why it doesn't see xgb.train here ... :( 
    booster = xgb.train(params=train_hp,
                        dtrain=dtrain,
                        evals=watchlist)

    model_dir = args.model_dir
    
    model_location = model_dir + '/{}-model'.format(city)
    pkl.dump(booster, open(model_location, 'wb'))
    logging.info("Stored trained model at {}".format(model_location))

if __name__ == '__main__':

    args = parse_args()
    
    os.mkdir('data')
    
    # loop through all of the models we need to train on this job
    for city in get_cities(args):
        
        print ('about to generate data and train models for {}'.format(city))
        
        # generate the data we need
        houses = gen_houses(num_houses = 1000)
        train, val, test = split_data(houses)
        
        save_data_locally(city, train, val, test)
        
        print ('made it to saving data locally!!!')
        
        # train the model and store in model directory
        xgb_train(args, train, city)

UsageError: Line magic function `%writefile` not found (But cell magic `%%writefile` exists, did you mean that instead?).


In [105]:
from sagemaker.session import s3_input, Session
from sagemaker.xgboost.estimator import XGBoost

def get_estimator(full_s3_path, job_id, n_jobs_to_run, n_models_to_train):

    hyperparams = {
        "max_depth":"5",
        "eta":"0.2",
        "gamma":"4",
        "min_child_weight":"6",
        "subsample":"0.7",
        "verbose":"1",
        "objective":"reg:linear",
        "num_round":"50",
        "job_id": "{}".format(job_id),
        "n_models_to_train": "{}".format(n_models_to_train),
        "n_jobs_to_run": "{}".format(n_jobs_to_run)}

    # need to add spot instances here
    est = XGBoost(entry_point='xgboost.py',
                    framework_version='1.0-1', # Note: framework_version is mandatory
                    hyperparameters=hyperparams,
                    role=sagemaker.get_execution_role(),
                    train_instance_count=1, 
                    train_instance_type='ml.c5.xlarge',
                    output_path=full_s3_path,
                    train_use_spot_instances=1,
                    train_max_run=(60 * 60 * 6),
                    train_max_wait= (60 * 60 * 12))
    
    return est

def run_job(est, full_s3_path, wait):
    train_input = s3_input(full_s3_path, content_type='text/csv')
    validation_input = s3_input(full_s3_path.format(bucket, prefix), content_type='text/csv')

    est.fit({'train': train_input, 'validation': validation_input}, wait=wait)
    

In [109]:
n_jobs_to_run = 20
n_models_to_train = 2000

for job_id in range(1, (n_jobs_to_run+1)):
        
    est = get_estimator(full_s3_path, job_id, n_jobs_to_run, n_models_to_train)
    
    run_job(est, full_s3_path, wait=False)
    
    break

's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.


2020-07-12 02:01:37 Starting - Starting the training job...
2020-07-12 02:01:39 Starting - Launching requested ML instances......
2020-07-12 02:02:56 Starting - Preparing the instances for training......
2020-07-12 02:03:58 Downloading - Downloading input data
2020-07-12 02:03:58 Training - Downloading the training image..[34mINFO:sagemaker-containers:Imported framework sagemaker_xgboost_container.training[0m
[34mINFO:sagemaker-containers:No GPUs detected (normal if no gpus installed)[0m
[34mINFO:sagemaker_xgboost_container.training:Invoking user training script.[0m
[34mINFO:sagemaker-containers:Module xgboost does not provide a setup.py. [0m
[34mGenerating setup.py[0m
[34mINFO:sagemaker-containers:Generating setup.cfg[0m
[34mINFO:sagemaker-containers:Generating MANIFEST.in[0m
[34mINFO:sagemaker-containers:Installing module with the following command:[0m
[34m/miniconda3/bin/python -m pip install . [0m
[34mProcessing /opt/ml/code[0m
[34mBuilding wheels for collected

UnexpectedStatusException: Error for Training job sagemaker-xgboost-2020-07-12-02-01-36-910: Failed. Reason: AlgorithmError: ExecuteUserScriptError:
Command "/miniconda3/bin/python -m xgboost --eta 0.2 --gamma 4 --job_id 1 --max_depth 5 --min_child_weight 6 --n_jobs_to_run 20 --n_models_to_train 2000 --num_round 50 --objective reg:linear --subsample 0.7 --verbose 1"

In [None]:
def launch_training_job(location, bucket):
    # clear out old versions of the data
    
#     s3 = boto3.resource('s3')

#     s3_bucket = s3.Bucket(bucket)
    
    full_input_prefix = f'{DATA_PREFIX}/model_prep/{location}'
    
    s3_bucket.objects.filter(Prefix=full_input_prefix + '/').delete()

    # upload the entire set of data for all three channels
#     local_folder = f'data/{location}'
#     inputs = sagemaker_session.upload_data(path=local_folder, key_prefix=full_input_prefix)
#     print(f'Training data uploaded: {inputs}')
    
#     _job = 'xgb-{}'.format(location.replace('_', '-'))
    full_output_prefix = f'{DATA_PREFIX}/model_artifacts/{location}'
    s3_output_path = f's3://{BUCKET}/{full_output_prefix}'

    
#     xgb = sagemaker.estimator.Estimator(XGBOOST_IMAGE, role, 
#                                         train_instance_count=1, train_instance_type=TRAIN_INSTANCE_TYPE,
#                                         output_path=s3_output_path, base_job_name=_job,
#                                         sagemaker_session=sagemaker_session)
    
#     xgb.set_hyperparameters(max_depth=5, eta=0.2, gamma=4, min_child_weight=6, subsample=0.8, silent=0, 
#                             early_stopping_rounds=5, objective='reg:linear', num_round=25) 
    
#     DISTRIBUTION_MODE = 'FullyReplicated'
    
#     train_input = sagemaker.s3_input(s3_data=inputs+'/train', 
#                                      distribution=DISTRIBUTION_MODE, content_type='csv')
    
#     val_input   = sagemaker.s3_input(s3_data=inputs+'/val', 
#                                      distribution=DISTRIBUTION_MODE, content_type='csv')
    
#     remote_inputs = {'train': train_input, 'validation': val_input}

#     xgb.fit(remote_inputs, wait=False)
    
    # Return the estimator object
    return xgb

In [98]:
!aws s3 cp "thousands_of_models.ipynb" s3://my-bucket-for-fridays/notebooks/xgboost-by-the-thousands/
!aws s3 cp "xgboost.py" s3://my-bucket-for-fridays/notebooks/xgboost-by-the-thousands/

upload: ./thousands_of_models.ipynb to s3://my-bucket-for-fridays/notebooks/xgboost-by-the-thousands/thousands_of_models.ipynb
upload: ./xgboost.py to s3://my-bucket-for-fridays/notebooks/xgboost-by-the-thousands/xgboost.py
