In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('hacker').getOrCreate()

In [3]:
data = spark.read.csv('hdfs://nn-homedepot.s3s.altiscale.com:8020/user/mlakshmikanthaiah/personl_test_files/hack_data.csv',inferSchema=True,header=True)

In [4]:
data.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)



In [5]:
data.head()

Row(Session_Connection_Time=8.0, Bytes Transferred=391.09, Kali_Trace_Used=1, Servers_Corrupted=2.96, Pages_Corrupted=7.0, Location=u'Slovenia', WPM_Typing_Speed=72.37)

In [6]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler

In [8]:
indexer = StringIndexer(inputCol='Location',outputCol='indexedLocation')
data_indx = indexer.fit(data)
data_index = data_indx.transform(data)

In [61]:
columns = data_index.columns
columns.pop(5)
columns.pop(6)
columns

['Session_Connection_Time',
 'Bytes Transferred',
 'Kali_Trace_Used',
 'Servers_Corrupted',
 'Pages_Corrupted',
 'WPM_Typing_Speed']

In [62]:
assembler = VectorAssembler(inputCols=columns,outputCol='features')
data_ass = assembler.transform(data_index)

In [38]:
from pyspark.ml.clustering import KMeans

In [79]:
scalar = StandardScaler(inputCol='features',outputCol='scaledFeatures')
scalar_model = scalar.fit(data_ass)
dataset = scalar_model.transform(data_ass)

In [82]:
dataset.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)
 |-- indexedLocation: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)



In [93]:
model = KMeans(featuresCol='scaledFeatures',k=2)
final_data = model.fit(dataset)
final_dataset = final_data.transform(dataset)

In [85]:
final_dataset.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)
 |-- indexedLocation: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)
 |-- prediction: integer (nullable = true)



In [94]:
print('WSSES')
print(final_data.computeCost(dataset))

WSSES
601.770751268


In [91]:
print('WSSES')
print(final_data.computeCost(dataset))

WSSES
434.755073085


In [96]:
final_dataset.groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  167|
|         0|  167|
+----------+-----+

