In [1]:
#Analisis de detección de anomalias en trafico de red.
#Impotar la libreria de spark
from pyspark.sql import SparkSession


In [2]:
#Iniciar contexto de Spark 
spark = SparkSession.builder.appName("Analisis de Datos-Big data") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()


In [3]:
#Leer el dataset 
#Como nuestro dataset no tiene encabezado, establecemos header en false
#Nuestro dataframe utiliza los encabezados por default que es _c0, _c01,....., _cn
df = spark.read.load("/home/ivan/bigdata/spark-bigdata/kddcup.data",
            format="csv", sep=",", inferSchema="true", header="false")

In [4]:
#Establecemos el nombre de las columnas de nuestro dataset
newNameColumns = ["duration", "protocol_type", "service", "flag", "src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent", "hot", "num_failed_logins", "logged_in", "num_compromised", "root_shell","su_attempted", "num_root", "num_file_creations","num_shells", "num_access_files", "num_outbound_cmds","is_host_login", "is_guest_login", "count", "srv_count","serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate", "same_srv_rate", "diff_srv_rate", "srv_diff_host_rate", "dst_host_count", "dst_host_srv_count","dst_host_same_srv_rate", "dst_host_diff_srv_rate", "dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate","dst_host_serror_rate", "dst_host_srv_serror_rate", "dst_host_rerror_rate", "dst_host_srv_rerror_rate","label"]
df_rename = df.toDF(*newNameColumns)


In [5]:
#Se van contar la cantidad de datos distintos de la ultima columna que referencia
# a los diferentes tipos de ataques que se tiene
df_rename.groupBy("label").count().orderBy("count", ascending=False).show()

+----------------+-------+
|           label|  count|
+----------------+-------+
|          smurf.|2807886|
|        neptune.|1072017|
|         normal.| 972781|
|          satan.|  15892|
|        ipsweep.|  12481|
|      portsweep.|  10413|
|           nmap.|   2316|
|           back.|   2203|
|    warezclient.|   1020|
|       teardrop.|    979|
|            pod.|    264|
|   guess_passwd.|     53|
|buffer_overflow.|     30|
|           land.|     21|
|    warezmaster.|     20|
|           imap.|     12|
|        rootkit.|     10|
|     loadmodule.|      9|
|      ftp_write.|      8|
|       multihop.|      7|
+----------------+-------+
only showing top 20 rows



In [6]:
#Como K-mean trabaja solamente con valores numericos eliminarios algunas columnas no numericas
#Columna 1 -> Tipo de conexion
#Columna 2 -> Tipo de servicio
#Columna 3 -> Si el usuario inicio sesión o no
df_only_numeric = df_rename.drop("protocol_type", "service", "flag")

In [7]:
#Primeramente extraemos todas las columnas numericas excepto la columna label
numeric_only_col = df_only_numeric.columns[:37]

#Importamos VectorAsembler, es un transformador que combina una lista dada 
# de columnas en una sola columna vectorial
from pyspark.ml.feature import VectorAssembler
# Luego vectorizamos
df_va = VectorAssembler(inputCols = numeric_only_col, outputCol = "featureVector")
new_df = df_va.transform(df_only_numeric)


In [8]:
#Importamos el k-means para empezar a armar nuestro modelo
from pyspark.ml.clustering import KMeans

#Sin la especificación del valor de k en la inicialización de la clase KMeans, K toma por default el valor 2
kmeans = KMeans()
#Establecemos la columna de predicción
kmeans.setPredictionCol("cluster")
#Establecemos el feactureCol
kmeans.setFeaturesCol("featureVector")
#Creamos el modelo
model = kmeans.fit(new_df)

In [9]:
#Obtenemos los valores centros 
center = model.clusterCenters()
print(center)

[array([4.83401949e+01, 1.83462155e+03, 8.26203190e+02, 5.71611720e-06,
       6.48779303e-04, 7.96173468e-06, 1.24376586e-02, 3.20510858e-05,
       1.43529049e-01, 8.08830584e-03, 6.81851124e-05, 3.67464677e-05,
       1.29349608e-02, 1.18874823e-03, 7.43095237e-05, 1.02114351e-03,
       0.00000000e+00, 4.08294086e-07, 8.35165553e-04, 3.34973508e+02,
       2.95267146e+02, 1.77970317e-01, 1.78036989e-01, 5.76648988e-02,
       5.77299094e-02, 7.89884132e-01, 2.11796106e-02, 2.82608101e-02,
       2.32981078e+02, 1.89214283e+02, 7.53713390e-01, 3.07109788e-02,
       6.05051931e-01, 6.46410789e-03, 1.78091184e-01, 1.77885898e-01,
       5.79276115e-02]), array([1.0999000e+04, 0.0000000e+00, 1.3099374e+09, 0.0000000e+00,
       0.0000000e+00, 0.0000000e+00, 0.0000000e+00, 0.0000000e+00,
       0.0000000e+00, 0.0000000e+00, 0.0000000e+00, 0.0000000e+00,
       0.0000000e+00, 0.0000000e+00, 0.0000000e+00, 0.0000000e+00,
       0.0000000e+00, 0.0000000e+00, 0.0000000e+00, 1.0000000e+00,


In [10]:
#Como tenemos 23 distintos tipos de conexiones, podemos concluir que con k = 2 no es preciso nuestro modelo
# Por tanto no podemos precisar nuestras agrupaciones en nuestro datos
#Veremos como se agrupo nuestro datos, volvemos a tranformar 
transformed = model.transform(new_df)

In [11]:
#Selecionamos la columna cluster y label
transformed.select("cluster","label").groupBy("cluster", "label").count().orderBy(["cluster","count"], ascending=[1,0]).show()

+-------+----------------+-------+
|cluster|           label|  count|
+-------+----------------+-------+
|      0|          smurf.|2807886|
|      0|        neptune.|1072017|
|      0|         normal.| 972781|
|      0|          satan.|  15892|
|      0|        ipsweep.|  12481|
|      0|      portsweep.|  10412|
|      0|           nmap.|   2316|
|      0|           back.|   2203|
|      0|    warezclient.|   1020|
|      0|       teardrop.|    979|
|      0|            pod.|    264|
|      0|   guess_passwd.|     53|
|      0|buffer_overflow.|     30|
|      0|           land.|     21|
|      0|    warezmaster.|     20|
|      0|           imap.|     12|
|      0|        rootkit.|     10|
|      0|     loadmodule.|      9|
|      0|      ftp_write.|      8|
|      0|       multihop.|      7|
+-------+----------------+-------+
only showing top 20 rows



In [None]:
#Eligiendo el valor mas optimo o el mejor valor de k para nuestro conjunto de datos 
#Para cada cierto valor de K podriamos medir la calidad de la agrupación, es un buen parametro para saber que k 
# es el mas optimo, entonces para medir eso, una buena agrupación es cuandos los puntos estan mas cercano al centroide, que en este caso seria la media de las distancias o la media de las distancias al cuadrado
# Se define una funcion para ese calculo.
# Y K-means no ofrece un metodo computeCost para calcular esa media.
import random
from pyspark.ml.evaluation import ClusteringEvaluator

def clusteringScore0(data, k):
    vector = VectorAssembler(inputCols = numeric_only_col, outputCol = "featureVector")
    new_df = vector.transform(data)
    kmeans = KMeans()
    kmeans.setSeed(random.randint(1,1000))
    kmeans.setK(k)
    kmeans.setPredictionCol("cluster")
    kmeans.setFeaturesCol("featureVector")
    model = kmeans.fit(new_df) 

    df_tranform = model.transform(new_df)
    evaluator = ClusteringEvaluator()
    evaluator.setPredictionCol("featureVector")
    return evualto.evaluate(df_tranform)

#Comenzando por k igual 20 hasta 100 con paso de 20
for k in range(20,120,20):
    media = clusteringScore0(df_only_numeric, k)
    print(k, media)

In [None]:
#En base eso podemos ver que para ciertos valores de k en nuestro conjunto de datos tenemos que k=80 se tiene un agrupamiento suboptimo y ademas digamos que puede parar antes de llegar a su optimo local.
#Pero es importante siempre va a depender de nuestro valores iniciales de los centroides, normalmente se eligen de forma aleatoria, hay dos tipos de variantes que se encargan de eso que es k-means++ y K-means||, por default la libreia de spark implementa el k-means||
# Por tanto podemos ir ajustando algunos valores de nuestro modelo para tener un resultado mas optimo
# Como el numeros de iteraciones, y la cantidad minima de movimiento del centroide
# A valores menores el algortimo deja que los centroides se muevan por mucho mas tiempo
# Pero en ese caso ajustamos el numero maximo de iteracciones, para que no haga demasiado calculos
def clusteringScore1(data, k):
    vector = VectorAssembler(inputCols = numeric_only_col, outputCol = "featureVector")
    new_df = vector.transform(data)
    kmeans = KMeans()
    #Nuevo valores establecidos
    kmeans.setMaxIter(40)
    kmeans.setTol(1.0e-5)
    kmeans.setSeed(random.randint(1,1000))
    kmeans.setK(k)
    kmeans.setPredictionCol("cluster")
    kmeans.setFeaturesCol("featureVector")
    model = kmeans.fit(new_df)
    return model.computeCost(new_df) / data.count()

#Comenzando por k igual 20 hasta 100 con paso de 20
for k in range(20,120,20):
    media = clusteringScore1(df_only_numeric, k)
    print(k, media)

In [None]:
#Digamos que con estos ajuste se podria encontrar un punto a partir del cual el aumento de k no reduzca mucho esa media que obtenemos.
#Por tanto podemos observa mediante los resultados que a partir de k igual a 100 disminuye esa media, podriamos encontrar un valor de k mas optimo por encima de los 100

In [None]:
#Pero si miramos graficamente, se observan que tenemos dos campos en donde su escala es mucho mayor que las demas,serian las columnas de bytes enviados y bytes recibidos, que eso obviamente va desde 0 a miles.
#La idea tener una mejor escala se normalizan esos datos, de tal manera que todos nuestros datos esten normalizados para una mejor visualizacion
#Se usa un metodo que nos ofrece MLib llamada StandardScaler para estandarizar esos features
#Basicamente la normalización la resta de la media de los feature - su valor divido su desviación estandar 
#Se aumentara el valor de k para hacer pruebas con la normalización de nuestros datos 

#Se importa el metodo StandardScaler
from pyspark.ml.feature import StandardScaler

#Una version 2, con la normalizacion
def clusteringScore2(data, k):
    vector = VectorAssembler(inputCols = numeric_only_col, outputCol = "featureVector")
    new_df = vector.transform(data)

    #Normalización
    standardScaler = StandardScaler()
    standardScaler.setInputCol("featureVector")
    standardScaler.setOutputCol("scaledFeatureVector")
    #Ponemos em false, los datos centrales con media
    standardScaler.setWithMean(False)
    #Ponemos en verdadero la escala de la desviación stantard en una unidad
    standardScaler.setWithStd(True)
    fit_scaler = standardScaler.fit(new_df)
    new_df = fit_scaler.transform(data)

    kmeans = KMeans()
    #Nuevo valores establecidos
    kmeans.setMaxIter(40)
    kmeans.setTol(1.0e-5)
    kmeans.setSeed(random.randint(1,1000))
    kmeans.setK(k)
    kmeans.setPredictionCol("cluster")
    kmeans.setFeaturesCol("scaledFeatureVector")
    model = kmeans.fit(new_df)
    return model.computeCost(new_df) / data.count()

#Comenzando por k igual 60 hasta 270 con paso de 30
for k in range(60,300,30):
    media = clusteringScore2(df_only_numeric, k)
    print(k, media)

In [None]:
#Si vemos los resultados, se puede observar que todavia no tenemos un valor de k apartir de cual su aumento no mejore el coste(distancia absoluta entre puntos) digamos

#De igual manera igual se puede mejorar mas la agrupación, para los analisis anteriores se dejaron de lado digamos las columnas que no eran numericas. Por tanto para este analisis seria una información valiosa si le tomamos en cuenta, por tanto tendriamos una mejor clusters mejor informada.

#Importamos StringIndexer
from pyspark.ml.feature import StringIndexer, OneHotEncoder
def oneHotDataSet(inputCol, data):
    stringIndexer = StringIndexer()
    stringIndexer.setInputCol(inputCol)
    stringIndexer.setOutputCol(inputCol + "_indexed")
    fit_indexer = stringIndexer.fit(data)
    new_df = fit_indexer.transform(data)
    
    #Con esta funcion se mapean las variables categoricas y convierte a valores binarios
    onehotencoder = OneHotEncoder()
    onehotencoder.setInputCol(inputCol + "_indexed")
    onehotencoder.setOutputCol(inputCol + "_vec")
    fit_encoder = onehotencoder.fit(new_df)
    new_df = fit_encoder.transform(new_df)

    return new_df

def fitModelFinal(data, k):
    data = oneHotDataSet("protocol_type", data)
    data = oneHotDataSet("service", data)
    data = oneHotDataSet("flag", data)

    vector = VectorAssembler(inputCols 
    = ["protocol_type_vec", "service_vec", "flag_vec"], outputCol = "featureVector")
    new_df = vector.transform(data)

    #Normalización
    standardScaler = StandardScaler()
    standardScaler.setInputCol("featureVector")
    standardScaler.setOutputCol("scaledFeatureVector")
    #Ponemos em false, los datos centrales con media
    standardScaler.setWithMean(False)
    #Ponemos en verdadero la escala de la desviación stantard en una unidad
    standardScaler.setWithStd(True)
    fit_scaler = standardScaler.fit(new_df)
    new_df = fit_scaler.transform(data)
    
    kmeans = KMeans()
    kmeans.setMaxIter(40)
    kmeans.setTol(1.0e-5)
    kmeans.setSeed(random.randint(1,1000))
    kmeans.setK(k)
    kmeans.setPredictionCol("cluster")
    kmeans.setFeaturesCol("scaledFeatureVector")
    model = kmeans.fit(new_df)
    return model


In [None]:
from pyspark.sql.functions import col
from pyspark.ml.linalg import Vectors
#Y el paso final es la funcion de detención, con un umbral alto se podria que ese trafico es anomalo 
def detectAnomaly():
    modelo = fitModelFinal(df_rename,180)
    centroides = modelo.clusterCenters()
    df = modelo.transform(df_rename)
    umbral = df.select("cluster", "scaledFeatureVector").withColumn("value", 
    Vectors.squared_distance(centroides[col("cluster")], col("scaledFeatureVector")))
    .orderBy("value", ascending=False).select("value").show(100)

    anomalias = df.filter(Vectors.squared_distance(centroides[df.cluster],df.scaledFeatureVector) >= unmbral)
    anomalias.show(100)