In [3]:
import sys
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql import Row
import operator
from pyspark.mllib.clustering import StreamingKMeans
from pyspark.streaming import StreamingContext
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

In [4]:
sc = SparkContext("local[2]", "streaming anomalies detection") # run locally with 2 cores
sqlContext = SQLContext(sc)
ssc = StreamingContext(sc, 1)  # 1 second per batch

In [8]:
def readData(filename):
    rawDF = sqlContext.read.parquet(filename).cache()
    return rawDF
    
file_path = "logs-features-sample/"
rawDF = readData(file_path)
rawDF.show(n=7, truncate=99)

+-----+---------------------------------------------------------------------------------------------------+
|   id|                                                                                        rawFeatures|
+-----+---------------------------------------------------------------------------------------------------+
|44263|[udp, SF, -0.158545578037, -0.0314405270803, -0.111401325544, 0.0, -0.00317668482933, -0.0031766...|
|44264|[tcp, SF, -0.158545578037, 0.197720022657, -0.111401325544, 0.0, -0.00317668482933, -0.003176684...|
|44265|[tcp, SF, -0.158545578037, -0.0246212390762, 0.0353165052995, 0.0, -0.00317668482933, -0.0031766...|
|44266|[tcp, SF, -0.158545578037, 0.025711600954, -0.0995500693817, 0.0, -0.00317668482933, -0.00317668...|
|44267|[tcp, SF, -0.158545578037, -0.0263422974773, 1.01349541435, 0.0, -0.00317668482933, -0.003176684...|
|44268|[udp, SF, -0.158545578037, -0.0334213678815, -0.108555583183, 0.0, -0.00317668482933, -0.0031766...|
|44269|[tcp, SF, -0.15854557

In [9]:
def to_onehot(lst, indices, unique_values, c):
    zs = [0.0]*c
    rest_lst = [float(lst[k]) for k in range(len(lst)) if k not in indices]
    for pos in indices:
        idx = unique_values.index(Row(lst[pos]))
        zs[idx] = 1.0
    zs.extend(rest_lst)
    return zs
    
    
# in rawFeatures, the first 2 categorical data convert to one hot vector such as [0,0,1,0,1]
# extend the one-hot vector with original numerical list, and all convert to Double type
# put the numerical list to a new column called "features"
def cat2Num(df, indices):
    unique_values = []
    for i in indices:
        d = udf(lambda r: r[i], StringType())
        dt = df.select(d(df.rawFeatures)).distinct().collect()
        unique_values.extend(dt)

    unique_count = len(unique_values)
    convertUDF = udf(lambda r: to_onehot(r, indices, unique_values, unique_count), ArrayType(DoubleType()))
    newdf = df.withColumn("features", convertUDF(df.rawFeatures))

    return newdf

In [14]:
df1 = cat2Num(rawDF, [0, 1]).cache()
df1.show(n=7, truncate=50)

+-----+--------------------------------------------------+--------------------------------------------------+
|   id|                                       rawFeatures|                                          features|
+-----+--------------------------------------------------+--------------------------------------------------+
|44263|[udp, SF, -0.158545578037, -0.0314405270803, -0...|[0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0...|
|44264|[tcp, SF, -0.158545578037, 0.197720022657, -0.1...|[1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0...|
|44265|[tcp, SF, -0.158545578037, -0.0246212390762, 0....|[1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0...|
|44266|[tcp, SF, -0.158545578037, 0.025711600954, -0.0...|[1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0...|
|44267|[tcp, SF, -0.158545578037, -0.0263422974773, 1....|[1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0...|
|44268|[udp, SF, -0.158545578037, -0.0334213678815, -0...|[0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0...|
|44269|[tc

In [46]:
print df1.count()  # number of rows
print len(df1.select("features").first()[0])  # dimention is 50

99095
50


In [34]:
max_value = df1.agg({"id": "max"}).collect()[0][0]
min_value = df1.agg({"id": "min"}).collect()[0][0]
print max_value, min_value

99094 0


In [36]:
training_features = df1.where(df1.id >= 90000).select("features").rdd.map(lambda row: Vectors.dense(row[0]))
testing_features = df1.where(df1.id < 90000).select("features").rdd.map(lambda row: Vectors.dense(row[0]))

In [37]:
trainingQueue = [training_features]
testingQueue = [testing_features]

In [38]:
trainingStream = ssc.queueStream(trainingQueue)
testingStream = ssc.queueStream(testingQueue)

In [47]:
# We create a model with random clusters and specify the number of clusters to find
dimension = 50
model = StreamingKMeans(k=8, decayFactor=1.0).setRandomCenters(dimension, 1.0, 410)

# Now register the streams for training
model.trainOn(trainingStream)

In [48]:
result = model.predictOn(testingStream)
result.pprint()

In [49]:
ssc.start()  # start streaming
ssc.stop(stopSparkContext=True, stopGraceFully=True) 

-------------------------------------------
Time: 2018-01-28 22:02:32
-------------------------------------------
4
7
2
7
2
7
7
2
2
2
...

-------------------------------------------
Time: 2018-01-28 22:02:33
-------------------------------------------

