# Chapter 5: Anomaly Detection in Network Traffic with K-means Clustering

In [11]:
import org.apache.spark.ml.{PipelineModel, Pipeline}
import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.feature.{OneHotEncoder, VectorAssembler, StringIndexer, StandardScaler}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.util.Random

## A First Take on Clustering

In [1]:
val data_path = "../data/kmeans/kddcup.data"

data_path = ../data/kmeans/kddcup.data


../data/kmeans/kddcup.data

In [2]:
val data = spark.read.
      option("inferSchema", true).
      option("header", false).
      csv(data_path).
      toDF(
        "duration", "protocol_type", "service", "flag",
        "src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent",
        "hot", "num_failed_logins", "logged_in", "num_compromised",
        "root_shell", "su_attempted", "num_root", "num_file_creations",
        "num_shells", "num_access_files", "num_outbound_cmds",
        "is_host_login", "is_guest_login", "count", "srv_count",
        "serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate",
        "same_srv_rate", "diff_srv_rate", "srv_diff_host_rate",
        "dst_host_count", "dst_host_srv_count",
        "dst_host_same_srv_rate", "dst_host_diff_srv_rate",
        "dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate",
        "dst_host_serror_rate", "dst_host_srv_serror_rate",
        "dst_host_rerror_rate", "dst_host_srv_rerror_rate",
        "label")

    data.cache()

data = [duration: int, protocol_type: string ... 40 more fields]


[duration: int, protocol_type: string ... 40 more fields]

In [3]:
data.select("label").groupBy("label").count().orderBy($"count".desc).show(25)

+----------------+------+
|           label| count|
+----------------+------+
|          smurf.|280790|
|        neptune.|107201|
|         normal.| 97278|
|           back.|  2203|
|          satan.|  1589|
|        ipsweep.|  1247|
|      portsweep.|  1040|
|    warezclient.|  1020|
|       teardrop.|   979|
|            pod.|   264|
|           nmap.|   231|
|   guess_passwd.|    53|
|buffer_overflow.|    30|
|           land.|    21|
|    warezmaster.|    20|
|           imap.|    12|
|        rootkit.|    10|
|     loadmodule.|     9|
|      ftp_write.|     8|
|       multihop.|     7|
|            phf.|     4|
|           perl.|     3|
|            spy.|     2|
+----------------+------+



In [6]:
val numericOnly = data.drop("protocol_type", "service", "flag").cache()

val assembler = new VectorAssembler().
    setInputCols(numericOnly.columns.filter(_ != "label")).
    setOutputCol("featureVector")

val kmeans = new KMeans().b
    setSeed(42).
    setPredictionCol("cluster").
    setFeaturesCol("featureVector")

val pipeline = new Pipeline().setStages(Array(assembler, kmeans))
val pipelineModel = pipeline.fit(numericOnly)
val kmeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]

kmeansModel.clusterCenters.foreach(println)

[47.979395571029514,1622.078830816566,868.5341828266062,4.453261001578883E-5,0.006432937937735314,1.4169466823205539E-5,0.03451682118132869,1.5181571596291647E-4,0.14824703453301485,0.01021213716043885,1.1133152503947209E-4,3.6435771831099954E-5,0.011351767134933808,0.0010829521072021374,1.0930731549329986E-4,0.0010080563539937655,0.0,0.0,0.0013865835391279706,332.2862475203433,292.9071434354884,0.17668541759442963,0.17660780940042922,0.05743309987449894,0.05771839196793656,0.7915488441763401,0.020981640419415717,0.028996862475203753,232.4707319541719,188.6660459090725,0.7537812031901896,0.03090561110887087,0.6019355289259497,0.0066835148374549125,0.17675395732965926,0.17644162179668316,0.05811762681672753,0.05741111695882672]
[2.0,6.9337564E8,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,57.0,3.0,0.79,0.67,0.21,0.33,0.05,0.39,0.0,255.0,3.0,0.01,0.09,0.22,0.0,0.18,0.67,0.05,0.33]


numericOnly = [duration: int, src_bytes: int ... 37 more fields]
assembler = vecAssembler_e76dc4db5687
kmeans = kmeans_204a3e5834f8
pipeline = pipeline_adc070594d48
pipelineModel = pipeline_adc070594d48
kmeansModel = kmeans_204a3e5834f8


kmeans_204a3e5834f8

In [7]:
val withCluster = pipelineModel.transform(numericOnly)

withCluster.select("cluster", "label").
    groupBy("cluster", "label").count().
    orderBy($"cluster", $"count".desc).
    show(25)

numericOnly.unpersist()

+-------+----------------+------+
|cluster|           label| count|
+-------+----------------+------+
|      0|          smurf.|280790|
|      0|        neptune.|107201|
|      0|         normal.| 97278|
|      0|           back.|  2203|
|      0|          satan.|  1589|
|      0|        ipsweep.|  1247|
|      0|      portsweep.|  1039|
|      0|    warezclient.|  1020|
|      0|       teardrop.|   979|
|      0|            pod.|   264|
|      0|           nmap.|   231|
|      0|   guess_passwd.|    53|
|      0|buffer_overflow.|    30|
|      0|           land.|    21|
|      0|    warezmaster.|    20|
|      0|           imap.|    12|
|      0|        rootkit.|    10|
|      0|     loadmodule.|     9|
|      0|      ftp_write.|     8|
|      0|       multihop.|     7|
|      0|            phf.|     4|
|      0|           perl.|     3|
|      0|            spy.|     2|
|      1|      portsweep.|     1|
+-------+----------------+------+



withCluster = [duration: int, src_bytes: int ... 39 more fields]


[duration: int, src_bytes: int ... 37 more fields]

## Choosing k

In [12]:
def clusteringScore0(data: DataFrame, k: Int): Double = {
    val assembler = new VectorAssembler().
      setInputCols(data.columns.filter(_ != "label")).
      setOutputCol("featureVector")

    val kmeans = new KMeans().
      setSeed(42).
      setK(k).
      setPredictionCol("cluster").
      setFeaturesCol("featureVector")

    val pipeline = new Pipeline().setStages(Array(assembler, kmeans))

    val kmeansModel = pipeline.fit(data).stages.last.asInstanceOf[KMeansModel]
    kmeansModel.computeCost(assembler.transform(data)) / data.count()
}

clusteringScore0: (data: org.apache.spark.sql.DataFrame, k: Int)Double


In [13]:
def clusteringScore1(data: DataFrame, k: Int): Double = {
    val assembler = new VectorAssembler().
      setInputCols(data.columns.filter(_ != "label")).
      setOutputCol("featureVector")

    val kmeans = new KMeans().
      setSeed(Random.nextLong()).
      setK(k).
      setPredictionCol("cluster").
      setFeaturesCol("featureVector").
      setMaxIter(40).
      setTol(1.0e-5)

    val pipeline = new Pipeline().setStages(Array(assembler, kmeans))

    val kmeansModel = pipeline.fit(data).stages.last.asInstanceOf[KMeansModel]
    kmeansModel.computeCost(assembler.transform(data)) / data.count()
}

clusteringScore1: (data: org.apache.spark.sql.DataFrame, k: Int)Double


In [14]:
val numericOnly = data.drop("protocol_type", "service", "flag").cache()

numericOnly = [duration: int, src_bytes: int ... 37 more fields]


[duration: int, src_bytes: int ... 37 more fields]

In [15]:
(20 to 100 by 20).map(k => (k, clusteringScore0(numericOnly, k))).foreach(println)

(20,6.988910071522829E7)
(40,6.98890952256968E7)
(60,3.223287575527558E7)
(80,3.155325456382279E7)
(100,2.6254419772138733E7)


In [16]:
(20 to 100 by 20).map(k => (k, clusteringScore1(numericOnly, k))).foreach(println)

(20,3.057847974935887E8)
(40,4.4047195387869634E7)
(60,3.4495793501899E7)
(80,1.1607611352200137E7)
(100,4644073.062761333)


In [17]:
numericOnly.unpersist()

[duration: int, src_bytes: int ... 37 more fields]

## Feature Normalization

In [18]:
def clusteringScore2(data: DataFrame, k: Int): Double = {
    val assembler = new VectorAssembler().
      setInputCols(data.columns.filter(_ != "label")).
      setOutputCol("featureVector")

    val scaler = new StandardScaler()
      .setInputCol("featureVector")
      .setOutputCol("scaledFeatureVector")
      .setWithStd(true)
      .setWithMean(false)

    val kmeans = new KMeans().
      setSeed(Random.nextLong()).
      setK(k).
      setPredictionCol("cluster").
      setFeaturesCol("scaledFeatureVector").
      setMaxIter(40).
      setTol(1.0e-5)

    val pipeline = new Pipeline().setStages(Array(assembler, scaler, kmeans))
    val pipelineModel = pipeline.fit(data)

    val kmeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]
    kmeansModel.computeCost(pipelineModel.transform(data)) / data.count()
}

clusteringScore2: (data: org.apache.spark.sql.DataFrame, k: Int)Double


In [19]:
val numericOnly = data.drop("protocol_type", "service", "flag").cache()

[duration: int, src_bytes: int ... 37 more fields]

In [20]:
(60 to 270 by 30).map(k => (k, clusteringScore2(numericOnly, k))).foreach(println)

(60,1.2239030219540843)
(90,0.7792502727047937)
(120,0.49406744648386625)
(150,0.37671225612994114)
(180,0.3093477205019793)
(210,0.26749170699958735)
(240,0.22614663308865912)
(270,0.20291296647759655)


In [None]:
numericOnly.unpersist()

## Categorical Variables

In [21]:
def oneHotPipeline(inputCol: String): (Pipeline, String) = {
    val indexer = new StringIndexer().
      setInputCol(inputCol).
      setOutputCol(inputCol + "_indexed")
    val encoder = new OneHotEncoder().
      setInputCol(inputCol + "_indexed").
      setOutputCol(inputCol + "_vec")
    val pipeline = new Pipeline().setStages(Array(indexer, encoder))
    (pipeline, inputCol + "_vec")
}

oneHotPipeline: (inputCol: String)(org.apache.spark.ml.Pipeline, String)


In [22]:
val (protoTypeEncoder, protoTypeVecCol) = oneHotPipeline("protocol_type")
val (serviceEncoder, serviceVecCol) = oneHotPipeline("service")
val (flagEncoder, flagVecCol) = oneHotPipeline("flag")

protoTypeEncoder = pipeline_8a510b6512ca
protoTypeVecCol = protocol_type_vec
serviceEncoder = pipeline_693fe71cce72
serviceVecCol = service_vec
flagEncoder = pipeline_4402a0adcf91
flagVecCol = flag_vec


flag_vec

In [None]:
val assembleCols = Set(data.columns: _*) --
    Seq("label", "protocol_type", "service", "flag") ++
    Seq(protoTypeVecCol, serviceVecCol, flagVecCol)
val assembler = new VectorAssembler().
    setInputCols(assembleCols.toArray).
    setOutputCol("featureVector")

val scaler = new StandardScaler()
    .setInputCol("featureVector")
    .setOutputCol("scaledFeatureVector")
    .setWithStd(true)
    .setWithMean(false)

val kmeans = new KMeans().
    setSeed(Random.nextLong()).
    setK(k).
    setPredictionCol("cluster").
    setFeaturesCol("scaledFeatureVector").
    setMaxIter(40).
    setTol(1.0e-5)

val pipeline = new Pipeline().setStages(
    Array(protoTypeEncoder, serviceEncoder, flagEncoder, assembler, scaler, kmeans))

In [23]:
val pipelineModel = pipeline.fit(data)

pipelineModel = pipeline_adc070594d48


pipeline_adc070594d48

In [None]:
val kmeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]
kmeansModel.computeCost(pipelineModel.transform(data)) / data.count()

## Using Labels with Entropy

In [24]:
def entropy(counts: Iterable[Int]): Double = {
    val values = counts.filter(_ > 0)
    val n = values.map(_.toDouble).sum
    values.map { v =>
      val p = v / n
      -p * math.log(p)
    }.sum
}

entropy: (counts: Iterable[Int])Double


In [25]:
def fitPipeline4(data: DataFrame, k: Int): PipelineModel = {
    val (protoTypeEncoder, protoTypeVecCol) = oneHotPipeline("protocol_type")
    val (serviceEncoder, serviceVecCol) = oneHotPipeline("service")
    val (flagEncoder, flagVecCol) = oneHotPipeline("flag")

    // Original columns, without label / string columns, but with new vector encoded cols
    val assembleCols = Set(data.columns: _*) --
      Seq("label", "protocol_type", "service", "flag") ++
      Seq(protoTypeVecCol, serviceVecCol, flagVecCol)
    val assembler = new VectorAssembler().
      setInputCols(assembleCols.toArray).
      setOutputCol("featureVector")

    val scaler = new StandardScaler()
      .setInputCol("featureVector")
      .setOutputCol("scaledFeatureVector")
      .setWithStd(true)
      .setWithMean(false)

    val kmeans = new KMeans().
      setSeed(Random.nextLong()).
      setK(k).
      setPredictionCol("cluster").
      setFeaturesCol("scaledFeatureVector").
      setMaxIter(40).
      setTol(1.0e-5)

    val pipeline = new Pipeline().setStages(
      Array(protoTypeEncoder, serviceEncoder, flagEncoder, assembler, scaler, kmeans))
    pipeline.fit(data)
}

fitPipeline4: (data: org.apache.spark.sql.DataFrame, k: Int)org.apache.spark.ml.PipelineModel


In [26]:
def clusteringScore4(data: DataFrame, k: Int): Double = {
    val pipelineModel = fitPipeline4(data, k)

    // Predict cluster for each datum
    val clusterLabel = pipelineModel.transform(data).
      select("cluster", "label").as[(Int, String)]
    val weightedClusterEntropy = clusterLabel.
      // Extract collections of labels, per cluster
      groupByKey { case (cluster, _) => cluster }.
      mapGroups { case (_, clusterLabels) =>
        val labels = clusterLabels.map { case (_, label) => label }.toSeq
        // Count labels in collections
        val labelCounts = labels.groupBy(identity).values.map(_.size)
        labels.size * entropy(labelCounts)
      }.collect()

    // Average entropy weighted by cluster size
    weightedClusterEntropy.sum / data.count()
}

clusteringScore4: (data: org.apache.spark.sql.DataFrame, k: Int)Double


In [27]:
def clusteringTake4(data: DataFrame): Unit = {
    (60 to 270 by 30).map(k => (k, clusteringScore4(data, k))).foreach(println)

    val pipelineModel = fitPipeline4(data, 180)
    val countByClusterLabel = pipelineModel.transform(data).
      select("cluster", "label").
      groupBy("cluster", "label").count().
      orderBy("cluster", "label")
    countByClusterLabel.show()
}

clusteringTake4: (data: org.apache.spark.sql.DataFrame)Unit


In [28]:
data.cache()

[duration: int, protocol_type: string ... 40 more fields]

In [29]:
clusteringTake4(data)

(60,0.04942426371114013)
(90,0.049396769747763844)
(120,0.04312248060837521)
(150,0.038291546744763545)
(180,0.022577093557218056)
(210,0.019398614828370613)
(240,0.009805853286683406)
(270,0.020184799437952816)
+-------+----------+-----+
|cluster|     label|count|
+-------+----------+-----+
|      0|  neptune.|36151|
|      1|   normal.|21789|
|      2|  neptune.|  106|
|      2|portsweep.|    1|
|      3|     imap.|    7|
|      3|  neptune.|  105|
|      4|  neptune.|  102|
|      4|portsweep.|    1|
|      4|    satan.|    1|
|      5|   normal.|    1|
|      6|     back.|    4|
|      6|   normal.|18699|
|      7|  neptune.|   16|
|      7|portsweep.|    2|
|      7|    satan.|    1|
|      8|  neptune.|  105|
|      9|  ipsweep.|    1|
|      9|  neptune.|  118|
|      9|portsweep.|    1|
|     10|  neptune.|  102|
+-------+----------+-----+
only showing top 20 rows



In [30]:
data.unpersist()

[duration: int, protocol_type: string ... 40 more fields]

## Clustering in Action

In [31]:
data.cache()

[duration: int, protocol_type: string ... 40 more fields]

In [32]:
def buildAnomalyDetector(data: DataFrame): Unit = {
    val pipelineModel = fitPipeline4(data, 180)

    val kMeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]
    val centroids = kMeansModel.clusterCenters

    val clustered = pipelineModel.transform(data)
    val threshold = clustered.
      select("cluster", "scaledFeatureVector").as[(Int, Vector)].
      map { case (cluster, vec) => Vectors.sqdist(centroids(cluster), vec) }.
      orderBy($"value".desc).take(100).last

    val originalCols = data.columns
    val anomalies = clustered.filter { row =>
      val cluster = row.getAs[Int]("cluster")
      val vec = row.getAs[Vector]("scaledFeatureVector")
      Vectors.sqdist(centroids(cluster), vec) >= threshold
    }.select(originalCols.head, originalCols.tail:_*)

    println(anomalies.first())
}

buildAnomalyDetector: (data: org.apache.spark.sql.DataFrame)Unit


In [34]:
buildAnomalyDetector(data)

[1,tcp,ftp,SF,60,189,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,1,1,0.0,0.0,0.0,0.0,1.0,0.0,0.0,151,47,0.31,0.03,0.01,0.0,0.0,0.0,0.0,0.0,normal.]


In [35]:
data.unpersist()

[duration: int, protocol_type: string ... 40 more fields]