# Detecting anomalies in network trafic

Based on the implementation in **Advanced Analytics with Spark** (by _Uri Laserson, Sandy Ryza, Sean Owen, Josh Wills_)

**Initilize `spark` session** 

In [1]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession
            .builder()
            .appName("Prediction forest cover")
            .master("local[*]")
            .getOrCreate()

Intitializing Scala interpreter ...

Spark Web UI available at http://DESKTOP-JMD9350:4041
SparkContext available as 'sc' (version = 2.4.0, master = local[*], app id = local-1552152822798)
SparkSession available as 'spark'


2019-03-09 18:33:36 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-03-09 18:33:42 WARN  Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
2019-03-09 18:33:54 WARN  SparkSession$Builder:66 - Using an existing SparkSession; some configuration may not take effect.


import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@36a1fd5e


**load data**

In [2]:
val data = spark.read.format("csv").load("../../kddcup.data.corrected")
data.printSchema

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: string (nullable = true)
 |-- _c21: string (nullable = true)
 |-- _c22: string (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: string (nullable = true)
 |-- _c25: string (nullable = true)
 |-- _c26: string (nullable = true)
 |-- _c27: string (nullable = tru

data: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 40 more fields]


In [6]:
data.show(5)

+---+---+----+---+---+-----+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-------+
|_c0|_c1| _c2|_c3|_c4|  _c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|_c25|_c26|_c27|_c28|_c29|_c30|_c31|_c32|_c33|_c34|_c35|_c36|_c37|_c38|_c39|_c40|   _c41|
+---+---+----+---+---+-----+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-------+
|  0|tcp|http| SF|215|45076|  0|  0|  0|  0|   0|   1|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   1|   1|0.00|0.00|0.00|0.00|1.00|0.00|0.00|   0|   0|0.00|0.00|0.00|0.00|0.00|0.00|0.00|0.00|normal.|
|  0|tcp|http| SF|162| 4528|  0|  0|  0|  0|   0|   1|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   2|   2|0.00|0.00|0.00|0.00|1.00|0.00|0.00|   1|   1|1.00|0.00

**Look at labels**

In [18]:
data.select("_c41")
    .rdd
    .countByValue()
    .toSeq
    .sortBy(_._2)
    .reverse
    .foreach(println)

([smurf.],2807886)
([neptune.],1072017)
([normal.],972781)
([satan.],15892)
([ipsweep.],12481)
([portsweep.],10413)
([nmap.],2316)
([back.],2203)
([warezclient.],1020)
([teardrop.],979)
([pod.],264)
([guess_passwd.],53)
([buffer_overflow.],30)
([land.],21)
([warezmaster.],20)
([imap.],12)
([rootkit.],10)
([loadmodule.],9)
([ftp_write.],8)
([multihop.],7)
([phf.],4)
([perl.],3)
([spy.],2)


In [33]:
import org.apache.spark.mllib.linalg._

import org.apache.spark.mllib.linalg._


Omit not numeric values

In [37]:
val rawData = spark.sparkContext.textFile("../../kddcup.data.corrected")
spark.sparkContext.setLogLevel("ERROR")

rawData: org.apache.spark.rdd.RDD[String] = ../../kddcup.data.corrected MapPartitionsRDD[81] at textFile at <console>:34


In [38]:
val labelAndData = rawData.map { line =>
    val buffer = line.split(",").toBuffer
    buffer.remove(1, 3)
    val label = buffer.remove(buffer.length-1)
    val vector = Vectors.dense(buffer.map(_.toDouble).toArray)
    (label, vector)
}

val newData = labelAndData.values.cache()

labelAndData: org.apache.spark.rdd.RDD[(String, org.apache.spark.mllib.linalg.Vector)] = MapPartitionsRDD[82] at map at <console>:36
newData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[83] at values at <console>:44


In [39]:
import org.apache.spark.mllib.clustering.KMeans

val kmeans = new KMeans()
val model = kmeans.run(newData)

model.clusterCenters.foreach(println)

[48.34019491959669,1834.6215497618625,826.2031900016945,5.7161172049003456E-6,6.487793027561892E-4,7.961734678254053E-6,0.012437658596734055,3.205108575604837E-5,0.14352904910348827,0.00808830584493399,6.818511237273984E-5,3.6746467745787934E-5,0.012934960793560386,0.0011887482315762398,7.430952366370449E-5,0.0010211435092468404,0.0,4.082940860643104E-7,8.351655530445469E-4,334.9735084506668,295.26714620807076,0.17797031701994304,0.17803698940272675,0.05766489875327384,0.05772990937912762,0.7898841322627527,0.021179610609915762,0.02826081009629794,232.98107822302248,189.21428335201279,0.753713389800417,0.030710978823818437,0.6050519309247937,0.006464107887632785,0.1780911843182427,0.17788589813471198,0.05792761150001037,0.05765922142400437]
[10999.0,0.0,1.309937401E9,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,1.0,1.0,1.0,0.0,0.0,255.0,1.0,0.0,0.65,1.0,0.0,0.0,0.0,1.0,1.0]


import org.apache.spark.mllib.clustering.KMeans
kmeans: org.apache.spark.mllib.clustering.KMeans = org.apache.spark.mllib.clustering.KMeans@26498792
model: org.apache.spark.mllib.clustering.KMeansModel = org.apache.spark.mllib.clustering.KMeansModel@42dd4ff5
