# **PREREQUISITES**

In [None]:
import pandas as pd
import numpy as np
import random as rd
import json
import sys
import requests
import joblib


from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.ml.evaluation import ClusteringEvaluator
from sklearn import preprocessing
from __future__ import print_function
from pyspark import SparkConf, SparkContext, sql
from pyspark.streaming import StreamingContext
from pyspark.mllib.stat import Statistics
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import monotonically_increasing_id, col
from pyspark.sql.functions import stddev, stddev_pop, mean
from scipy.spatial.distance import cdist
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier

from sklearn import metrics


from math import sqrt, ceil
from os import environ


## Download the exact version of elasticsearch-spark-20_2.11-7.15.0.jar and place it inside the Apache Spark folder ##
## The version should match the Elasticsearch version deployed in your system
## https://www.elastic.co/downloads/hadoop
## Modify the code below to point to the right path in your system
## This jar is used for the hadoop implementation of elasticsearch index query and insertion.

In [None]:
environ['PYSPARK_SUBMIT_ARGS'] = '--jars "/home/orchestrator/spark-2.3.0-bin-hadoop2.7/elasticsearch-spark-20_2.11-7.15.0.jar" pyspark-shell'

In [None]:
# Initialize global variables.
kmeansModel = None
knnModel = None


In [None]:
#setup configuration property 
#set the master URL 
#set an application name 
conf = SparkConf().setMaster("local[10]").setAppName("sparkproject")
#start spark cluster 
#if already started then get it else start it 
sc = SparkContext.getOrCreate(conf=conf)
#initialize SQLContext from spark cluster 
sqlContext = SQLContext(sc)

# **DATA PROCESSING AND STANDARIZATION**

In [None]:
sc

## Define the elasticsearch query for obtaining the training DATA ##
## Be sure to point to the correct address and port of the elasticsearch deployment ##
## Change index name if needed ##
## User name and password if using securyty enable cluster -- Change or erase if needed ##  

In [None]:
q ="""{
  "query": {
    "match_all": {}
  }  
}"""

es_read_conf = {
		"es.nodes" : 'localhost',
		# specify the port in case it is not the default port
		"es.port" : '9200',
		# specify a resource in the form 'index/doc-type'
		"es.resource" : 'sflowtest',
        #Query
        "es.query" : q,
		# is the input JSON?
		"es.input.json" : "yes",
		# is there a field in the mapping that should be used to specify the ES document ID
		#"es.mapping.id": round(time.time() * 1000),
		'es.net.http.auth.user' : 'elastic',
		'es.net.http.auth.pass' : 'elastic'
}

In [None]:
#Query for elasticsearch data
es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)

In [None]:
df =sqlContext.createDataFrame(es_rdd)

In [None]:
df.collect()

In [None]:
df.printSchema()

In [None]:
#Format the Dataframe into proper Feature columns
df=df.rdd.map(lambda x: \
    (x._2["sumOfBytes"],x._2["sumOfFlows"], x._2["sumOfPackets"],x._2["uniqDstIPs"], x._2["uniqDstPorts"] )) \
    .toDF(['sumOfBytes', 'sumOfFlows', 'sumOfPackets', 'uniqDstIPs', 'uniqDstPorts'])
df.printSchema()
df.show()

In [None]:
FEATURE_COLS = ['sumOfBytes', 'sumOfFlows', 'sumOfPackets', 'uniqDstIPs', 'uniqDstPorts']

In [None]:
	# Add unique numeric ID, and place in first column.
df = df.withColumn("id", monotonically_increasing_id())
df = df.select("id", FEATURE_COLS[0], FEATURE_COLS[1], FEATURE_COLS[2],FEATURE_COLS[3],FEATURE_COLS[4])
df.show()

In [None]:
	# Convert all data columns to float.
df = df.select('id', *[col(c).cast('float').alias(c) for c in FEATURE_COLS])

df.show()

** Below code is used if you want to save this data as CSV **

In [None]:
#Saves the dataset as csv
df.repartition(1).write.csv("cc_out.csv")

**Spark ML works with vectorized data** 

In [None]:
vecAssembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
df_kmeans = vecAssembler.transform(df).select('id', 'features')  # Drop other columns.
df_kmeans.show()

**Scale the data by using StandardScaler on a Vector of Features. WE NEED TO SAVE THIS TRAINED MODEL FOR FUTURE TEST DATA**

In [None]:
# Scale the data.
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scaler_model = scaler.fit(df_kmeans) #Remeber this model and use it for new data
df_scaled = scaler_model.transform(df_kmeans)
df_scaled.show()

** The standardscaler model needs to be saved for later use in the spark enviroment - REMEBER THE PATH ** 

In [None]:
#Saves the Scaler model to file
scaler_model.write().overwrite().save('scaler.sav')

In [None]:
def extract(row):
	# From https://stackoverflow.com/questions/38384347/how-to-split-vector-into-columns-using-pyspark
	return (row["id"], row["prediction"], row["scaledFeatures"]) + tuple(row['scaledFeatures'].toArray().tolist())

# **K-MEANS CLUSTERING**

**Find the optimal Value of K by running a silhouette score evaluator for up to 20 clusters.**

In [None]:
		# # # # Find optimal choice for k.
cost = np.zeros(20)
for k in range(2,20):
  kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("scaledFeatures")
  model = kmeans.fit(df_scaled)
  predictions = model.transform(df_scaled).select('id', 'scaledFeatures', 'prediction')
  predictions.show()
  # Extract scaledFeatures column back to FEATURE_COLS
  predictions = predictions.rdd.map(extract).toDF(["id", "prediction", "scaledFeatures", 'sumOfBytes', 'sumOfFlows', 'sumOfPackets', 'uniqDstIPs', 'uniqDstPorts'])
  predictions.show()
	# Rename scaledFeatures to features.
  predictions = predictions.withColumnRenamed("scaledFeatures", "features")
  predictions.show()
  print("Prediction counts for each cluster:")
  predictions.groupBy('prediction').count().show()
  evaluator = ClusteringEvaluator()
  silhouette = evaluator.evaluate(predictions)
  cost[k] = silhouette
print(" Cost =")
for k in range(2, 20):
  print("{0}: {1}".format(k, cost[k]))
  print("Silhouette with squared euclidean distance = " + str(cost[k]))



**Plot the optimal K value**

In [None]:
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,20),cost[2:20])
ax.set_xlabel('k')
ax.set_ylabel('cost')

**Train K-means using the optimal K**
** Change the value of K to the best score value obtained from previous code block **

In [None]:
k = 14  
kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("scaledFeatures")
model = kmeans.fit(df_scaled)
		
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
  print(center)
		
# Assign events to clusters. Testing the model
predictions = model.transform(df_scaled).select('id', 'scaledFeatures', 'prediction')
		
predictions.show()
		
# Extract scaledFeatures column back to FEATURE_COLS
predictions = predictions.rdd.map(extract).toDF(["id", "prediction", "scaledFeatures", 'sumOfBytes', 'sumOfFlows', 'sumOfPackets', 'uniqDstIPs', 'uniqDstPorts'])
		
# Rename scaledFeatures to features.
predictions = predictions.withColumnRenamed("scaledFeatures", "features")
df_pred = predictions
df_pred.show()
df_pred_plot = df_pred.drop('features')
df_pred_plot.show()
print("Prediction counts for each cluster:")
predictions.groupBy('prediction').count().show()



In [None]:
###convert dataframe to pandas for visualization
pddf_pred = df_pred_plot.toPandas().set_index('id')
pddf_pred.head()

** Creates CSV of labeled data for later use (If needed) **

In [None]:
pddf_pred.to_csv('LabeledDATA')

# TRAIN KNN WITH K-FOLD CROSSVALIDATION For finding the optimal value of K (tuning)

In [None]:
from sklearn.model_selection import cross_val_score
from sklearn.utils import parallel_backend

In [None]:
###Applying Classification using supervised learnin K-NN
X = pddf_pred[['sumOfBytes', 'sumOfFlows', 'sumOfPackets', 'uniqDstIPs', 'uniqDstPorts']]
y = pddf_pred['prediction']

In [None]:
# search for an optimal value of K for KNN

# range of k we want to try
k_range = range(1, 41)
# empty list to store scores
k_scores = []

# 1. we will loop through reasonable values of k
for k in k_range:
    # 2. run KNeighborsClassifier with k neighbours
    knn = KNeighborsClassifier(n_neighbors=k, weights = 'distance')
    # 3. obtain cross_val_score for KNeighborsClassifier with k neighbours
    scores = cross_val_score(knn, X, y, cv=10, scoring='accuracy')
    # 4. append mean of scores for k neighbors to k_scores list
    k_scores.append(scores.mean())


print(k_scores)

In [None]:
k_scores = [0.9995939223997949, 0.9995939223997949, 0.9995605461099346, 0.9995800156639266, 0.9996940508052147, 0.9996606749021554, 0.9996690189746206, 0.9996801445335075, 0.999691269860314, 0.9996968323689968, 0.9997023951871205, 0.9997079578505238, 0.9997357712449002, 0.9997385525766017, 0.9997608033849351, 0.9997691474574003, 0.9997635848713571, 0.9997719289438223, 0.9997747101981636, 0.9997802726294862, 0.9997830541159083, 0.9997802728615669, 0.999777491452505, 0.9997747101981636, 0.999777491452505, 0.99976914738004, 0.9997747101208034, 0.9997802728615669, 0.9997913981110133, 0.9997969609291368, 0.9997969609291367, 0.9997941795974352, 0.9997997422608383, 0.9997969609291368, 0.9997997422608383, 0.9998080862559433, 0.9998025235151798, 0.9997997420287579, 0.9997941792106342, 0.9997913979562927]
k_range = range(1, 41)

# in essence, this is basically running the k-fold cross-validation method 40 times because we want to run through K values from 1 to 30
# we should have 40 scores here
print('Length of list', len(k_scores))
print('Max of list', max(k_scores))

In [None]:
%matplotlib inline

# plot the value of K for KNN (x-axis) versus the cross-validated accuracy (y-axis)
# plt.plot(x_axis, y_axis)
plt.plot(k_range, k_scores)
plt.xlabel('Value of K for KNN')
plt.ylabel('Cross-validated accuracy')

# **TRAIN A CLASSIFIER ML BASED ALGORITHM**

In [None]:
###Applying Classification using supervised learnin K-NN
x = pddf_pred[['sumOfBytes', 'sumOfFlows', 'sumOfPackets', 'uniqDstIPs', 'uniqDstPorts']]
y = pddf_pred['prediction']



In [None]:
##Split data for training and test
x_train, x_test, y_train, y_test = train_test_split(x, y, random_state=0)


In [None]:
##Find optimal K (KNN) value
##Minimun Error
error_rate = []
for i in range(1,40):
  knn = KNeighborsClassifier(n_neighbors = i, weights = 'distance')
  knn.fit(x_train, y_train)
  pred_i = knn.predict(x_test)
  error_rate.append(np.mean(pred_i != y_test))
 ## error_rate.append(metrics.accuracy_score(y_test,yhat))

plt.figure(figsize=(10,6))
plt.plot(range(1,40),error_rate,color = 'blue',linestyle='dashed', marker='o', markerfacecolor='red', markersize=10)
plt.title('Error Rate vs. K Value')
plt.xlabel('K')
plt.ylabel('Error Rate')
print("Minimum error:=",min(error_rate),"at K=",error_rate.index(min(error_rate)))

In [None]:
##Find optimal K (KNN) value
##Maximun Accuracy
accu = []
for i in range(1,40):
  knn = KNeighborsClassifier(n_neighbors = i, weights = 'distance')
  knn.fit(x_train, y_train)
  yhat = knn.predict(x_test)
 ## error_rate.append(np.mean(pred_i != y_test))
  accu.append(metrics.accuracy_score(y_test,yhat))

plt.figure(figsize=(10,6))
plt.plot(range(1,40),accu,color = 'blue',linestyle='dashed', marker='o', markerfacecolor='red', markersize=10)
plt.title('Accuracy vs. K Value')
plt.xlabel('K')
plt.ylabel('Accuracy')
#print("Maximum accuracy:=",max(accu),"at K=",accu.index(max(accu)))

** Change the value of K to the best value from the tuning process, etiher K-Fold or Mean error and Accuracy technique from previous blocks **

In [None]:
##Run with optimal k
k = 36
knn = KNeighborsClassifier(n_neighbors = k, weights = 'distance')
knn.fit(x_train, y_train)
pred_y = knn.predict(x_test)
print("Accuracy of model at K=36 is", metrics.accuracy_score(y_test, pred_y))

In [None]:
len(pred_y)

In [None]:
pred_y
print(type(pred_y))
dfKNNPrediction = pd.DataFrame(pred_y, columns = ['Prediction'])

In [None]:
dfKNNPrediction.head()
#len(dfKNNPrediction.index)

In [None]:
x_test.head()
#len(x_test.index)

In [None]:
x_testReset = x_test.reset_index()
x_testComplete = x_testReset.drop(columns=['id'])
x_testComplete.head()


In [None]:
x_testComplete["Prediction"] = dfKNNPrediction["Prediction"]
x_testComplete.head()

In [None]:
len(x_testComplete)

** Saves teh KNN model to disk **
** Place this model inside the python folder in your Apache Spark deployment **

In [None]:
# save the model to disk
filename = 'knn.sav'
joblib.dump(knn, filename)


# **TEST with normal traffic**
# ** Change the name of the file to a CSV file containing only normal traffic streams **

**Firts we need to get the data and scale it using our previous scaler model that was trained on Original Monthly data**

In [None]:
df = sqlContext.read.csv('nomalTraffic.csv', header=True)
FEATURE_COLS = ['sumOfBytes', 'sumOfFlows', 'sumOfPackets', 'uniqDstIPs', 'uniqDstPorts']
df = df.withColumn("id", monotonically_increasing_id())
df = df.select("id", FEATURE_COLS[0], FEATURE_COLS[1],FEATURE_COLS[2], FEATURE_COLS[3], FEATURE_COLS[4])
for col in df.columns:
	  if col in FEATURE_COLS:
		    df = df.withColumn(col,df[col].cast('float'))
vecAssembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
df_vector = vecAssembler.transform(df).select('id','features')  # Drop other columns.
# Scale the data. We use a previous trained model from the original train data (To keep the ranges)
df_scaledVector = scaler_model.transform(df_vector)
df_scaledVector.show()

In [None]:
def extractVector(row):
	# From https://stackoverflow.com/questions/38384347/how-to-split-vector-into-columns-using-pyspark
	return (row["id"], row["scaledFeatures"]) + tuple(row['scaledFeatures'].toArray().tolist())

In [None]:
scaleddf = df_scaledVector.rdd.map(extractVector).toDF(["id", "scaledFeatures", 'sumOfBytes', 'sumOfFlows', 'sumOfPackets', 'uniqDstIPs', 'uniqDstPorts'])
scaleddf.show()

**Convert to Pandas DF for further manipulation**

In [None]:
dataNormal = scaleddf.toPandas()
xNormal = dataNormal[['sumOfBytes', 'sumOfFlows', 'sumOfPackets', 'uniqDstIPs', 'uniqDstPorts']]
len(xNormal)


In [None]:
pred_yNormal = knn.predict(xNormal)
pred_NormaPROB = knn.predict_proba(xNormal) #probabilites of classification
pred_NormaPROB

**Join data with predictions for verification**

In [None]:
classes = knn.classes_
predpdNormal = pd.DataFrame(pred_yNormal, columns = ['Prediction'])
predpdNormalPROB = pd.DataFrame(pred_NormaPROB, columns = classes )

In [None]:
## PROB
predpdNormalPROB

**The output of the following code block are the normal clusters**
** Save this output and include it in the list of Normal clusters in the anomalyDetection.py file **
** The abnormal clusters are the remaining clusters that do not appear in this list **





In [None]:
predpdNormal.Prediction.unique()

In [None]:

xNormalComplete = xNormal.copy()
xNormalComplete["Prediction"] = predpdNormal["Prediction"]
xNormalComplete.head()
