# Malware Detection in Network Traffic

## Random Forest Classification

Run the quickstart_ml_eda.ipynb to obtain the preprocessed data for this analysis.

This tutorial is based on the following work:

https://github.com/aruberts/tutorials/blob/main/pyspark/spark_feature_engineering.ipynb

https://www.youtube.com/watch?v=TlXqsL4ysB0&t=1322s

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install pyspark==3.5.1

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

In [None]:
spark = (
    SparkSession.builder.appName("iot")
    .getOrCreate()
)

## Load preprocessed Parquet data

Load data preprocessed in the previous tutorial and while loading create new variable "is_bad" with the numerical value 1 for malicious traffic and 0 for benign.

In [None]:
df_fe = spark.read.parquet("/content/drive/MyDrive/NetworkMalwareData/processed.pq").withColumn(
    "is_bad", F.when(F.col("label") != "Benign", 1).otherwise(0)
)
df_fe.show(5)

## Preprocessing

In [None]:
numerical_features = [
    "duration",
    "orig_bytes",
    "resp_bytes",
    "orig_pkts",
    "orig_ip_bytes",
    "resp_pkts",
    "resp_ip_bytes",
]
categorical_features = ["proto", "service", "conn_state", "history"]
categorical_features_indexed = [c + "_index" for c in categorical_features]

input_features = numerical_features + categorical_features_indexed

### Remove rare categories

In [None]:
df_fe.select([F.count_distinct(c) for c in categorical_features]).show()

In [None]:
categorical_valid_values = {}

for c in categorical_features:
    # Find frequent values
    categorical_valid_values[c] = (
        df_fe.groupby(c)
        .count()
        .filter(F.col("count") > 100)
        .select(c)
        .toPandas()
        .values.ravel()
    )

    df_fe = df_fe.withColumn(
        c,
        F.when(F.col(c).isin(list(categorical_valid_values[c])), F.col(c)).otherwise(
            F.lit("Other").alias(c)
        ),
    )

In [None]:
df_fe.select([F.count_distinct(c) for c in categorical_features]).show()

## Train/Test Split
Train test split will need to be done using the source IP address, otherwise we risk leaking data. The best way to do this is by splitting the IP addresses at random, and then filtering the data frame according to the IP address.

In [None]:
df_fe.groupby("source_ip").agg(F.sum(F.col("is_bad")).alias("bad_sum")).orderBy("bad_sum", ascending=False).show(5)

In [None]:
# Training non-malicious IPs (80%)
train_ips = (
    df_fe.where(
        ~F.col("source_ip").isin(["192.168.100.103", "192.168.2.5", "192.168.2.1"])
    )
    .select(F.col("source_ip"), F.lit(1).alias("is_train"))
    .dropDuplicates()
    .sample(0.8)
)


df_fe = df_fe.join(train_ips, "source_ip", "left")

# Add 1 malicious IP to training and testing data
df_train = df_fe.where((F.col("is_train") == 1) | (F.col("source_ip") == "192.168.100.103"))
df_test = df_fe.where((F.col("is_train") != 1) | (F.col("source_ip") == "192.168.2.5"))

## Pipeline

In [None]:
ind = StringIndexer(inputCols=categorical_features, outputCols=categorical_features_indexed, handleInvalid='skip')
va = VectorAssembler(inputCols=input_features, outputCol="features", handleInvalid='skip' )
rf = RandomForestClassifier(featuresCol="features", labelCol="is_bad", numTrees=100)

pipeline = Pipeline(stages=[ind, va, rf])

## Fit and Predict

In [None]:
pipeline = pipeline.fit(df_train)
test_preds = pipeline.transform(df_test)

## Evaluate

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

roc = BinaryClassificationEvaluator(labelCol="is_bad", metricName="areaUnderROC")
print("ROC AUC", roc.evaluate(test_preds))

pr = BinaryClassificationEvaluator(labelCol="is_bad", metricName="areaUnderPR")
print("PR AUC", pr.evaluate(test_preds))

In [None]:
import pandas as pd

pd.DataFrame(
    {
        "importance": list(pipeline.stages[-1].featureImportances),
        "feature": pipeline.stages[-2].getInputCols(),
    }
).sort_values("importance", ascending=False)

## Export

In [None]:
pipeline.stages[-1].write().overwrite().save("rf_basic")

In [None]:
pipeline.write().overwrite().save("pipeline_basic")