# Clustering data from harvesting sample @ MLAI Training

This experiment explores transactional datasets looking for clusters of session behavior. It uses the same bag-of-transactions approach, but uses data with ordinary proportions of known harvesting activity, rather than "salting" the sample with harvesting activity to support classification learning.

## Training Data
The training dataset consists of several hours of raw transaction logs containing activity from all users. Each sample does contain at least one Liquid Tension session by choice, but is otherwise unmanipulated. 
All LiquidTension(LT) activity is labeled as 'BadActor' = 1, while all other traffic is assumed to be innocent and labeled as 'BadActor' = 0. We use this only to determine whether LT activity appears in a cluster by itself, with other harvester sessions, or is not successfully isolated.

The training set includes the following files:

|File       |Contents                             |Rows|
------------|-------------------------------------|----|
|may1.tsv|raw transactions|119474|
|may2.tsv|raw transactions|43608|
|may3.tsv|raw transactions|30844|

In all, we have 193393 transactions from raw samples, of which 533 are known LT.


## Warm up
Import standard libraries and prepare the environment.

In [2]:
import io
import os
import matplotlib.pyplot as plt
import numpy as np 
import pandas as pd 

import boto3
import sagemaker
from sagemaker import get_execution_role

%matplotlib inline

!mkdir data

mkdir: cannot create directory ‘data’: File exists


In [3]:
# sagemaker session, role
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()

# S3 bucket name
bucket = sagemaker_session.default_bucket()



## Download
Retrieve the datafiles from the project's designated S3 bucket.

In [7]:
s3 = boto3.resource('s3')
b = s3.Bucket('sagemaker-mlai-harvesting')

# b.download_file( 'data/MLAI_ParsedDataSet.tsv', 'data/data.tsv')
b.download_file( "data/MinimalLogs/Minimal_May01.rpt", 'data/may1.tsv')
b.download_file( "data/MinimalLogs/Minimal_May02.rpt", 'data/may2.tsv')
b.download_file( "data/MinimalLogs/Minimal_May03.rpt", 'data/may3.tsv')


may1 = pd.read_csv('data/may1.tsv',sep='\t')
may2 = pd.read_csv('data/may2.tsv',sep='\t')
may3 = pd.read_csv('data/may3.tsv',sep='\t')

bad_col='BadActor'
sess_col='SessionNo'
txn_col='Act'

txn = may1.append([may2, may3])
len(txn[txn[bad_col]==1]), len(txn[txn[bad_col]==0])

(533, 193393)

## Data conversion and feature engineering
In real life, a session consists of a series of rows of transactions of different types, and each transaction type records a variable number of additional metadata attributes describing a logged event, for a total of over 30 columns of extracted data. In addition, our tagging process has given each row a BadActor label.

|sessionno|txn id|BadActor|parm1|parm2|...|
|---------|------|--------|-----|-----|---|
|1240|111|0|query string|...|...|
|1240|112|0|meta|...|...|
|2993|301|1|meta|...|...|


In [9]:
# 'Innocent' log entries
txns = pd.DataFrame(np.sort(txn['Act'].unique()))




We drop most of this information, including the temporal sequence of the log entries, and convert each session into a single row of data. Almost all of the columns go away, replaced by counts of transaction types in the session.

|sessionno|BadActor|111|112|113|...|301|302|...|
|---------|--------|---|---|---|---|---|---|---|
|1240|0|1|1|0|...|0|0|...|
|2993|1|0|0|0|...|1|0|...|

In [10]:
def flatten_txns( txn_log ):
    txn_narrow = txn_log[[sess_col, txn_col,bad_col]]
    txn_pivot = pd.pivot_table(txn_narrow, index=[sess_col,bad_col], columns = [txn_col],aggfunc=[np.size]).fillna(0)
    txn_pivot.columns = txn_pivot.columns.droplevel(0)           # the pivot table has a two-level index
    txn_flat = txn_pivot.rename_axis(None, axis=1).reset_index() # these two lines get rid of it so we have a simple table
    return txn_flat

In [19]:
flatten_txns( txn ).head(10).set_index('SessionNo')


Unnamed: 0_level_0,BadActor,111,112,114,115,116,117,118,119,121,...,402,403,406,407,410,411,511,513,601,607
SessionNo,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
-2147481927,0,3.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
-2147317281,0,3.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,9.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
-2147002735,0,3.0,0.0,0.0,6.0,3.0,0.0,0.0,0.0,6.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
-2146953899,0,0.0,3.0,0.0,60.0,0.0,0.0,0.0,0.0,48.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
-2146926264,0,3.0,0.0,0.0,3.0,0.0,3.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
-2146915841,0,3.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
-2146723372,0,3.0,0.0,0.0,0.0,0.0,3.0,0.0,0.0,3.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
-2146089473,0,3.0,0.0,0.0,3.0,3.0,3.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
-2145757832,0,3.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
-2145260428,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [21]:
flat = flatten_txns( txn )
flat.set_index('SessionNo')

Unnamed: 0_level_0,BadActor,111,112,114,115,116,117,118,119,121,...,402,403,406,407,410,411,511,513,601,607
SessionNo,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
-2147481927,0,3.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
-2147317281,0,3.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,9.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
-2147002735,0,3.0,0.0,0.0,6.0,3.0,0.0,0.0,0.0,6.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
-2146953899,0,0.0,3.0,0.0,60.0,0.0,0.0,0.0,0.0,48.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
-2146926264,0,3.0,0.0,0.0,3.0,0.0,3.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
-2146915841,0,3.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
-2146723372,0,3.0,0.0,0.0,0.0,0.0,3.0,0.0,0.0,3.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
-2146089473,0,3.0,0.0,0.0,3.0,3.0,3.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
-2145757832,0,3.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
-2145260428,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


## Producing pools of training and testing data

We will divide the combined good and bad data pools as follows:
- a training set that the model iterates over during the learning process
- a test set that is used to evaluate the model during training
- a validation set that is kept separate to test the model after training is complete. We need separate test and validate pools in order to make sure that we're overfitting the model to a single set of test data.

In [13]:
def split_frame( df, train_frac ):
    l = len(df)
    test_frac = (1-train_frac)/2
    tr = int(train_frac * l)
    te = int(tr + test_frac * l)
    
    train = df[:tr]
    test = df[tr:te]
    val = df[te:]
    return [train, test, val]

In [14]:
def train_split( flat, bad_split=.6 ):
    bad = flat[flat[bad_col]==1]
    good = flat[flat[bad_col]==0]
    
    bads = split_frame(bad, bad_split)
    goods = split_frame(good, bad_split)
    
    dfs = []
    for i in range(3):
        # Dropping the session # because we don't want to train on it.
        # Also leaves our label - BadActor - in the 0 column, as XGBoost requires for CSV
        df = bads[i].append(goods[i]).drop(sess_col,axis=1).sample(frac=1)
        dfs.append( df )
    
    return dfs
    


# Split the data and upload to S3
Break the set into train, test, and validation collections and output CSV's.
As Sagemaker requires, leave out row indices and column headers.

In [18]:
dfs = train_split(flat, .8)

!mkdir cluster

data_path = "cluster"

s3_client = boto3.client('s3')
bucket = "sagemaker-mlai-harvesting"

for i, df in enumerate(dfs):
    files = ["train","test","validate"]
    file = "{}/{}.csv".format(data_path, files[i])
    df.to_csv(path_or_buf= file, header=False, index=False  )

    print("Uploading {} to {}".format(file, bucket))

    response = s3_client.upload_file(file, bucket, file)
    print(response)
    
    
    


mkdir: cannot create directory ‘cluster’: File exists
Uploading cluster/train.csv to sagemaker-mlai-harvesting
None
Uploading cluster/test.csv to sagemaker-mlai-harvesting
None
Uploading cluster/validate.csv to sagemaker-mlai-harvesting
None


# Prepare and train a model
Boilerplate code mostly copied from Amazon sample code at https://github.com/awslabs/amazon-sagemaker-examples/blob/master/introduction_to_amazon_algorithms/xgboost_abalone/xgboost_abalone.ipynb, with ample room for improvement.

In [472]:
%%time
region = 'us-east-1'
from sagemaker.amazon.amazon_estimator import get_image_uri
container = get_image_uri(region, 'xgboost')


from time import gmtime, strftime

job_name = 'harvesting-xgboost-binary-classification' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print("Training job", job_name)

#Ensure that the training and validation data folders generated above are reflected in the "InputDataConfig" parameter below.

create_training_params = \
{
    "AlgorithmSpecification": {
        "TrainingImage": container,
        "TrainingInputMode": "File"
    },
    "RoleArn": role,
    "OutputDataConfig": {
        "S3OutputPath": os.path.join("s3://",bucket, "out", "xgb-class") 
    },
    "ResourceConfig": {
        "InstanceCount": 1,
        "InstanceType": "ml.m4.4xlarge",
        "VolumeSizeInGB": 5
    },
    "TrainingJobName": job_name,
    "HyperParameters": {
        "max_depth":"5",
        "eta":"0.2",
        "gamma":"4",
        "min_child_weight":"6",
        "subsample":"0.7",
        "silent":"0",
        "objective":"binary:logistic",
        "num_round":"50"
    },
    "StoppingCondition": {
        "MaxRuntimeInSeconds": 3600
    },
    "InputDataConfig": [
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": os.path.join( "s3://", bucket, "out" ), 
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
            "ContentType": "text/csv",
            "CompressionType": "None"
        },
        {
            "ChannelName": "validation",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": os.path.join( "s3://", bucket, "out" ),
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
            "ContentType": "text/csv",
            "CompressionType": "None"
        }
    ]
}


client = boto3.client('sagemaker', region_name=region)
client.create_training_job(**create_training_params)

import time

status = client.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
print(status)
while status !='Completed' and status!='Failed':
    time.sleep(60)
    status = client.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
    print(status)

Training job harvesting-xgboost-binary-classification2019-06-12-17-23-43
InProgress
InProgress
InProgress
Completed
CPU times: user 163 ms, sys: 0 ns, total: 163 ms
Wall time: 3min


In [474]:
%%time
import boto3
from time import gmtime, strftime

model_name="harvesting-xgboost-binary-cl-2019-06-12-17-23-43"+ '-model'
print(model_name)

info = client.describe_training_job(TrainingJobName=job_name)
model_data = info['ModelArtifacts']['S3ModelArtifacts']
print(model_data)

primary_container = {
    'Image': container,
    'ModelDataUrl': model_data
}

create_model_response = client.create_model(
    ModelName = model_name,
    ExecutionRoleArn = role,
    PrimaryContainer = primary_container)

print(create_model_response['ModelArn'])

harvesting-xgboost-binary-cl-2019-06-12-17-23-43-model
s3://sagemaker-mlai-harvesting/out/xgb-class/harvesting-xgboost-binary-classification2019-06-12-17-23-43/output/model.tar.gz
arn:aws:sagemaker:us-east-1:872344130825:model/harvesting-xgboost-binary-cl-2019-06-12-17-23-43-model
CPU times: user 15 ms, sys: 4.03 ms, total: 19.1 ms
Wall time: 335 ms


In [475]:
from time import gmtime, strftime

endpoint_config_name = 'Harvest-XGBoostEndpointConfig-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_config_name)
create_endpoint_config_response = client.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[{
        'InstanceType':'ml.m4.xlarge',
        'InitialVariantWeight':1,
        'InitialInstanceCount':1,
        'ModelName':model_name,
        'VariantName':'AllTraffic'}])

print("Endpoint Config Arn: " + create_endpoint_config_response['EndpointConfigArn'])


Harvest-XGBoostEndpointConfig-2019-06-12-17-30-04
Endpoint Config Arn: arn:aws:sagemaker:us-east-1:872344130825:endpoint-config/harvest-xgboostendpointconfig-2019-06-12-17-30-04


# Launch an endpoint

In [476]:
%%time
import time

endpoint_name = 'Harvest-XGBoostEndpoint-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_name)
create_endpoint_response = client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name)
print(create_endpoint_response['EndpointArn'])

resp = client.describe_endpoint(EndpointName=endpoint_name)
status = resp['EndpointStatus']
print("Status: " + status)

while status=='Creating':
    time.sleep(60)
    resp = client.describe_endpoint(EndpointName=endpoint_name)
    status = resp['EndpointStatus']
    print("Status: " + status)

print("Arn: " + resp['EndpointArn'])
print("Status: " + status)



Harvest-XGBoostEndpoint-2019-06-12-17-30-34
arn:aws:sagemaker:us-east-1:872344130825:endpoint/harvest-xgboostendpoint-2019-06-12-17-30-34
Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: InService
Arn: arn:aws:sagemaker:us-east-1:872344130825:endpoint/harvest-xgboostendpoint-2019-06-12-17-30-34
Status: InService
CPU times: user 168 ms, sys: 5.78 ms, total: 174 ms
Wall time: 9min 1s


# Test the model
Currently, we launch an endpoint to test the model. This endpoint includes a simple web service that takes POST request with rows of or model's X values - columns other than BadActor - and returns a corresponding list of Y values - BadActor predictions.

The endpoint approach is most suitable to interactive use, such as possibly using the model to blacklist a harvesting session as soon as it is identified. For offline analysis, this should be reconfigured to run batch transform jobs instead, which are cheaper to run and more streamlined to invoke.

In [488]:
runtime_client = boto3.client('runtime.sagemaker', region_name=region)

import json
from itertools import islice
import math
import struct

!head -10000 out/test.csv > out/single-test.csv

file_name = 'out/single-test.csv' #customize to your test file

csv = pd.read_csv(file_name, header=None)
csv.columns
label = csv[0]
csv = csv.drop(0,axis=1)

single = "out/single.csv"

csv.to_csv(path_or_buf=single, header=False, index=False)

with open(single, 'r') as f:
    payload = f.read().strip()

In [609]:
response = runtime_client.invoke_endpoint(EndpointName=endpoint_name, 
                                   ContentType='text/csv', 
                                   Body=payload)
result = response['Body'].read()
result = result.decode("utf-8")
result = result.split(',')
result = [round(float(i)) for i in result]


# Compute the confusion metrics

A confusion matrix describes the proportions of true and false positives and negatives, together with some derived metrics.

In [613]:
comp = pd.concat( [label, pd.DataFrame(result)], axis = 1)
comp.columns =["label",'prediction']

label_positive = comp['label'] == 1
predict_positive = comp['prediction'] == 1

tp = len( comp[label_positive & predict_positive])
fp = len( comp[~label_positive & predict_positive])
tn = len( comp[~label_positive & ~predict_positive])
fn = len( comp[label_positive & ~predict_positive])

m = len(comp)

accuracy = (tp+tn)/m
precision = tp/(tp+fp)
recall = tp/(tp+fn)

print("accuracy: {} precision: {} recall {}".format(accuracy, precision,recall))

accuracy: 0.9668187474077147 precision: 1.0 recall 0.8409542743538767


In [612]:
tp,fp,tn,fn, len(comp)

(423, 0, 1908, 80, 2411)

The very first time we ran the model, we achieved strikingly successful rates of harvesting identification.
The most significant number here is the recall of 84%, meaning that we successfully identified 84% of all harvesting sessions by looking only at counts of transaction types.

This approach appears promising!

# Next steps

## Further investigating the data 

We had additional ideas for modeling the data while staying in this bag-of-transaction technique.
1. Try some hyperparameter tuning to seem if the success rates can be trivially improved.
1. Enrich the training data set in various ways - add colums to summarize total session time, average time/request, and so on.
1. Perform some clustering analysis to try to identify common patterns of behavior other than LT. This may reveal the presence of other kinds of harvesting.

## Qualifying the approach
Can we use this approach to identify and blacklist harvesting sessions as they occur? Some notes:
1. The approach must be resilient to easy efforts to evade. Does the accuracy of the identification drop if the attacker makes minor changes to his workflow?
1. How long does it take to identify an attacker in real time? 
    1. Do we gain certainty soon enough to stop an attacker before he's done what he came to do?
    2. Can we tag sessions accurately after the first N log entries, for instance?
    
## Designing an implemetation
Design an architecture for identifying and intercepting harvesting activity in real time. Confirm data sources, manage impact to usage latency, model costs and ROI.

In today's world, it would be less effective to perform real-time analysis on AWS, since all of our current content usage is on-prem. The algorithm used here, XGBoost, is performant on commodity hardware, so we may be able to run on standard VMs.

In real-time analysis, we will face a stream of events from interleaved sessions. We will have to demultiplex these into individual event streams both for training and for prediction, implying some kind of windowing to capture and send sets of log entries as partial sessions. It's not clear how big the impact of this windowing will be on the accuracy of the models.

# Other analytical techniques
While this algorithm seems promising, we're throwing away a huge amount of intelligence before we start training, in the name of simplicity. We can evaluate what kind of gains we could achieve through more advanced techniques:
- Stateful models like LSTM or CNN
- more 
