# Queue Prediction

Parsl (Parallel Scripting in Python) lets python functions or external applications run at the same time but does not really an intelligent way to manage resources. This includes provisioning new resources, knowing when to scale up or down, or releasing unnecessary resources at a certain point. It is expensive to grab new resources from an HPC platform as they are heavily used and you can end up sitting in queue for hours, resulting in us wanting to add intelligence to Parsl's resource management strategies. This notebook aims to accurately predict the queue time of a request that has been sent to the supercomputer using 2 different methods: Amazon Sagemaker and KFold, all tested on 2019 time series data. We hope this will be comparable to the QBETS model by Rich Wolski which is already in use and gives u a confidence interval of getting a certain amount of nodes in a certain amount of minutes.

### Amazon Sagemaker

##### Importing Libraries and Creating IAM Role

In [1]:
# import libraries
import boto3, re, sys, math, json, os, sagemaker, urllib.request
from sagemaker import get_execution_role
import numpy as np                                
import pandas as pd                               
import matplotlib.pyplot as plt                   
from IPython.display import Image                 
from IPython.display import display               
from time import gmtime, strftime                 
from sagemaker.predictor import csv_serializer   
from sklearn.model_selection import KFold
from sklearn.preprocessing import MinMaxScaler
from sklearn.svm import SVR
from sklearn.model_selection import cross_val_score, cross_val_predict

role = get_execution_role()
prefix = 'sagemaker/DEMO-xgboost-dm'
containers = {'us-west-2': '433757028032.dkr.ecr.us-west-2.amazonaws.com/xgboost:latest',
              'us-east-1': '811284229777.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest',
              'us-east-2': '825641698319.dkr.ecr.us-east-2.amazonaws.com/xgboost:latest',
              'eu-west-1': '685385470294.dkr.ecr.eu-west-1.amazonaws.com/xgboost:latest'}
my_region = boto3.session.Session().region_name
print("Success - the MySageMakerInstance is in the " + my_region + " region. You will use the " + containers[my_region] + " container for your SageMaker endpoint.")

Success - the MySageMakerInstance is in the us-east-1 region. You will use the 811284229777.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest container for your SageMaker endpoint.


##### Creating a S3 Bucket to Store Data

In [2]:
bucket_name = 'alcfsagemakerbucket'
s3 = boto3.resource('s3')
try:
    if  my_region == 'us-east-1':
      s3.create_bucket(Bucket=bucket_name)
    else: 
      s3.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={ 'LocationConstraint': my_region })
    print('S3 bucket created successfully')
except Exception as e:
    print('S3 error: ',e)

S3 bucket created successfully


##### Downloading Data to Sagemaker Instance

In [3]:
try:
  model_data = pd.read_csv('./ANL-ALCF-DJC-THETA_20190101_20190630.csv', header=0)
  print('Success: Data loaded into dataframe.')
except Exception as e:
    print('Data load error: ',e)

Success: Data loaded into dataframe.


##### Pre-Splitting the Data

In order to run this model it is essential to split that dataset into train and test data. For this example, I used a 70-30 split. Instead of using pandas split the csv file, I decided to pre-split because time series data should not be randomized. The first 70% of the data is the trainset and the last 30% of the data is the testset.

In [4]:
train_data = pd.read_csv('70_ANL-ALCF-DJC-THETA_20190101_20190630-2-2-2-2.csv', header=0)
test_data = pd.read_csv('30_ANL-ALCF-DJC-THETA_20190101_20190630.csv', header=0)

##### Reformatting Data to use XGBoost model

Using XGBoost has some restrictions, such as having your dependent variable be the first column as well as no header being in the dataset. Along with this, all of the columns must be either integers or floats. Thus, all the string type columns were taken out, still proving to be useful as the important features are run time, nodes requested, nodes used, etc., all being floats. Additionally, the dependent variable (queue time) must be [0, 1], otherwise XGBoost does not run. Thus, all the queue times that were below 1000 were assigned to 0 and the times above were assigned 1s.

In [5]:
pd.concat([train_data['QUEUED_WAIT_SECONDS'], train_data.drop(['QUEUED_WAIT_SECONDS'], axis=1)], axis=1).to_csv('train.csv', index=False, header=False)
boto3.Session().resource('s3').Bucket(bucket_name).Object(os.path.join(prefix, 'train/train.csv')).upload_file('train.csv')
s3_input_train = sagemaker.s3_input(s3_data='s3://{}/{}/train'.format(bucket_name, prefix), content_type='csv')

In [6]:
sess = sagemaker.Session()
xgb = sagemaker.estimator.Estimator(containers[my_region],role, train_instance_count=1, train_instance_type='ml.m4.xlarge',output_path='s3://{}/{}/output'.format(bucket_name, prefix),sagemaker_session=sess)
xgb.set_hyperparameters(max_depth=5,eta=0.2,gamma=4,min_child_weight=6,subsample=0.8,silent=0,objective='binary:logistic',num_round=100)

##### Train the Model using Gradient Optimization

In [7]:
xgb.fit({'train': s3_input_train})

2019-08-08 20:48:27 Starting - Starting the training job...
2019-08-08 20:48:29 Starting - Launching requested ML instances......
2019-08-08 20:49:33 Starting - Preparing the instances for training......
2019-08-08 20:50:55 Downloading - Downloading input data
2019-08-08 20:50:55 Training - Downloading the training image...
2019-08-08 20:51:25 Uploading - Uploading generated training model
2019-08-08 20:51:25 Completed - Training job completed

[31mArguments: train[0m
[31m[2019-08-08:20:51:13:INFO] Running standalone xgboost training.[0m
[31m[2019-08-08:20:51:13:INFO] Path /opt/ml/input/data/validation does not exist![0m
[31m[2019-08-08:20:51:13:INFO] File size need to be processed in the node: 6.88mb. Available memory size in the node: 8587.93mb[0m
[31m[2019-08-08:20:51:13:INFO] Determined delimiter of CSV input is ','[0m
[31m[20:51:13] S3DistributionType set as FullyReplicated[0m
[31m[20:51:13] 34049x43 matrix with 1464107 entries loaded from /opt/ml/input/data/train?for

Billable seconds: 51


##### Deploy the Model

Deploying the model requires many resources, and currently the resource limit happens to be exceeded with the ml.m4.xlarge instance type. After contacting AWS Support, they are approving my limit increase request and are collaborating with the service team to get approval.

In [8]:
xgb_predictor = xgb.deploy(initial_instance_count=1,instance_type='ml.m4.xlarge')

ResourceLimitExceeded: An error occurred (ResourceLimitExceeded) when calling the CreateEndpoint operation: The account-level service limit 'ml.m4.xlarge for endpoint usage' is 0 Instances, with current utilization of 0 Instances and a request delta of 1 Instances. Please contact AWS support to request an increase for this limit.

##### Predicting the Queue Time

In [None]:
test_data_array = test_data.drop(['QUEUED_WAIT_SECONDS'], axis=1).as_matrix()
xgb_predictor.content_type = 'text/csv'
xgb_predictor.serializer = csv_serializer
predictions = xgb_predictor.predict(test_data_array).decode('utf-8')
predictions_array = np.fromstring(predictions[1:], sep=',')
print(predictions_array.shape)

##### Evaluate Model Performance

In [None]:
cm = pd.crosstab(index=test_data['QUEUED_WAIT_SECONDS'], columns=np.round(predictions_array), rownames=['Observed'], colnames=['Predicted'])
tn = cm.iloc[0,0]; fn = cm.iloc[1,0]; tp = cm.iloc[1,1]; fp = cm.iloc[0,1]; p = (tp+tn)/(tp+tn+fp+fn)*100
print("\n{0:<20}{1:<4.1f}%\n".format("Overall Classification Rate: ", p))
print("{0:<15}{1:<15}{2:>8}".format("Predicted", "No Purchase", "Purchase"))
print("Observed")
print("{0:<15}{1:<2.0f}% ({2:<}){3:>6.0f}% ({4:<})".format("No Purchase", tn/(tn+fn)*100,tn, fp/(tp+fp)*100, fp))
print("{0:<16}{1:<1.0f}% ({2:<}){3:>7.0f}% ({4:<}) \n".format("Purchase", fn/(tn+fn)*100,fn, tp/(tp+fp)*100, tp))

##### Terminate Resources

Terminating resources that are not actively being used reduces costs and is good practice. This deletes the Sagemaker endpoint.

In [None]:
sagemaker.Session().delete_endpoint(xgb_predictor.endpoint)
bucket_to_delete = boto3.resource('s3').Bucket(bucket_name)
bucket_to_delete.objects.all().delete()

### KFold Prediction

Cross-validation is a resampling procedure used to evaluate machine learning models on a data sample. It is simple to understand and it generally results in a less biased estimate of the model skill than a simple train/test split. The procedure has a single parameter called k that refers to the number of groups that a given data sample is to be split into. After splitting the dataset into k folds, the first fold is used to test the model and the rest are used to train the model. In the second iteration, the second fold is used as the testing set while the rest serve as the training set. This process is repeated until each of the k folds have been used as the testing set. With the data, you must determine what (float or int) features heavily impact the dependent variable (queue time). I chose the run time, nodes requested, and nodes used.

##### KFold (First 100 lines)

In order to see if the KFold procedure was splitting the data into testing and training sets accurately, I tested the method on a small section of the data, the first 100 lines only. From this process, I was able to see that the data was being split up accurately and how the number of splits affected the overall accuracy of the model (in terms of r squared). It seems as though the r squared value was the highest at 25 splits, with the r squared being -.63.

In [28]:
model_data = pd.read_csv('./SECTION_ANL-ALCF-DJC-THETA_20190101_20190630.csv', header=0)

x = model_data.iloc[:,[13,14,15]]
y = model_data.iloc[:,12]
scaler = MinMaxScaler(feature_range=(0, 1))
x = scaler.fit_transform(x)

scores = []
best_svr = SVR(kernel='rbf', gamma = 'scale')
cv = KFold(n_splits=25, random_state=42, shuffle=False)
for train_index, test_index in cv.split(x):
    print("Train Index: ", train_index, "\n")
    print("Test Index: ", test_index)
    x_train, x_test, y_train, y_test = x[train_index], x[test_index], y[train_index], y[test_index]
    best_svr.fit(x_train, y_train)
    scores.append(best_svr.score(x_test, y_test))

Train Index:  [ 4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98] 

Test Index:  [0 1 2 3]
Train Index:  [ 0  1  2  3  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98] 

Test Index:  [4 5 6 7]
Train Index:  [ 0  1  2  3  4  5  6  7 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98] 

Test Index:  [ 8  9 10 11]
Train Ind

In [29]:
best_svr.fit(x_train, y_train)
scores.append(best_svr.score(x_test, y_test))
print(np.mean(scores)) #overall accuracy

-0.6275891208497313


##### KFold Prediction (All Data)

While it is certain that this method works, it is much less accurate while running it on all of the data. This might be due to the fact that the number of splits chosen is not optimal, but also the difficulty in predicting the queue time with the given data.

In [None]:
try:
  model_data = pd.read_csv('./ANL-ALCF-DJC-THETA_20190101_20190630.csv', header=0)
  print('Success: Data loaded into dataframe.')
except Exception as e:
    print('Data load error: ',e)
x = model_data.iloc[:,[13,14,15]]
y = model_data.iloc[:,12]
scaler = MinMaxScaler(feature_range=(0, 1))
x = scaler.fit_transform(x)

scores = []
best_svr = SVR(kernel='rbf', gamma='scale')
cv = KFold(n_splits=50, random_state=42, shuffle=False)
for train_index, test_index in cv.split(x):
    print("Train Index: ", train_index, "\n")
    print("Test Index: ", test_index)
    x_train, x_test, y_train, y_test = x[train_index], x[test_index], y[train_index], y[test_index]
    best_svr.fit(x_train, y_train)
    scores.append(best_svr.score(x_test, y_test))

In [None]:
best_svr.fit(x_train, y_train)
scores.append(best_svr.score(x_test, y_test))
print(np.mean(scores)) #overall accuracy

In [None]:
cross_val_score(best_svr, x, y, cv=50) #list of r^2 scores

In [None]:
cross_val_predict(best_svr, x, y, cv=50) #list of predictions