## Exercises in KMeans clustering with Spark and Scala

Based on material from Advanced Analytics with Spark: Ryza, Laserson, Owen & Wills Chapter 5

Uses Almond Scala kernel for Jupyter notebooks. 
Scala version 2.12

Data set is the KDD Cup 1999 Data Set available at http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html
The full dataset (c. 740MB was used)

In [1]:
import $ivy.`org.apache.spark::spark-sql:2.4.3`
import org.apache.spark.sql._

[32mimport [39m[36m$ivy.$                                  
[39m
[32mimport [39m[36morg.apache.spark.sql._[39m

In [2]:
import org.apache.log4j.{Logger,Level}
Logger.getLogger("org").setLevel(Level.ERROR)

[32mimport [39m[36morg.apache.log4j.{Logger,Level}
[39m

In [3]:
val spark = {
    org.apache.spark.sql.NotebookSparkSession.builder().master("local[*]").config("spark.sql.shuffle.partitions","8").getOrCreate()
}

Loading spark-stubs
Getting spark JARs
Creating SparkSession


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@3fe7acf6

In [4]:
val dataNoHeader = spark.read
            .option("inferSchema","true")
            .option("header","false")
            .csv("../KDDCup1999_dataset/kddcup.data")

[36mdataNoHeader[39m: [32mDataFrame[39m = [_c0: int, _c1: string ... 40 more fields]

In [5]:
dataNoHeader.columns.length

[36mres4[39m: [32mInt[39m = [32m42[39m

Get the list of headers from the kddcup.names file and use to convert to a DF

In [6]:
val names = spark.read.option("header","false").csv("../KDDCup1999_dataset/kddcup.names")

[36mnames[39m: [32mDataFrame[39m = [_c0: string, _c1: string ... 21 more fields]

In [7]:
import org.apache.spark.sql.functions._

[32mimport [39m[36morg.apache.spark.sql.functions._[39m

In [8]:
names.createOrReplaceTempView("temp1")

In [9]:
import org.apache.spark.sql.types.DataTypes._

[32mimport [39m[36morg.apache.spark.sql.types.DataTypes._[39m

Trim the text after the : to get just the names

In [10]:
val nameDF = spark.sql("select left(_c0, instr(_c0,':')-1) as text from temp1 where instr(_c0,':')>0 union all select 'label'")
val nameArray = nameDF.collect.map(row=>row.getString(0))
nameArray.length

[36mnameDF[39m: [32mDataFrame[39m = [text: string]
[36mnameArray[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"duration"[39m,
  [32m"protocol_type"[39m,
  [32m"service"[39m,
  [32m"flag"[39m,
  [32m"src_bytes"[39m,
  [32m"dst_bytes"[39m,
  [32m"land"[39m,
  [32m"wrong_fragment"[39m,
  [32m"urgent"[39m,
  [32m"hot"[39m,
  [32m"num_failed_logins"[39m,
  [32m"logged_in"[39m,
  [32m"num_compromised"[39m,
  [32m"root_shell"[39m,
  [32m"su_attempted"[39m,
  [32m"num_root"[39m,
  [32m"num_file_creations"[39m,
  [32m"num_shells"[39m,
  [32m"num_access_files"[39m,
  [32m"num_outbound_cmds"[39m,
  [32m"is_host_login"[39m,
  [32m"is_guest_login"[39m,
  [32m"count"[39m,
  [32m"srv_count"[39m,
  [32m"serror_rate"[39m,
  [32m"srv_serror_rate"[39m,
  [32m"rerror_rate"[39m,
  [32m"srv_rerror_rate"[39m,
  [32m"same_srv_rate"[39m,
  [32m"diff_srv_rate"[39m,
  [32m"srv_diff_host_rate"[39m,
  [32m"dst_host_count"[39m,

In [11]:
val data = dataNoHeader.toDF(nameArray:_*) // _* converts array to a varags (variable number of arguments)

[36mdata[39m: [32mDataFrame[39m = [duration: int, protocol_type: string ... 40 more fields]

In [12]:
data.select("label").groupBy("label").count().orderBy(col("count").desc).show

+----------------+-------+
|           label|  count|
+----------------+-------+
|          smurf.|2807886|
|        neptune.|1072017|
|         normal.| 972781|
|          satan.|  15892|
|        ipsweep.|  12481|
|      portsweep.|  10413|
|           nmap.|   2316|
|           back.|   2203|
|    warezclient.|   1020|
|       teardrop.|    979|
|            pod.|    264|
|   guess_passwd.|     53|
|buffer_overflow.|     30|
|           land.|     21|
|    warezmaster.|     20|
|           imap.|     12|
|        rootkit.|     10|
|     loadmodule.|      9|
|      ftp_write.|      8|
|       multihop.|      7|
+----------------+-------+
only showing top 20 rows



Create pipeline for the kmeans and run int

In [13]:
import $ivy.`org.apache.spark::spark-mllib:2.4.0`

[32mimport [39m[36m$ivy.$                                    [39m

In [14]:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.feature.VectorAssembler

[32mimport [39m[36morg.apache.spark.ml.Pipeline
[39m
[32mimport [39m[36morg.apache.spark.ml.clustering.{KMeans, KMeansModel}
[39m
[32mimport [39m[36morg.apache.spark.ml.feature.VectorAssembler[39m

In [15]:
data.getClass

[36mres14[39m: [32mClass[39m[[32mT[39m] = class org.apache.spark.sql.Dataset

In [16]:
data.schema

[36mres15[39m: [32mtypes[39m.[32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"duration"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"protocol_type"[39m, StringType, true, {}),
  [33mStructField[39m([32m"service"[39m, StringType, true, {}),
  [33mStructField[39m([32m"flag"[39m, StringType, true, {}),
  [33mStructField[39m([32m"src_bytes"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"dst_bytes"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"land"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"wrong_fragment"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"urgent"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"hot"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"num_failed_logins"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"logged_in"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"num_compromised"[39m, IntegerType, true, {}),
 

In [17]:
val numericalCols = data.drop("protocol_type","service","flag").cache

[36mnumericalCols[39m: [32mDataset[39m[[32mRow[39m] = [duration: int, src_bytes: int ... 37 more fields]

In [18]:
val assembler = new VectorAssembler().
                setInputCols(numericalCols.columns.filter(_ != "label")).
                setOutputCol("featureVector")

[36massembler[39m: [32mVectorAssembler[39m = vecAssembler_be600441ab7f

Create null model with default options

In [19]:
val kmeans = new KMeans().
    setPredictionCol("cluster").
    setFeaturesCol("featureVector")

[36mkmeans[39m: [32mKMeans[39m = kmeans_09a91fb5d571

Create pipeline for transforming ( assembel it into the input object expected of all ML alg), fit the model. This sets it up for hyper parameter tuning later using the spark.ml.tuning api

In [20]:
val pipeline = new Pipeline().setStages(Array(assembler,kmeans))
val pipelineModel = pipeline.fit(numericalCols)

19/11/17 23:39:32 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
19/11/17 23:39:32 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


[36mpipeline[39m: [32mPipeline[39m = pipeline_cf3a579ab770
[36mpipelineModel[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mml[39m.[32mPipelineModel[39m = pipeline_cf3a579ab770

In [21]:
val kmeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]
kmeansModel.clusterCenters.foreach(println)

[48.34019491959669,1834.6215497618625,826.2031900016945,5.7161172049003456E-6,6.487793027561892E-4,7.961734678254053E-6,0.012437658596734055,3.205108575604837E-5,0.14352904910348827,0.00808830584493399,6.818511237273984E-5,3.6746467745787934E-5,0.012934960793560386,0.0011887482315762398,7.430952366370449E-5,0.0010211435092468404,0.0,4.082940860643104E-7,8.351655530445469E-4,334.9735084506668,295.26714620807076,0.17797031701994256,0.1780369894027269,0.05766489875327379,0.05772990937912744,0.7898841322630906,0.02117961060991097,0.028260810096297884,232.98107822302248,189.21428335201279,0.7537133898007772,0.03071097882384052,0.605051930924901,0.006464107887636894,0.1780911843182284,0.1778858981346887,0.05792761150001272,0.05765922142401037]
[10999.0,0.0,1.309937401E9,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,1.0,1.0,1.0,0.0,0.0,255.0,1.0,0.0,0.65,1.0,0.0,0.0,0.0,1.0,1.0]


[36mkmeansModel[39m: [32mKMeansModel[39m = kmeans_09a91fb5d571

In [22]:
val withCluster = pipelineModel.transform(numericalCols)

[36mwithCluster[39m: [32mDataFrame[39m = [duration: int, src_bytes: int ... 39 more fields]

In [23]:
withCluster("cluster")

[36mres22[39m: [32mColumn[39m = cluster

In [24]:
import spark.implicits._

[32mimport [39m[36mspark.implicits._[39m

In [25]:
withCluster.select("cluster","label").groupBy("cluster","label").count()
                .orderBy($"cluster",$"count".desc).show(25)

+-------+----------------+-------+
|cluster|           label|  count|
+-------+----------------+-------+
|      0|          smurf.|2807886|
|      0|        neptune.|1072017|
|      0|         normal.| 972781|
|      0|          satan.|  15892|
|      0|        ipsweep.|  12481|
|      0|      portsweep.|  10412|
|      0|           nmap.|   2316|
|      0|           back.|   2203|
|      0|    warezclient.|   1020|
|      0|       teardrop.|    979|
|      0|            pod.|    264|
|      0|   guess_passwd.|     53|
|      0|buffer_overflow.|     30|
|      0|           land.|     21|
|      0|    warezmaster.|     20|
|      0|           imap.|     12|
|      0|        rootkit.|     10|
|      0|     loadmodule.|      9|
|      0|      ftp_write.|      8|
|      0|       multihop.|      7|
|      0|            phf.|      4|
|      0|           perl.|      3|
|      0|            spy.|      2|
|      1|      portsweep.|      1|
+-------+----------------+-------+



In [26]:
kmeansModel.extractParamMap

[36mres25[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mml[39m.[32mparam[39m.[32mParamMap[39m = {
	kmeans_09a91fb5d571-distanceMeasure: euclidean,
	kmeans_09a91fb5d571-featuresCol: featureVector,
	kmeans_09a91fb5d571-initMode: k-means||,
	kmeans_09a91fb5d571-initSteps: 2,
	kmeans_09a91fb5d571-k: 2,
	kmeans_09a91fb5d571-maxIter: 20,
	kmeans_09a91fb5d571-predictionCol: cluster,
	kmeans_09a91fb5d571-seed: -1689246527,
	kmeans_09a91fb5d571-tol: 1.0E-4
}

Note that the default number of clusters is 2.

Because this is unsupervised there isn't an obvious loss function that can be used to choose optimal parameters such as in the case of regression where purity or entropy could be used. The loss function to be used in the sum of squared distance to the closest centres which intuitively should be minimised. However, this is not implemented in org.apache.spark.evaluators api and must be done manually

In [27]:
import org.apache.spark.sql.DataFrame

[32mimport [39m[36morg.apache.spark.sql.DataFrame[39m

In [28]:
import scala.util.Random

[32mimport [39m[36mscala.util.Random[39m

def myClusterScore(data: DataFrame, k: Int): Double = {
    val assembler = new VectorAssembler().setInputCols(data.columns.filter(_!="label")).setOutputCol("featureVector")
    val kmeans = new KMeans().setFeaturesCol("featureVector").setSeed(Random.nextLong()).setK(k).setPredictionCol("cluster")
    
    val pipeline= new Pipeline().setStages(Array(assembler,kmeans))
    val kmeansModel=pipeline.fit(data).stages.last.asInstanceOf[KMeansModel]
    kmeansModel.computeCost(assembler.transform(data))/data.count()
}

(20 to 100 by 20).map(k=>myClusterScore(numericalCols,k)).foreach(println)

Do a cross tab on the predicitons of the Kmeans model to see the 'purity' of the clusters

kmeansModel.extractParamMap

In [29]:
val preds = kmeansModel.transform(assembler.transform(numericalCols))

[36mpreds[39m: [32mDataFrame[39m = [duration: int, src_bytes: int ... 39 more fields]

In [30]:
val labels = data.select("label").as[String].collect

[36mlabels[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
  [32m"normal."[39m,
...

In [31]:
val predCol = preds.select("cluster").as[Int].collect()

[36mpredCol[39m: [32mArray[39m[[32mInt[39m] = [33mArray[39m(
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
  [32m0[39m,
...

This code cell below seems to be problematic. It leads to GC overflow. Trying other ways wiht less GC overhead.

val xtab = spark.createDataset(labels.zip(predCol))

In [32]:
val xtab = labels.zip(predCol)

[36mxtab[39m: [32mArray[39m[([32mString[39m, [32mInt[39m)] = [33mArray[39m(
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  ([32m"normal."[39m, [32m0[39m),
  

In [37]:
val xtabLabeled= spark.createDataset(xtab)

[36mxtabLabeled[39m: [32mDataset[39m[([32mString[39m, [32mInt[39m)] = [_1: string, _2: int]

In [38]:
xtabLabeled.getClass

[36mres37[39m: [32mClass[39m[[32mT[39m] = class org.apache.spark.sql.Dataset

In [39]:
xtabLabeled.groupBy("_2").pivot("_1").count().show()

------------ UNHANDLED EXCEPTION ---------- (scala-kernel-zeromq-polling-1)
Exception in thread "dispatcher-event-loop-7" java.lang.OutOfMemoryError: GC overhead limit exceeded
	at cats.effect.internals.ArrayStack.<init>(ArrayStack.scala:36)
	at cats.effect.internals.ArrayStack.<init>(ArrayStack.scala:37)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:75)
	at cats.effect.internals.IORunLoop$.startCancelable(IORunLoop.scala:41)
	at cats.effect.internals.IORace$.$anonfun$simple$3(IORace.scala:85)
	at cats.effect.internals.IORace$.$anonfun$simple$3$adapted(IORace.scala:66)
	at cats.effect.internals.IORace$$$Lambda$762/2123216367.apply(Unknown Source)
	at cats.effect.internals.IORunLoop$RestartCallback.start(IORunLoop.scala:341)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:119)
	at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:355)
	at cats.effect.internals.IORunLoop$RestartCa

: 