In [1]:
pip install -q findspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
import findspark
findspark.init('/home/bigdata/Documents/spark-3.0.0')

In [3]:
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import ByteType
from pyspark.sql.types import FloatType
from pyspark.sql.types import BinaryType
from pyspark.sql.types import DataType
from pyspark.sql.types import StructField
import pandas as pd
import numpy as np
from pyspark.sql.session import SparkSession

In [4]:
spark = SparkSession.builder.getOrCreate()

In [5]:
schema = StructType([ StructField("srcip", StringType(), True),   
    StructField("sport", IntegerType(), True),  
    StructField("dstip", StringType(), True),   
    StructField("dsport", IntegerType(), True), 
    StructField("proto", StringType(), True),  
    StructField("state", StringType(), True),
    StructField("dur", FloatType(), True),       
    StructField("sbytes", ByteType(), True),    
    StructField("dbytes", ByteType(), True),    
    StructField("sttl", IntegerType(), True), 
    StructField("dttl", IntegerType(), True),   
    StructField("sloss", IntegerType(), True),
    StructField("dloss", IntegerType(), True),
    StructField("service", StringType(), True),    
    StructField("Sload", FloatType(), True),  
    StructField("Dload", FloatType(), True), 
    StructField("Spkts", IntegerType(), True),    
    StructField("Dpkts", IntegerType(), True),    
    StructField("swin", IntegerType(), True),
    StructField("dwin", IntegerType(), True), 
    StructField("stcpb", IntegerType(), True),  
    StructField("dtcpb", IntegerType(), True),    
    StructField("smeansz", IntegerType(), True),   
    StructField("dmeansz", IntegerType(), True),  
    StructField("trans_depth", IntegerType(), True),  
    StructField("res_bdy_len", IntegerType(), True),    
    StructField("Sjit", FloatType(), True),    
    StructField("Djit", FloatType(), True),     
    StructField("Stime", IntegerType(), True), 
    StructField("Ltime", IntegerType(), True),   
    StructField("Sintpkt", FloatType(), True),
    StructField("Dintpkt", FloatType(), True),    
    StructField("tcprtt", FloatType(), True),  
    StructField("synack", FloatType(), True),    
    StructField("ackdat", FloatType(), True),  
    StructField("is_sm_ips_ports", IntegerType(), True),    
    StructField("ct_state_ttl", IntegerType(), True), 
    StructField("ct_flw_http_mthd", IntegerType(), True), 
    StructField("is_ftp_login", IntegerType(), True),  
    StructField("ct_ftp_cmd", IntegerType(), True),
    StructField("ct_srv_src", IntegerType(), True),    
    StructField ("ct_srv_dst", IntegerType(), True),  
    StructField("ct_dst_ltm", IntegerType(), True),   
    StructField("ct_src_ltm", IntegerType(), True),    
    StructField("ct_src_dport_ltm", IntegerType(), True),    
    StructField("ct_dst_sport_ltm", IntegerType(), True),  
    StructField("ct_dst_src_ltm", IntegerType(), True),  
    StructField("attack_cat", StringType(), True),    
    StructField("Label", IntegerType(), True) 
    
])

In [6]:
df_schema = spark.read.format("csv").option("header", "True").schema(schema).load("/home/bigdata/UNSW-NB15.csv")

In [7]:
df_schema.show()

+----------+-----+-------------+------+-----+-----+--------+------+------+----+----+-----+-----+--------+---------+---------+-----+-----+----+----+----------+----------+-------+-------+-----------+-----------+---------+---------+----------+----------+---------+---------+-------+-------+-------+---------------+------------+----------------+------------+----------+----------+----------+----------+----------+----------------+----------------+--------------+----------+-----+
|     srcip|sport|        dstip|dsport|proto|state|     dur|sbytes|dbytes|sttl|dttl|sloss|dloss| service|    Sload|    Dload|Spkts|Dpkts|swin|dwin|     stcpb|     dtcpb|smeansz|dmeansz|trans_depth|res_bdy_len|     Sjit|     Djit|     Stime|     Ltime|  Sintpkt|  Dintpkt| tcprtt| synack| ackdat|is_sm_ips_ports|ct_state_ttl|ct_flw_http_mthd|is_ftp_login|ct_ftp_cmd|ct_srv_src|ct_srv_dst|ct_dst_ltm|ct_src_ltm|ct_src_dport_ltm|ct_dst_sport_ltm|ct_dst_src_ltm|attack_cat|Label|
+----------+-----+-------------+------+-----+---

In [8]:
df_schema = df_schema.drop("srcip", "sport", "dstip", "dsport", "stime", "ltime", "Label")

In [9]:
from pyspark.sql.functions import col, Column, lit,when, regexp_replace

In [10]:
df_schema = df_schema.withColumn('attack_cat', regexp_replace(col('attack_cat'), " ", ""))

In [11]:
df_schema= df_schema.withColumn('attack_cat', regexp_replace(col('attack_cat'), "Backdoors", "Backdoor"))

In [12]:
df_schema.select('attack_cat').distinct().show()

+--------------+
|    attack_cat|
+--------------+
|         Worms|
|     Shellcode|
|          null|
|       Fuzzers|
|      Analysis|
|           DoS|
|Reconnaissance|
|      Backdoor|
|      Exploits|
|       Generic|
+--------------+



In [13]:
df_schemas = df_schema.withColumn('attack_cat', regexp_replace(col('attack_cat'), "Backdoor", "Backdoors"))

In [14]:
df_schemas.select('attack_cat').distinct().show()

+--------------+
|    attack_cat|
+--------------+
|         Worms|
|     Shellcode|
|          null|
|     Backdoors|
|       Fuzzers|
|      Analysis|
|           DoS|
|Reconnaissance|
|      Exploits|
|       Generic|
+--------------+



In [15]:
df_schemas = df_schema.withColumn('attack_cat', regexp_replace(col('attack_cat'), " " , "normal"))

In [16]:
df_schemas.select('attack_cat').distinct().show()

+--------------+
|    attack_cat|
+--------------+
|         Worms|
|     Shellcode|
|          null|
|       Fuzzers|
|      Analysis|
|           DoS|
|Reconnaissance|
|      Backdoor|
|      Exploits|
|       Generic|
+--------------+



In [17]:
df_schem = df_schemas.fillna({"attack_cat": 'Normal'})

In [18]:
df_schem.select('attack_cat').distinct().show()

+--------------+
|    attack_cat|
+--------------+
|         Worms|
|     Shellcode|
|       Fuzzers|
|      Analysis|
|           DoS|
|Reconnaissance|
|      Backdoor|
|      Exploits|
|        Normal|
|       Generic|
+--------------+



In [19]:
final_data=df_schem.na.drop()

In [20]:
final_data.show()

+-----+-----+---------+------+------+----+----+-----+-----+-------+--------+---------+-----+-----+----+----+-----+-----+-------+-------+-----------+-----------+--------+--------+---------+---------+------+------+------+---------------+------------+----------------+------------+----------+----------+----------+----------+----------+----------------+----------------+--------------+----------+
|proto|state|      dur|sbytes|dbytes|sttl|dttl|sloss|dloss|service|   Sload|    Dload|Spkts|Dpkts|swin|dwin|stcpb|dtcpb|smeansz|dmeansz|trans_depth|res_bdy_len|    Sjit|    Djit|  Sintpkt|  Dintpkt|tcprtt|synack|ackdat|is_sm_ips_ports|ct_state_ttl|ct_flw_http_mthd|is_ftp_login|ct_ftp_cmd|ct_srv_src|ct_srv_dst|ct_dst_ltm|ct_src_ltm|ct_src_dport_ltm|ct_dst_sport_ltm|ct_dst_src_ltm|attack_cat|
+-----+-----+---------+------+------+----+----+-----+-----+-------+--------+---------+-----+-----+----+----+-----+-----+-------+-------+-----------+-----------+--------+--------+---------+---------+------+------+

In [21]:
final_data.summary().select("dur", "dbytes", "sttl", "dttl", "ct_dst_src_ltm").show()

+-----------------+-----------------+------------------+--------------------+------------------+
|              dur|           dbytes|              sttl|                dttl|    ct_dst_src_ltm|
+-----------------+-----------------+------------------+--------------------+------------------+
|           225994|           225994|            225994|              225994|            225994|
|0.407434248147434|1.112348115436693|245.32343779038382|0.009230333548678284|26.790277617989858|
|4.150830810875402|  9.9111587016314| 46.05972276367919|   0.952028783949515|12.612631599831502|
|              0.0|                0|                 0|                   0|                 1|
|           4.0E-6|                0|               254|                   0|                18|
|           8.0E-6|                0|               254|                   0|                28|
|           9.0E-6|                0|               254|                   0|                36|
|         59.99999|           

In [22]:
for t in df_schemas.dtypes:
    if t[1] =='string':
        print("column name", t[0])
        print(final_data.select(t[0]).distinct().show())

column name proto
+------+
| proto|
+------+
|  ospf|
|   arp|
|sun-nd|
|mobile|
|  sctp|
|   tcp|
|  ipv6|
|   sep|
| swipe|
|  igmp|
|   udp|
|  icmp|
+------+

None
column name state
+-----+
|state|
+-----+
|  URN|
|  ACC|
|  ECO|
|  CLO|
|  PAR|
|  REQ|
|  INT|
|  FIN|
|  MAS|
|   no|
|  CON|
|  TST|
+-----+

None
column name service
+--------+
| service|
+--------+
|ftp-data|
|     dns|
|  radius|
|       -|
|    snmp|
+--------+

None
column name attack_cat
+--------------+
|    attack_cat|
+--------------+
|         Worms|
|     Shellcode|
|       Fuzzers|
|      Analysis|
|           DoS|
|Reconnaissance|
|      Backdoor|
|      Exploits|
|        Normal|
|       Generic|
+--------------+

None


In [23]:
cols =final_data.columns

In [24]:
from pyspark.ml.feature import(VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer)
from pyspark.ml import Pipeline

In [25]:
trafficColumns = ['proto', 'state', 'service', 'attack_cat']
stages = []
indexers=[StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"_index").fit(final_data) for categoricalCol in trafficColumns]

#display(indexers)
pipeline = Pipeline(stages=indexers)
df_schemas_r = pipeline.fit(df_schemas).transform(final_data)

In [26]:
final_data.show()

+-----+-----+---------+------+------+----+----+-----+-----+-------+--------+---------+-----+-----+----+----+-----+-----+-------+-------+-----------+-----------+--------+--------+---------+---------+------+------+------+---------------+------------+----------------+------------+----------+----------+----------+----------+----------+----------------+----------------+--------------+----------+
|proto|state|      dur|sbytes|dbytes|sttl|dttl|sloss|dloss|service|   Sload|    Dload|Spkts|Dpkts|swin|dwin|stcpb|dtcpb|smeansz|dmeansz|trans_depth|res_bdy_len|    Sjit|    Djit|  Sintpkt|  Dintpkt|tcprtt|synack|ackdat|is_sm_ips_ports|ct_state_ttl|ct_flw_http_mthd|is_ftp_login|ct_ftp_cmd|ct_srv_src|ct_srv_dst|ct_dst_ltm|ct_src_ltm|ct_src_dport_ltm|ct_dst_sport_ltm|ct_dst_src_ltm|attack_cat|
+-----+-----+---------+------+------+----+----+-----+-----+-------+--------+---------+-----+-----+----+----+-----+-----+-------+-------+-----------+-----------+--------+--------+---------+---------+------+------+

In [27]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
trafficColumns = ['proto', 'state', 'service']
stages = []
for categoricalCol in trafficColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
    label_stringIdx = StringIndexer(inputCol = 'attack_cat', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['sbytes', 'dbytes', 'sttl', 'dttl', 'Spkts', 'Dpkts', 'swin', 'dwin', 'stcpb', 'dtcpb', 'smeansz', 'dmeansz',
'Spkts','is_ftp_login', 'ct_dst_src_ltm']
assemblerInputs = [c + "classVec" for c in trafficColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
# pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(final_data)
df = pipelineModel.transform(final_data)
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
df.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- proto: string (nullable = true)
 |-- state: string (nullable = true)
 |-- dur: float (nullable = true)
 |-- sbytes: byte (nullable = true)
 |-- dbytes: byte (nullable = true)
 |-- sttl: integer (nullable = true)
 |-- dttl: integer (nullable = true)
 |-- sloss: integer (nullable = true)
 |-- dloss: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- Sload: float (nullable = true)
 |-- Dload: float (nullable = true)
 |-- Spkts: integer (nullable = true)
 |-- Dpkts: integer (nullable = true)
 |-- swin: integer (nullable = true)
 |-- dwin: integer (nullable = true)
 |-- stcpb: integer (nullable = true)
 |-- dtcpb: integer (nullable = true)
 |-- smeansz: integer (nullable = true)
 |-- dmeansz: integer (nullable = true)
 |-- trans_depth: integer (nullable = true)
 |-- res_bdy_len: integer (nullable = true)
 |-- Sjit: float (nullable = true)
 |-- Djit: float (nullable = true)
 |-- Sintpkt

In [28]:
import pandas as pd

In [29]:
pd.DataFrame(df.take(5), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
label,1,1,1,1,1
features,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
proto,arp,arp,arp,arp,arp
state,CON,CON,CON,CON,INT
dur,34.7197,34.7197,0,0,0
sbytes,56,56,56,56,46
dbytes,92,92,92,92,0
sttl,0,0,0,0,0
dttl,0,0,0,0,0
sloss,0,0,0,0,0


In [30]:
train, test = df.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 158462
Test Dataset Count: 67532


In [31]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train)
predictions = dtModel.transform(test)
predictions.select('proto', 'dur', 'attack_cat','label', 'rawPrediction', 'prediction', 'probability').show(10)

+-----+------+----------+-----+--------------------+----------+--------------------+
|proto|   dur|attack_cat|label|       rawPrediction|prediction|         probability|
+-----+------+----------+-----+--------------------+----------+--------------------+
|  udp|1.0E-6|   Generic|  0.0|[147060.0,0.0,0.0...|       0.0|[1.0,0.0,0.0,0.0,...|
|  udp|8.0E-6|   Generic|  0.0|[147060.0,0.0,0.0...|       0.0|[1.0,0.0,0.0,0.0,...|
|  udp|9.0E-6|   Generic|  0.0|[147060.0,0.0,0.0...|       0.0|[1.0,0.0,0.0,0.0,...|
|  udp|9.0E-6|   Generic|  0.0|[147060.0,0.0,0.0...|       0.0|[1.0,0.0,0.0,0.0,...|
|  udp|9.0E-6|   Generic|  0.0|[147060.0,0.0,0.0...|       0.0|[1.0,0.0,0.0,0.0,...|
|  udp|1.0E-6|   Generic|  0.0|[147060.0,0.0,0.0...|       0.0|[1.0,0.0,0.0,0.0,...|
|  udp|3.0E-6|   Generic|  0.0|[147060.0,0.0,0.0...|       0.0|[1.0,0.0,0.0,0.0,...|
|  udp|4.0E-6|   Generic|  0.0|[147060.0,0.0,0.0...|       0.0|[1.0,0.0,0.0,0.0,...|
|  udp|6.0E-6|   Generic|  0.0|[147060.0,0.0,0.0...|       0.0|[1

In [33]:
print(dtModel.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_6661caf59a56, depth=3, numNodes=13, numClasses=10, numFeatures=41
  If (feature 26 <= 113.0)
   If (feature 36 <= 48.5)
    Predict: 1.0
   Else (feature 36 > 48.5)
    If (feature 0 in {0.0})
     Predict: 3.0
    Else (feature 0 not in {0.0})
     Predict: 1.0
  Else (feature 26 > 113.0)
   If (feature 26 <= 115.0)
    If (feature 22 in {0.0})
     Predict: 7.0
    Else (feature 22 not in {0.0})
     Predict: 0.0
   Else (feature 26 > 115.0)
    If (feature 23 in {0.0})
     Predict: 5.0
    Else (feature 23 not in {0.0})
     Predict: 1.0



In [34]:
#Evaluate our Decision Tree model.
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator()
evaluator.evaluate(predictions)

0.9797349012423847

In [35]:
#RandomForest
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)
predictionss = rfModel.transform(test)
predictionss.select('proto', 'dur', 'attack_cat','label', 'rawPrediction', 'prediction', 'probability').show(10)
evaluator = MulticlassClassificationEvaluator()
accuracy=evaluator.evaluate(predictionss)
accuracy

+-----+------+----------+-----+--------------------+----------+--------------------+
|proto|   dur|attack_cat|label|       rawPrediction|prediction|         probability|
+-----+------+----------+-----+--------------------+----------+--------------------+
|  udp|1.0E-6|   Generic|  0.0|[19.7141368306387...|       0.0|[0.98570684153193...|
|  udp|8.0E-6|   Generic|  0.0|[19.7141368306387...|       0.0|[0.98570684153193...|
|  udp|9.0E-6|   Generic|  0.0|[19.7141368306387...|       0.0|[0.98570684153193...|
|  udp|9.0E-6|   Generic|  0.0|[19.7141368306387...|       0.0|[0.98570684153193...|
|  udp|9.0E-6|   Generic|  0.0|[19.7141368306387...|       0.0|[0.98570684153193...|
|  udp|1.0E-6|   Generic|  0.0|[19.8879210045130...|       0.0|[0.99439605022565...|
|  udp|3.0E-6|   Generic|  0.0|[19.8879210045130...|       0.0|[0.99439605022565...|
|  udp|4.0E-6|   Generic|  0.0|[19.8879210045130...|       0.0|[0.99439605022565...|
|  udp|6.0E-6|   Generic|  0.0|[19.8879210045130...|       0.0|[0

0.9786035525184851