In [None]:
!rm -rfv *.csv *.log

In [None]:
!pip install -qU awscli boto3 sagemaker torch torchvision locust

In [None]:
import numpy as np 
import datetime
import math
import time
import boto3   
import matplotlib.pyplot as plt
import os
import pandas as pd

### Set enviornment vairables for Locust file to pick up

In [None]:
time_stamp_suffix = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
endpoint_name = 'triton-tf-' + time_stamp_suffix
print(endpoint_name)
endpoint_details = {
    'endpoint_name': endpoint_name,
    'endpoint_type': 'ml.g4dn.4xlarge',
    'endpoint_cores': 16,
    'endpoint_notes': 'SageMaker-Container'
}
file_name_string = endpoint_details['endpoint_name'] + '-' + endpoint_details['endpoint_type'] + '-' + endpoint_details['endpoint_notes'] + '-D-' + time_stamp_suffix
#https://docs.aws.amazon.com/general/latest/gr/sagemaker.html
host_string = 'https://runtime.sagemaker.us-east-2.amazonaws.com/endpoints/' + endpoint_details['endpoint_name'] + '/invocations'

test_details  = {
    'users': 10, # -u NUM_USERS, --users NUM_USERS Number of concurrent Locust users. Primarily used together with --headless
    'spawn_rate': 2, # -r SPAWN_RATE, --spawn-rate SPAWN_RATE The rate per second in which users are spawned. Primarily used together with --headless
    'log_level' : 'WARNING', # -L LOGLEVEL Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL. Default is INFO.
    'run_time': '10m' # -t RUN_TIME, --run-time RUN_TIME Stop after the specified amount of time, e.g. (300s, 20m, 3h, 1h30m, etc.). Only used together with --headless
}
print(host_string)

In [None]:
import boto3, time, json
from sagemaker import get_execution_role
sess    = boto3.Session()
sm      = sess.client('sagemaker')
region  = sess.region_name
account = boto3.client('sts').get_caller_identity().get('Account')
print(account)

In [None]:
## Variables

model_data = 's3://lniniga-mars/ensemble-bert-large.tar.gz' # TAR file containing saved_model.pb file in Triton directory format
sm_model_name = 'ensemble-bert-large'
#role_name = 'arn:aws:iam::{}:role/service-role/{}'.format(account, 'AmazonSageMaker-ExecutionRole-20210201T205137')
role_name = get_execution_role()
image = '{}.dkr.ecr.{}.amazonaws.com/{}:latest'.format(account, region, 'triton-with-tf')
print(role_name)
print(image)

In [None]:
with open('container/triton/ngnix.conf.template', 'r') as template, open('container/triton/ngnix.conf', 'w') as conf:
    conf_str = template.read()
    conf.write(conf_str.format(sm_model_name))

In [None]:
%%sh

# The name of our algorithm
algorithm_name=triton-with-tf

cd container/triton

account=$(aws sts get-caller-identity --query Account --output text)

# Get the region defined in the current configuration (default to us-west-2 if none defined)
region=$(aws configure get region)
region=${region:-us-east-2}

fullname="${account}.dkr.ecr.${region}.amazonaws.com/${algorithm_name}:latest"

# If the repository doesn't exist in ECR, create it.
aws ecr describe-repositories --repository-names "${algorithm_name}" > /dev/null 2>&1

if [ $? -ne 0 ]
then
    aws ecr create-repository --repository-name "${algorithm_name}" > /dev/null
fi

# Get the login command from ECR and execute it directly
$(aws ecr get-login --region ${region} --no-include-email)

# Build the docker image locally with the image name and then push it to ECR
# with the full name.

docker build -q -t ${algorithm_name} .
docker tag ${algorithm_name} ${fullname}

docker push ${fullname}

In [None]:
import sagemaker
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session(boto_session=sess)

In [None]:
container = {
    'Image': image,
    'ModelDataUrl': model_data
}

create_model_response = sm.create_model(
    ModelName         = sm_model_name,
    ExecutionRoleArn  = role_name,
    PrimaryContainer  = container)
print(create_model_response['ModelArn'])

In [None]:
import time
endpoint_config_name = 'triton-tf-' + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
print(endpoint_config_name)

create_endpoint_config_response = sm.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants = [{
        'InstanceType'        : endpoint_details['endpoint_type'],
        'InitialVariantWeight': 1,
        'InitialInstanceCount': 1,
        'ModelName'           : sm_model_name,
        'VariantName'         : 'AllTraffic'}])

print("Endpoint Config Arn: " + create_endpoint_config_response['EndpointConfigArn'])

In [None]:
create_endpoint_response = sm.create_endpoint(
    EndpointName         = endpoint_name,
    EndpointConfigName   = endpoint_config_name)
print(create_endpoint_response['EndpointArn'])

In [None]:
resp = sm.describe_endpoint(EndpointName=endpoint_name)
status = resp['EndpointStatus']
print("Status: " + status)

while status=='Creating':
    time.sleep(60)
    resp = sm.describe_endpoint(EndpointName=endpoint_name)
    status = resp['EndpointStatus']
    print("Status: " + status)

print("Arn: " + resp['EndpointArn'])
print("Status: " + status)

In [None]:
!echo {host_string}

### Run locust. See this link for more options - https://docs.locust.io/en/stable/running-locust-without-web-ui.html. Also, see this blog for more info - https://medium.com/@linh22jan/load-test-with-locust-37c4f85ee2fb

In [None]:
cw_start = datetime.datetime.utcnow()
%store cw_start
!locust -f loadtest/stress-optimized-protobuf.py -u {test_details['users']} -r {test_details['spawn_rate']} -L {test_details['log_level']} -t{test_details['run_time']} --csv={file_name_string} --logfile {file_name_string}.log --headless --host={host_string}
cw_end = datetime.datetime.utcnow()

### Load testing results from Locust

In [None]:
!ls *.csv

In [None]:
data = pd.read_csv(file_name_string + '_stats.csv')

for index, row in data.head(n=2).iterrows():
     print(index, row)

### Get model latency from Cloud Watch

In [None]:
time_stamp_suffix = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
#total_runs = data['Request Count'][0]
seconds_conversion = 1000
print('Total runs - {}\n'.format(data['Request Count'][0], time_stamp_suffix))
print('Timestamp - {}\n'.format(time_stamp_suffix))
print('Getting Cloudwatch:\n')
cloudwatch = boto3.client('cloudwatch')
statistics=['SampleCount', 'Average', 'Minimum', 'Maximum', 'Sum']
extended=['p10', 'p50', 'p90', 'p95', 'p100']

#ml.c5.9xlarge	36
#ml.c5.4xlarge	16

# Give 5 minute buffer to end
#cw_end += datetime.timedelta(minutes=5)

# Period must be 1, 5, 10, 30, or multiple of 60
# Calculate closest multiple of 60 to the total elapsed time
factor = math.ceil((cw_end - cw_start).total_seconds() / 60)
period = factor * 60
period = int(period)
#period = 1

print('Time elapsed: {} seconds\n'.format((cw_end - cw_start).total_seconds()))
print('Using period of {} seconds\n'.format(period))

cloudwatch_ready = False
# Keep polling CloudWatch metrics until datapoints are available
while not cloudwatch_ready:
    time.sleep(30)
    print('Waiting 30 seconds ...\n')
    # Must use default units of microseconds
    model_latency_metrics = cloudwatch.get_metric_statistics(MetricName='ModelLatency',
                                             Dimensions=[{'Name': 'EndpointName',
                                                          'Value': endpoint_details['endpoint_name']},
                                                         {'Name': 'VariantName',
                                                          'Value': "AllTraffic"}],
                                             Namespace="AWS/SageMaker",
                                             StartTime=cw_start,
                                             EndTime=cw_end,
                                             Period=period,
                                             Statistics=statistics,
                                             ExtendedStatistics=extended
                                             )
  
    if len(model_latency_metrics['Datapoints']) > 0:
        print('Model Latency: {} data points'.format(model_latency_metrics['Datapoints'][0]['SampleCount']))
        side_avg = model_latency_metrics['Datapoints'][0]['Average'] / seconds_conversion
        side_p50 = model_latency_metrics['Datapoints'][0]['ExtendedStatistics']['p50']  / seconds_conversion
        side_p90 = model_latency_metrics['Datapoints'][0]['ExtendedStatistics']['p90']  / seconds_conversion
        side_p95 = model_latency_metrics['Datapoints'][0]['ExtendedStatistics']['p95']  / seconds_conversion
        side_p100 = model_latency_metrics['Datapoints'][0]['ExtendedStatistics']['p100']  / seconds_conversion
        print('Avg | P50 | P90 | P95 | P100')
        print('{:.4f} | {:.4f} | {:.4f} | {:.4f}\n'.format(side_avg, side_p50, side_p90, side_p95, side_p100))
        cloudwatch_ready = True
    else:
        time.sleep(30)
        continue


    model_latency_metrics = cloudwatch.get_metric_statistics(MetricName='OverheadLatency',
                                             Dimensions=[{'Name': 'EndpointName',
                                                          'Value': endpoint_details['endpoint_name']},
                                                         {'Name': 'VariantName',
                                                          'Value': "AllTraffic"}],
                                             Namespace="AWS/SageMaker",
                                             StartTime=cw_start,
                                             EndTime=cw_end,
                                             Period=period,
                                             Statistics=statistics,
                                             ExtendedStatistics=extended
                                             )
  
    if len(model_latency_metrics['Datapoints']) > 0:
        print('OverheadLatency: {} data points'.format(model_latency_metrics['Datapoints'][0]['SampleCount']))
        side_avg = model_latency_metrics['Datapoints'][0]['Average'] / seconds_conversion
        side_p50 = model_latency_metrics['Datapoints'][0]['ExtendedStatistics']['p50']  / seconds_conversion
        side_p90 = model_latency_metrics['Datapoints'][0]['ExtendedStatistics']['p90']  / seconds_conversion
        side_p95 = model_latency_metrics['Datapoints'][0]['ExtendedStatistics']['p95']  / seconds_conversion
        side_p100 = model_latency_metrics['Datapoints'][0]['ExtendedStatistics']['p100']  / seconds_conversion
        print('Avg | P50 | P90 | P95 | P100')
        print('{:.4f} | {:.4f} | {:.4f} | {:.4f}\n'.format(side_avg, side_p50, side_p90, side_p95, side_p100))
    else:
        time.sleep(30)
        continue

    model_latency_metrics = cloudwatch.get_metric_statistics(MetricName='Invocations',
                                     Dimensions=[{'Name': 'EndpointName',
                                                  'Value': endpoint_details['endpoint_name']},
                                                 {'Name': 'VariantName',
                                                  'Value': "AllTraffic"}],
                                     Namespace="AWS/SageMaker",
                                     StartTime=cw_start,
                                     EndTime=cw_end,
                                     Period=period,
                                     Statistics=statistics,
                                     ExtendedStatistics=extended
                                     )
  
    if len(model_latency_metrics['Datapoints']) > 0:
        print('Invocations: {} \n'.format(model_latency_metrics['Datapoints'][0]['Sum']))
    else:
        time.sleep(30)
        continue

    model_latency_metrics = cloudwatch.get_metric_statistics(MetricName='CPUUtilization',
                                     Dimensions=[{'Name': 'EndpointName',
                                                  'Value': endpoint_details['endpoint_name']},
                                                 {'Name': 'VariantName',
                                                  'Value': "AllTraffic"}],
                                     Namespace="/aws/sagemaker/Endpoints",
                                     StartTime=cw_start,
                                     EndTime=cw_end,
                                     Period=period,
                                     Statistics=statistics,
                                     ExtendedStatistics=extended
                                     )
    
    if len(model_latency_metrics['Datapoints']) > 0:
        print('CPUUtilization: {} data points (adjusted for cores)'.format(model_latency_metrics['Datapoints'][0]['SampleCount']))
        side_avg = model_latency_metrics['Datapoints'][0]['Average'] / endpoint_details['endpoint_cores']
        side_p50 = model_latency_metrics['Datapoints'][0]['ExtendedStatistics']['p50'] / endpoint_details['endpoint_cores']
        side_p90 = model_latency_metrics['Datapoints'][0]['ExtendedStatistics']['p90'] / endpoint_details['endpoint_cores']
        side_p95 = model_latency_metrics['Datapoints'][0]['ExtendedStatistics']['p95'] / endpoint_details['endpoint_cores']
        side_p100 = model_latency_metrics['Datapoints'][0]['ExtendedStatistics']['p100'] / endpoint_details['endpoint_cores']
        print('Avg | P50 | P90 | P95 | P100')
        print('{:.4f} | {:.4f} | {:.4f} | {:.4f}\n'.format(side_avg, side_p50, side_p90, side_p95, side_p100))
    else:
        time.sleep(30)
        continue
        
    model_latency_metrics = cloudwatch.get_metric_statistics(MetricName='GPUUtilization',
                                     Dimensions=[{'Name': 'EndpointName',
                                                  'Value': endpoint_details['endpoint_name']},
                                                 {'Name': 'VariantName',
                                                  'Value': "AllTraffic"}],
                                     Namespace="/aws/sagemaker/Endpoints",
                                     StartTime=cw_start,
                                     EndTime=cw_end,
                                     Period=period,
                                     Statistics=statistics,
                                     ExtendedStatistics=extended
                                     )
    
    if len(model_latency_metrics['Datapoints']) > 0:
        print('GPUUtilization: {} data points (adjusted for cores)'.format(model_latency_metrics['Datapoints'][0]['SampleCount']))
        side_avg = model_latency_metrics['Datapoints'][0]['Average'] #/ endpoint_details['endpoint_cores']
        side_p50 = model_latency_metrics['Datapoints'][0]['ExtendedStatistics']['p50'] #/ endpoint_details['endpoint_cores']
        side_p90 = model_latency_metrics['Datapoints'][0]['ExtendedStatistics']['p90'] #/ endpoint_details['endpoint_cores']
        side_p95 = model_latency_metrics['Datapoints'][0]['ExtendedStatistics']['p95'] #/ endpoint_details['endpoint_cores']
        side_p100 = model_latency_metrics['Datapoints'][0]['ExtendedStatistics']['p100'] #/ endpoint_details['endpoint_cores']
        print('Avg | P50 | P90 | P95 | P100')
        print('{:.4f} | {:.4f} | {:.4f} | {:.4f}\n'.format(side_avg, side_p50, side_p90, side_p95, side_p100))
    else:
        time.sleep(30)
        continue