# Introduction

In this workshop, we will go through the steps of training and deploying a model and then training and testing a possible replacement model using the SageMaker Shadow Test feature. We'll do this entirely in code, making use of the <a href="https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker/client/create_inference_experiment.html">SageMaker API</a>. The models will analyze and classify network traffic.  

## Contents

1) [Setup](#setup)
2) [Basic Training](#basic_training)
3) [Register the Models](#register)
4) [Create Endpoint Config](#create_endpoint)
5) [Deploy and Predict](#deploy)
6) [Create a Shadow Test](#shadow)
7) [Evaluate](#eval)

For training our model we will be using datasets <a href="https://registry.opendata.aws/cse-cic-ids2018/">CSE-CIC-IDS2018</a> by CIC and ISCX which are used for security testing and malware prevention.
These datasets include a huge amount of raw network traffic logs, plus pre-processed data where network connections have been reconstructed and  relevant features have been extracted using CICFlowMeter, a tool that outputs network connection features as CSV files. Each record is classified as benign traffic, or it can be malicious traffic, with a total number of 15 classes.

Starting from this featurized dataset, we have executed additional pre-processing for the purpose of this lab:
<ul>
    <li>Encoded class labels</li>
    <li>Replaced invalid string attribute values generated by CICFlowMeter (e.g. inf and Infinity)</li>
    <li>Executed one hot encoding of discrete attributes</li>
    <li>Remove invalid headers logged multiple times in the same CSV file</li>
    <li>Reduced the size of the featurized dataset to ~1.3GB (from ~6.3GB) to speed-up training, while making sure that all classes are well represented</li>
    <li>Executed stratified random split of the dataset into training (80%) and validation (20%) sets</li>
</ul>

Class are represented and have been encoded as follows (train + validation):


| Label                    | Encoded | N. records |
|:-------------------------|:-------:|-----------:|
| Benign                   |    0    |    1000000 |
| Bot                      |    1    |     200000 |
| DoS attacks-GoldenEye    |    2    |      40000 |
| DoS attacks-Slowloris    |    3    |      10000 |
| DDoS attacks-LOIC-HTTP   |    4    |     300000 |
| Infilteration            |    5    |     150000 |
| DDOS attack-LOIC-UDP     |    6    |       1730 |
| DDOS attack-HOIC         |    7    |     300000 |
| Brute Force -Web         |    8    |        611 |
| Brute Force -XSS         |    9    |        230 |
| SQL Injection            |   10    |         87 |
| DoS attacks-SlowHTTPTest |   11    |     100000 |
| DoS attacks-Hulk         |   12    |     250000 |
| FTP-BruteForce           |   13    |     150000 |
| SSH-Bruteforce           |   14    |     150000 |       

The final pre-processed dataset has been saved to a public Amazon S3 bucket for your convenience, and will represent the inputs to the training processes.
<a id='setup'></a>
### Let's get started!

First, we set some variables, including the AWS region we are working in, the IAM (Identity and Access Management) execution role of the notebook instance and the Amazon S3 bucket where we will store data, models, outputs, etc. We will use the Amazon SageMaker default bucket for the selected AWS region, and then define a key prefix to make sure all objects have share the same prefix for easier discoverability.

In [1]:
%pip install jsonlines --quiet
%pip install sagemaker --upgrade --quiet

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
import time
import glob
import json
import jsonlines
import base64
import io

import boto3
import sagemaker
from sagemaker.model_monitor import DataCaptureConfig

import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, f1_score
from sklearn.model_selection import train_test_split
from IPython.display import display, clear_output

pd.options.display.max_columns = 100

region = boto3.Session().region_name
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session()
bucket_name = sagemaker.Session().default_bucket()
prefix = 'shadow-test'
os.environ["AWS_REGION"] = region
sm_client = boto3.Session().client('sagemaker')

print(f'REGION:  {region}')
print(f'ROLE:    {role}')
print(f'BUCKET:  {bucket_name}')

# These are the clasifications that have been encoded as ints, we'll use these for analysis
class_list = ['Benign','Bot','DoS attacks-GoldenEye','DoS attacks-Slowloris','DDoS attacks-LOIC-HTTP','Infilteration','DDOS attack-LOIC-UDP','DDOS attack-HOIC','Brute Force-Web','Brute Force-XSS','SQL Injection','DoS attacks-SlowHTTPTest','DoS attacks-Hulk','FTP-BruteForce','SSH-Bruteforce']

REGION:  us-east-1
ROLE:    arn:aws:iam::278578987671:role/SageMaker-IoTRole
BUCKET:  sagemaker-us-east-1-278578987671


In [107]:
sm_client.delete_model(ModelName =model_name1)
sm_client.delete_model(ModelName =model_name2)
sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config)
sm_client.delete_endpoint(EndpointName=endpoint_name)

{'ResponseMetadata': {'RequestId': 'f49454bb-6bf0-445c-9e47-102c17c9e069',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'f49454bb-6bf0-445c-9e47-102c17c9e069',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'date': 'Fri, 31 Mar 2023 18:49:48 GMT'},
  'RetryAttempts': 0}}

In [108]:
from sagemaker import image_uris
model1_uri = 's3://sagemaker-us-east-1-278578987671/xgboost-webtraffic/output/hgbc-scikit-2023-03-31-15-00-34-254/output/model.tar.gz'
model2_uri = 's3://sagemaker-us-east-1-278578987671/xgboost-webtraffic/output/rf-scikit-2023-03-31-16-09-48-329/output/model.tar.gz'
image_uri = '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:1.0-1-cpu-py3'
#image_uris.retrieve(framework='sklearn',region='us-east-1',version='0.23-1',image_scope='inference')
data_capture_s3 = f's3://{bucket_name}/{prefix}/datacapture_test/'


model_name1 = "PROD-HGB"
model_name2 = "SHADOW-RF"

print(f"Prod model name: {model_name1}")
print(f"Shadow model name: {model_name2}")

resp = sm_client.create_model(
    ModelName=model_name1,
    ExecutionRoleArn=role,
    PrimaryContainer={
                      "Image": image_uri,
                      "Mode": "SingleModel",
                      "ModelDataUrl": model1_uri,
                      "Environment": {
                          "SAGEMAKER_CONTAINER_LOG_LEVEL": "20",
                          "SAGEMAKER_SUBMIT_DIRECTORY":"s3://sagemaker-us-east-1-278578987671/hgbc-scikit-2023-03-31-15-00-34-254/source/sourcedir.tar.gz",
                          "SAGEMAKER_PROGRAM":"histgradientboost.py",
                      },
                     }
)

resp = sm_client.create_model(
    ModelName=model_name2,
    ExecutionRoleArn=role,
    PrimaryContainer={
                      "Image": image_uri,
                      "Mode": "SingleModel",
                      "ModelDataUrl": model2_uri,
                      "Environment": {
                          "SAGEMAKER_CONTAINER_LOG_LEVEL": "20",
                          "SAGEMAKER_SUBMIT_DIRECTORY": "s3://sagemaker-us-east-1-278578987671/rf-scikit-2023-03-31-16-09-48-329/source/sourcedir.tar.gz",
                          "SAGEMAKER_PROGRAM": "randomforest.py",
                      },
                     }
)

Prod model name: PROD-HGB
Shadow model name: SHADOW-RF


In [109]:
#deploy the endpoint

endpoint_name='shadowtest-endpoint'
hosting_image = image_uris.retrieve(region=boto3.Session().region_name, framework="image-classification")
# create sagemaker model

print ("create_model API response", create_model_api_response)
print()

endpoint_config='shadowtest-endpoint-config'

# create sagemaker endpoint config
create_endpoint_config_api_response = sm_client.create_endpoint_config(
                                            EndpointConfigName=endpoint_config,
                                            ProductionVariants=[
                                                {
                                                    'VariantName': 'base-variant',
                                                    'ModelName': model_name1,
                                                    'InitialInstanceCount': 3,
                                                    'InstanceType': 'ml.m5.2xlarge',
                                                },
                                            ],
                                            DataCaptureConfig={
                                                'EnableCapture': True,
                                                'InitialSamplingPercentage': 100,
                                                "CaptureOptions": [ 
                                                     { 
                                                        "CaptureMode": "Output"
                                                     }
                                                  ],
                                                'DestinationS3Uri': data_capture_s3
                                            },
                                       )

print ("create_endpoint_config API response", create_endpoint_config_api_response)
print()

# create sagemaker endpoint
create_endpoint_api_response = sm_client.create_endpoint(
                                    EndpointName=endpoint_name,
                                    EndpointConfigName=endpoint_config,
                                )

print ("create_endpoint API response", create_endpoint_api_response)          

create_model API response {'ModelArn': 'arn:aws:sagemaker:us-east-1:278578987671:model/shadow-rf', 'ResponseMetadata': {'RequestId': 'eb1896da-f0a6-44d8-8160-8c63dfff2f54', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'eb1896da-f0a6-44d8-8160-8c63dfff2f54', 'content-type': 'application/x-amz-json-1.1', 'content-length': '71', 'date': 'Wed, 29 Mar 2023 23:17:51 GMT'}, 'RetryAttempts': 0}}

create_endpoint_config API response {'EndpointConfigArn': 'arn:aws:sagemaker:us-east-1:278578987671:endpoint-config/shadowtest-endpoint-config', 'ResponseMetadata': {'RequestId': '1f983672-ef83-4627-9f06-1399ccf85d7c', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '1f983672-ef83-4627-9f06-1399ccf85d7c', 'content-type': 'application/x-amz-json-1.1', 'content-length': '107', 'date': 'Fri, 31 Mar 2023 18:49:57 GMT'}, 'RetryAttempts': 0}}

create_endpoint API response {'EndpointArn': 'arn:aws:sagemaker:us-east-1:278578987671:endpoint/shadowtest-endpoint', 'ResponseMetadata': {'R

In [110]:
sm_client.describe_endpoint(EndpointName = endpoint_name)

{'EndpointName': 'shadowtest-endpoint',
 'EndpointArn': 'arn:aws:sagemaker:us-east-1:278578987671:endpoint/shadowtest-endpoint',
 'EndpointConfigName': 'shadowtest-endpoint-config',
 'DataCaptureConfig': {'EnableCapture': True,
  'CaptureStatus': 'Started',
  'CurrentSamplingPercentage': 100,
  'DestinationS3Uri': 's3://sagemaker-us-east-1-278578987671/shadow-test/datacapture_test/'},
 'EndpointStatus': 'Creating',
 'CreationTime': datetime.datetime(2023, 3, 31, 18, 49, 58, 452000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 3, 31, 18, 49, 58, 904000, tzinfo=tzlocal()),
 'ResponseMetadata': {'RequestId': '63c0bb70-49be-4c58-8252-07d8d73e436f',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '63c0bb70-49be-4c58-8252-07d8d73e436f',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '460',
   'date': 'Fri, 31 Mar 2023 18:50:01 GMT'},
  'RetryAttempts': 0}}

In [113]:
from time import sleep
def wait_until_endpoint_started(endpoint_name):
    print(f'Waiting on endpoint: {endpoint_name}')
    done = False
    while not done:
        description = sm_client.describe_endpoint(EndpointName=endpoint_name)
        status = description["EndpointStatus"].lower()
        print(f'Status: {status}')
        if status == 'failed' or status == 'cancelled':
            print("Failure detected. Exiting Loop.")
            print(shadowtestdescribe)
            return
        elif status == 'inservice':
            print("endpoint is running! Exiting Loop.")
            return
        sleep(60)

In [114]:
wait_until_endpoint_started(endpoint_name)

Waiting on endpoint: shadowtest-endpoint
Status: inservice
endpoint is running! Exiting Loop.


<a id='shadow'></a>
# Create a Shadow Test 

## Create a Shadow Test using an Existing Endpoint

Now we will create a shadow test using the existing production endpoint.  We will pass the validation data we set aside earlier to the endpoint during this test and stop this test using the API later in the notebook.  Note that we could also specify the test start and stop time when we create the inference experiements.  If we don't provide the start and end times, then the experiment starts immediately and concludes after 7 days.  We are using an existing production endpoint for this test.  SageMaker will update that endpoint with the new model variants.  The production endpoint will also update the inference compute instance type for the production variant if needed. 


In [115]:
sm_client.delete_inference_experiment(Name=shadowtestname)

{'InferenceExperimentArn': 'arn:aws:sagemaker:us-east-1:278578987671:inference-experiment/shadowinferencetestexistingep',
 'ResponseMetadata': {'RequestId': '2dc848f4-aac6-4eb1-81bc-933915b10435',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '2dc848f4-aac6-4eb1-81bc-933915b10435',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '120',
   'date': 'Fri, 31 Mar 2023 18:56:44 GMT'},
  'RetryAttempts': 0}}

In [116]:
shadowtestname = 'ShadowInferenceTestExistingEP'
infexperimentarn = sm_client.create_inference_experiment(
    Name=shadowtestname,
    Type='ShadowMode',
    Description='Shadow inference test created via boto3 python API using an existing EP',
    RoleArn=role,
    EndpointName=endpoint_name,
    ModelVariants=[
        {
            'ModelName': model_name1,
            'VariantName': 'AllTraffic',
            'InfrastructureConfig': {
                'InfrastructureType':'RealTimeInference',
                'RealTimeInferenceConfig': {
                    'InstanceType': 'ml.m5.2xlarge',
                    'InstanceCount': 3 
                }
            }
        },
        
        {
            'ModelName': model_name2,
            'VariantName': 'Shadow-01',
            'InfrastructureConfig': {
                'InfrastructureType':'RealTimeInference',
                'RealTimeInferenceConfig': {
                    'InstanceType': 'ml.m5.2xlarge',
                    'InstanceCount': 3 
                }
            }
        },
    ],
    DataStorageConfig={
        'Destination':data_capture_s3,
    },
    ShadowModeConfig={
        'SourceModelVariantName': 'AllTraffic',
        'ShadowModelVariants': [
            {
                'ShadowModelVariantName': 'Shadow-01',
                'SamplingPercentage': 100
            },
        ]
    },
)   


In [117]:
shadowtestdescribe = sm_client.describe_inference_experiment(Name=shadowtestname)
shadowtestdescribe

{'Arn': 'arn:aws:sagemaker:us-east-1:278578987671:inference-experiment/shadowinferencetestexistingep',
 'Name': 'ShadowInferenceTestExistingEP',
 'Type': 'ShadowMode',
 'Schedule': {'StartTime': datetime.datetime(2023, 3, 31, 18, 56, 48, 731000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 4, 7, 18, 56, 48, 731000, tzinfo=tzlocal())},
 'Status': 'Creating',
 'Description': 'Shadow inference test created via boto3 python API using an existing EP',
 'CreationTime': datetime.datetime(2023, 3, 31, 18, 56, 48, 169000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 3, 31, 18, 56, 48, 599000, tzinfo=tzlocal()),
 'RoleArn': 'arn:aws:iam::278578987671:role/SageMaker-IoTRole',
 'EndpointMetadata': {'EndpointName': 'shadowtest-endpoint'},
 'ModelVariants': [{'ModelName': 'PROD-HGB',
   'VariantName': 'AllTraffic',
   'InfrastructureConfig': {'InfrastructureType': 'RealTimeInference',
    'RealTimeInferenceConfig': {'InstanceType': 'ml.m5.2xlarge',
     'InstanceCount':

In [118]:
from time import sleep
def wait_until_test_complete(test_name):
    print(f'Waiting on shadow test: {test_name}')
    done = False
    while not done:
        shadowtestdescribe = sm_client.describe_inference_experiment(Name=shadowtestname)
        status = shadowtestdescribe["Status"].lower()
        print(f'Status: {status}')
        if status == 'failed' or status == 'cancelled':
            print("Failure detected. Exiting Loop.")
            print(shadowtestdescribe)
            return
        elif shadowtestdescribe["Status"].lower() == 'running':
            print("Shadow test is running! Exiting Loop.")
            return
        sleep(60)

In [119]:
wait_until_test_complete(shadowtestname)

Waiting on shadow test: ShadowInferenceTestExistingEP
Status: creating
Status: creating
Status: creating
Status: running
Shadow test is running! Exiting Loop.


## Simulate Production Traffic

We will now simulate the production traffic.  We will loop over the production data.  In a real production use case you won't need to do this since actual production data will be flowing to the production endpoint.  Since our shadow test is now active the production variant and the shadow variant will recieve the inference input.  Only the production output will be supplied in the response, however, since we have configured the test to capture data we will record both the production and shadow variant responses in s3.  


In [122]:
holdout = pd.read_csv('./data/holdout.csv')

In [131]:
runtimeclient = boto3.client('sagemaker-runtime')

In [None]:
%%time  
# this should take ~ 2 minutes to complete
indexes = []
actuals = []
i = 0
for index, row in holdout.iterrows():
    vals = row.to_numpy()
    prediction = runtimeclient.invoke_endpoint(EndpointName=endpoint_name, ContentType='application/text', Body=vals[1::])
    actuals.append(vals[0])
    indexes.append(index)
    
    i+=1
    if i%1000 == 0:
        print(i)

In [None]:
storage = shadowtestdescribe['DataStorageConfig']['Destination']+predictor.endpoint_name +'/'
storage

In [None]:
!aws s3 ls {storage}

In [None]:
!aws s3 cp {storage} ./data/datacapture/  --recursive

In [None]:
shadowfiles = glob.glob('./data/datacapture/Shadow-01/**/*.jsonl',recursive=True)
prodfiles = glob.glob('./data/datacapture/AllTraffic/**/*.jsonl',recursive=True)

In [None]:
shadowin = []
shadowout = []
shadowid = []

for f in shadowfiles:
    print(f)
    with jsonlines.open(f) as reader:
        for obj in reader:
            
            try:
                infid = obj['eventMetadata']['inferenceId'].split(' ')
                shadowid.append(int(infid[-1]))

                # input to model
                model_input = base64.b64decode(obj['captureData']['endpointInput']['data'])
                shadowin.append(np.load(io.BytesIO(model_input))[0].tolist())

                # output from model
                model_output = base64.b64decode(obj['captureData']['endpointOutput']['data'])
                shadowout.append(np.load(io.BytesIO(model_output))[0])
            except:
                pass       

In [None]:
shadowdf = pd.DataFrame(data=shadowout,index=shadowid,columns=['Shadow'])
shadowdf

In [None]:
shadowdf['Shadow'] = pd.to_numeric(shadowdf['Shadow'])
shadowdf['Shadow'] = shadowdf['Shadow'].astype(int)
shadowdf = pd.merge(shadowdf,holdout['Target'],left_index=True,right_index=True)
acc = accuracy_score(shadowdf['Target'],shadowdf['Shadow'])
wf1 = f1_score(shadowdf['Target'],shadowdf['Shadow'],average='weighted')
print(acc, wf1)

In [None]:
print(classification_report(shadowdf['Target'],shadowdf['Shadow']))

In [None]:
fig, ax = plt.subplots(figsize=(10,8))
cm = confusion_matrix(shadowdf['Target'],shadowdf['Shadow'])
normalized_cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
clist = [class_list[i] for i in np.sort(shadowdf['Target'].unique())]
sns.heatmap(normalized_cm, ax=ax, annot=cm, fmt='',xticklabels=clist,yticklabels=clist)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Shadow Endpoint Confustion Matrix')
plt.show()

In [None]:
%%time

prodin = []
prodout = []
prodid = []

for f in prodfiles:
    print(f)
    with jsonlines.open(f) as reader:
        for obj in reader:
            try:               
                infid = obj['eventMetadata']['inferenceId'].split(' ')
                prodid.append(int(infid[-1]))

                # input to model
                model_input = base64.b64decode(obj['captureData']['endpointInput']['data'])
                prodin.append(np.load(io.BytesIO(model_input))[0].tolist())

                # output from model
                model_output = base64.b64decode(obj['captureData']['endpointOutput']['data'])
                prodout.append(np.load(io.BytesIO(model_output))[0])
                
            except:
                pass

In [None]:
proddf = pd.DataFrame(data=prodout,index=prodid,columns=['Prod'])
proddf

In [None]:
# Line up our production model predictions with the true value based on the index
proddf = pd.merge(proddf,holdout['Target'],left_index=True,right_index=True)

In [None]:
acc = accuracy_score(proddf['Target'],proddf['Prod'])
wf1 = f1_score(proddf['Target'],proddf['Prod'],average='weighted')
print(acc, wf1)

In [None]:
print(classification_report(proddf['Target'],proddf['Prod']))

In [None]:
fig, ax = plt.subplots(figsize=(10,8))
cm = confusion_matrix(proddf['Target'],proddf['Prod'])
normalized_cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
sns.heatmap(normalized_cm, ax=ax, annot=cm, fmt='',xticklabels=class_list,yticklabels=class_list)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Shadow Endpoint Confustion Matrix')
plt.show()

In [None]:
sm_client.stop_inference_experiment(
    Name=shadowtestname,
    ModelVariantActions={
        'Shadow-01': 'Promote',
        'AllTraffic': 'Remove'
    },
    DesiredState='Completed',
    Reason='Shadow variant performed better in validation'
)

In [None]:
# Here we show that the shadow model is now deployed to production
sm_client.describe_endpoint(EndpointName = predictor.endpoint_name)

## Clean Up

In [161]:
def wait_until_complete(test_name):
    print(f'Waiting on shadow test: {test_name}')
    done = False
    while not done:
        shadowtestdescribe = sm_client.describe_inference_experiment(Name=shadowtestname)
        status = shadowtestdescribe["Status"].lower()
        print(f'Status: {status}')
        if status == "completed":
            print("Shadow test is stopped, ok to delete. Exiting Loop.")
            return
        sleep(60)

In [None]:
wait_until_complete(shadowtestname)

In [None]:
#predictor.delete_endpoint()
sm_client.delete_inference_experiment(
    Name=shadowtestname
)
sm_client.delete_endpoint(EndpointName=predictor.endpoint_name)

# References

* A Realistic Cyber Defense Dataset (CSE-CIC-IDS2018) - https://registry.opendata.aws/cse-cic-ids2018/
* AIM362 - Re:Invent 2019 SageMaker Debugger and Model Monitor - https://github.com/aws-samples/reinvent2019-aim362-sagemaker-debugger-model-monitor