# Intrusion Detection based Anomaly method using Clustering algorithm
- K-Means

### Importing Packages

In [None]:
!pip install pyspark



In [None]:
import pyspark.sql.functions as funcs
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import *
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder\
.master("local[4]")\
.appName("ReadFromCsv")\
.config("spark.driver.memory","3g")\
.config("spark.executor.memory", "4g")\
.getOrCreate()

In [None]:
'''logger = spark.sparkContext._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel(logger.Level.ERROR)
logger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)'''

'logger = spark.sparkContext._jvm.org.apache.log4j\nlogger.LogManager.getLogger("org"). setLevel(logger.Level.ERROR)\nlogger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)'

## Predict on Stream

# 1. Load Dataset

In [None]:
iris = spark.read \
.format("csv")\
.option("header", True)\
.option("sep", ",")\
.option("inferSchema", "True")\
.load("UNSW_NB15.csv")

In [None]:
iris.head()

Row(id=1, dur=1.1e-05, proto='udp', service='-', state='INT', spkts=2, dpkts=0, sbytes=496, dbytes=0, rate=90909.0902, sttl=254, dttl=0, sload=180363632.0, dload=0.0, sloss=0, dloss=0, sinpkt=0.011, dinpkt=0.0, sjit=0.0, djit=0.0, swin=0, stcpb=0, dtcpb=0, dwin=0, tcprtt=0.0, synack=0.0, ackdat=0.0, smean=248, dmean=0, trans_depth=0, response_body_len=0, ct_srv_src=2, ct_state_ttl=2, ct_dst_ltm=1, ct_src_dport_ltm=1, ct_dst_sport_ltm=1, ct_dst_src_ltm=2, is_ftp_login=0, ct_ftp_cmd=0, ct_flw_http_mthd=0, ct_src_ltm=1, ct_srv_dst=2, is_sm_ips_ports=0, attack_cat='Normal', label=0)

In [None]:
iris.printSchema()

root
 |-- id: integer (nullable = true)
 |-- dur: double (nullable = true)
 |-- proto: string (nullable = true)
 |-- service: string (nullable = true)
 |-- state: string (nullable = true)
 |-- spkts: integer (nullable = true)
 |-- dpkts: integer (nullable = true)
 |-- sbytes: integer (nullable = true)
 |-- dbytes: integer (nullable = true)
 |-- rate: double (nullable = true)
 |-- sttl: integer (nullable = true)
 |-- dttl: integer (nullable = true)
 |-- sload: double (nullable = true)
 |-- dload: double (nullable = true)
 |-- sloss: integer (nullable = true)
 |-- dloss: integer (nullable = true)
 |-- sinpkt: double (nullable = true)
 |-- dinpkt: double (nullable = true)
 |-- sjit: double (nullable = true)
 |-- djit: double (nullable = true)
 |-- swin: integer (nullable = true)
 |-- stcpb: long (nullable = true)
 |-- dtcpb: long (nullable = true)
 |-- dwin: integer (nullable = true)
 |-- tcprtt: double (nullable = true)
 |-- synack: double (nullable = true)
 |-- ackdat: double (nullable 

In [None]:
iris=iris.drop('proto','service','state','label')

# 2. Data Preparation for Training

In [None]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline

In [None]:
#train, test = data.randomSplit([0.70, 0.30])

### 2.1 Data Vectorization and String Indexing

In [None]:
label_indexer = StringIndexer(inputCol = "attack_cat", outputCol = "label")

label_indexer_model = label_indexer.fit(iris)
new_df = label_indexer_model.transform(iris)

In [None]:
feature_cols = iris.columns[:-1]


In [None]:
assembler = VectorAssembler(inputCols = feature_cols, outputCol = 'vec_features')
assembler_df = assembler.transform(new_df)

In [None]:
normal = assembler_df.where(funcs.col("attack_cat") == "Normal")

### 2.2 Feature Reduction using PCA (Principal Component Analysis)

In [None]:
from pyspark.ml.feature import PCA

pca = PCA(k=9, inputCol="vec_features", outputCol="features")
pcaModel = pca.fit(normal)
normal_reduction_df = pcaModel.transform(normal)

In [None]:
normal_reduction_df.toPandas().head(3)

Unnamed: 0,id,dur,spkts,dpkts,sbytes,dbytes,rate,sttl,dttl,sload,dload,sloss,dloss,sinpkt,dinpkt,sjit,djit,swin,stcpb,dtcpb,dwin,tcprtt,synack,ackdat,smean,dmean,trans_depth,response_body_len,ct_srv_src,ct_state_ttl,ct_dst_ltm,ct_src_dport_ltm,ct_dst_sport_ltm,ct_dst_src_ltm,is_ftp_login,ct_ftp_cmd,ct_flw_http_mthd,ct_src_ltm,ct_srv_dst,is_sm_ips_ports,attack_cat,label,vec_features,features
0,1,1.1e-05,2,0,496,0,90909.0902,254,0,180363632.0,0.0,0,0,0.011,0.0,0.0,0.0,0,0,0,0,0.0,0.0,0.0,248,0,0,0,2,2,1,1,1,2,0,0,0,1,2,0,Normal,0.0,"(1.0, 1.1e-05, 2.0, 0.0, 496.0, 0.0, 90909.090...","[4959946.8794864, -17456.289661774674, 1802953..."
1,2,8e-06,2,0,1762,0,125000.0003,254,0,881000000.0,0.0,0,0,0.008,0.0,0.0,0.0,0,0,0,0,0.0,0.0,0.0,881,0,0,0,2,2,1,1,1,2,0,0,0,1,2,0,Normal,0.0,"(2.0, 8e-06, 2.0, 0.0, 1762.0, 0.0, 125000.000...","[24227235.07051033, -85266.57379073313, 880666..."
2,3,5e-06,2,0,1068,0,200000.0051,254,0,854400000.0,0.0,0,0,0.005,0.0,0.0,0.0,0,0,0,0,0.0,0.0,0.0,534,0,0,0,3,2,1,1,1,3,0,0,0,1,3,0,Normal,0.0,"(3.0, 5e-06, 2.0, 0.0, 1068.0, 0.0, 200000.005...","[23495744.56271649, -82692.12633856495, 854076..."


# 3. Train Model (K-Means Clustering)

### 3.1 Training of Data

In [None]:
from pyspark.ml.clustering import KMeans

k_num = 2
kmeans = KMeans(featuresCol='features',k=k_num, maxIter=100)
model = kmeans.fit(normal_reduction_df)

### 3.1.1 Prediction Training Dataset

In [None]:
pca = PCA(k=9, inputCol="vec_features", outputCol="features")
pcaModel = pca.fit(assembler_df)
test_reduction_df = pcaModel.transform(assembler_df)

predictions = model.transform(test_reduction_df)
predictions = predictions.select("features","label","prediction")
predictions.toPandas().head()

Unnamed: 0,features,label,prediction
0,"[5708712.889189165, 1237.7888575087018, 180273...",0.0,1
1,"[27884633.38072515, 6046.07048660398, 88055849...",0.0,1
2,"[27042716.843218617, 5863.523577897636, 853971...",0.0,1
3,"[18990672.98883812, 4117.643203814579, 5996993...",0.0,1
4,"[26916108.39680101, 5836.070002120078, 8499738...",0.0,1


In [None]:
test_reduction_df.head(5)

[Row(id=1, dur=1.1e-05, spkts=2, dpkts=0, sbytes=496, dbytes=0, rate=90909.0902, sttl=254, dttl=0, sload=180363632.0, dload=0.0, sloss=0, dloss=0, sinpkt=0.011, dinpkt=0.0, sjit=0.0, djit=0.0, swin=0, stcpb=0, dtcpb=0, dwin=0, tcprtt=0.0, synack=0.0, ackdat=0.0, smean=248, dmean=0, trans_depth=0, response_body_len=0, ct_srv_src=2, ct_state_ttl=2, ct_dst_ltm=1, ct_src_dport_ltm=1, ct_dst_sport_ltm=1, ct_dst_src_ltm=2, is_ftp_login=0, ct_ftp_cmd=0, ct_flw_http_mthd=0, ct_src_ltm=1, ct_srv_dst=2, is_sm_ips_ports=0, attack_cat='Normal', label=0.0, vec_features=SparseVector(40, {0: 1.0, 1: 0.0, 2: 2.0, 4: 496.0, 6: 90909.0902, 7: 254.0, 9: 180363632.0, 13: 0.011, 24: 248.0, 28: 2.0, 29: 2.0, 30: 1.0, 31: 1.0, 32: 1.0, 33: 2.0, 37: 1.0, 38: 2.0}), features=DenseVector([5708712.8892, 1237.7889, 180273267.6551, -83476.0189, -1224.5606, 694.9033, 24605.3435, 1062.7755, 86.8435])),
 Row(id=2, dur=8e-06, spkts=2, dpkts=0, sbytes=1762, dbytes=0, rate=125000.0003, sttl=254, dttl=0, sload=881000000.

### 3.1.2 Calculation of Silhouette Score

In [None]:
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.7641545083264973


### 3.2 Analysing of Trained model

#### Analysing of how many labels have in a cluster ?

In [None]:
predictions.select("prediction", "label").groupBy("prediction", "label").count()\
.orderBy("prediction", "label", ascending=True).toPandas().head(10)

Unnamed: 0,prediction,label,count
0,0,0.0,20122
1,0,1.0,419
2,0,2.0,6125
3,0,3.0,2902
4,0,4.0,790
5,0,5.0,1452
6,0,6.0,43
7,0,7.0,40
8,0,8.0,149
9,0,9.0,26


In [None]:
predictions.select("prediction", "label")\
.groupBy("prediction", "label").count()\
.orderBy("prediction", "label", ascending=True).withColumn("status",
funcs.when(funcs.col("label").isin(1), "Anomaly")\
.otherwise("Normal")).toPandas().head()

Unnamed: 0,prediction,label,count,status
0,0,0.0,20122,Normal
1,0,1.0,419,Anomaly
2,0,2.0,6125,Normal
3,0,3.0,2902,Normal
4,0,4.0,790,Normal


### 3.3 Calculation of centroids for every cluster

In [None]:
train_clusters = model.clusterCenters()

traind_clusters = {int(i):[float(train_clusters[i][j]) for j in range(len(train_clusters[i]))] 
              for i in range(len(train_clusters))}
train_clusters

[array([-3.53987486e+09,  7.21069162e+06,  9.79013203e+07, -7.27953090e+05,
        -3.50022419e+03, -4.59904213e+04,  6.94134083e+03, -1.01817722e+03,
         4.99206380e+04]),
 array([-4.63817818e+08, -2.04559348e+06,  9.93055341e+07, -8.08098696e+05,
        -3.90772282e+03, -4.71055870e+04,  6.13599140e+03, -8.72729361e+02,
         4.93227479e+04])]

In [None]:
train_df_centers = spark.sparkContext.parallelize([(k,)+(v,) for k,v in traind_clusters.items()]).toDF(['prediction','center'])
train_df_centers.toPandas().head()

Unnamed: 0,prediction,center
0,0,"[-3539874863.1353145, 7210691.6237883195, 9790..."
1,1,"[-463817818.45496565, -2045593.4777213074, 993..."


In [None]:
train_pred_df = predictions.withColumn('prediction',funcs.col('prediction').cast(IntegerType()))
train_pred_df.toPandas().head()

Unnamed: 0,features,label,prediction
0,"[5708712.889189165, 1237.7888575087018, 180273...",0.0,1
1,"[27884633.38072515, 6046.07048660398, 88055849...",0.0,1
2,"[27042716.843218617, 5863.523577897636, 853971...",0.0,1
3,"[18990672.98883812, 4117.643203814579, 5996993...",0.0,1
4,"[26916108.39680101, 5836.070002120078, 8499738...",0.0,1


### 3.3.1 Joining of centroid and feature dataframes

In [None]:
train_pred_df = train_pred_df.join(train_df_centers,on='prediction',how='left')
train_pred_df.toPandas().head()

Unnamed: 0,prediction,features,label,center
0,0,"[-3945513635.0590897, -840436628.619345, 12495...",0.0,"[-3539874863.1353145, 7210691.6237883195, 9790..."
1,0,"[-3145010118.4775715, 1618262491.021295, 99603...",0.0,"[-3539874863.1353145, 7210691.6237883195, 9790..."
2,0,"[-4469369077.999338, 887516385.47205, 14153129...",0.0,"[-3539874863.1353145, 7210691.6237883195, 9790..."
3,0,"[-2712068004.352801, -1734310994.9684193, 8589...",0.0,"[-3539874863.1353145, 7210691.6237883195, 9790..."
4,0,"[-2311546963.3735495, -323231314.0366038, 7320...",0.0,"[-3539874863.1353145, 7210691.6237883195, 9790..."


### 3.4 Finding Anomaly Values

#### Getting distance values function

In [None]:
get_dist = funcs.udf(lambda features, center :
                 float(features.squared_distance(center)),FloatType())

#### Sorting the furthest distance values

In [None]:
train_pred_df = train_pred_df.withColumn('dist',get_dist(funcs.col('features'),funcs.col('center')))
train_pred_df.toPandas().sort_values(by="dist",ascending=False).head(10)

Unnamed: 0,prediction,features,label,center,dist
60148,1,"[166738078.32773128, 36152.897495451725, 52653...",4.0,"[-463817818.45496565, -2045593.4777213074, 993...",2.708573e+19
60145,1,"[166738078.32773775, 36152.89749561954, 526536...",2.0,"[-463817818.45496565, -2045593.4777213074, 993...",2.708573e+19
60147,1,"[166738078.32773343, 36152.89749550766, 526536...",4.0,"[-463817818.45496565, -2045593.4777213074, 993...",2.708573e+19
60149,1,"[166738078.32772914, 36152.89749539578, 526536...",4.0,"[-463817818.45496565, -2045593.4777213074, 993...",2.708573e+19
60146,1,"[166738078.3277356, 36152.8974955636, 52653601...",2.0,"[-463817818.45496565, -2045593.4777213074, 993...",2.708573e+19
60144,1,"[166738078.3277399, 36152.897495675476, 526536...",2.0,"[-463817818.45496565, -2045593.4777213074, 993...",2.708573e+19
81207,1,"[138252070.6633688, 29976.431789254748, 436581...",0.0,"[-463817818.45496565, -2045593.4777213074, 993...",1.856556e+19
80237,1,"[134707166.2947062, 29207.814257699774, 425386...",0.0,"[-463817818.45496565, -2045593.4777213074, 993...",1.761862e+19
59670,1,"[134707166.369035, 29207.81618858013, 42538672...",0.0,"[-463817818.45496565, -2045593.4777213074, 993...",1.761862e+19
62641,1,"[134707166.3553571, 29207.81583325966, 4253867...",3.0,"[-463817818.45496565, -2045593.4777213074, 993...",1.761862e+19


### 3.5 Calculation of Threshold Value according to distance

### 3.5.1 Average distance for every cluster 

In [None]:
averageDistance = train_pred_df.filter(funcs.col("label") == 0.0).groupBy("prediction")\
.agg(funcs.avg("dist").alias("avgDist"))
averageDistance.toPandas().head()

Unnamed: 0,prediction,avgDist
0,1,8.193462e+17
1,0,2.544128e+18


### 3.5.2 Maximum distance for every cluster 

In [None]:
maxDistance = train_pred_df.filter(funcs.col("label") == 0.0).groupBy("prediction")\
.agg(funcs.max("dist").alias("maxDist"))
maxDistance.toPandas().head()

Unnamed: 0,prediction,maxDist
0,1,1.856556e+19
1,0,9.306718e+18


### Predicted Dataframe

In [None]:
train_pred_df.toPandas().head(5)

Unnamed: 0,prediction,features,label,center,dist
0,0,"[-3945513635.0590897, -840436628.619345, 12495...",0.0,"[-3539874863.1353145, 7210691.6237883195, 9790...",8.837835e+17
1,0,"[-3145010118.4775715, 1618262491.021295, 99603...",0.0,"[-3539874863.1353145, 7210691.6237883195, 9790...",2.751411e+18
2,0,"[-4469369077.999338, 887516385.47205, 14153129...",0.0,"[-3539874863.1353145, 7210691.6237883195, 9790...",1.640805e+18
3,0,"[-2712068004.352801, -1734310994.9684193, 8589...",0.0,"[-3539874863.1353145, 7210691.6237883195, 9790...",3.718308e+18
4,0,"[-2311546963.3735495, -323231314.0366038, 7320...",0.0,"[-3539874863.1353145, 7210691.6237883195, 9790...",1.618593e+18


#### 3.5.3 Joining of predicted and threshold dataframes

In [None]:
anomalyDetection = train_pred_df.join(maxDistance, maxDistance.prediction == train_pred_df.prediction)

anomalyDetection.toPandas().head(5)

Unnamed: 0,prediction,features,label,center,dist,prediction.1,maxDist
0,1,"[5708712.889189165, 1237.7888575087018, 180273...",0.0,"[-463817818.45496565, -2045593.4777213074, 993...",2.270157e+17,1,1.856556e+19
1,1,"[27884633.38072515, 6046.07048660398, 88055849...",0.0,"[-463817818.45496565, -2045593.4777213074, 993...",8.521319e+17,1,1.856556e+19
2,1,"[27042716.843218617, 5863.523577897636, 853971...",0.0,"[-463817818.45496565, -2045593.4777213074, 993...",8.104697e+17,1,1.856556e+19
3,1,"[18990672.98883812, 4117.643203814579, 5996993...",0.0,"[-463817818.45496565, -2045593.4777213074, 993...",4.835025e+17,1,1.856556e+19
4,1,"[26916108.39680101, 5836.070002120078, 8499738...",0.0,"[-463817818.45496565, -2045593.4777213074, 993...",8.043271e+17,1,1.856556e+19


### 3.6 Assigning of labels as normal or anomaly

In [None]:
detected_df = anomalyDetection.withColumn("detected", funcs.when(anomalyDetection.dist > anomalyDetection.maxDist, "Anomaly").otherwise("Normal"))
detected_df.toPandas().head(5)

Unnamed: 0,prediction,features,label,center,dist,prediction.1,maxDist,detected
0,1,"[5708712.889189165, 1237.7888575087018, 180273...",0.0,"[-463817818.45496565, -2045593.4777213074, 993...",2.270157e+17,1,1.856556e+19,Normal
1,1,"[27884633.38072515, 6046.07048660398, 88055849...",0.0,"[-463817818.45496565, -2045593.4777213074, 993...",8.521319e+17,1,1.856556e+19,Normal
2,1,"[27042716.843218617, 5863.523577897636, 853971...",0.0,"[-463817818.45496565, -2045593.4777213074, 993...",8.104697e+17,1,1.856556e+19,Normal
3,1,"[18990672.98883812, 4117.643203814579, 5996993...",0.0,"[-463817818.45496565, -2045593.4777213074, 993...",4.835025e+17,1,1.856556e+19,Normal
4,1,"[26916108.39680101, 5836.070002120078, 8499738...",0.0,"[-463817818.45496565, -2045593.4777213074, 993...",8.043271e+17,1,1.856556e+19,Normal


### 3.7 Evaluation of result using Confusion Matrix

In [None]:
conf_matrix = detected_df.withColumn("label",funcs.when(funcs.col("label").isin(1), "Anomaly").otherwise("Normal"))\
.groupBy("label","detected").count()
conf_matrix.toPandas().head(10)

Unnamed: 0,label,detected,count
0,Anomaly,Normal,18871
1,Normal,Anomaly,10
2,Normal,Normal,63451


### 3.7.1 Calculation of Accuracy 

In [None]:
all_df = detected_df.count()
tptn = conf_matrix.filter(conf_matrix.label == conf_matrix.detected).agg(funcs.sum("count")).select("sum(count)").toPandas().head()
tptn = tptn.at[0, 'sum(count)']

In [None]:
accuracy = tptn / all_df
print("Accuracy: ", accuracy)

Accuracy:  0.7706723995530292


### 3.7.2 Calculation of Recall

In [None]:
tp = conf_matrix.filter((conf_matrix.label == "Normal") & ( conf_matrix.detected == "Normal")).select("count").toPandas()
fn = conf_matrix.filter((conf_matrix.label == "Anomaly") & ( conf_matrix.detected == "Normal")).select("count").toPandas()
tp = tp.at[0, 'count']
fn = fn.at[0, 'count']

In [None]:
recall = tp / (tp + fn)
print("Recall: ", recall)

Recall:  0.7707660163747236
