# Preparing the environment


## Libraries

In [1]:
#%pip install --upgrade pip
#%pip install pandas
#%pip install scipy 
#%pip install scikit-learn 
#%pip install tqdm 
#%pip install nbformat
#%pip install pyarrow

In [2]:
# requirements
import pandas as pd
import numpy as np
from scipy.stats import poisson
from scipy.stats import entropy
from tqdm import tqdm
import math

## Training phase: identifying typical traffic behavior

In [3]:
class TrafficLearner:
    """
    Learns traffic normal behavior
    """
    def __init__(self, window_sizes, path_normal_traffic_df):
        self.window_sizes = window_sizes
        self.path_normal_traffic_df = path_normal_traffic_df
        self.raw_normal_traffic_df = None

    def get_normal_traffic_df(self):
        """
        Loads, format timestamp and creates a cache for
        normal traffic dataset
        """
        if self.raw_normal_traffic_df is None:
            self.raw_normal_traffic_df = pd.read_csv(self.path_normal_traffic_df, low_memory=False)
            self.raw_normal_traffic_df['time_local'] = pd.to_datetime(self.raw_normal_traffic_df['time_local'])
        return self.raw_normal_traffic_df

    def index_windows(self, df):
        """
        Computes window size to index request to each
        corresponding window

        Args:
            df (DataFrame): traffic request frequency by second

        Returns:
            dataframe with window indexed dataset
        """
        windows_df = []

        for window_size in self.window_sizes:
            df['window_start'] = df['time_local'].dt.floor(window_size).copy()
            df['window_id'] = (
                    df['window_start']
                    .astype(int)
                    .rank(method='dense')
                    .astype(int) - 1
            )

            df['window_size'] = int(window_size.replace('s', ''))

            windows_df.append(
                df[[
                    'endpoint',
                    'window_size',
                    'window_id',
                    'window_start',
                    'time_local',
                    'total_requests',
                    'is_anomaly'
                ]]
            )

        return pd.concat(windows_df, ignore_index=True)

    def learn_traffic_information(self):
        """
        Estimates lambda using each endpoint
        window mean request frequency

        Returns:
            DataFrame: with computed lambda per window
        """
        df = self.get_normal_traffic_df()
        df = self.index_windows(df)

        window_stats = (
            df.groupby(["window_size", "endpoint", "window_id"])
            .agg(lam=("total_requests", "mean"))
            .reset_index()
        )

        return window_stats

    def label_test_windows(self, df):
        df = self.index_windows(df)

        window_stats = (
            df.groupby(["window_size","endpoint", "window_id"])
            .agg(has_anomaly=("is_anomaly", "max"))
            .reset_index()
        )

        return window_stats


## Monitoring phase: observes traffic for atypical behavior

In [4]:
class TrafficFeatureExtractor:
    def kl_divergence(self, p, q):
        ''' Calculates KL divergence between p and q'''
        SIG_EPS = 1e-10 # avoids division by zero
        p = np.asarray(p, dtype=float) + SIG_EPS
        q = np.asarray(q, dtype=float) + SIG_EPS
        return entropy(p, q)

    def js_divergence(self, p, q, eps=1e-12):
        p = np.asarray(p) + eps
        q = np.asarray(q) + eps
        p /= p.sum()
        q /= q.sum()
        m = 0.5 * (p + q)
        return 0.5 * entropy(p, m) + 0.5 * entropy(q, m)

    def calculate_D(self, pmf_y, dX):
        return self.js_divergence(pmf_y, dX)

    def calculate_Delta(self, Xbar, lambda_ep):
        return (Xbar - lambda_ep) / max(lambda_ep, 1)

    def calculate_ZScore(self, Xbar, lambda_ep, seconds_in_window):
        return (Xbar - lambda_ep) / math.sqrt(max(lambda_ep, 1) / seconds_in_window)

    def sample_expected_distribution(self, max_count, lambda_ep):
        bins = np.arange(0, max_count+1)
        dY = poisson.pmf(bins, mu=lambda_ep)
        dY = dY / dY.sum()
        return dY

    def get_observed_distribution(self, max_count, current_window):
        obs_counts, _ = np.histogram(
            current_window,
            bins=np.arange(0, max_count+2)
        )
        dX = obs_counts / obs_counts.sum()
        return dX

    def extract_traffic_changes(self, current_window, lambda_ep, seconds_in_window):
        max_count = max(
            int(current_window.max()),
            int(lambda_ep*3)+5
        )

        Xbar = current_window.mean()
        dX = self.get_observed_distribution(max_count, current_window)
        dY = self.sample_expected_distribution(max_count, lambda_ep)

        D = self.calculate_D(dX, dY)
        Delta = self.calculate_Delta(Xbar, lambda_ep)
        Z = self.calculate_ZScore(Xbar, lambda_ep, seconds_in_window)
        return D, Delta, Z


In [5]:
class EndpointAnomalySensor:
    def __init__(self, endpoint, lam):
        self.endpoint = endpoint
        self.lam = lam
        self.tfe = TrafficFeatureExtractor()

    def gaussian_membership(self, u, mu=0.0, sigma=1.0):
        """
        Calculates the feature gaussian membership for
        traffic normality

        Args:
            u: traffic feature
            mu: the normality reference for the feature
            sigma: deviation from normality

        Returns:
            float: value in [0.,1.] to assign no
                    anomaly (0.) or huge anomaly (1.)
        """
        return math.exp(-((u-mu)**2) / (2*(sigma**2)))

    def fuzzification(self, current_window, D, Delta, Z):
        """
        Traffic features are fuzzed using gaussian membership

        Args:
            current_window (np.array): request frequency per second array
            D (float): divergence between observed and expected traffic
            Delta (float): area difference between observed and expected traffic
            Z: z-score of mean deviation between observed and expected traffic
        Returns:
            (float, float, float): membership of each feature
        """
        sigma_u = max(1.0, np.std(current_window))  # adaptive width
        fD = self.gaussian_membership(D, mu=0.0, sigma=sigma_u)
        fDelta = self.gaussian_membership(Delta, mu=0.0, sigma=sigma_u)
        fZ = self.gaussian_membership(Z, mu=0.0, sigma=sigma_u)
        return fD, fDelta, fZ

    def anomaly_score(self,fD, fDelta, fZ):
        """
        Calculates anomaly score for traffic observations

        Args:
            fD (float): traffic divergence normality membership
            fDelta (float): traffic area difference normality membership
            fZ (float): z-score of mean deviation normality membership
        """
        fDprime = 1 - fD
        fDelprime = 1 - fDelta
        fZprime = 1 - fZ
        eta = fDelprime + fDprime + fZprime
        return eta/3

    def calculate_eta(self, current_window, seconds_in_window):
        """
        Calculates traffic anomaly score

        Args:
            current_window (np.array): request frequency per second array
            seconds_in_window (int): window size
        Returns:
            dict: traffic feature and anomaly measurements
        """
        D, Delta, Z = self.tfe.extract_traffic_changes(current_window, self.lam, seconds_in_window)
        fD, fDelta, fZ = self.fuzzification(current_window, D, Delta, Z)
        eta = self.anomaly_score(fD, fDelta, fZ)

        return {
            'eta': eta,
            'D': D,
            'Delta': Delta,
            'Z': Z,
            'fDp': 1-fD,
            'fDeltap': 1-fDelta,
            'fZp': 1-fZ
        }

    def calculate_C2(self, eta, beta=0.5):
        """
        Calculates frequency anomaly component

        Args:
            eta (float): anomaly score captured by sensor
            beta (float): anomaly thresholds to accept or reject penalization
        Returns:
            float: computed component
        """
        C2= -math.tanh((eta - beta)*2)

        return C2


In [6]:
class RaCalculator:
    def __init__(self, endpoint, initial_Ra=1.0, model="logistic", params=None):
        self.endpoint = endpoint
        self.Ra = float(initial_Ra)
        self.model = model
        self.params = params or {}
        self.P = self.params.get("P0", 0.1)
        self.history = []

    def update_ra(self, eta=None, C2=None):
        if self.model == "sigmoid":
            alpha = self.params.get("alpha", 3.063)
            beta = self.params.get("beta", 0.5)
            self.Ra = 1 / (1 + np.exp(-alpha * ((self.Ra + C2) - beta)))

        elif self.model == "logistic":
            gamma = self.params.get("gamma", 0.2)
            self.Ra += gamma * C2 * self.Ra * (1 - self.Ra)

        elif self.model == "exponential":
            k = self.params.get("k", 0.5)
            anomaly = max(0, eta - 0.5)
            self.Ra *= np.exp(-k * anomaly)

        elif self.model == "recovery":
            gamma = self.params.get("gamma", 0.02)
            delta = self.params.get("delta", 0.2)
            beta = self.params.get("beta", 0.2)

            anomaly = max(0, eta - beta)

            recovery = gamma * (1 - self.Ra)
            damage = delta * anomaly * self.Ra
            self.Ra += recovery - damage

        elif self.model == "kalman":
            self._update_kalman(eta)

        self.Ra = np.clip(self.Ra, 0, 1)

    def _update_kalman(self, eta):
        # parameters
        Q = self.params.get("Q", 0.005)   # process noise
        R = self.params.get("R", 0.05)    # observation noise
        k = self.params.get("k", 3.0)     # eta sensitivity

        # convert eta → health observation
        z = np.exp(-k * eta)

        # prediction step
        Ra_pred = self.Ra
        P_pred = self.P + Q

        # Kalman gain
        K = P_pred / (P_pred + R)

        # correction step
        self.Ra = Ra_pred + K * (z - Ra_pred)

        # update uncertainty
        self.P = (1 - K) * P_pred

    def record(self, window_id, info):
        row = {
            "endpoint": self.endpoint,
            "window_id": window_id,
            "Ra": self.Ra,
            "model": self.model
        }

        row.update(info)
        self.history.append(row)


## Experiment

In [7]:
def compute_lambda_series(train_df):
    return (
        train_df
        .sort_values("window_id")
        .groupby("endpoint")["lam"]
        .expanding()
        .mean()
        .reset_index(level=0, drop=True)
    )

def compute_anomaly_score_series(lambda_series, obs_by_window, train_obs_by_window, window_size, eas):
    window_ids = sorted(obs_by_window.keys())
    eta_series = []

    for window_id in window_ids:
        observed = obs_by_window.get(window_id)
        if observed is None:
            continue

        if window_size > 0:
            eas.lam = lambda_series.iloc[window_id-1]

        out = eas.calculate_eta(
            current_window=observed["total_requests"],
            seconds_in_window=window_size
        )

        out["lambda"] = eas.lam
        out["C2"] = eas.calculate_C2(out["eta"])
        out["expected"] = train_obs_by_window.get(window_id - 1, [])
        out["window_start"] = observed["window_start"]
        out["window_id"] = window_id
        eta_series.append(out)

    return pd.DataFrame(eta_series)

def compute_ra_series(eta_series, indicators_calculators):
    window_ids = sorted(eta_series.keys())

    for window_id in window_ids:
        for ind in indicators_calculators:
            ind.update_ra(eta=eta_series[window_id]['eta'])
            ind.record(window_id, eta_series[window_id])
    return indicators_calculators

def format_window_info(obs_df, train_obs_df):
    obs_by_window = {
        wid: {
            "total_requests":g["total_requests"].values,
            "window_start": g['window_start'].values[0],
        }
        for wid, g in obs_df.groupby("window_id")
    }

    train_obs_by_window = {
        wid: g["total_requests"].values
        for wid, g in train_obs_df.groupby("window_id")
    }

    return obs_by_window, train_obs_by_window

def format_eta_info(df_eta):
    eta_by_window = {
        wid: {
            "eta": g["eta"].values[0] if len(g["eta"].values) == 1 else np.nan,
            "C2":g["C2"].values[0] if len(g["C2"].values) == 1 else np.nan,
            "window_start": g["window_start"].values[0] if len(g["window_start"].values) == 1 else np.nan,
            "fDeltap":g["fDeltap"].values[0] if len(g["fDeltap"].values) == 1 else np.nan,
            "fDp":g["fDp"].values[0] if len(g["fDp"].values) == 1 else np.nan,
            "fZp":g["fZp"].values[0] if len(g["fZp"].values) == 1 else np.nan
        }
        for wid, g in df_eta.groupby("window_id")
    }

    return eta_by_window

def experiment_eta(train_df, obs_df, train_obs_df, window_size):
    results_anomaly_detection = []

    for endpoint in tqdm(train_df.endpoint.unique()):
        lambda_series = compute_lambda_series(train_df[train_df.endpoint == endpoint])

        obs_by_window, train_obs_by_window = format_window_info(
            obs_df[obs_df.endpoint == endpoint],
            train_obs_df[train_obs_df.endpoint == endpoint]
        )

        eas = EndpointAnomalySensor(
            endpoint=endpoint,
            lam=lambda_series.iloc[0]
        )

        df_anomaly_score_series = compute_anomaly_score_series(
            lambda_series,
            obs_by_window,
            train_obs_by_window,
            window_size,
            eas
        )

        df_anomaly_score_series["endpoint"] = endpoint

        results_anomaly_detection.append(df_anomaly_score_series)
    return pd.concat(results_anomaly_detection, ignore_index=True)

def experiment_ra(train_df, obs_df, train_obs_df, window_size, params):
    results = []
    for endpoint in tqdm(train_df.endpoint.unique()):
        lambda_series = compute_lambda_series(train_df[train_df.endpoint == endpoint])

        obs_by_window, train_obs_by_window = format_window_info(
            obs_df[obs_df.endpoint == endpoint],
            train_obs_df[train_obs_df.endpoint == endpoint]
        )

        ra_calculators = [
            #RaCalculator(endpoint, model="sigmoid", params=params),
            #RaCalculator(endpoint, model="exponential", params=params),
            #RaCalculator(endpoint, model="recovery", params=params),
            RaCalculator(endpoint, model="kalman", params=params),
        ]

        eas = EndpointAnomalySensor(
            endpoint=endpoint,
            lam=lambda_series.iloc[0]
        )

        df_anomaly_score_series = compute_anomaly_score_series(
            lambda_series,
            obs_by_window,
            train_obs_by_window,
            window_size,
            eas
        )

        eta_series = format_eta_info(df_anomaly_score_series)
        indicators_report = compute_ra_series(eta_series, ra_calculators)

        for ir in indicators_report:
            results.extend(ir.history)

    return pd.DataFrame(results)

# Evaluation

In [8]:
def add_gt(results_history, window_gt, window_sizes):
    """
        @description Add the corresponding ground truth for each endpoint-window,
                     so it can be compared later to generate perfomance metrics
    """

    labeled_data= []
    for window_size in window_sizes:
        size = int(window_size.replace('s', ''))
        labels = window_gt[window_gt['window_size']==size][['endpoint', 'window_id', 'has_anomaly']]
        result_df = results_history[results_history['window_size'] == size]
        df = pd.merge(result_df, labels, on=['endpoint','window_id'], how='left')
        labeled_data.append(df)
    return pd.concat(labeled_data, ignore_index=True)

## Full Pipeline

Dataset path

In [9]:
ton_iot_train_path = "../0-datasets/treated_dataset/ton_treated_train.csv"
ton_iot_test_path = "../0-datasets/treated_dataset/ton_treated_test.csv"

Experiment params

In [10]:
WINDOW_SIZES = ['30s', '40s', '50s']
BETA_VAR=[0.0, 0.3, 0.5, 0.7, 1.0]
k_VAR=[1., 2., 3., 4., 5.]
R_VAR = [0.01,0.05,0.1,0.2]
Q_VAR = [0.001,0.005,0.01,0.02]

Training

In [11]:
tl = TrafficLearner(
    window_sizes=WINDOW_SIZES,
    path_normal_traffic_df=ton_iot_train_path,
)

normal_traffic_lambda_df = tl.learn_traffic_information()
normal_traffic_wind_observations_df = tl.index_windows(tl.get_normal_traffic_df())

In [12]:
normal_traffic_wind_observations_df.groupby(['window_id', 'window_size']).count()

Unnamed: 0_level_0,Unnamed: 1_level_0,endpoint,window_start,time_local,total_requests,is_anomaly
window_id,window_size,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
0,30,160,160,160,160,160
0,40,160,160,160,160,160
0,50,260,260,260,260,260
1,30,300,300,300,300,300
1,40,400,400,400,400,400
...,...,...,...,...,...,...
3087,30,300,300,300,300,300
3088,30,300,300,300,300,300
3089,30,300,300,300,300,300
3090,30,300,300,300,300,300


Collecting observations

In [13]:
df_ton_test = pd.read_csv(ton_iot_test_path, low_memory=False)
df_ton_test['time_local'] = pd.to_datetime(df_ton_test['time_local'])
anomalous_traffic_label_df = tl.label_test_windows(df_ton_test.copy())
anomalous_traffic_win_observations_df = tl.index_windows(df_ton_test)

In [14]:
anomalous_traffic_win_observations_df.groupby('window_size')['window_id'].count()

window_size
30    927250
40    927250
50    927250
Name: window_id, dtype: int64

### Eta experiments

In [15]:
def window_size_experiment_eta(normal_windows_lambda_df, window_obs, window_normal, window_sizes):
    results_eta = []
    for window_size in window_sizes:
        print(f"Running experiment: window_size={window_size}")
        size = int(window_size.replace('s', ''))

        df_eta = experiment_eta(
            normal_windows_lambda_df[normal_windows_lambda_df['window_size'] == size],
            window_obs[window_obs['window_size'] == size],
            window_normal[window_normal['window_size'] == size],
            size,
        )

        df_eta["window_size"] = size
        results_eta.append(df_eta)

    return pd.concat(results_eta, ignore_index=True)

In [16]:
df_eta_results = window_size_experiment_eta(normal_traffic_lambda_df, anomalous_traffic_win_observations_df, normal_traffic_wind_observations_df, window_sizes=WINDOW_SIZES)
df_eta_results = add_gt(df_eta_results, anomalous_traffic_label_df, WINDOW_SIZES)

Running experiment: window_size=30s


100%|██████████| 10/10 [00:24<00:00,  2.49s/it]


Running experiment: window_size=40s


100%|██████████| 10/10 [00:20<00:00,  2.03s/it]


Running experiment: window_size=50s


100%|██████████| 10/10 [00:15<00:00,  1.56s/it]


### Ra experiments

k variation

In [17]:
k_results=[]
for k in k_VAR:
    params={
        "beta": 0.5,
        "k": k,
    }
    print(f"Running experiment: k={k}")
    results = experiment_ra(normal_traffic_lambda_df, anomalous_traffic_win_observations_df, normal_traffic_wind_observations_df, window_size=30, params=params)
    results["k"] = k
    results["window_size"] = 30
    k_results.append(results)

df_k_results = pd.concat(k_results, ignore_index=True)
df_k_l_results = add_gt(df_k_results, anomalous_traffic_label_df, WINDOW_SIZES)

Running experiment: k=1.0


100%|██████████| 10/10 [00:37<00:00,  3.75s/it]


Running experiment: k=2.0


100%|██████████| 10/10 [00:37<00:00,  3.75s/it]


Running experiment: k=3.0


100%|██████████| 10/10 [00:31<00:00,  3.16s/it]


Running experiment: k=4.0


100%|██████████| 10/10 [00:34<00:00,  3.44s/it]


Running experiment: k=5.0


100%|██████████| 10/10 [00:36<00:00,  3.63s/it]


q variation

In [18]:
q_results = []
for q in Q_VAR:
    params={
        "beta": 0.5,
        "Q": q,
    }
    print(f"Running experiment: Q={q}")
    results = experiment_ra(normal_traffic_lambda_df, anomalous_traffic_win_observations_df, normal_traffic_wind_observations_df, window_size=30, params=params)
    results["Q"] = q
    results["window_size"] = 30
    q_results.append(results)

df_q_results = pd.concat(q_results, ignore_index=True)
df_q_l_results = add_gt(df_q_results, anomalous_traffic_label_df, WINDOW_SIZES)

Running experiment: Q=0.001


100%|██████████| 10/10 [00:30<00:00,  3.09s/it]


Running experiment: Q=0.005


100%|██████████| 10/10 [00:30<00:00,  3.07s/it]


Running experiment: Q=0.01


100%|██████████| 10/10 [00:30<00:00,  3.02s/it]


Running experiment: Q=0.02


100%|██████████| 10/10 [00:30<00:00,  3.00s/it]


R variation

In [19]:
r_results = []
for r in R_VAR:
    params={
        "beta": 0.5,
        "R": r,
    }
    print(f"Running experiment: R={r}")
    results = experiment_ra(normal_traffic_lambda_df, anomalous_traffic_win_observations_df, normal_traffic_wind_observations_df, window_size=30, params=params)
    results["R"] = r
    results["window_size"] = 30
    r_results.append(results)

df_r_results = pd.concat(r_results, ignore_index=True)
df_r_l_results = add_gt(df_r_results, anomalous_traffic_label_df, WINDOW_SIZES)


Running experiment: R=0.01


100%|██████████| 10/10 [00:30<00:00,  3.04s/it]


Running experiment: R=0.05


100%|██████████| 10/10 [00:30<00:00,  3.08s/it]


Running experiment: R=0.1


100%|██████████| 10/10 [00:30<00:00,  3.03s/it]


Running experiment: R=0.2


100%|██████████| 10/10 [00:29<00:00,  3.00s/it]


Window variation

In [20]:
def window_size_experiment_ra(normal_windows_lambda_df, window_obs, window_normal, window_sizes, params):
    results_ra = []
    for window_size in window_sizes:
        print(f"Running experiment: window_size={window_size}")
        size = int(window_size.replace('s', ''))

        df_ra = experiment_ra(
            normal_windows_lambda_df[normal_windows_lambda_df['window_size'] == size],
            window_obs[window_obs['window_size'] == size],
            window_normal[window_normal['window_size'] == size],
            window_size=size,
            params=params
        )

        df_ra["window_size"] = size
        results_ra.append(df_ra)

    return pd.concat(results_ra, ignore_index=True)

In [21]:
df_ra_win_results = window_size_experiment_ra(normal_traffic_lambda_df, anomalous_traffic_win_observations_df, normal_traffic_wind_observations_df, window_sizes=WINDOW_SIZES, params={})
df_ra_win_results = add_gt(df_ra_win_results, anomalous_traffic_label_df, WINDOW_SIZES)

Running experiment: window_size=30s


100%|██████████| 10/10 [00:28<00:00,  2.80s/it]


Running experiment: window_size=40s


100%|██████████| 10/10 [00:21<00:00,  2.11s/it]


Running experiment: window_size=50s


100%|██████████| 10/10 [00:16<00:00,  1.69s/it]


In [22]:
df_eta_results.to_csv("outputs/kalman_eta_results.csv", index=False)
df_k_l_results.to_csv("outputs/kalman_k_results.csv", index=False)
df_q_l_results.to_csv("outputs/kalman_q_results.csv", index=False)
df_r_l_results.to_csv("outputs/kalman_r_results.csv", index=False)
df_ra_win_results.to_csv("outputs/kalman_ra_win_results.csv", index=False)
anomalous_traffic_win_observations_df.to_csv("../0-datasets/treated_dataset/anomalous_traffic_win_observations.csv")