# Saturn aliens - Heuristics to detect anomalous retrieval logs
### Maria Silva, August 2022

In this notebook, we design some heuristics to detect anomalous retrievals in Saturn logs using historical data from verified logs.

To compute the thresholds, we start by using scipy's Maximum Likelihood Estimation (MLE) to fit standard ditributions to the underlying data (the best fitting distribution is the one with the lowest sum of squared error, SSE) and we take a 99.9 percentile of the fitted distribution as the threshold.

The code to pick the best fitted distribution was based on [this Stack Overflow](https://stackoverflow.com/questions/6620471/fitting-empirical-distribution-to-theoretical-ones-with-scipy-python) question.


**Note on reproducibility:**

The data used to run this notebook was queried from Saturn's internal systems. We are not able to share externally due to privacy concerns and the sensitivity of the final tuning of the heuristics.

Even though we are not sharing the final thresholds, by sharing the code, we aim to demonstrate the process and reasoning behind Saturn's logs detection system.

***


In [None]:
import os
import numpy as np
import pandas as pd
import datetime
import numpy.random as random
import scipy.stats as st
import statsmodels.api as sm
from scipy.stats._continuous_distns import _distn_names
import seaborn as sns
import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings('ignore')

## 1. Load data

In [None]:
file = os.path.abspath("../../../../Data/Saturn/bandwidth_logs_jul_05.csv")
raw_df = pd.read_csv(file)

df = raw_df.dropna()
df["start_time"] = pd.to_datetime(df["start_time"])
df["mb_sent"] = df["num_bytes_sent"]/(2**20)

df.info()

## 2. Fit distributions and set thresholds

### Auxiliary functions

In [None]:
def best_fit_distribution(raw_data, checkpoint=False):
    """Model data by finding best fit distribution to data"""
    # Compute optimal no. of bins (half the data, with a min of 30 and a max of 200)
    data_len = len(raw_data) 
    if data_len < 60:
        bins = 30
    elif data_len > 400:
        bins = 200
    else:
        bins = int(data_len / 2)
    # Sample data if it is too big
    if data_len > 10000:
        data = random.choice(raw_data, size=10000, replace=False)
    else: 
        data = raw_data
    # Get histogram of original data
    y, x = np.histogram(data, bins=bins, density=True)
    x = (x + np.roll(x, -1))[:-1] / 2.0
    # Best holders
    best_distributions = []
    # Estimate distribution parameters from data
    for ii, distribution in enumerate([d for d in _distn_names if not d in ['levy_stable', 'studentized_range']]):
        if checkpoint:
            print("{:>3} / {:<3}: {}".format(ii+1, len(_distn_names), distribution))
        distribution = getattr(st, distribution)
        # Try to fit the distribution
        try:
            # Ignore warnings from data that can't be fit
            with warnings.catch_warnings():
                warnings.filterwarnings('ignore')
                # fit dist to data
                params = distribution.fit(data, method="MLE")
                # Separate parts of parameters
                arg = params[:-2]
                loc = params[-2]
                scale = params[-1]
                # Calculate fitted PDF and error with fit in distribution
                pdf = distribution.pdf(x, loc=loc, scale=scale, *arg)
                sse = np.sum(np.power(y - pdf, 2.0))
                # identify if this distribution is better
                best_distributions.append((distribution, params, sse))
                
        except Exception:
            pass
    return sorted(best_distributions, key=lambda x:x[2])


def make_pdf(dist, params, size=1000):
    """Generate distributions's Probability Distribution Function """
    # Separate parts of parameters
    arg = params[:-2]
    loc = params[-2]
    scale = params[-1]
    # Get start and end points of distribution
    start = dist.ppf(0.01, *arg, loc=loc, scale=scale) if arg else dist.ppf(0.01, loc=loc, scale=scale)
    end = dist.ppf(0.99, *arg, loc=loc, scale=scale) if arg else dist.ppf(0.99, loc=loc, scale=scale)
    # Build PDF and turn into pandas Series
    x = np.linspace(start, end, size)
    y = dist.pdf(x, loc=loc, scale=scale, *arg)
    pdf = pd.Series(y, x)
    return pdf

def get_threshold(dist, params, percentile=0.999):
    """Get value for distribution percentile (inverse cumulative distribution function)"""
    # Separate parts of parameters
    arg = params[:-2]
    loc = params[-2]
    scale = params[-1]
    # Get percent point
    threshold = dist.ppf(percentile, *arg, loc=loc, scale=scale) if arg else dist.ppf(0.01, loc=loc, scale=scale)
    return threshold

### Heuristic 1: High request count

* **Level:** operator
* **Description:** Flag operators with a total number of requests higher than a threshold
* **Reasoning:** Operators may inject fake logs to bump their rewards. This rule catches operators that inject a massive number of requests

In [None]:
# Get variable
var_series = df.groupby("node_id")["request_id"].nunique()

# Find best fit distribution
best_distibutions = best_fit_distribution(var_series.values)
best_dist = best_distibutions[0]
print(f"SSE for best fit: {best_dist[2]}")
print(" ")

# Make PDF with best params 
pdf = make_pdf(best_dist[0], best_dist[1])

# Compute threshold
p = 0.99999
print(f"Threshold for percentile {p}: {np.round(get_threshold(best_dist[0], best_dist[1], p), 1)}")
print(" ")

# Display distribution
plt.figure(figsize=(8,5))
ax = pdf.plot(lw=2, label='PDF', legend=True)
var_series.plot(kind='hist', bins=50, density=True, alpha=0.5, label='Data', legend=True, ax=ax)
param_names = (best_dist[0].shapes + ', loc, scale').split(', ') if best_dist[0].shapes else ['loc', 'scale']
param_str = ', '.join(['{}={:0.2f}'.format(k,v) for k,v in zip(param_names, best_dist[1])])
dist_str = '{}({})'.format(best_dist[0].name, param_str)
plt.title("Distribution: Requests per node\n"+ dist_str)
plt.show()

### Heuristic 2: High bandwidth

* **Level:** operator
* **Description:** Flag operators with a bandwidth higher than a threshold
* **Reasoning:** Operators may change their logs to bump their rewards. This rule catches operators where these changes add up to an impossibly high bandwidth

In [None]:
# Get variable
var_series = df.groupby("node_id")["mb_sent"].sum()

# Find best fit distribution
best_distibutions = best_fit_distribution(var_series.values)
best_dist = best_distibutions[0]
print(f"SSE for best fit: {best_dist[2]}")
print(" ")

# Make PDF with best params 
pdf = make_pdf(best_dist[0], best_dist[1])

# Compute threshold
p = 0.99999
print(f"Threshold for percentile {p}: {np.round(get_threshold(best_dist[0], best_dist[1], p), 1)}")
print(" ")

# Display distribution
plt.figure(figsize=(8,5))
ax = pdf.plot(lw=2, label='PDF', legend=True)
var_series.plot(kind='hist', bins=50, density=True, alpha=0.5, label='Data', legend=True, ax=ax)
param_names = (best_dist[0].shapes + ', loc, scale').split(', ') if best_dist[0].shapes else ['loc', 'scale']
param_str = ', '.join(['{}={:0.2f}'.format(k,v) for k,v in zip(param_names, best_dist[1])])
dist_str = '{}({})'.format(best_dist[0].name, param_str)
plt.title("Distribution: Bandwidth per node\n"+ dist_str)
plt.show()

### Heuristic 3: Bot client (requests)

* **Level:** request
* **Description:** Flag the requests coming from a client whose total number of requests is higher than a threshold
* **Reasoning:** Operators may employ bots to bump the number of requests and gain more rewards. this rule catches clients that have too many requests for a real user

In [None]:
# Get variable
var_series = df.groupby("client_id")["request_id"].nunique()
var_series = var_series[var_series<var_series.quantile(0.99)]

# Find best fit distribution
best_distibutions = best_fit_distribution(var_series.values)
best_dist = best_distibutions[0]
print(f"SSE for best fit: {best_dist[2]}")
print(" ")

# Make PDF with best params 
pdf = make_pdf(best_dist[0], best_dist[1])

# Compute threshold
p = 0.99999
print(f"Threshold for percentile {p}: {np.round(get_threshold(best_dist[0], best_dist[1], p), 1)}")
print(" ")

# Display distribution
plt.figure(figsize=(8,5))
ax = pdf.plot(lw=2, label='PDF', legend=True)
var_series.plot(kind='hist', bins=50, density=True, alpha=0.5, label='Data', legend=True, ax=ax)
param_names = (best_dist[0].shapes + ', loc, scale').split(', ') if best_dist[0].shapes else ['loc', 'scale']
param_str = ', '.join(['{}={:0.2f}'.format(k,v) for k,v in zip(param_names, best_dist[1])])
dist_str = '{}({})'.format(best_dist[0].name, param_str)
plt.title("Distribution: Requests per client\n"+ dist_str)
plt.show()

In [None]:
print(min(var_series))
print(np.median(var_series))
print(max(var_series))

### Heuristic 4: Bot client (bytes)

* **Level:** request
* **Description:** Flag the requests coming from a client whose total bytes transferred is higher than a threshold
* **Reasoning:** Operators may employ bots to bump their bytes transferred and gain more rewards. This rule catches clients that are requesting too much data for a real user

In [None]:
# Get variable
var_series = df.groupby("client_id")["mb_sent"].sum()
var_series = var_series[var_series<var_series.quantile(0.99)]

# Find best fit distribution
best_distibutions = best_fit_distribution(var_series.values)
best_dist = best_distibutions[0]
print(f"SSE for best fit: {best_dist[2]}")
print(" ")

# Make PDF with best params 
pdf = make_pdf(best_dist[0], best_dist[1])

# Compute threshold
p = 0.99999
print(f"Threshold for percentile {p}: {np.round(get_threshold(best_dist[0], best_dist[1], p), 1)}")
print(" ")

# Display distribution
plt.figure(figsize=(8,5))
ax = pdf.plot(lw=2, label='PDF', legend=True)
var_series.plot(kind='hist', bins=50, density=True, alpha=0.5, label='Data', legend=True, ax=ax)
param_names = (best_dist[0].shapes + ', loc, scale').split(', ') if best_dist[0].shapes else ['loc', 'scale']
param_str = ', '.join(['{}={:0.2f}'.format(k,v) for k,v in zip(param_names, best_dist[1])])
dist_str = '{}({})'.format(best_dist[0].name, param_str)
plt.title("Distribution: Bandwidth per client\n"+ dist_str)
plt.show()

In [None]:
print(min(var_series))
print(np.median(var_series))
print(max(var_series))

### Heuristic 5: Single client operators (requests)

* **Level:** operator
* **Description:** Flag operators with an average request count per client higher than a threshold
* **Reasoning:** Operators may try to dodge detection by deploying bots on a small scale. If they use a small no. of bots, looking at their total no. of requests divided by the number of clients will uncover some of these cases.

In [None]:
# Get variable
temp = df.groupby(["node_id", "client_id"])["request_id"].nunique().reset_index()
var_series = temp.groupby("node_id")["request_id"].mean()

# Find best fit distribution
best_distibutions = best_fit_distribution(var_series.values)
best_dist = best_distibutions[0]
print(f"SSE for best fit: {best_dist[2]}")
print(" ")

# Make PDF with best params 
pdf = make_pdf(best_dist[0], best_dist[1])

# Compute threshold
p = 0.99999
print(f"Threshold for percentile {p}: {np.round(get_threshold(best_dist[0], best_dist[1], p), 1)}")
print(" ")

# Display distribution
plt.figure(figsize=(8,5))
ax = pdf.plot(lw=2, label='PDF', legend=True)
var_series.plot(kind='hist', bins=50, density=True, alpha=0.5, label='Data', legend=True, ax=ax)
param_names = (best_dist[0].shapes + ', loc, scale').split(', ') if best_dist[0].shapes else ['loc', 'scale']
param_str = ', '.join(['{}={:0.2f}'.format(k,v) for k,v in zip(param_names, best_dist[1])])
dist_str = '{}({})'.format(best_dist[0].name, param_str)
plt.title("Distribution: Average client requests per node\n"+ dist_str)
plt.show()

### Heuristic 6: Single client operators (bytes)

* **Level:** operator
* **Description:** Flag operators with an average number of bytes transferred per client higher than a threshold
* **Reasoning:** Operators may try to dodge detection by deploying bots on a small scale. If they use a small no. of bots, looking at their total bytes divided by the number of clients will uncover some of these cases.

In [None]:
# Get variable
temp = df.groupby(["node_id", "client_id"])["mb_sent"].sum().reset_index()
var_series = temp.groupby("node_id")["mb_sent"].mean()

# Find best fit distribution
best_distibutions = best_fit_distribution(var_series.values)
best_dist = best_distibutions[0]
print(f"SSE for best fit: {best_dist[2]}")
print(" ")

# Make PDF with best params 
pdf = make_pdf(best_dist[0], best_dist[1])

# Compute threshold
p = 0.99999
print(f"Threshold for percentile {p}: {np.round(get_threshold(best_dist[0], best_dist[1], p), 1)}")
print(" ")

# Display distribution
plt.figure(figsize=(8,5))
ax = pdf.plot(lw=2, label='PDF', legend=True)
var_series.plot(kind='hist', bins=50, density=True, alpha=0.5, label='Data', legend=True, ax=ax)
param_names = (best_dist[0].shapes + ', loc, scale').split(', ') if best_dist[0].shapes else ['loc', 'scale']
param_str = ', '.join(['{}={:0.2f}'.format(k,v) for k,v in zip(param_names, best_dist[1])])
dist_str = '{}({})'.format(best_dist[0].name, param_str)
plt.title("Distribution: Average client bandwidth per node\n"+ dist_str)
plt.show()

### Heuristic 7: Self-requests

* **Level:** request
* **Description:** Flag requests where the node ID and client ID are the same
* **Reasoning:** Operators may try to do requests to themselves to collect more rewards. If they do, we can catch this by comparing the node ID with the client ID.

No threshold to tune!!

### Heuristics 8 and 9: Doctored content

* **Level:** request
* **Description:** Flag requests where the total bytes are higher than a threshold for the CID/referer being requested
* **Reasoning:** Operators may change the total data being transferred in real requests to bump their rewards. This rule catches logs where the total bytes transferred is too high for the CID/website that is being requested

Note that for these 2 rules, we are not fitting classical distributions since we don't have enough data for each CID/website Instead we expect the size for each CID or website to follow a gaussian distirbution and, as such, we can use $\mu + 3 \sigma$ as the threshold. Note that this rule should only be applied after we get a large enough number of request for a given CID/website.

### Heuristic 10: fast requests

* **Level:** request
* **Description:** Flag requests where the duration is lower than a threshold (split by whether the cache is being used)
* **Reasoning:** Operators may change the duration of real requests to bump their rewards. This rule catches logs where the request duration is too low for a comparable request (i.e., whether it uses cache or not)

In [None]:
# Get variable
var_series = df[df["cache_hit"] == "t"]["request_duration_sec"]
var_series = var_series[var_series<var_series.quantile(0.99)]

# Find best fit distribution
best_distibutions = best_fit_distribution(var_series.values)
best_dist = best_distibutions[0]
print(f"SSE for best fit: {best_dist[2]}")
print(" ")

# Make PDF with best params 
pdf = make_pdf(best_dist[0], best_dist[1])

# Compute threshold
p = 0.00001
print(f"Threshold for percentile {p}: {get_threshold(best_dist[0], best_dist[1], p)}")
print(" ")

# Display distribution
plt.figure(figsize=(8,5))
ax = pdf.plot(lw=2, label='PDF', legend=True)
var_series.plot(kind='hist', bins=50, density=True, alpha=0.5, label='Data', legend=True, ax=ax)
param_names = (best_dist[0].shapes + ', loc, scale').split(', ') if best_dist[0].shapes else ['loc', 'scale']
param_str = ', '.join(['{}={:0.2f}'.format(k,v) for k,v in zip(param_names, best_dist[1])])
dist_str = '{}({})'.format(best_dist[0].name, param_str)
plt.title("Distribution: Cache-hit request duration by Mb\n"+ dist_str)
plt.show()

In [None]:
# Get variable
var_series = df[df["cache_hit"] == "f"]["request_duration_sec"]
var_series = var_series[var_series<var_series.quantile(0.99)]

# Find best fit distribution
best_distibutions = best_fit_distribution(var_series.values)
best_dist = best_distibutions[0]
print(f"SSE for best fit: {best_dist[2]}")
print(" ")

# Make PDF with best params 
pdf = make_pdf(best_dist[0], best_dist[1])

# Compute threshold
p = 0.00001
print(f"Threshold for percentile {p}: {get_threshold(best_dist[0], best_dist[1], p)}")
print(" ")

# Display distribution
plt.figure(figsize=(8,5))
ax = pdf.plot(lw=2, label='PDF', legend=True)
var_series.plot(kind='hist', bins=50, density=True, alpha=0.5, label='Data', legend=True, ax=ax)
param_names = (best_dist[0].shapes + ', loc, scale').split(', ') if best_dist[0].shapes else ['loc', 'scale']
param_str = ', '.join(['{}={:0.2f}'.format(k,v) for k,v in zip(param_names, best_dist[1])])
dist_str = '{}({})'.format(best_dist[0].name, param_str)
plt.title("Distribution: Cache-miss request duration by Mb\n"+ dist_str)
plt.show()