In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.ml.clustering import KMeans
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.feature import VectorAssembler

In [3]:
sqlContext = SQLContext(sc)

In [None]:
df = sqlContext.read.csv('netflow_day-11', header=None)
df.printSchema()

In [None]:
df1 = df.withColumnRenamed("_c0", "Time").withColumnRenamed("_c1", "Duration").withColumnRenamed("_c2", "SrcDevice").withColumnRenamed("_c3", "DstDevice").withColumnRenamed("_c4", "Protocol").withColumnRenamed("_c5", "SrcPort").withColumnRenamed("_c6", "DstPort").withColumnRenamed("_c7", "SrcPackets").withColumnRenamed("_c8", "DstPackets").withColumnRenamed("_c9", "SrcBytes").withColumnRenamed("_c10", "DstBytes")

In [None]:
df1.printSchema()

In [None]:
df2 = df1.select(df1.Duration.cast("double"), 
df1.SrcPackets.cast("double"), 
df1.DstPackets.cast("double"),
df1.SrcBytes.cast("double"),
df1.DstBytes.cast("double"))

In [None]:
df2.printSchema()

In [None]:
df2.take(5)

In [None]:
assembler = VectorAssembler(inputCols=["Duration", "SrcPackets", "DstPackets", "SrcBytes", "DstBytes"],outputCol="featureVector")
output = assembler.transform(df2)

In [None]:
kmeans1 = KMeans().setK(5).setFeaturesCol("featureVector").setPredictionCol("cluster")
model1 = kmeans1.fit(output)

In [None]:
centers = model1.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

In [None]:
bikmeans1 = BisectingKMeans().setK(5).setFeaturesCol("featureVector").setPredictionCol("cluster")
model2 = bikmeans1.fit(output)

In [None]:
centers2 = model2.clusterCenters()
print("Cluster Centers: ")
for center in centers2:
    print(center)

In [4]:
dfN = sqlContext.read.json('wls_day-11')
dfN.printSchema()

root
 |-- AuthenticationPackage: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- DomainName: string (nullable = true)
 |-- EventID: long (nullable = true)
 |-- FailureReason: string (nullable = true)
 |-- LogHost: string (nullable = true)
 |-- LogonID: string (nullable = true)
 |-- LogonType: long (nullable = true)
 |-- LogonTypeDescription: string (nullable = true)
 |-- ParentProcessID: string (nullable = true)
 |-- ParentProcessName: string (nullable = true)
 |-- ProcessID: string (nullable = true)
 |-- ProcessName: string (nullable = true)
 |-- ServiceName: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- SubjectDomainName: string (nullable = true)
 |-- SubjectLogonID: string (nullable = true)
 |-- SubjectUserName: string (nullable = true)
 |-- Time: long (nullable = true)
 |-- UserName: string (nullable = true)



In [5]:
dfN.count()

83183265

In [6]:
dfN.createOrReplaceTempView("someData")

In [7]:
someData1 = sqlContext.sql("SELECT * FROM someData WHERE EventID = 4625").groupBy("EventID").count().show()

+-------+------+
|EventID| count|
+-------+------+
|   4625|356966|
+-------+------+



In [8]:
someData2 = sqlContext.sql("SELECT * FROM someData WHERE EventID = 4609").groupBy("EventID").count().show()

+-------+-----+
|EventID|count|
+-------+-----+
|   4609|   21|
+-------+-----+

