In [1]:
import pyspark
import numpy as np
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit, rand, monotonically_increasing_id, row_number
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.window import Window

In [2]:
from pyspark.sql.types import DoubleType
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
sc = spark.sparkContext

In [3]:
mirai_devices = ['danmini_doorbell', 'ecobee_thermostat', 'philips_B120N10_baby_monitor', 
            'provision_PT_737E_security_camera', 'provision_PT_838_security_camera',
            'simplehome_XCS_1002_WHT_security_camera', 'simplehome_XCS_1003_WHT_security_camera']

benign = ['danmini_doorbell', 'ecobee_thermostat', 'ennio_doorbell', 'philips_B120N10_baby_monitor', 
            'provision_PT_737E_security_camera', 'provision_PT_838_security_camera', 'samsung_SNH_1011_N_webcam',
            'simplehome_XCS_1002_WHT_security_camera', 'simplehome_XCS_1003_WHT_security_camera']

mirai_attacks = ['ack', # automatic scan for vulnerable devices
                 'scan', # ack flood
                 'syn', # syn flood
                 'udp', # udp flood
                 'udpplain'] # optimized udp flood

In [4]:
first = True
mirai_data = None
for device in mirai_devices:
    for attack in mirai_attacks:
        if first:
            mirai_data = spark.read.option("inferSchema",True)\
                .option("header", True).csv(f'../data/n_balo_t/{device}/mirai_attacks/{attack}.csv')
            first = False
        else:
            to_add = spark.read.option("inferSchema",True)\
                .option("header", True).csv(f'../data/n_balo_t/{device}/mirai_attacks/{attack}.csv')
            mirai_data = mirai_data.union(to_add)
mirai_data = mirai_data.withColumn('label', lit(0))
malign_total = mirai_data.count()

first = True
benign_data = None
for device in benign:
    if first:
        benign_data = spark.read.option("inferSchema",True)\
            .option("header", True).csv(f'../data/n_balo_t/{device}/benign_traffic.csv')
        first = False
    else:
        to_add = spark.read.option("inferSchema",True)\
            .option("header", True).csv(f'../data/n_balo_t/{device}/benign_traffic.csv')
        benign_data = benign_data.union(to_add)
benign_data = benign_data.withColumn('label', lit(1))
benign_total = benign_data.count()
ratio = benign_total/malign_total
print(f'total malign data = {malign_total}')
print(f'total benign data = {benign_total}')
mirai_data = mirai_data.sample(False, fraction=ratio)
mirai_data = mirai_data.union(benign_data)
mirai_data = mirai_data.withColumn('label', mirai_data['label'].cast(DoubleType()))
print(f'total data for prediciton modeling = {mirai_data.count()}')

total malign data = 3668402
total benign data = 555932
total data for prediciton modeling = 1111565


In [5]:
cols = mirai_data.columns
new_cols = [str(i) for i in range(len(cols))]
mirai_data = mirai_data.toDF(*new_cols)
mirai_data = mirai_data.withColumnRenamed('115', 'label')

In [6]:
benign = mirai_data.filter(mirai_data['label']==1.0).count()
malicious = mirai_data.filter(mirai_data['label']==0.0).count()
print(benign, malicious)

555932 555633


In [7]:
def grid_rf(data, kfolds, threads, numTreeParams=None, maxDepthParams=None):
    if numTreeParams is None:
        numTreeParams = [int(x) for x in np.linspace(start = 5, stop = 25, num = 3)]
    if maxDepthParams is None:
        maxDepthParams = [int(x) for x in np.linspace(start = 5, stop = 25, num = 3)]
        
    feature_list = []
    for col in data.columns:
        if col == 'label':
            continue
        else:
            feature_list.append(col)
            
    # set up feature and labels as input and output
    asmblr = VectorAssembler(inputCols=feature_list, outputCol="features")
    
    # initialize classifier
    rf = RandomForestClassifier(labelCol="label", featuresCol="features")
    
    # pipeline the assembler and the random forest for cross validation
    pipe = Pipeline(stages=[asmblr, rf])
    
    # grid search to optimize tree depth as well a number of trees
    grid = ParamGridBuilder().addGrid(rf.numTrees, numTreeParams).addGrid(rf.maxDepth, maxDepthParams).build()
    
    evaluator = MulticlassClassificationEvaluator()
    
    cv = CrossValidator(estimator=pipe, 
                        estimatorParamMaps=grid, 
                        evaluator=evaluator, 
                        numFolds=kfolds, 
                        parallelism=threads)
        
    cvModel = cv.fit(data)
    print(cvModel)
    return cvModel
    

In [8]:
def reg_rf(data, trees, maxD):
    feature_list = []
    for col in data.columns:
        if col == 'label':
            continue
        else:
            feature_list.append(col)
            
    # set up feature and labels as input and output
    asmblr = VectorAssembler(inputCols=feature_list, outputCol="features")
    rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=trees, maxDepth=maxD)
    
    # pipeline the assembler and the random forest for cross validation
    pipe = Pipeline(stages=[asmblr, rf])
    (trainingData, testData) = data.randomSplit([0.7, 0.3])
    model = pipe.fit(trainingData)
    preds = model.transform(testData)
    return preds

In [9]:
def printout(m):
    print(f'Accuracy: {m.accuracy}')
    print(f'F1: {m.fMeasure()}')
    print(f'False Positive Rate for Benign: {m.falsePositiveRate(1.0)}')
    print(f'True Positive Rate for Benign: {m.truePositiveRate(1.0)}')
    print(f'False Positive Rate for Malicious: {m.falsePositiveRate(0.0)}')
    print(f'True Positive Rate for Malicious: {m.truePositiveRate(0.0)}')
    print(f'Precision: {m.precision()}')
    print(f'Recall: {m.recall()}')

In [12]:
big_sample, small_sample = mirai_data.randomSplit([0.85, 0.15])
small_sample = small_sample.withColumn('order', rand(seed=123)).orderBy('order').drop('order')
small_sample.select(['label']).show()

+-----+
|label|
+-----+
|  1.0|
|  1.0|
|  1.0|
|  1.0|
|  1.0|
|  1.0|
|  0.0|
|  1.0|
|  1.0|
|  0.0|
|  0.0|
|  0.0|
|  1.0|
|  1.0|
|  1.0|
|  0.0|
|  0.0|
|  0.0|
|  1.0|
|  0.0|
+-----+
only showing top 20 rows



In [13]:
predictions = reg_rf(small_sample, 5, 10)
predictions_rdd = predictions.select(['label', 'prediction']).rdd.map(tuple)
metrics = MulticlassMetrics(predictions_rdd)
printout(metrics)


Accuracy: 0.9999596513879923
F1: 0.9999596513879923
False Positive Rate for Benign: 0.0
True Positive Rate for Benign: 0.9999192669438501
False Positive Rate for Malicious: 8.073305614984056e-05
True Positive Rate for Malicious: 1.0
Precision: 0.9999596513879923
Recall: 0.9999596513879923
