In [ ]:
import scala.collection.mutable.HashMap
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature.Normalizer
val data = sc.textFile("/home/stavros/Downloads/wineQualityReds.csv")
 val header = data.first()
val parsedData = data.filter(row => row != header).map { data => 
val values = data.split(',')
(values(0), Vectors.dense(values.drop(1).map(_.toDouble)))
}.cache()
val dfPoints = parsedData.toDF("label","features")
val normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("n_features")

val lInfNormData = normalizer.transform(dfPoints, normalizer.p -> Double.PositiveInfinity)
// lInfNormData.show()
val normalized_data = lInfNormData.select($"label", $"n_features".as("features"))

val errors = new HashMap[Int,Double]()
 for (numClusters <- 2 to 20) {     
   val kmeans = new KMeans().setK(numClusters)
   .setFeaturesCol("n_features")
   .setMaxIter(50)
   val model = kmeans.fit(lInfNormData)
   val WSSSE = model.computeCost(lInfNormData)
   errors.put(numClusters, WSSSE)
}


import scala.collection.mutable.HashMap
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature.Normalizer
data: org.apache.spark.rdd.RDD[String] = /home/stavros/Downloads/wineQualityReds.csv MapPartitionsRDD[6426] at textFile at <console>:165
header: String = "","fixed.acidity","volatile.acidity","citric.acid","residual.sugar","chlorides","free.sulfur.dioxide","total.sulfur.dioxide","density","pH","sulphates","alcohol","quality"
parsedData: org.apache.spark.rdd.RDD[(String, org.apache.spark.ml.linalg.Vector)] = MapPartitionsRDD[6428] at map at <console>:167
dfPoints: org.apache.spark.sql.DataFrame = [label: string, features: vector]
normalizer: org.apache.spark.ml.fea...

In [ ]:
WSSSE

res207: Double = 107.14101429590055


In [ ]:
errors.toArray

res209: Array[(Int, Double)] = Array((17,21.559666739049433), (8,35.70260943001307), (11,27.692201213342862), (20,19.34549475806726), (2,107.14101429590055), (5,48.53559329155385), (14,24.557777658418264), (4,56.78355364701449), (13,25.257836089219992), (16,22.194938026480422), (7,37.76941511328587), (10,30.71351096101113), (19,20.091797846238663), (18,21.021726370385895), (9,31.948719879861514), (3,75.79602680334075), (12,26.152586968386245), (15,23.018920156459714), (6,42.68206084715902))


In [ ]:
   val kmeans = new KMeans().setK(2)
   .setFeaturesCol("n_features")
   .setMaxIter(50)
   val model = kmeans.fit(lInfNormData)
   val WSSSE = model.computeCost(lInfNormData)

kmeans: org.apache.spark.ml.clustering.KMeans = kmeans_c0089ec1f99b
model: org.apache.spark.ml.clustering.KMeansModel = kmeans_c0089ec1f99b
WSSSE: Double = 107.14101429590055


In [ ]:
model.clusterCenters

res212: Array[org.apache.spark.ml.linalg.Vector] = Array([0.18039444291409718,0.011903512685698184,0.00538682352254432,0.052890252892119106,0.001927570269812699,0.3694765387729449,1.0,0.022183009281431215,0.0741005521280797,0.014625924011909038,0.22995980340258093,0.12500312447932746], [0.5788327370461465,0.0338725581836942,0.020011301098778453,0.1666257715575919,0.0056309725158454,0.40349633343290753,0.9675213942134802,0.06601698929968569,0.2180910512034508,0.04238828475464813,0.714994401855638,0.38312153624625206])


In [ ]:
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors

val pca = new PCA()
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
  .setK(2)
  .fit(normalized_data)


val result = pca.transform(normalized_data).select("pcaFeatures")
result.show(false)


+-------------------------------------------+
|pcaFeatures                                |
+-------------------------------------------+
|[-0.3702973306866152,-0.33771637980173497] |
|[-0.18712918469358497,-0.4023322671905787] |
|[-0.22830404336207674,-0.30380330949811646]|
|[-0.23880490950702832,-0.30385624143161144]|
|[-0.3702973306866152,-0.33771637980173497] |
|[-0.3103399519170776,-0.34436979708419047] |
|[-0.19848360865860548,-0.28268066530572394]|
|[-0.6977010617039199,-0.7043676334409302]  |
|[-0.7960129637347033,-0.4784408899780102]  |
|[-0.10492361385769952,-0.2037650030505796] |
|[-0.1630245984212077,-0.2638485404888952]  |
|[-0.10492361385769952,-0.2037650030505796] |
|[-0.18529130939240113,-0.30476015011481344]|
|[-0.4362691590601039,-0.31712672795491115] |
|[-0.07537772221534685,-0.3966274942156468] |
|[-0.07221040405907608,-0.3829401962619964] |
|[-0.12518177227143848,-0.3750539093048038] |
|[-0.21478281300432153,-0.3120605580593237] |
|[-0.4183908983593498,-0.213722474

In [ ]:
result.rdd.collect().map{x => x.getAs[org.apache.spark.ml.linalg.DenseVector]("pcaFeatures").toArray}
.map { case Array(f1,f2) => (f1,f2) }

res216: Array[(Double, Double)] = Array((-0.3702973306866152,-0.33771637980173497), (-0.18712918469358497,-0.4023322671905787), (-0.22830404336207674,-0.30380330949811646), (-0.23880490950702832,-0.30385624143161144), (-0.3702973306866152,-0.33771637980173497), (-0.3103399519170776,-0.34436979708419047), (-0.19848360865860548,-0.28268066530572394), (-0.6977010617039199,-0.7043676334409302), (-0.7960129637347033,-0.4784408899780102), (-0.10492361385769952,-0.2037650030505796), (-0.1630245984212077,-0.2638485404888952), (-0.10492361385769952,-0.2037650030505796), (-0.18529130939240113,-0.30476015011481344), (-0.4362691590601039,-0.31712672795491115), (-0.07537772221534685,-0.3966274942156468), (-0.07221040405907608,-0.3829401962619964), (-0.12518177227143848,-0.3750539093048038), (-0.2147...