In [1]:
import os
import sys
spark_home = '/usr/lib/spark'
os.environ['SPARK_HOME']= spark_home
os.environ['PYLIB']=os.environ['SPARK_HOME']+'/python/lib'
sys.path.insert(0,os.environ['PYLIB']+'/py4j-0.10.7-src.zip')
sys.path.insert(1,os.environ['PYLIB']+'/pyspark.zip')

In [2]:
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np

In [3]:
spark = SparkSession.builder.appName('ForestCoverTypeClassifier') \
.config('spark.warehouse.dir','/apps/hive/warehouse') \
.config('spark.driver.memory', '4G') \
.config('spark.sql.shuffle.partitions', 4) \
.enableHiveSupport().getOrCreate()

In [4]:
# Having set the driver and driver options we should have spark representing spark session 
# available straight away
spark.version

'2.3.1'

In [5]:
sc = spark.sparkContext

In [6]:
sc.setLogLevel('ERROR')

In [7]:
#   KDD cup was like kaggle before there was kaggle
#   1999 topic was network intrusion and data is still available
#   data available at http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html
#   708 mb in size with 4.89 million  csv rows - each spanning 42 values
#   there is also the ten percent sample data set with 490k rows
#   data column names are available from kdd.names file

In [8]:
#  Covtype dataset publicly available dataset provides information on
# types of forest-covering parcels of land in Colorado, USA
fileloc = "D:/ufdata/kddcup.testdata_10_percent"
data = spark.read \
.option("inferSchema", "true") \
.option("header", "false") \
.csv("D:/ufdata/kddcup.data_10_percent") \
.toDF(
      "duration", "protocol_type", "service", "flag",
      "src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent",
      "hot", "num_failed_logins", "logged_in", "num_compromised",
      "root_shell", "su_attempted", "num_root", "num_file_creations",
      "num_shells", "num_access_files", "num_outbound_cmds",
      "is_host_login", "is_guest_login", "count", "srv_count",
      "serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate",
      "same_srv_rate", "diff_srv_rate", "srv_diff_host_rate",
      "dst_host_count", "dst_host_srv_count",
      "dst_host_same_srv_rate", "dst_host_diff_srv_rate",
      "dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate",
      "dst_host_serror_rate", "dst_host_srv_serror_rate",
      "dst_host_rerror_rate", "dst_host_srv_rerror_rate",
      "label")

In [9]:
#  take a look at the labels
#  in unsupervised learning we do not use the label - however it is here useful to
#  find the labels present in the data - normal and various type of classified network attacks

from pyspark.sql.functions import *

print("\nThe labels and their count - 23 distinct labels with most frequent being smurf. and neptune.")
data.select("label").groupBy("label").count().orderBy(desc('count')).show(25)


The labels and their count - 23 distinct labels with most frequent being smurf. and neptune.
+----------------+------+
|           label| count|
+----------------+------+
|          smurf.|280790|
|        neptune.|107201|
|         normal.| 97278|
|           back.|  2203|
|          satan.|  1589|
|        ipsweep.|  1247|
|      portsweep.|  1040|
|    warezclient.|  1020|
|       teardrop.|   979|
|            pod.|   264|
|           nmap.|   231|
|   guess_passwd.|    53|
|buffer_overflow.|    30|
|           land.|    21|
|    warezmaster.|    20|
|           imap.|    12|
|        rootkit.|    10|
|     loadmodule.|     9|
|      ftp_write.|     8|
|       multihop.|     7|
|            phf.|     4|
|           perl.|     3|
|            spy.|     2|
+----------------+------+



In [10]:
# drop the non numeric columns
numericOnly = data.drop("protocol_type", "service", "flag").cache()

# assemble them into a vector leaving the label out
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler() \
.setInputCols(list(filter(lambda x: x != "label", numericOnly.columns))) \
.setOutputCol("featureVector")

In [11]:
from random import randint, seed
from pyspark.ml.clustering import KMeans

kmeans = KMeans() \
.setSeed(100) \
.setPredictionCol("cluster") \
.setFeaturesCol("featureVector")

In [12]:
from pyspark.ml import Pipeline, PipelineModel

pipeline = Pipeline().setStages([assembler, kmeans])
pipelineModel = pipeline.fit(numericOnly)
kmeansModel = pipelineModel.stages[1]


In [13]:
print("with no parameter setting, kmeans creates two clusters\n")
kmeansModel.clusterCenters()

with no parameter setting, kmeans creates two clusters



[array([4.79793956e+01, 1.62207883e+03, 8.68534183e+02, 4.45326100e-05,
        6.43293794e-03, 1.41694668e-05, 3.45168212e-02, 1.51815716e-04,
        1.48247035e-01, 1.02121372e-02, 1.11331525e-04, 3.64357718e-05,
        1.13517671e-02, 1.08295211e-03, 1.09307315e-04, 1.00805635e-03,
        0.00000000e+00, 0.00000000e+00, 1.38658354e-03, 3.32286248e+02,
        2.92907143e+02, 1.76685418e-01, 1.76607809e-01, 5.74330999e-02,
        5.77183920e-02, 7.91548844e-01, 2.09816404e-02, 2.89968625e-02,
        2.32470732e+02, 1.88666046e+02, 7.53781203e-01, 3.09056111e-02,
        6.01935529e-01, 6.68351484e-03, 1.76753957e-01, 1.76441622e-01,
        5.81176268e-02, 5.74111170e-02]),
 array([2.0000000e+00, 6.9337564e+08, 0.0000000e+00, 0.0000000e+00,
        0.0000000e+00, 0.0000000e+00, 1.0000000e+00, 0.0000000e+00,
        0.0000000e+00, 0.0000000e+00, 0.0000000e+00, 0.0000000e+00,
        0.0000000e+00, 0.0000000e+00, 0.0000000e+00, 0.0000000e+00,
        0.0000000e+00, 0.0000000e+00, 

In [14]:
# use the model transformer to add cluster information to the dataset we gave to the
# estimator to generate the model

withCluster = pipelineModel.transform(numericOnly)

print("\nTaking a look at the clusters generated and the labels assigned")
withCluster.select("cluster", "label"). \
groupBy("cluster", "label").count(). \
orderBy("cluster", desc('count')). \
show(50, False)


Taking a look at the clusters generated and the labels assigned
+-------+----------------+------+
|cluster|label           |count |
+-------+----------------+------+
|0      |smurf.          |280790|
|0      |neptune.        |107201|
|0      |normal.         |97278 |
|0      |back.           |2203  |
|0      |satan.          |1589  |
|0      |ipsweep.        |1247  |
|0      |portsweep.      |1039  |
|0      |warezclient.    |1020  |
|0      |teardrop.       |979   |
|0      |pod.            |264   |
|0      |nmap.           |231   |
|0      |guess_passwd.   |53    |
|0      |buffer_overflow.|30    |
|0      |land.           |21    |
|0      |warezmaster.    |20    |
|0      |imap.           |12    |
|0      |rootkit.        |10    |
|0      |loadmodule.     |9     |
|0      |ftp_write.      |8     |
|0      |multihop.       |7     |
|0      |phf.            |4     |
|0      |perl.           |3     |
|0      |spy.            |2     |
|1      |portsweep.      |1     |
+-------+--------

In [15]:
# let us try with different values of k the best possible score that we wil get
def clusteringScore0(data, k):
    
    assembler = VectorAssembler().\
    setInputCols(list(filter(lambda x: x != "label", data.columns))). \
    setOutputCol("featureVector")

    kmeans = KMeans(). \
    setSeed(100). \
    setK(k). \
    setPredictionCol("cluster"). \
    setFeaturesCol("featureVector")

    pipeline = Pipeline().setStages([assembler, kmeans])

    kmeansModel = pipeline.fit(data).stages[1]
    return kmeansModel.computeCost(assembler.transform(data)) / data.count()

In [16]:
# take 0
print("\nTake 0 - the cost with varied number of clusters")
  
# We see that the cost is lowest for 80
# if we were to take as many clusters as the number of points, the cost would be 0

print(list(map(lambda k: (k, clusteringScore0(numericOnly, k)), range(20, 160, 20))))


Take 0 - the cost with varied number of clusters
[(20, 70090529.54393287), (40, 69889095.66546552), (60, 34130705.51573093), (80, 6281197.831342092), (100, 5249377.999851539), (120, 3552517.5381736914), (140, 2580194.8646263178)]


In [17]:
def clusteringScore1(data, k):
    
    assembler = VectorAssembler().\
    setInputCols(list(filter(lambda x: x != "label", data.columns))). \
    setOutputCol("featureVector")

    kmeans = KMeans(). \
    setSeed(100). \
    setK(k). \
    setPredictionCol("cluster"). \
    setFeaturesCol("featureVector"). \
    setMaxIter(40).setTol(1.0e-5)

    pipeline = Pipeline().setStages([assembler, kmeans])

    kmeansModel = pipeline.fit(data).stages[1]
    return kmeansModel.computeCost(assembler.transform(data)) / data.count()

In [18]:
# take 1
print("\nTake 1 - the cost where we have explicitly set the number of iterations and tolerance")

# when we run more iterations and set the tolerance level, we see
# that the clustering cost is lowest for 60
# higher number of clusters should have a lower cost
# the problem may be suboptimal clustering due to ineffective starting points or not low
# enough distance discriminator or both
print(list(map(lambda k: (k, clusteringScore1(numericOnly, k)), range(20, 160, 20))))


Take 1 - the cost where we have explicitly set the number of iterations and tolerance
[(20, 70090529.54393286), (40, 69889095.66546552), (60, 34130705.51573093), (80, 6281197.831342092), (100, 5249377.999851539), (120, 3552516.214068078), (140, 2580190.5810112343)]


In [19]:
# now we are going to normalize the data and check
from pyspark.ml.feature import StandardScaler
def clusteringScore2(data, k):
    
    assembler = VectorAssembler().\
    setInputCols(list(filter(lambda x: x != "label", data.columns))). \
    setOutputCol("featureVector")
    
    scaler = StandardScaler(). \
    setInputCol("featureVector"). \
    setOutputCol("scaledFeatureVector"). \
    setWithStd(True). \
    setWithMean(False)
    
    kmeans = KMeans(). \
    setSeed(100). \
    setK(k). \
    setPredictionCol("cluster"). \
    setFeaturesCol("scaledFeatureVector"). \
    setMaxIter(40).setTol(1.0e-5)

    pipeline = Pipeline().setStages([assembler, scaler,  kmeans])
    pipelineModel = pipeline.fit(data)
    
    kmeansModel = pipeline.fit(data).stages[2]
    
    return kmeansModel.computeCost(pipelineModel.transform(data)) / data.count()

In [20]:
# take 2
print("\nTake 2 - Consider the scores after we have scaled, normalized the data")
print(list(map(lambda k: (k, clusteringScore2(numericOnly, k)), range(20, 160, 20))))


Take 2 - Consider the scores after we have scaled, normalized the data
[(20, 7.662614419004401), (40, 1.8592876574979695), (60, 1.0796203270258335), (80, 0.7950829979073035), (100, 0.5719722421002665), (120, 0.5222554892359224), (140, 0.4091315697738147)]


In [21]:
# now we will include the non numeric columns and check
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

def oneHotPipeline(inputCol): 
    indexer =  StringIndexer(). \
    setInputCol(inputCol).\
    setOutputCol(inputCol + "_indexed")

    encoder =  OneHotEncoder().\
    setInputCol(inputCol + "_indexed").\
    setOutputCol(inputCol + "_vec")

    pipeline = Pipeline().setStages([indexer, encoder])
    return(pipeline, inputCol + "_vec")


In [22]:
def clusteringScore3(data, k):
    protoTypeEncoder, protoTypeVecCol = oneHotPipeline("protocol_type")
    serviceEncoder, serviceVecCol = oneHotPipeline("service")
    flagEncoder, flagVecCol = oneHotPipeline("flag")
    
    assembleCols = data.columns
#     print(assembleCols)
    [assembleCols.remove(col) for col in ['label', 'protocol_type', 'service', 'flag' ]]
    [assembleCols.append(col) for col in [protoTypeVecCol, serviceVecCol, flagVecCol]]
#     print(assembleCols)
    
    assembler = VectorAssembler().\
    setInputCols(assembleCols). \
    setOutputCol("featureVector")
    
    scaler = StandardScaler(). \
    setInputCol("featureVector"). \
    setOutputCol("scaledFeatureVector"). \
    setWithStd(True). \
    setWithMean(False)
    
    kmeans = KMeans(). \
    setSeed(100). \
    setK(k). \
    setPredictionCol("cluster"). \
    setFeaturesCol("scaledFeatureVector"). \
    setMaxIter(40).setTol(1.0e-5)

    pipeline = Pipeline().setStages([protoTypeEncoder, serviceEncoder, flagEncoder, assembler, scaler,  kmeans])
    pipelineModel = pipeline.fit(data)
    
    kmeansModel = pipeline.fit(data).stages[-1]
    
    return kmeansModel.computeCost(pipelineModel.transform(data)) / data.count()

In [23]:
[1, 2, 3][-1]

3

In [24]:
# take 3
print("\nTake 3 - Clustering cost scores after one hot encoding string columns")
#  we have all columns included and now the elbow seems to be around 180/210
print(list(map(lambda k: (k, clusteringScore3(data, k)), range(20, 160, 20))))


Take 3 - Clustering cost scores after one hot encoding string columns
[(20, 77.75052364318653), (40, 52.649900811145976), (60, 32.828472137573954), (80, 21.375243596683305), (100, 7.1470637725616175), (120, 3.101839490018694), (140, 2.4624693714793167)]


In [27]:
# we will use entropy to see the quality of individual clusters
# we will use the labels to compute entropy
# more the dominance of a single label, lower will be the entropy
# more distributed the presence of different labels, higher will be the entropy 
# thus more a cluster exhibits skewness towards a single entity, lower will be the entropy and vice versa
import math
import builtins
def entropy(counts):
    from builtins import sum as pysum
    from math import log as pylog
    values = list(filter(lambda x: x > 0, counts))
    n = pysum(values)
    return pysum(map(lambda v: - v / n * pylog(v / n), values))

In [28]:
print(entropy([100, 200, 300, 100]))
-100/700 * math.log(100)

1.277034259466139


-0.6578814551411559

In [29]:
def fitPipeline4(data, k):
    protoTypeEncoder, protoTypeVecCol = oneHotPipeline("protocol_type")
    serviceEncoder, serviceVecCol = oneHotPipeline("service")
    flagEncoder, flagVecCol = oneHotPipeline("flag")
    
    assembleCols = data.columns
#     print(assembleCols)
    [assembleCols.remove(col) for col in ['label', 'protocol_type', 'service', 'flag' ]]
    [assembleCols.append(col) for col in [protoTypeVecCol, serviceVecCol, flagVecCol]]
#     print(assembleCols)
    
    assembler = VectorAssembler().\
    setInputCols(assembleCols). \
    setOutputCol("featureVector")
    
    scaler = StandardScaler(). \
    setInputCol("featureVector"). \
    setOutputCol("scaledFeatureVector"). \
    setWithStd(True). \
    setWithMean(False)
    
    kmeans = KMeans(). \
    setSeed(100). \
    setK(k). \
    setPredictionCol("cluster"). \
    setFeaturesCol("scaledFeatureVector"). \
    setMaxIter(40).setTol(1.0e-5)

    pipeline = Pipeline().setStages([protoTypeEncoder, serviceEncoder, flagEncoder, assembler, scaler,  kmeans])
    pipelineModel = pipeline.fit(data)
    
    return pipelineModel

In [30]:
test_df = fitPipeline4(data, 20).transform(data)

In [31]:
test_df.select('cluster', 'label').show(truncate=False)

+-------+-------+
|cluster|label  |
+-------+-------+
|1      |normal.|
|1      |normal.|
|1      |normal.|
|1      |normal.|
|1      |normal.|
|1      |normal.|
|1      |normal.|
|1      |normal.|
|1      |normal.|
|1      |normal.|
|1      |normal.|
|1      |normal.|
|1      |normal.|
|1      |normal.|
|1      |normal.|
|1      |normal.|
|1      |normal.|
|1      |normal.|
|1      |normal.|
|1      |normal.|
+-------+-------+
only showing top 20 rows



In [32]:
test_df.count()

494021

In [33]:
test_df.select('cluster', 'label').rdd.map(lambda x: (x[0], x[1])) \
.groupByKey().map(lambda x: (x[0], [list(x[1]).count(v) for v in set(x[1])], len(list(x[1])))) \
.map(lambda x: (x[0], entropy(x[1]), x[2])) \
.map(lambda x: (x[0], (x[1] * x[2]))) \
.map(lambda x: x[1]) \
.sum()
# .map(lambda x: (x[0], len(x[1][1]))).take(10)

259790.46663264162

In [34]:
def clusteringScore4(data, k):
    cluster_df = fitPipeline4(data, k).transform(data)
    return cluster_df.select('cluster', 'label').rdd.map(lambda x: (x[0], x[1])) \
    .groupByKey().map(lambda x: (x[0], [list(x[1]).count(v) for v in set(x[1])], len(list(x[1])))) \
    .map(lambda x: (x[0], entropy(x[1]), x[2])) \
    .map(lambda x: x[1] * x[2]).sum()

In [35]:
clusteringScore4(data, 20)

259790.46663264162

In [36]:
# take 4
# the best clustering is to be found around 180 - and every iteration takes time
# so on a local cluster use the known
print("\nTake 4 - Now going for clustering score 4 - estimating the entropy - best score to be found for k = 100")
print(clusteringScore4(data, 180))


Take 4 - Now going for clustering score 4 - estimating the entropy - best score to be found for k = 100
9379.615168599032


In [190]:
from pyspark.ml.linalg import Vector, Vectors
pipelineModel = fitPipeline4(data, 180)

kmeansModel = pipelineModel.stages[-1]
centroids = kmeansModel.clusterCenters()
# print(centroids[0])
# print(len(centroids[0]))

clustered = pipelineModel.transform(data)
clustered.select('cluster', 'scaledFeatureVector').show(5, False)

threshold_boundary = 100

threshold = clustered. \
select("cluster", "scaledFeatureVector").rdd. \
map(lambda x: Vectors.squared_distance(Vectors.dense(centroids[x[0]]), x[1])) \
.sortBy(lambda x: -x).take(threshold_boundary)[threshold_boundary - 1]

print('threshold is ' , threshold)


# map { case (cluster, vec) => Vectors.sqdist(centroids(cluster), vec) }.
#       orderBy($"value".desc).take(100).last

#     val originalCols = data.columns
#     val anomalies = clustered.filter { row =>
#       val cluster = row.getAs[Int]("cluster")
#       val vec = row.getAs[Vector]("scaledFeatureVector")
#       Vectors.sqdist(centroids(cluster), vec) >= threshold
#     }.select(originalCols.head, originalCols.tail: _*)

#     println("\nPrinting the first ten anomalous looking entries")
#     anomalies.take(10).foreach(println)

#     println("Total number of anomalous points found: " + anomalies.count)

+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|cluster|scaledFeatureVector                                                                                                                                                                                                                                                                                        |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|177    |(115,[1,2,8,19,20,25,28,29,30,32,39,42,105],[1.83157948440342

In [158]:
clustered.withColumn('id', monotonically_increasing_id()).select('id').show(5)

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+
only showing top 5 rows



In [159]:
clustered_with_id = clustered.withColumn('id', monotonically_increasing_id())

In [163]:
anomalies_rdd = clustered_with_id.select("id", "cluster", "scaledFeatureVector").rdd. \
map(lambda x: (x[0], Vectors.squared_distance(Vectors.dense(centroids[x[1]]), x[2]))) \
.filter(lambda x: x[1] >= threshold)
anomalies_rdd.cache()
anomalies_rdd.take(5)

[(11882, 2171.968313051754),
 (15699, 1226.0213560587104),
 (16064, 857.4055273940494),
 (21931, 2092.4015736714387),
 (22750, 3337.0915028250834)]

In [162]:
print('total numboer of anomailes ', anomalies_rdd.count())

total numboer of anomailes  100


In [164]:
anomalies_df = anomalies_rdd.toDF(['id', 'distance'])
anomalies_df.show()

TypeError: not supported type: <class 'numpy.float64'>

In [167]:
anomalies_df = anomalies_rdd.map(lambda x: (x[0], float(x[1]))).toDF(['id', 'distance'])
anomalies_df.show()

+-----+------------------+
|   id|          distance|
+-----+------------------+
|11882| 2171.968313051754|
|15699|1226.0213560587104|
|16064| 857.4055273940494|
|21931|2092.4015736714387|
|22750|3337.0915028250834|
|22772|3357.6963259464815|
|22785| 1052.185251221034|
|22786|1052.2850282222416|
|22789| 1052.285234229538|
|22790|1052.2810046753782|
|23235|  2195.18678162918|
|25708| 827.1860873820201|
|26541| 862.1152571582687|
|26616| 730.0491846706202|
|26637| 704.9558735560414|
|26638| 709.8037446447116|
|26671| 1542.306110702227|
|26673| 720.4301114151302|
|26771|1526.7587156086972|
|31698| 734.3909176287823|
+-----+------------------+
only showing top 20 rows



In [169]:
origCols = data.columns
data.select(origCols).show(1)

+--------+-------------+-------+----+---------+---------+----+--------------+------+---+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-------+
|duration|protocol_type|service|flag|src_bytes|dst_bytes|land|wrong_fragment|urgent|hot|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_login|count|srv_count|serror_rate|srv_serror_rate|rerror_rate|srv_rerror_rate|same_srv_rate|diff_srv_rate|srv_diff_host_rate|dst_hos

In [172]:
clustered_with_id.join(anomalies_df, 'id').select(origCols).show()

+--------+-------------+--------+----+---------+---------+----+--------------+------+---+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-------------+
|duration|protocol_type| service|flag|src_bytes|dst_bytes|land|wrong_fragment|urgent|hot|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_login|count|srv_count|serror_rate|srv_serror_rate|rerror_rate|srv_rerror_rate|same_srv_rate|diff_srv_rate|srv_diff_host_rate

In [186]:
ncols = data.columns
ncols.append('cluster')
ncols.append('distance')
ncols

['duration',
 'protocol_type',
 'service',
 'flag',
 'src_bytes',
 'dst_bytes',
 'land',
 'wrong_fragment',
 'urgent',
 'hot',
 'num_failed_logins',
 'logged_in',
 'num_compromised',
 'root_shell',
 'su_attempted',
 'num_root',
 'num_file_creations',
 'num_shells',
 'num_access_files',
 'num_outbound_cmds',
 'is_host_login',
 'is_guest_login',
 'count',
 'srv_count',
 'serror_rate',
 'srv_serror_rate',
 'rerror_rate',
 'srv_rerror_rate',
 'same_srv_rate',
 'diff_srv_rate',
 'srv_diff_host_rate',
 'dst_host_count',
 'dst_host_srv_count',
 'dst_host_same_srv_rate',
 'dst_host_diff_srv_rate',
 'dst_host_same_src_port_rate',
 'dst_host_srv_diff_host_rate',
 'dst_host_serror_rate',
 'dst_host_srv_serror_rate',
 'dst_host_rerror_rate',
 'dst_host_srv_rerror_rate',
 'label',
 'cluster',
 'distance']

In [188]:
anomalies = clustered_with_id.join(anomalies_df, 'id').cache()

In [198]:
anomalies.selectExpr('id', 'label', 'cluster', 'round(distance, 2) as distance', 'duration', 'protocol_type', 'service', 'flag', 'num_failed_logins','num_compromised').orderBy(desc('distance')).show()

+-----------+-------------+-------+--------+--------+-------------+--------+------+-----------------+---------------+
|         id|        label|cluster|distance|duration|protocol_type| service|  flag|num_failed_logins|num_compromised|
+-----------+-------------+-------+--------+--------+-------------+--------+------+-----------------+---------------+
|      43084|        nmap.|     38| 4606.66|       0|          tcp|     ctf|    SH|                0|              0|
| 8589945311|      normal.|     35| 4504.43|      19|          tcp|  telnet|    SF|                2|              0|
|      41721|      normal.|     35| 4494.69|      26|          tcp|  telnet|    SF|                2|              0|
| 8589945168|        imap.|     25|  4453.6|       0|          tcp|   imap4|    S1|                0|              0|
|17179882374|   portsweep.|     49| 3820.58|   40448|          tcp|csnet_ns|  RSTR|                0|              0|
|17179882372|   portsweep.|    105| 3515.99|   40682|   