In [1]:
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions
from pyspark.sql.functions import *
from pyspark.sql.types import *
import operator
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.mllib.linalg import Vectors, VectorUDT

In [2]:
spark = SparkSession.builder.appName('weather map').getOrCreate()
spark.sparkContext.setLogLevel('WARN')

In [3]:

class AnomalyDetection():

    def readData(self, filename):
        self.rawDF = spark.read.parquet(filename).cache()

    def readToyData(self):
        data = [(0, ["http", "udt", 0.4]),(1, ["http", "udf", 0.5]),(2, ["http", "tcp", 0.5]),(3, ["ftp", "icmp", 0.1]),(4, ["http", "tcp", 0.4])]
        schema = ["id", "rawFeatures"]
        self.rawDF = spark.createDataFrame(data, schema)


    def cat2Num(self, df, indices):
        def onehotencode(indexed,max_val):
            onehot = list()
            for i in range(int(max_val)):
                onehot.append([1.0 if x == i else 0.0 for x in range(int(max_val))])
            return onehot[int(indexed)]
        
        def delete_cat(features,indices):
            indices.sort(reverse = True)
            for index in indices:
                del features[index]
            features = [float(f) for f in features]
            return features
        
        tocombine = list()
        for index in indices:
            df = df.withColumn("index_"+str(index), df["rawFeatures"][index])
            indexer = StringIndexer(inputCol="index_"+str(index), outputCol="newIndex_"+str(index))
            index_model = indexer.fit(df)
            df = index_model.transform(df)
            max_val = df.select(["newIndex_"+str(index)]).groupBy().max().collect()
            max_val = max_val[0][0]+1
            
            onehotudf = udf(lambda x: onehotencode(x,max_val), ArrayType(FloatType()))
            df = df.withColumn("onehot_"+str(index), onehotudf("newIndex_"+str(index)))
            tocombine.append("onehot_"+str(index))
            
        deludf = udf(lambda x: delete_cat(x,indices), ArrayType(FloatType()))
        df = df.withColumn("without_cat",deludf("rawFeatures"))
        tocombine.append("without_cat")
        
        mergeCols = udf(lambda x, y: x + y, ArrayType(FloatType()))
        for i in range(1,len(tocombine)):
            if i == 1:
                df = df.withColumn("combined_"+str(i), mergeCols(col(tocombine[i-1]), col(tocombine[i])))
            else:
                df = df.withColumn("combined_"+str(i), mergeCols(col("combined_"+str(i-1)), col(tocombine[i])))
        vectorudf = udf(lambda x: Vectors.dense(x), VectorUDT())
        df = df.withColumn("features", vectorudf(col("combined_"+str(len(tocombine)-1)))).select("id","rawfeatures","features")
        return df

    def addScore(self, df):
        cluster_size = df.groupBy("prediction").count()
        N_max = cluster_size.groupBy().max().collect()[0][0]
        N_min = cluster_size.groupBy().min().collect()[0][0]
        score = cluster_size.withColumn("score", (N_max - col("count")) / (N_max - N_min))
        df = df.join(score,"prediction").select("id","rawFeatures","features","prediction","score")
        return df

    def detect(self, k, t):
        
        print ("DF1:")
        #Encoding categorical features using one-hot.
        df1 = self.cat2Num(self.rawDF, [0, 1]).cache()
        df1.show()
        
        print ("DF2:")
        print ("training Kmeans Model...")
        #Clustering points using KMeans
        features = df1.select("features").rdd.map(lambda row: row[0]).cache()
        model = KMeans.train(features, k, maxIterations=40, runs=10, initializationMode="random", seed=20)
        
        print ("making predictions...")
        #Adding the prediction column to df1
        modelBC = spark.sparkContext.broadcast(model)
        predictUDF = udf(lambda x: modelBC.value.predict(x), StringType())
        
        print ("calculating score...")
        df2 = df1.withColumn("prediction", predictUDF(df1.features)).cache()
        df2.show()
        
        print ("DF3:")
        #Adding the score column to df2; The higher the score, the more likely it is an anomaly
        df3 = self.addScore(df2).cache()
        df3.show()
        
        return df3.where(df3.score > t)


In [4]:
if __name__ == "__main__":
    ad = AnomalyDetection()
    ad.readToyData()
    ad.readData('data/logs-features-sample')
    anomalies = ad.detect(8, 0.97)
    print ("number of anomalies :: ",anomalies.count())
    anomalies.show()

DF1:
+-----+--------------------+--------------------+
|   id|         rawfeatures|            features|
+-----+--------------------+--------------------+
|44263|[udp, SF, -0.1585...|[0.0,1.0,0.0,1.0,...|
|44264|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|
|44265|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|
|44266|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|
|44267|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|
|44268|[udp, SF, -0.1585...|[0.0,1.0,0.0,1.0,...|
|44269|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|
|44270|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|
|44271|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|
|44272|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|
|44273|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|
|44274|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|
|44275|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|
|44276|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|
|44277|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|
|44278|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|
|44279|[udp, SF, -0.1585...|[0.0,1.0,0.0,1.0,



making predictions...
calculating score...
+-----+--------------------+--------------------+----------+
|   id|         rawfeatures|            features|prediction|
+-----+--------------------+--------------------+----------+
|44263|[udp, SF, -0.1585...|[0.0,1.0,0.0,1.0,...|         6|
|44264|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|         4|
|44265|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|         5|
|44266|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|         4|
|44267|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|         5|
|44268|[udp, SF, -0.1585...|[0.0,1.0,0.0,1.0,...|         6|
|44269|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|         7|
|44270|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|         5|
|44271|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|         2|
|44272|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|         5|
|44273|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|         5|
|44274|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1.0,...|         2|
|44275|[tcp, SF, -0.1585...|[1.0,0.0,0.0,1