In [1]:
val sqlC = new org.apache.spark.sql.SQLContext(sc)
import sqlC.implicits._

val data = spark.read.
        option("inferSchema",true).
        option("header",false).
        csv("Data/kddcup.data_10_percent_corrected").
        toDF(
        "duration","protocol_type","service","flag",
        "src_bytes","dst_bytes","land","wrong_fragment","urgent",
        "hot","num_failed_logins","logged_in","num_compromised",
        "root_shell","su_attempted","num_root","num_file_creations",
        "num_shells","num_access_files","num_outbound_cmds",
        "is_host_login","is_guest_login","count","srv_count",
        "serror_rate","srv_serror_rate","rerror_rate","srv_rerror_rate",
        "same_srv_rate","diff_srv_rate","srv_diff_host_rate",
        "dst_host_count","dst_host_srv_count",
        "dst_host_same_srv_rate","dst_host_diff_srv_rate",
        "dst_host_same_src_port_rate","dst_host_srv_diff_host_rate",
        "dst_host_serror_rate","dst_host_srv_serror_rate",
        "dst_host_rerror_rate","dst_host_srv_rerror_rate",
        "label")

Intitializing Scala interpreter ...

Spark Web UI available at http://localhost:4040
SparkContext available as 'sc' (version = 2.4.6, master = local[*], app id = local-1602442574241)
SparkSession available as 'spark'


sqlC: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@47b4302c
import sqlC.implicits._
data: org.apache.spark.sql.DataFrame = [duration: int, protocol_type: string ... 40 more fields]


In [2]:
data.cache()

res0: data.type = [duration: int, protocol_type: string ... 40 more fields]


In [3]:
data.select("label").groupBy("label").count().orderBy($"count".desc).show(25)

+----------------+------+
|           label| count|
+----------------+------+
|          smurf.|280790|
|        neptune.|107201|
|         normal.| 97278|
|           back.|  2203|
|          satan.|  1589|
|        ipsweep.|  1247|
|      portsweep.|  1040|
|    warezclient.|  1020|
|       teardrop.|   979|
|            pod.|   264|
|           nmap.|   231|
|   guess_passwd.|    53|
|buffer_overflow.|    30|
|           land.|    21|
|    warezmaster.|    20|
|           imap.|    12|
|        rootkit.|    10|
|     loadmodule.|     9|
|      ftp_write.|     8|
|       multihop.|     7|
|            phf.|     4|
|           perl.|     3|
|            spy.|     2|
+----------------+------+



In [4]:
val numericOnly = data.drop("protocol_type","service","flag").cache()

numericOnly: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [duration: int, src_bytes: int ... 37 more fields]


In [5]:
import org.apache.spark.ml.{PipelineModel, Pipeline}
import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.feature.{OneHotEncoder, VectorAssembler, StringIndexer, StandardScaler}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.util.Random

import org.apache.spark.ml.{PipelineModel, Pipeline}
import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.feature.{OneHotEncoder, VectorAssembler, StringIndexer, StandardScaler}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.util.Random


In [6]:
val assembler = new VectorAssembler().
    setInputCols(numericOnly.columns.filter(_ !="label")).
    setOutputCol("featureVector")

val kmeans = new KMeans().
    setSeed(Random.nextLong()).
    setPredictionCol("cluster").
    setFeaturesCol("featureVector")

val pipeline = new Pipeline().setStages(Array(assembler, kmeans))
val pipelineModel = pipeline.fit(numericOnly)
val kmeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]

assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_8ab1076fa390
kmeans: org.apache.spark.ml.clustering.KMeans = kmeans_e996e7c986bb
pipeline: org.apache.spark.ml.Pipeline = pipeline_a9efab245575
pipelineModel: org.apache.spark.ml.PipelineModel = pipeline_a9efab245575
kmeansModel: org.apache.spark.ml.clustering.KMeansModel = kmeans_e996e7c986bb


In [7]:
kmeansModel.clusterCenters.foreach(println)

[47.979395571029514,1622.078830816566,868.5341828266062,4.453261001578883E-5,0.006432937937735314,1.4169466823205539E-5,0.03451682118132869,1.5181571596291647E-4,0.14824703453301485,0.01021213716043885,1.1133152503947209E-4,3.6435771831099954E-5,0.011351767134933808,0.0010829521072021374,1.0930731549329986E-4,0.0010080563539937655,0.0,0.0,0.0013865835391279706,332.2862475203433,292.9071434354884,0.17668541759442943,0.17660780940042914,0.05743309987449898,0.05771839196793656,0.7915488441762945,0.020981640419421355,0.028996862475203923,232.4707319541719,188.6660459090725,0.7537812031901686,0.030905611108870867,0.6019355289259973,0.006683514837454898,0.17675395732966057,0.1764416217966883,0.05811762681672766,0.057411116958826745]
[2.0,6.9337564E8,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,57.0,3.0,0.79,0.67,0.21,0.33,0.05,0.39,0.0,255.0,3.0,0.01,0.09,0.22,0.0,0.18,0.67,0.05,0.33]


In [8]:
val withCluster = pipelineModel.transform(numericOnly)

withCluster.select("cluster","label").
    groupBy("cluster","label").count().
    orderBy($"cluster",$"count".desc).
    show(25)

+-------+----------------+------+
|cluster|           label| count|
+-------+----------------+------+
|      0|          smurf.|280790|
|      0|        neptune.|107201|
|      0|         normal.| 97278|
|      0|           back.|  2203|
|      0|          satan.|  1589|
|      0|        ipsweep.|  1247|
|      0|      portsweep.|  1039|
|      0|    warezclient.|  1020|
|      0|       teardrop.|   979|
|      0|            pod.|   264|
|      0|           nmap.|   231|
|      0|   guess_passwd.|    53|
|      0|buffer_overflow.|    30|
|      0|           land.|    21|
|      0|    warezmaster.|    20|
|      0|           imap.|    12|
|      0|        rootkit.|    10|
|      0|     loadmodule.|     9|
|      0|      ftp_write.|     8|
|      0|       multihop.|     7|
|      0|            phf.|     4|
|      0|           perl.|     3|
|      0|            spy.|     2|
|      1|      portsweep.|     1|
+-------+----------------+------+



withCluster: org.apache.spark.sql.DataFrame = [duration: int, src_bytes: int ... 39 more fields]


In [9]:
def clusteringScore0(data: DataFrame, k: Int): Double = {
    val assembler = new VectorAssembler().
    setInputCols(data.columns.filter(_ !="label")).
    setOutputCol("featureVector")
    
    val kmeans =  new KMeans().
        setSeed(Random.nextLong()).
        setK(k).
        setPredictionCol("cluster").
        setFeaturesCol("featureVector")
    
    val pipeline = new Pipeline().setStages(Array(assembler, kmeans))
    
    val kmeansModel = pipeline.fit(data).stages.last.asInstanceOf[KMeansModel]
    kmeansModel.computeCost(assembler.transform(data)) / data.count()
}

clusteringScore0: (data: org.apache.spark.sql.DataFrame, k: Int)Double


In [10]:
(20 to 100 by 20).map(k => (k, clusteringScore0(numericOnly, k))).foreach(println)

(20,6.025019577677537E7)
(40,3.4160392678435594E7)
(60,1.0904643275915088E7)
(80,4706517.614093637)
(100,3.1359282839813825E7)


In [11]:
def clusteringScore1(data: DataFrame, k: Int): Double = {
    val assembler = new VectorAssembler().
        setInputCols(data.columns.filter(_ !="label")).
        setOutputCol("featureVector")
    
    val kmeans = new KMeans().
        setSeed(Random.nextLong()).
        setK(k).
        setPredictionCol("cluster").
        setFeaturesCol("featureVector").
        setMaxIter(40).
        setTol(1.0e-5)
    
    val pipeline = new Pipeline().setStages(Array(assembler, kmeans))
    
    val kmeansModel = pipeline.fit(data).stages.last.asInstanceOf[KMeansModel]
    kmeansModel.computeCost(assembler.transform(data)) / data.count()
}

clusteringScore1: (data: org.apache.spark.sql.DataFrame, k: Int)Double


In [12]:
(20 to 100 by 20).map(k => (k, clusteringScore1(numericOnly, k))).foreach(println)

(20,3.882886309009383E7)
(40,7065963.035438348)
(60,6368375.651097387)
(80,6882277.580070589)
(100,3024476.432966163)


In [13]:
def clusteringScore2(data: DataFrame, k: Int): Double = {
    val assembler = new VectorAssembler().
        setInputCols(data.columns.filter(_ !="label")).
        setOutputCol("featureVector")
    
    val scaler = new StandardScaler()
        .setInputCol("featureVector")
        .setOutputCol("scaledFeatureVector")
        .setWithStd(true)
        .setWithMean(false)
    
    val kmeans = new KMeans().
        setSeed(Random.nextLong()).
        setK(k).
        setPredictionCol("cluster").
        setFeaturesCol("scaledFeatureVector").
        setMaxIter(40).
        setTol(1.0e-5)
    
    val pipeline = new Pipeline().setStages(Array(assembler, scaler, kmeans))
    val pipelineModel = pipeline.fit(data)
    
    val kmeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]
    kmeansModel.computeCost(pipelineModel.transform(data)) / data.count()
}

clusteringScore2: (data: org.apache.spark.sql.DataFrame, k: Int)Double


In [14]:
(60 to 270 by 30).map(k => (k, clusteringScore2(numericOnly, k))).foreach(println)

(60,1.0951688035838902)
(90,0.6711836985503741)
(120,0.46290733670281287)
(150,0.34974018415248276)
(180,0.30119968215943305)
(210,0.2588374801014279)
(240,0.22292602979307774)
(270,0.20750607898020548)


In [15]:
def oneHotPipeline(inputCol: String): (Pipeline, String) = {
    val indexer = new StringIndexer().
        setInputCol(inputCol).
        setOutputCol(inputCol + "_indexed")
    val encoder = new OneHotEncoder().
        setInputCol(inputCol + "_indexed").
        setOutputCol(inputCol + "_vec")
    val pipeline = new Pipeline().setStages(Array(indexer, encoder))
    (pipeline, inputCol + "_vec")
}

oneHotPipeline: (inputCol: String)(org.apache.spark.ml.Pipeline, String)


In [16]:
def clusteringScore3(data: DataFrame, k: Int): Double = {
    val (protoTypeEncoder, protoTypeVecCol) = oneHotPipeline("protocol_type")
    val (serviceEncoder, serviceVecCol) = oneHotPipeline("service")
    val (flagEncoder, flagVecCol) = oneHotPipeline("flag")    
    
    val assembleCols = Set(data.columns: _*) --
        Seq("label", "protocol_type", "service", "flag") ++
        Seq(protoTypeVecCol, serviceVecCol, flagVecCol)  
    val assembler = new VectorAssembler().
        setInputCols(assembleCols.toArray).
        setOutputCol("featureVector")
    
    val scaler = new StandardScaler()
        .setInputCol("featureVector")
        .setOutputCol("scaledFeatureVector")
        .setWithStd(true)
        .setWithMean(false)
    
    val kmeans = new KMeans().
        setSeed(Random.nextLong()).
        setK(k).
        setPredictionCol("cluster").
        setFeaturesCol("scaledFeatureVector").
        setMaxIter(40).
        setTol(1.0e-5)
    
    val pipeline = new Pipeline().setStages(
        Array(protoTypeEncoder, serviceEncoder, flagEncoder, assembler, scaler, kmeans))
    val pipelineModel = pipeline.fit(data)
    
    val kmeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]
    kmeansModel.computeCost(pipelineModel.transform(data)) / data.count()
}

clusteringScore3: (data: org.apache.spark.sql.DataFrame, k: Int)Double


In [17]:
(60 to 270 by 30).map(k => (k, clusteringScore3(data, k))).foreach(println)

(60,34.59853768509183)
(90,9.655109805623399)
(120,3.025799941534635)
(150,1.9387075439611636)
(180,1.4530838535393191)
(210,1.6127464886594087)
(240,0.9991314756117768)
(270,0.771802982870259)


In [18]:
def fitPipeline4(data: DataFrame, k: Int): PipelineModel = {
    val (protoTypeEncoder, protoTypeVecCol) = oneHotPipeline("protocol_type")
    val (serviceEncoder, serviceVecCol) = oneHotPipeline("service")
    val (flagEncoder, flagVecCol) = oneHotPipeline("flag")    
    
    val assembleCols = Set(data.columns: _*) --
        Seq("label", "protocol_type", "service", "flag") ++
        Seq(protoTypeVecCol, serviceVecCol, flagVecCol)  
    val assembler = new VectorAssembler().
        setInputCols(assembleCols.toArray).
        setOutputCol("featureVector")
    
    val scaler = new StandardScaler()
        .setInputCol("featureVector")
        .setOutputCol("scaledFeatureVector")
        .setWithStd(true)
        .setWithMean(false)
    
    val kmeans = new KMeans().
        setSeed(Random.nextLong()).
        setK(k).
        setPredictionCol("cluster").
        setFeaturesCol("scaledFeatureVector").
        setMaxIter(40).
        setTol(1.0e-5)
    
    val pipeline = new Pipeline().setStages(
        Array(protoTypeEncoder, serviceEncoder, flagEncoder, assembler, scaler, kmeans))
    pipeline.fit(data)
}

fitPipeline4: (data: org.apache.spark.sql.DataFrame, k: Int)org.apache.spark.ml.PipelineModel


In [None]:
val pipelineModel = fitPipeline4(data,120)
val countByClusterLabel = pipelineModel.transform(data).
    select("cluster","label").
    groupBy("cluster","label").count().
    orderBy("cluster","label")
countByClusterLabel.show()

In [None]:
val kMeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]
val centroids = kMeansModel.clusterCenters

val clustered = pipelineModel.transform(data)
val threshold = clustered.
    select("cluster","scaledFeatureVector").as[(Int, Vector)].
    map { case (cluster, vec) => Vectors.sqdist(centroids(cluster), vec)}.
    orderBy($"value".desc).take(100).last

val originalCols = data.columns
val anomalies = clustered.filter {row =>
    val cluster = row.getAs[Int]("cluster")
    val vec = row.getAs[Vector]("scaledFeatureVector")
    Vectors.sqdist(centroids(cluster),vec) >= threshold
}.select(originalCols.head, originalCols.tail:_*)

println(anomalies.first())