<a href="https://colab.research.google.com/github/lamvng/network-anomaly-dectection/blob/master/Spark_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Network Anomalies Detection with K-Means on Pyspark
*Ba-Tuan THAI, Van-Lam NGUYEN, Anh-Duc PHAM - PFIEV K60*


## **1. Install openjdk 8, download and configure Pyspark**
Firstly, activate [Google Colab](https://colab.research.google.com/) by your Google account. You will be prompted to open a new Jupyter Notebook file, all of which will be stored on your Drive. Then download and install `pyspark` on your Google Colab directory.

*Note*: `findspark` is a Python package to automatically locate `pyspark` installation files.

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

Set up environmental variables:

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show(3)



+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows



In [0]:
!pip install -U -q PyDrive
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
# Authenticate and create the PyDrive client.
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [0]:
link = "https://drive.google.com/open?id=1wOl76pErCKeFwFnPRk_u-56STpqva2JQ"
fluff, id = link.split('=')
print (id) # Verify that you have everything after '='

1wOl76pErCKeFwFnPRk_u-56STpqva2JQ


In [0]:
downloaded = drive.CreateFile({'id':id}) 
downloaded.GetContentFile('kddcup.csv')

In [0]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [0]:
dataWithoutHeader = spark.read.option("inferSchema", True).option("header", False).csv("kddcup.csv")
data = dataWithoutHeader.toDF("duration", "protocol_type", "service", "flag",
"src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent", "hot", "num_failed_logins", "logged_in", "num_compromised", "root_shell", "su_attempted", "num_root", "num_file_creations", "num_shells", "num_access_files", "num_outbound_cmds", "is_host_login", "is_guest_login", "count", "srv_count", "serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate", "same_srv_rate", "diff_srv_rate", "srv_diff_host_rate", "dst_host_count", "dst_host_srv_count",
"dst_host_same_srv_rate", "dst_host_diff_srv_rate", "dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate", "dst_host_serror_rate", "dst_host_srv_serror_rate", "dst_host_rerror_rate", "dst_host_srv_rerror_rate",
"label")

In [0]:
from pyspark.sql.functions import col
data.select("label").groupBy("label").count().orderBy(col("count").desc()).show(25)

+----------------+------+
|           label| count|
+----------------+------+
|          smurf.|280790|
|        neptune.|107201|
|         normal.| 97278|
|           back.|  2203|
|          satan.|  1589|
|        ipsweep.|  1247|
|      portsweep.|  1040|
|    warezclient.|  1020|
|       teardrop.|   979|
|            pod.|   264|
|           nmap.|   231|
|   guess_passwd.|    53|
|buffer_overflow.|    30|
|           land.|    21|
|    warezmaster.|    20|
|           imap.|    12|
|        rootkit.|    10|
|     loadmodule.|     9|
|      ftp_write.|     8|
|       multihop.|     7|
|            phf.|     4|
|           perl.|     3|
|            spy.|     2|
+----------------+------+



In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.clustering import KMeans
from pyspark.ml.clustering import KMeansModel
numericOnly = data.drop("protocol_type", "service", "flag")
assembler = VectorAssembler(inputCols=numericOnly.columns[0:-1], outputCol="featureVector")
kmeans = KMeans().setSeed(10).setPredictionCol("cluster").setFeaturesCol("featureVector")
pipeline = Pipeline(stages=[assembler, kmeans])
pipelineModel = pipeline.fit(numericOnly)
kmeansModel = pipelineModel.stages[-1]
centers = kmeansModel.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[4.79793956e+01 1.62207883e+03 8.68534183e+02 4.45326100e-05
 6.43293794e-03 1.41694668e-05 3.45168212e-02 1.51815716e-04
 1.48247035e-01 1.02121372e-02 1.11331525e-04 3.64357718e-05
 1.13517671e-02 1.08295211e-03 1.09307315e-04 1.00805635e-03
 0.00000000e+00 0.00000000e+00 1.38658354e-03 3.32286248e+02
 2.92907143e+02 1.76685418e-01 1.76607809e-01 5.74330999e-02
 5.77183920e-02 7.91548844e-01 2.09816404e-02 2.89968625e-02
 2.32470732e+02 1.88666046e+02 7.53781203e-01 3.09056111e-02
 6.01935529e-01 6.68351484e-03 1.76753957e-01 1.76441622e-01
 5.81176268e-02 5.74111170e-02]
[2.0000000e+00 6.9337564e+08 0.0000000e+00 0.0000000e+00 0.0000000e+00
 0.0000000e+00 1.0000000e+00 0.0000000e+00 0.0000000e+00 0.0000000e+00
 0.0000000e+00 0.0000000e+00 0.0000000e+00 0.0000000e+00 0.0000000e+00
 0.0000000e+00 0.0000000e+00 0.0000000e+00 0.0000000e+00 5.7000000e+01
 3.0000000e+00 7.9000000e-01 6.7000000e-01 2.1000000e-01 3.3000000e-01
 5.0000000e-02 3.9000000e-01 0.0000000e+00 2.5

In [0]:
withCluster = pipelineModel.transform(numericOnly)
withCluster.select("cluster", "label").groupBy("cluster", "label").count().orderBy(col("cluster"), col("count").desc()).show(25)

+-------+----------------+------+
|cluster|           label| count|
+-------+----------------+------+
|      0|          smurf.|280790|
|      0|        neptune.|107201|
|      0|         normal.| 97278|
|      0|           back.|  2203|
|      0|          satan.|  1589|
|      0|        ipsweep.|  1247|
|      0|      portsweep.|  1039|
|      0|    warezclient.|  1020|
|      0|       teardrop.|   979|
|      0|            pod.|   264|
|      0|           nmap.|   231|
|      0|   guess_passwd.|    53|
|      0|buffer_overflow.|    30|
|      0|           land.|    21|
|      0|    warezmaster.|    20|
|      0|           imap.|    12|
|      0|        rootkit.|    10|
|      0|     loadmodule.|     9|
|      0|      ftp_write.|     8|
|      0|       multihop.|     7|
|      0|            phf.|     4|
|      0|           perl.|     3|
|      0|            spy.|     2|
|      1|      portsweep.|     1|
+-------+----------------+------+



In [0]:
from pyspark.sql import DataFrame
from random import randrange
def clusteringScore0(data, k):
    assembler = VectorAssembler(inputCols=data.columns[0:-1], outputCol="featureVector")
    kmeans = KMeans(seed=1, k=k, predictionCol="cluster", featuresCol="featureVector")
    pipeline = Pipeline(stages=[assembler, kmeans])
    kmeansModel = pipeline.fit(data).stages[-1]
    print(k, kmeansModel.computeCost(assembler.transform(data)) / data.count())
def clusteringScore1(data, k):
    assembler = VectorAssembler(inputCols=data.columns[0:-1], outputCol="featureVector")
    kmeans = KMeans(seed=1, k=k, predictionCol="cluster", featuresCol="featureVector", maxIter=40, tol=1.0e-05)
    pipeline = Pipeline(stages=[assembler, kmeans])
    kmeansModel = pipeline.fit(data).stages[-1]
    print(k, kmeansModel.computeCost(assembler.transform(data)) / data.count())

In [0]:
for k in range(20, 120, 20):
    clusteringScore0(numericOnly, k)
    clusteringScore1(numericOnly, k)

20 70090529.18766987
20 70090529.18766987
40 34134989.30719846
40 34134989.30719846
60 32241636.217287987
60 32241469.66946356
80 31426292.4634663
80 31426292.4634663
100 29985935.77407783
100 26300705.9737499


In [0]:
from pyspark.ml.feature import StandardScaler
def clusteringScore2(data, k):
  assembler = VectorAssembler(inputCols=data.columns[0:-1], outputCol="featureVector")
  scaler = StandardScaler(inputCol="featureVector", outputCol="scaledFeatureVector", withStd=True, withMean=False)
  kmeans = KMeans(seed=1, k=k, predictionCol="cluster", featuresCol="scaledFeatureVector", maxIter=40, tol=1.0e-05)
  pipeline = Pipeline(stages=[assembler, scaler, kmeans])
  pipelineModel = pipeline.fit(data)
  kmeansModel = pipelineModel.stages[-1]
  print(k, kmeansModel.computeCost(pipelineModel.transform(data)) / data.count())

In [0]:
for k in range(60, 300, 30):
  print(k, clusteringScore2(numericOnly, k))

60 1.1868969161190808
60 None
90 0.716842007702905
90 None
120 0.49734934242366263
120 None
150 0.3759175320833906
150 None
180 0.3172011058253099
180 None
210 0.273636909577068
210 None
240 0.23059890300407695
240 None
270 0.20361107540315934
270 None


In [0]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
import math
def oneHotPipeline(inputCol):
    indexer = StringIndexer(inputCol=inputCol, outputCol=(inputCol + "_indexed"))
    encoder = OneHotEncoder(inputCol=inputCol + "_indexed", outputCol=inputCol + "_vec")
    pipeline = Pipeline(stages=[indexer, encoder])
    return (pipeline, inputCol + "_vec")

In [0]:
def clusteringScore3(data, k):
    (protoTypeEncoder, protoTypeVecCol) = oneHotPipeline("protocol_type")
    (serviceEncoder, serviceVecCol) = oneHotPipeline("service")
    (flagEncoder, flagVecCol) = oneHotPipeline("flag")
    assembleCols = list(set(data.columns) - set(["label", "protocol_type", "service", "flag"])) + list([protoTypeVecCol, serviceVecCol, flagVecCol])
    assembler = VectorAssembler(inputCols=assembleCols, outputCol="featureVector")
    scaler = StandardScaler(inputCol="featureVector", outputCol="scaledFeatureVector", withStd=True, withMean=False)
    kmeans = KMeans(seed=1, k=k, predictionCol="cluster", featuresCol="scaledFeatureVector", maxIter=40, tol=1.0e-05)
    pipeline = Pipeline(stages=[protoTypeEncoder, serviceEncoder, flagEncoder, assembler, scaler, kmeans])
    pipelineModel = pipeline.fit(data)
    kmeansModel = pipelineModel.stages[-1]
    print(k, kmeansModel.computeCost(pipelineModel.transform(data)) / data.count())

for k in range(60, 270, 30):
    print(clusteringScore3(data, k))
    numericOnly.unpersist()

60 34.32659398810193
None
90 10.102528717704125
None
120 2.956985135173616
None
150 2.1403963857555346
None
180 1.5556825369949714
None
210 1.2852764808975832
None
240 0.9366045728591509
None


In [0]:
import math
def entropy(counts):
    values = filter(lambda x: x > 0, counts)
    n = float(sum(values))
    sum_entropy = 0.0
    for v in counts:
      sum_entropy += -(v/n) * math.log((v/n))
    # e = map(lambda v: -(v/n) * math.log((v/n)), values)
    # print(sum(e))
    return sum_entropy

In [0]:
from itertools import groupby
def list_group(group_list):
  groups = []
  for key, group in groupby(group_list):
    groups.append(len(list(group)))
  return groups


In [0]:
def fitPipeline4(data, k):
  (protoTypeEncoder, protoTypeVecCol) = oneHotPipeline("protocol_type")
  (serviceEncoder, serviceVecCol) = oneHotPipeline("service")
  (flagEncoder, flagVecCol) = oneHotPipeline("flag")
  assembleCols = list(set(data.columns) - set(["label", "protocol_type", "service", "flag"])) + list([protoTypeVecCol, serviceVecCol, flagVecCol])
  assembler = VectorAssembler(inputCols=assembleCols, outputCol="featureVector")
  scaler = StandardScaler(inputCol="featureVector", outputCol="scaledFeatureVector", withStd=True, withMean=False)
  kmeans = KMeans(seed=1, k=k, predictionCol="cluster", featuresCol="scaledFeatureVector", maxIter=40, tol=1.0e-05)
  pipeline = Pipeline(stages=[protoTypeEncoder, serviceEncoder, flagEncoder, assembler, scaler, kmeans])
  return pipeline.fit(data)

**GroupByKey, MapGroups chỉ hỗ trợ trong Spark với Scala, Java. Tuy nhiên trong document của spark có hướng dẫn là python có những hàm mạnh mẽ tương đương, cụ thể ở đây chúng ta sẽ dùng mapGroup, map, sum của itertools**  
Hàm dưới đây có chức năng tính entropy của 1 tập dữ liệu, công thức tính sẽ là: 
Mỗi cluster được phân cụm sẽ gồm nhiều label trùng nhau, tách tưng cluster ra riêng


1.   Tính size của từng cluster, 
2.   Group từng cluster lại thành các labels, với số lần xuất hiện trong cluster
3.   `entropy[i] = -(frequence/sum)*log((frequence/sum))`
4.   `Score_of_cluster[i] = size[i] * entropy[i] `
5.   `Sum(Score_of_cluster[i])`



In [0]:
def clusteringScore4(data, k):
  pipelineModel = fitPipeline4(data, k)
  # Predict cluster for each datum
  clusterLabel = pipelineModel.transform(data).select("cluster", "label")
  labelsInCluster = clusterLabel.rdd.groupByKey().values()
  labelCounts = labelsInCluster.map(lambda labels: list_group(labels)).collect()
  sum_of_labelcount = 0.0
  for t in labelCounts:
    sum_of_labelcount += entropy(t)*sum(t)
  return sum_of_labelcount / data.count()

In [0]:
for k in range(60, 300, 30):
  print(clusteringScore4(data, k))

1.324306956113726
1.1064688883555034
0.8790612095658834
0.3616672324031876
0.28924629592417994
0.10476604691418735
0.2517526550133862
0.22942454180841804


*Nhìn thấy rằng cluster 210 có điểm đánh giá thấp nhất, ta sẽ thực hiện phân tích bất thường với cụm này*


---


[Thật sự vẫn chưa hiểu nó tính score kiểu đó có tác dụng gì :))]


---

Bắt đầu Anomoly Detection

In [0]:
pipelineModel = fitPipeline4(data, 180)

In [0]:
countByClusterLabel = pipelineModel.transform(data).select("cluster", "label").groupBy("cluster", "label").count().orderBy("cluster", "label")
countByClusterLabel.show()

+-------+----------------+------+
|cluster|           label| count|
+-------+----------------+------+
|      0|         normal.|     9|
|      0|          smurf.|280773|
|      1|        neptune.|   101|
|      1|      portsweep.|     1|
|      2|           imap.|     7|
|      2|        neptune.|   105|
|      3|        neptune.| 36557|
|      3|      portsweep.|    13|
|      4|        neptune.|   101|
|      4|      portsweep.|     4|
|      5|        neptune.|    89|
|      5|          satan.|     1|
|      6|        ipsweep.|     1|
|      6|        neptune.|   102|
|      6|         normal.|     1|
|      6|      portsweep.|     1|
|      7|        neptune.|    25|
|      8|buffer_overflow.|     6|
|      8|      ftp_write.|     4|
|      8|     loadmodule.|     1|
+-------+----------------+------+
only showing top 20 rows



In [0]:
from pyspark.ml.linalg import Vector
from pyspark.ml.linalg import Vectors

kMeansModel = pipelineModel.stages[-1]
centroids = kMeansModel.clusterCenters()
clustered = pipelineModel.transform(data)

In [0]:
centroids[0]

In [0]:
need_order = clustered.select("cluster", "scaledFeatureVector").rdd.map(lambda cluster: (Vectors.squared_distance(centroids[cluster[0]], cluster[1]))).collect()

In [0]:
need_order.sort(reverse=True)
threshold = need_order[100]

In [0]:
originalCols = data.columns

In [0]:
def distance_cluster(centroids, vec_tor):
  return Vectors.squared_distance(centroids, vector)

In [0]:
clustered.filter(lambda row_in: distance_cluster(centroids[row_in["cluster"]], row_in["scaledFeatureVector"]) > threshold)

TypeError: ignored