# Classification Prefilter Training & Usage
Trained on subset from GEE train split is binary or multiclass manner, using various combinations of attacks

## Download Extracted Features

In [None]:
!pip install gdown
import gdown

In [None]:
file_id="1pmOhZ3S2v7Fn-uEgI9BuSk9Zj-uxUaFg"
url = f'https://drive.google.com/uc?id={file_id}'
output = 'feature.tar.gz'
gdown.download(url, output, quiet=False)

In [None]:
!tar -xf feature.tar.gz

In [None]:
psutil.virtual_memory().available // 1024 // 1024 // 1024

## Utils

In [None]:
import os
import sys
from pathlib import Path

import numpy as np
import psutil
import pyspark.sql.dataframe
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import Unischema, dict_to_spark_row
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_unixtime, unix_timestamp
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType

feature_min_max = {
    'mean_duration': (0.0, 2042.86),
    'mean_packet': (1.0, 109214.27272727272),
    'mean_num_of_bytes': (28.0, 163795638.0909091),
    'mean_packet_rate': (0.0, 17224.14377310265),
    'mean_byte_rate': (0.0, 13902452.340182647),
    'std_duration': (0.0, 562.7625560888366),
    'std_packet': (0.0, 370614.95468242496),
    'std_num_of_bytes': (0.0, 543247494.7844237),
    'std_packet_rate': (0.0, 15783.66319664221),
    'std_byte_rate': (0.0, 16441139.793386225),
    'entropy_protocol': (0.0, 2.260220915066596),
    'entropy_dst_ip': (0.0, 13.787687869067254),
    'entropy_src_port': (0.0, 14.206227931544092),
    'entropy_dst_port': (0.0, 14.027301292191831),
    'entropy_flags': (0.0, 4.631615665225586)
}


def read_csv(spark: SparkSession, path: str) -> pyspark.sql.dataframe:
    """
    Read csv files as spark dataframe

    :param spark: spark session object
    :param path: path of dir containing csv files
    :type spark: SparkSession
    :type path: str
    :return: df
    :rtype: pyspark.sql.dataframe
    """

    # define csv schema
    schema = StructType([
        StructField('timestamp', StringType(), True),
        StructField('duration', DoubleType(), True),
        StructField('src_ip', StringType(), True),
        StructField('dst_ip', StringType(), True),
        StructField('src_port', LongType(), True),
        StructField('dst_port', LongType(), True),
        StructField('protocol', StringType(), True),
        StructField('flags', StringType(), True),
        StructField('forwarding_status', LongType(), True),
        StructField('type_of_service', LongType(), True),
        StructField('packet', LongType(), True),
        StructField('num_of_bytes', LongType(), True),
        StructField('label', StringType(), True),
    ])

    df = (
        spark
            .read
            .schema(schema)
            .csv(path)
    )

    # convert datetime column from string to unix_timestamp
    df = (
        df
            .withColumn('timestamp', unix_timestamp(col('timestamp'), 'yyyy-MM-dd HH:mm:ss'))
    )

    return df


def patch_time_windows(df: pyspark.sql.dataframe, window_seconds: int):
    """
    Generate time window by
    :param df: pyspark dataframe
    :param window_seconds: window size in second
    :type df: pyspark.sql.dataframe
    :type window_seconds: int
    :return: df
    :rtype: pyspark.sql.dataframe
    """
    time_window = from_unixtime(col('timestamp') - col('timestamp') % window_seconds)

    df = (
        df
            .withColumn('time_window', time_window)
    )

    return df


def init_local_spark():
    # initialise local spark
    os.environ['PYSPARK_PYTHON'] = sys.executable
    os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
    memory_gb = psutil.virtual_memory().available // 1024 // 1024 // 1024
    spark = (
        SparkSession
            .builder
            .master('local[*]')
            #.config('spark.driver.memory', f'{memory_gb}g')
            .config('spark.driver.memory', f'2g')
            .config('spark.driver.host', '127.0.0.1')
            .getOrCreate()
    )
    return spark


def normalise(x: float, min_val: float, max_val: float) -> float:
    norm_x = (x - min_val) / (max_val - min_val)
    if norm_x < 0:
        norm_x = 0.0
    elif norm_x > 1.0:
        norm_x = 1.0

    return norm_x


def row_generator(x):
    time_window, src_ip, feature, label = x
    return {
        'time_window': time_window,
        'src_ip': src_ip,
        'feature': np.expand_dims(np.array(feature, dtype=np.float32), axis=0),
        'label': label,
    }


def change_df_schema(spark: SparkSession, schema: Unischema, df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    rows_rdd = (
        df
            .rdd
            .map(row_generator)
            .map(lambda x: dict_to_spark_row(schema, x))
    )

    df = spark.createDataFrame(
        rows_rdd,
        schema.as_spark_schema()
    )

    return df


def save_parquet_for_petastorm_parquet(spark: SparkSession, df: pyspark.sql.DataFrame, output_path: str,
                                       schema: Unischema):
    #output_path = Path(output_path).absolute().as_uri()
    output_path = 'file://' + str(Path(output_path).absolute())
    with materialize_dataset(spark, output_path, schema, row_group_size_mb=256):
        (
            df
                .write
                .mode('overwrite')
                .parquet(output_path)
        )

## Build Model Input

In [None]:
import logging

import click
import pyspark
import numpy as np
from petastorm.codecs import CompressedNdarrayCodec, ScalarCodec
from petastorm.unischema import Unischema, UnischemaField
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

#from utils import init_local_spark, feature_min_max, normalise, change_df_schema, save_parquet_for_petastorm_parquet


class FeatureComposer:
    def __init__(self, spark: SparkSession, df: pyspark.sql.DataFrame):
        self.spark = spark
        self.df = df
        self.feature_column = (
            'mean_duration', 'mean_packet', 'mean_num_of_bytes', 'mean_packet_rate', 'mean_byte_rate', 'std_duration',
            'std_packet', 'std_num_of_bytes', 'std_packet_rate', 'std_byte_rate', 'entropy_protocol', 'entropy_dst_ip',
            'entropy_src_port', 'entropy_dst_port', 'entropy_flags', 'proportion_src_port', 'proportion_dst_port',
        )

        self.feature_compose_udf = udf(self.feature_compose, 'array<double>')

    @staticmethod
    def feature_compose(
            mean_duration: float, mean_packet: float, mean_num_of_bytes: float, mean_packet_rate: float,
            mean_byte_rate: float, std_duration: float, std_packet: float, std_num_of_bytes: float,
            std_packet_rate: float, std_byte_rate: float, entropy_protocol: float, entropy_dst_ip: float,
            entropy_src_port: float, entropy_dst_port: float, entropy_flags: float, proportion_src_port: list,
            proportion_dst_port: list
    ) -> list:
        """
        Compose the feature array
        :param mean_duration: mean duration
        :param mean_packet: mean packet
        :param mean_num_of_bytes: mean number of bytes
        :param mean_packet_rate: mean packet rate
        :param mean_byte_rate: mean byte rate
        :param std_duration: std duration
        :param std_packet: std packet
        :param std_num_of_bytes: std number of bytes
        :param std_packet_rate: std packet rate
        :param std_byte_rate: std byte rate
        :param entropy_protocol: entropy of protocol
        :param entropy_dst_ip: entropy of dest ip
        :param entropy_src_port: entropy of src ip
        :param entropy_dst_port: entropy of dest port
        :param entropy_flags: entropy of flags
        :param proportion_src_port: proportion of src common ports
        :param proportion_dst_port: proportion of dest common port
        :type mean_duration: float
        :type mean_packet: float
        :type mean_num_of_bytes: float
        :type mean_packet_rate: float
        :type mean_byte_rate: float
        :type std_duration: float
        :type std_packet: float
        :type std_num_of_bytes: float
        :type std_packet_rate: float
        :type std_byte_rate: float
        :type entropy_protocol: float
        :type entropy_dst_ip: float
        :type entropy_src_port: float
        :type entropy_dst_port: float
        :type entropy_flags: float
        :type proportion_src_port: list
        :type proportion_dst_port: list
        :return: feature array
        :rtype list
        """
        # normalise
        mean_duration = normalise(mean_duration, *feature_min_max.get('mean_duration'))
        mean_packet = normalise(mean_packet, *feature_min_max.get('mean_packet'))
        mean_num_of_bytes = normalise(mean_num_of_bytes, *feature_min_max.get('mean_num_of_bytes'))
        mean_packet_rate = normalise(mean_packet_rate, *feature_min_max.get('mean_packet_rate'))
        mean_byte_rate = normalise(mean_byte_rate, *feature_min_max.get('mean_byte_rate'))
        std_duration = normalise(std_duration, *feature_min_max.get('std_duration'))
        std_packet = normalise(std_packet, *feature_min_max.get('std_packet'))
        std_num_of_bytes = normalise(std_num_of_bytes, *feature_min_max.get('std_num_of_bytes'))
        std_packet_rate = normalise(std_packet_rate, *feature_min_max.get('std_packet_rate'))
        std_byte_rate = normalise(std_byte_rate, *feature_min_max.get('std_byte_rate'))
        entropy_protocol = normalise(entropy_protocol, *feature_min_max.get('entropy_protocol'))
        entropy_dst_ip = normalise(entropy_dst_ip, *feature_min_max.get('entropy_dst_ip'))
        entropy_src_port = normalise(entropy_src_port, *feature_min_max.get('entropy_src_port'))
        entropy_dst_port = normalise(entropy_dst_port, *feature_min_max.get('entropy_dst_port'))
        entropy_flags = normalise(entropy_flags, *feature_min_max.get('entropy_flags'))

        feature_arr = [
            mean_duration, mean_packet, mean_num_of_bytes, mean_packet_rate, mean_byte_rate, std_duration, std_packet,
            std_num_of_bytes, std_packet_rate, std_byte_rate, entropy_protocol, entropy_dst_ip, entropy_src_port,
            entropy_dst_port, entropy_flags,
        ]

        feature_arr.extend(proportion_src_port)
        feature_arr.extend(proportion_dst_port)

        return feature_arr

    def transform(self, remove_malicious=True, remove_null_label=True) -> pyspark.sql.DataFrame:
        df = (
            self.df
                # compose feature
                .withColumn('features', self.feature_compose_udf(*self.feature_column))
        )

        if remove_null_label:
            df = df.filter(col('label').isNotNull())

        if remove_malicious:
            df = df.filter(col('label') == 'background')

        # select only time_window, src_ip, feature and label columns
        df = df.select(
            'time_window', 'src_ip', 'features', 'label',
        )

        return df

In [None]:
#temporary to generate complete train dataset (including attacks)
test = 'feature/train.feature.parquet'
target_test = 'model_input/train.model_input.parquet.complete'

In [None]:
%%time

spark = init_local_spark()

# petastorm schema
schema = Unischema(
    'data_schema', [
        UnischemaField('time_window', np.str, (), ScalarCodec(StringType()), False),
        UnischemaField('src_ip', np.str, (), ScalarCodec(StringType()), False),
        UnischemaField('feature', np.float32, (1, 69), CompressedNdarrayCodec(), False),
        UnischemaField('label', np.str, (), ScalarCodec(StringType()), True),
    ]
)

# processing test
test_feature_df = spark.read.parquet(test)
test_input = FeatureComposer(spark, test_feature_df).transform(remove_malicious=False, remove_null_label=True)
test_input = change_df_schema(spark, schema, test_input)
save_parquet_for_petastorm_parquet(spark, test_input, target_test, schema)

## Generate dataset

In [None]:
#temporary change to load local data
data_path = 'model_input/train.model_input.parquet.complete'

In [None]:
%%time
from petastorm import make_reader
from petastorm.pytorch import DataLoader

num_cores = 2#psutil.cpu_count(logical=True)

reader = make_reader(
    'file://' + str(Path(data_path).absolute()), reader_pool_type='process', workers_count=num_cores,
    pyarrow_serialize=True, num_epochs=1
)
dataloader = DataLoader(reader, batch_size=64, shuffling_queue_capacity=256)

x_list = []
label_list = []

for data in dataloader:
    x = np.squeeze(data['feature'].numpy())
    label = data['label']
 
    label_list.extend(label)
    x_list.extend(x.tolist())

In [None]:
#dataset creation for training classifier (balancing)
import pandas as pd

train_df = pd.DataFrame(
    {
        'x': x_list,
        'label': label_list
    }
)

In [None]:
train_df['label'].value_counts()

In [None]:
%%time
train_df_background = train_df[train_df['label'] == "background"].sample(n=10000, random_state=42, ignore_index=True)
train_df_background1 = train_df_background.sample(n=1000, random_state=42, ignore_index=True)
train_df_background2 = train_df_background.sample(n=2000, random_state=42, ignore_index=True)
train_df_background3 = train_df_background.sample(n=3000, random_state=42, ignore_index=True)
train_df_background4 = train_df_background.sample(n=4000, random_state=42, ignore_index=True)
train_df_background5 = train_df_background.sample(n=5000, random_state=42, ignore_index=True)
train_df_background6 = train_df_background.sample(n=6000, random_state=42, ignore_index=True)
train_df_dos = train_df[train_df['label'] == "dos"].sample(n=1000, random_state=42, ignore_index=True, replace=True)
train_df_scan11 = train_df[train_df['label'] == "scan11"].sample(n=1000, random_state=42, ignore_index=True, replace=True)
train_df_scan44 = train_df[train_df['label'] == "scan44"].sample(n=1000, random_state=42, ignore_index=True, replace=True)
train_df_botnet = train_df[train_df['label'] == "nerisbotnet"].sample(n=1000, random_state=42, ignore_index=True, replace=True)
train_df_spam = train_df[train_df['label'] == "anomaly-spam"].sample(n=1000, random_state=42, ignore_index=True, replace=True)
train_df_blacklist = train_df[train_df['label'] == "blacklist"].sample(n=1000, random_state=42, ignore_index=True)

In [None]:
from sklearn.utils import shuffle

train_df_all = pd.concat([train_df_background1, train_df_dos, train_df_scan11, train_df_scan44, train_df_botnet, train_df_spam, train_df_blacklist])
train_df_all = shuffle(train_df_all).reset_index().drop(['index'], axis=1)

train_df_all_binary_balanced = pd.concat([train_df_background6, train_df_dos, train_df_scan11, train_df_scan44, train_df_botnet, train_df_spam, train_df_blacklist])
train_df_all_binary_balanced = shuffle(train_df_all_binary_balanced).reset_index().drop(['index'], axis=1)

train_df_without_blacklist = pd.concat([train_df_background1, train_df_dos, train_df_scan11, train_df_scan44, train_df_botnet, train_df_spam])
train_df_without_blacklist = shuffle(train_df_without_blacklist).reset_index().drop(['index'], axis=1)

train_df_without_spam = pd.concat([train_df_background1, train_df_dos, train_df_scan11, train_df_scan44, train_df_botnet, train_df_blacklist])
train_df_without_spam = shuffle(train_df_without_spam).reset_index().drop(['index'], axis=1)

train_df_without_botnet = pd.concat([train_df_background1, train_df_dos, train_df_scan11, train_df_scan44, train_df_spam, train_df_blacklist])
train_df_without_botnet = shuffle(train_df_without_botnet).reset_index().drop(['index'], axis=1)

train_df_without_scan44 = pd.concat([train_df_background1, train_df_dos, train_df_scan11, train_df_botnet, train_df_spam, train_df_blacklist])
train_df_without_scan44 = shuffle(train_df_without_scan44).reset_index().drop(['index'], axis=1)

train_df_without_scan11 = pd.concat([train_df_background1, train_df_dos, train_df_scan44, train_df_botnet, train_df_spam, train_df_blacklist])
train_df_without_scan11 = shuffle(train_df_without_scan11).reset_index().drop(['index'], axis=1)

train_df_without_dos = pd.concat([train_df_background1, train_df_scan11, train_df_scan44, train_df_botnet, train_df_spam, train_df_blacklist])
train_df_without_dos = shuffle(train_df_without_dos).reset_index().drop(['index'], axis=1)

In [None]:
#save generated balanced datasets
train_df_all.to_csv('model_input/train_df_all.csv')
train_df_all_binary_balanced.to_csv('model_input/train_df_all_binary_balanced.csv')
train_df_without_blacklist.to_csv('model_input/train_df_without_blacklist.csv')
train_df_without_spam.to_csv('model_input/train_df_without_spam.csv')
train_df_without_botnet.to_csv('model_input/train_df_without_botnet.csv')
train_df_without_scan44.to_csv('model_input/train_df_without_scan44.csv')
train_df_without_scan11.to_csv('model_input/train_df_without_scan11.csv')
train_df_without_dos.to_csv('model_input/train_df_without_dos.csv')

## Train Classifier

In [None]:
import pandas as pd
from ast import literal_eval
from sklearn.utils import shuffle
from sklearn.ensemble import RandomForestClassifier
from sklearn import metrics

In [None]:
binary = True
#use binary_balanced for binary classifier
if not binary:
  train_df_all = pd.read_csv('model_input/train_df_all.csv')
else:
  train_df_all = pd.read_csv('model_input/train_df_all_binary_balanced.csv')

In [None]:
#multiclass (each class 1000 samples), we can just not use some class (no need to rebalance)
if not binary:
  #selected_df = train_df_all
  selected_df = train_df_all[train_df_all['label'] != "blacklist"]
  #selected_df = train_df_all[(train_df_all['label'] != "blacklist") & (train_df_all['label'] != "dos")]
  #selected_df = train_df_all[(train_df_all['label'] != "blacklist") & (train_df_all['label'] != "anomaly-spam")]
  #selected_df = train_df_all[(train_df_all['label'] != "blacklist") & (train_df_all['label'] != "nerisbotnet")]
  #selected_df = train_df_all[(train_df_all['label'] != "blacklist") & (train_df_all['label'] != "scan11")]
  #selected_df = train_df_all[(train_df_all['label'] != "blacklist") & (train_df_all['label'] != "scan44")]
  #selected_df = train_df_all[(train_df_all['label'] != "blacklist") & (train_df_all['label'] != "scan11") & (train_df_all['label'] != "scan44")]
  selected_df.x = selected_df.x.apply(literal_eval).tolist()
  X = pd.DataFrame(selected_df['x'].to_list())
  y = selected_df.label
  y.value_counts()

In [None]:
#binary, balanced background vs attacks in ratio of 1:1, adjust number of samples for background according to how many attack classes are omitted (1000 for each)
if binary:
  selected_df_background = train_df_all[train_df_all['label'] == "background"].sample(n=5000, random_state=42, ignore_index=True)
  #selected_df_anomalies = train_df_all[(train_df_all['label'] != "background")]
  selected_df_anomalies = train_df_all[(train_df_all['label'] != "background") & (train_df_all['label'] != "blacklist")]
  #selected_df_anomalies = train_df_all[(train_df_all['label'] != "background") & (train_df_all['label'] != "blacklist") & (train_df_all['label'] != "dos")]
  #selected_df_anomalies = train_df_all[(train_df_all['label'] != "background") & (train_df_all['label'] != "blacklist") & (train_df_all['label'] != "anomaly-spam")]
  #selected_df_anomalies = train_df_all[(train_df_all['label'] != "background") & (train_df_all['label'] != "blacklist") & (train_df_all['label'] != "nerisbotnet")]
  #selected_df_anomalies = train_df_all[(train_df_all['label'] != "background") & (train_df_all['label'] != "blacklist") & (train_df_all['label'] != "scan11")]
  #selected_df_anomalies = train_df_all[(train_df_all['label'] != "background") & (train_df_all['label'] != "blacklist") & (train_df_all['label'] != "scan44")]
  #selected_df_anomalies = train_df_all[(train_df_all['label'] != "background") & (train_df_all['label'] != "blacklist") & (train_df_all['label'] != "scan11") & (train_df_all['label'] != "scan44")]
  selected_df = pd.concat([selected_df_background, selected_df_anomalies])
  selected_df = shuffle(selected_df).reset_index().drop(['index'], axis=1)
  selected_df.x = selected_df.x.apply(literal_eval).tolist()
  X = pd.DataFrame(selected_df['x'].to_list())
  y = selected_df.label
  print(y.value_counts())

  #change to train binary classifier
  y[y != 'background'] = '1'
  y[y == 'background'] = '0'
  print(y.value_counts())

In [None]:
%%time
#train on first 5000 samples and test on the rest, then retrain on the whole train dataset

RFC = RandomForestClassifier(random_state=42, n_jobs=-1)
RFC.fit(X[:5000], y[:5000])

preds = RFC.predict(X[5000:])
print(metrics.classification_report(y[5000:], preds, digits=4))

RFC.fit(X, y)
preds = RFC.predict(X)
print(metrics.classification_report(y, preds, digits=4))

classifier_model = RFC

In [None]:
%%time
#temporarily try another classifier
from sklearn.tree import DecisionTreeClassifier

DTC = DecisionTreeClassifier(random_state=42)
DTC.fit(X[:5000], y[:5000])
preds = DTC.predict(X[5000:])
print(metrics.classification_report(y[5000:], preds, digits=4))
#classifier_model = DTC

## Run Experiment
Generate prediction results for the classifier trained on the selected data

In [None]:
test_data = pd.read_feather('results_ad_test.feather.with_mse')
labels = test_data['labels']
mse = test_data['mse']

In [None]:
%%time
predictions = classifier_model.predict(test_data['features'].to_list())
results = pd.DataFrame(
    {
        'labels': labels,
        'mse': mse,
        'predictions': predictions
    }
)
#rename the resulting file according to experiment (which classes where used / not used for training, which data were used for testing)
results.to_feather('results_bb_without_blacklist_test.feather')

In [None]:
#binarize labels
results.labels[results.labels != 'background'] = '1'
results.labels[results.labels == 'background'] = '0'

In [None]:
#rough evaluation
print(metrics.classification_report(results.labels, results.predictions, digits=4))

In [None]:
len(test_data)/16.8