In [1]:
// Try to cluster clients of a Wholesale Distributor
// based off of the sales of some product categories

// Source of the Data
//http://archive.ics.uci.edu/ml/datasets/Wholesale+customers

// Here is the info on the data:
// 1)	FRESH: annual spending (m.u.) on fresh products (Continuous);
// 2)	MILK: annual spending (m.u.) on milk products (Continuous);
// 3)	GROCERY: annual spending (m.u.)on grocery products (Continuous);
// 4)	FROZEN: annual spending (m.u.)on frozen products (Continuous)
// 5)	DETERGENTS_PAPER: annual spending (m.u.) on detergents and paper products (Continuous)
// 6)	DELICATESSEN: annual spending (m.u.)on and delicatessen products (Continuous);
// 7)	CHANNEL: customers Channel - Horeca (Hotel/Restaurant/Cafe) or Retail channel (Nominal)
// 8)	REGION: customers Region- Lisnon, Oporto or Other (Nominal)

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.0.22:4041
SparkContext available as 'sc' (version = 3.0.0, master = local[*], app id = local-1601800385234)
SparkSession available as 'spark'


In [2]:
// Import SparkSession
import org.apache.spark.sql.SparkSession

// Import VectorAssembler and Vectors
import org.apache.spark.ml.feature.{VectorAssembler,StringIndexer,VectorIndexer,OneHotEncoder}
import org.apache.spark.ml.linalg.Vectors

// Set the Error reporting
import org.apache.log4j._
Logger.getLogger("org").setLevel(Level.ERROR)


// Create a Spark Session Instance
val spark = SparkSession.builder().getOrCreate()

// Import Kmeans clustering Algorithm
import org.apache.spark.ml.clustering.KMeans

import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer, VectorIndexer, OneHotEncoder}
import org.apache.spark.ml.linalg.Vectors
import org.apache.log4j._
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@116f08e5
import org.apache.spark.ml.clustering.KMeans


In [3]:
// Load the Wholesale Customers Data
val data = spark.read
    .option("header","true")
    .option("inferSchema","true")
    .csv("wholesale_customers_data.csv")
// Select the following columns for the training set:
// Fresh, Milk, Grocery, Frozen, Detergents_Paper, Delicassen
// Cal this new subset feature_data
val feature_data = data
    .select($"Fresh", $"Milk", $"Grocery", $"Frozen", $"Detergents_Paper", $"Delicassen")

data: org.apache.spark.sql.DataFrame = [Channel: int, Region: int ... 6 more fields]
feature_data: org.apache.spark.sql.DataFrame = [Fresh: int, Milk: int ... 4 more fields]


In [4]:
// Create a new VectorAssembler object called assembler for the feature
// columns as the input Set the output column to be called features
// Remember there is no Label column

val assembler = new VectorAssembler()
    .setInputCols(
        Array("Fresh", "Milk", "Grocery", "Frozen", "Detergents_Paper", "Delicassen"))
    .setOutputCol("features")

assembler: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_ee918c704c7e, handleInvalid=error, numInputCols=6


In [5]:
// Use the assembler object to transform the feature_data
// Call this new data training_data
val training_data = assembler
    .transform(feature_data)
    .select("features")

training_data: org.apache.spark.sql.DataFrame = [features: vector]


In [6]:
// Create a Kmeans Model with K=3
val kmeans = new KMeans().setK(8).setSeed(1L)

kmeans: org.apache.spark.ml.clustering.KMeans = kmeans_0cd8d351105e


In [7]:
// Fit that model to the training_data
val model = kmeans.fit(training_data)

model: org.apache.spark.ml.clustering.KMeansModel = KMeansModel: uid=kmeans_0cd8d351105e, k=8, distanceMeasure=euclidean, numFeatures=6


In [8]:
// Make predictions
val predictions = model.transform(training_data)

predictions: org.apache.spark.sql.DataFrame = [features: vector, prediction: int]


In [9]:
// Evaluate clustering by computing Silhouette score
import org.apache.spark.ml.evaluation.ClusteringEvaluator
val evaluator = new ClusteringEvaluator()

val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")

// Shows the result.
println("Cluster Centers: ")
model.clusterCenters.foreach(println)

Silhouette with squared euclidean distance = 0.5260446829409514
Cluster Centers: 
[6539.087962962963,3006.1157407407404,3540.4675925925926,2709.949074074074,949.2037037037037,973.8194444444443]
[23710.849056603773,3882.056603773585,5169.311320754717,3772.4433962264147,1118.264150943396,1693.1037735849056]
[22925.0,73498.0,32114.0,987.0,20070.0,903.0]
[40204.0,46314.0,57584.5,5518.0,25436.0,4241.0]
[16117.0,46197.0,92780.0,1026.0,40827.0,2944.0]
[4083.797619047619,9649.380952380952,15284.261904761905,1383.642857142857,6546.559523809524,1461.404761904762]
[56453.307692307695,10026.23076923077,8739.0,15518.076923076924,1258.2307692307693,7170.538461538462]
[9291.529411764706,19783.882352941175,32254.470588235294,2083.0588235294117,15662.117647058823,3107.3529411764707]


import org.apache.spark.ml.evaluation.ClusteringEvaluator
evaluator: org.apache.spark.ml.evaluation.ClusteringEvaluator = ClusteringEvaluator: uid=cluEval_7697b05b2dfa, metricName=silhouette, distanceMeasure=squaredEuclidean
silhouette: Double = 0.5260446829409514
