In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.3.2-bin-hadoop2.7')
import pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('cluster_project').getOrCreate()

In [2]:
data = spark.read.csv('../Python-and-Spark-for-Big-Data/Spark_for_Machine_Learning/Clustering/hack_data.csv', 
                      inferSchema=True, header=True)

In [3]:
data.show(1)

+-----------------------+-----------------+---------------+-----------------+---------------+--------+----------------+
|Session_Connection_Time|Bytes Transferred|Kali_Trace_Used|Servers_Corrupted|Pages_Corrupted|Location|WPM_Typing_Speed|
+-----------------------+-----------------+---------------+-----------------+---------------+--------+----------------+
|                    8.0|           391.09|              1|             2.96|            7.0|Slovenia|           72.37|
+-----------------------+-----------------+---------------+-----------------+---------------+--------+----------------+
only showing top 1 row



In [4]:
from pyspark.ml.clustering import KMeans

In [5]:
from pyspark.ml.feature import VectorAssembler

In [6]:
data.columns

['Session_Connection_Time',
 'Bytes Transferred',
 'Kali_Trace_Used',
 'Servers_Corrupted',
 'Pages_Corrupted',
 'Location',
 'WPM_Typing_Speed']

In [7]:
feature_cols = [
 'Session_Connection_Time',
 'Bytes Transferred',
 'Kali_Trace_Used',
 'Servers_Corrupted',
 'Pages_Corrupted',
 'WPM_Typing_Speed']

In [8]:
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')

In [9]:
final_data = assembler.transform(data)

In [10]:
final_data.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)
 |-- features: vector (nullable = true)



In [11]:
from pyspark.ml.feature import StandardScaler

In [12]:
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')

In [13]:
scaler_model = scaler.fit(final_data).transform(final_data)

In [14]:
scaler_model.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaled_features: vector (nullable = true)



In [15]:
kmeans2 = KMeans(featuresCol='scaled_features', k=2)
kmeans3 = KMeans(featuresCol='scaled_features', k=3)

In [16]:
model_k2 = kmeans2.fit(scaler_model)
model_k3 = kmeans3.fit(scaler_model)

In [18]:
model_k3.transform(scaler_model).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  167|
|         2|   84|
|         0|   83|
+----------+-----+



In [19]:
model_k2.transform(scaler_model).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  167|
|         0|  167|
+----------+-----+

