In [1]:
from pyspark.sql import SparkSession

In [2]:
spark=SparkSession.builder.appName('clustering').getOrCreate()

In [3]:
input_file_path="file:///C:/Users/ckp43_000/Documents/hack_data.csv"

In [4]:
dataset=spark.read.csv(input_file_path,inferSchema=True,header=True)

In [6]:
dataset.head(1)

[Row(Session_Connection_Time=8.0, Bytes Transferred=391.09, Kali_Trace_Used=1, Servers_Corrupted=2.96, Pages_Corrupted=7.0, Location='Slovenia', WPM_Typing_Speed=72.37)]

In [7]:
from pyspark.ml.clustering import KMeans,KMeansModel

In [8]:
from pyspark.ml.feature import VectorAssembler

In [9]:
dataset.columns

['Session_Connection_Time',
 'Bytes Transferred',
 'Kali_Trace_Used',
 'Servers_Corrupted',
 'Pages_Corrupted',
 'Location',
 'WPM_Typing_Speed']

In [13]:
feat_cols=['Session_Connection_Time',
 'Bytes Transferred',
 'Kali_Trace_Used',
 'Servers_Corrupted',
 'Pages_Corrupted',
 'WPM_Typing_Speed']

In [14]:
assembler=VectorAssembler(inputCols=feat_cols,outputCol='features')

In [15]:
final_data=assembler.transform(dataset)

In [17]:
final_data.head(1)

[Row(Session_Connection_Time=8.0, Bytes Transferred=391.09, Kali_Trace_Used=1, Servers_Corrupted=2.96, Pages_Corrupted=7.0, Location='Slovenia', WPM_Typing_Speed=72.37, features=DenseVector([8.0, 391.09, 1.0, 2.96, 7.0, 72.37]))]

In [19]:
final_data.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)
 |-- features: vector (nullable = true)



In [20]:
from pyspark.ml.feature import StandardScaler

In [21]:
scaler=StandardScaler(inputCol='features',outputCol='scaledFeatures')

In [23]:
type(scaler)

pyspark.ml.feature.StandardScaler

In [25]:
scaler_model=scaler.fit(final_data)

In [26]:
cluster_final_data=scaler_model.transform(final_data)

In [28]:
cluster_final_data.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)



In [30]:
kmeans_2=KMeans(featuresCol='scaledFeatures',k=2)
kmeans_3=KMeans(featuresCol='scaledFeatures',k=3)

In [32]:
model_k2=kmeans_2.fit(cluster_final_data)
model_k3=kmeans_3.fit(cluster_final_data)

In [34]:
model_k2.transform(cluster_final_data).select('prediction').show(5)

+----------+
|prediction|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 5 rows



In [35]:
model_k3.transform(cluster_final_data).select('prediction').show(5)

+----------+
|prediction|
+----------+
|         2|
|         1|
|         2|
|         2|
|         1|
+----------+
only showing top 5 rows



In [36]:
model_k2.transform(cluster_final_data).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         0|  167|
|         1|  167|
+----------+-----+



In [37]:
model_k3.transform(cluster_final_data).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         2|   83|
|         1|   84|
|         0|  167|
+----------+-----+

