# Distrubute K-Means

Implementation of a distributed k-means algorithm which clusters posts on the popular question-answer platform StackOverflow according to their score. 

In [1]:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import annotation.tailrec
import scala.reflect.ClassTag

Defining case class to transform raw data into structured format:

In [2]:
case class Posting(postingType: Int, id: Int, acceptedAnswer: Option[Int], parentId: Option[Int], score: Int, tags: Option[String]) extends Serializable

defined class Posting


# Reading data file

Data can be downloaded from http://alaska.epfl.ch/~dockermoocs/bigdata/stackoverflow.csv

In [3]:
val lines   = sc.textFile("./data/stackoverflow.csv").cache()

lines = ./data/stackoverflow.csv MapPartitionsRDD[1] at textFile at <console>:35


./data/stackoverflow.csv MapPartitionsRDD[1] at textFile at <console>:35

In [4]:
lines.take(2).foreach(println)

1,27233496,,,0,C#
1,23698767,,,9,C#


In [5]:
lines.count()



8143801

# Definig Parameters

In [6]:
  /** Languages */
  val langs =
    List(
      "JavaScript", "Java", "PHP", "Python", "C#", "C++", "Ruby", "CSS",
      "Objective-C", "Perl", "Scala", "Haskell", "MATLAB", "Clojure", "Groovy");

  /** K-means parameter: How "far apart" languages should be for the kmeans algorithm? */
  def langSpread = 50000;
  assert(langSpread > 0, "If langSpread is zero we can't recover the language from the input data!");

  /** K-means parameter: Number of clusters */
  def kmeansKernels = 45;

  /** K-means parameter: Convergence criteria */
  def kmeansEta: Double = 20.0D;

  /** K-means parameter: Maximum iterations */
  def kmeansMaxIterations = 120;

langs = List(JavaScript, Java, PHP, Python, C#, C++, Ruby, CSS, Objective-C, Perl, Scala, Haskell, MATLAB, Clojure, Groovy)


langSpread: Int
kmeansKernels: Int
kmeansEta: Double
kmeansMaxIterations: Int


List(JavaScript, Java, PHP, Python, C#, C++, Ruby, CSS, Objective-C, Perl, Scala, Haskell, MATLAB, Clojure, Groovy)

# Transform raw data into records

In [7]:
  def rawPostings(lines: RDD[String]): RDD[Posting] =
    lines.map(line => {
      val arr = line.split(",")
      Posting(postingType =    arr(0).toInt,
              id =             arr(1).toInt,
              acceptedAnswer = if (arr(2) == "") None else Some(arr(2).toInt),
              parentId =       if (arr(3) == "") None else Some(arr(3).toInt),
              score =          arr(4).toInt,
              tags =           if (arr.length >= 6) Some(arr(5).intern()) else None)
    })

val raw = rawPostings(lines).cache()

raw = MapPartitionsRDD[2] at map at <console>:40


rawPostings: (lines: org.apache.spark.rdd.RDD[String])org.apache.spark.rdd.RDD[Posting]


MapPartitionsRDD[2] at map at <console>:40

In [8]:
raw.take(2).foreach(println)

Posting(1,27233496,None,None,0,Some(C#))
Posting(1,23698767,None,None,9,Some(C#))


# Grouping Question/Answers Together

In [9]:
  def groupedPostings(postings: RDD[Posting]): RDD[(Int, Iterable[(Posting, Posting)])] = {
    val q= postings.filter(z=>z.postingType==1).map(z=> (z.id,z))
    val a= postings.filter(z=>z.postingType==2 && z.parentId.isDefined).map(z=> (z.parentId.get,z))
    q.join(a).groupByKey()

  }

groupedPostings: (postings: org.apache.spark.rdd.RDD[Posting])org.apache.spark.rdd.RDD[(Int, Iterable[(Posting, Posting)])]


In [10]:
 val grouped = groupedPostings(raw)

grouped = MapPartitionsRDD[10] at groupByKey at <console>:38


MapPartitionsRDD[10] at groupByKey at <console>:38

In [11]:
grouped.take(2).foreach(println)

(6,CompactBuffer((Posting(1,6,None,None,140,Some(CSS)),Posting(2,31,None,Some(6),67,None)), (Posting(1,6,None,None,140,Some(CSS)),Posting(2,1948,None,Some(6),25,None)), (Posting(1,6,None,None,140,Some(CSS)),Posting(2,43377,None,Some(6),21,None)), (Posting(1,6,None,None,140,Some(CSS)),Posting(2,856615,None,Some(6),14,None))))
(42,CompactBuffer((Posting(1,42,None,None,155,Some(PHP)),Posting(2,933700,None,Some(42),26,None)), (Posting(1,42,None,None,155,Some(PHP)),Posting(2,76,None,Some(42),3,None)), (Posting(1,42,None,None,155,Some(PHP)),Posting(2,77,None,Some(42),89,None)), (Posting(1,42,None,None,155,Some(PHP)),Posting(2,147,None,Some(42),13,None)), (Posting(1,42,None,None,155,Some(PHP)),Posting(2,86433,None,Some(42),2,None)), (Posting(1,42,None,None,155,Some(PHP)),Posting(2,86796,None,Some(42),3,None)), (Posting(1,42,None,None,155,Some(PHP)),Posting(2,16141818,None,Some(42),0,None))))


# Extract highest rated answer to each question

In [12]:
  def scoredPostings(grouped: RDD[(Int, Iterable[(Posting, Posting)])]): RDD[(Posting, Int)] = {

    def answerHighScore(as: Array[Posting]): Int = {
      var highScore = 0
          var i = 0
          while (i < as.length) {
            val score = as(i).score
                if (score > highScore)
                  highScore = score
                  i += 1
          }
      highScore
    }

    grouped.values.map(z=>(z.head._1, answerHighScore(z.map(_._2).toArray) ) )
  }

scoredPostings: (grouped: org.apache.spark.rdd.RDD[(Int, Iterable[(Posting, Posting)])])org.apache.spark.rdd.RDD[(Posting, Int)]


In [13]:
val scored  = scoredPostings(grouped).cache()

scored = MapPartitionsRDD[12] at map at <console>:60


MapPartitionsRDD[12] at map at <console>:60

In [14]:
scored.take(5).foreach(println)

(Posting(1,6,None,None,140,Some(CSS)),67)
(Posting(1,42,None,None,155,Some(PHP)),89)
(Posting(1,72,None,None,16,Some(Ruby)),3)
(Posting(1,126,None,None,33,Some(Java)),30)
(Posting(1,174,None,None,38,Some(C#)),20)


In [15]:
scored.count()



2121822

# Feature Extraction

In [16]:
  def vectorPostings(scored: RDD[(Posting, Int)]): RDD[(Int, Int)] = {
    /** Return optional index of first language that occurs in `tags`. */
    def firstLangInTag(tag: Option[String], ls: List[String]): Option[Int] = {
      if (tag.isEmpty) None
      else if (ls.isEmpty) None
      else if (tag.get == ls.head) Some(0) // index: 0
      else {
        val tmp = firstLangInTag(tag, ls.tail)
        tmp match {
          case None => None
          case Some(i) => Some(i + 1) // index i in ls.tail => index i+1
        }
      }
    }

    scored.map(z=> ( firstLangInTag(z._1.tags, langs).get * langSpread, z._2)  ).persist()
  }

vectorPostings: (scored: org.apache.spark.rdd.RDD[(Posting, Int)])org.apache.spark.rdd.RDD[(Int, Int)]


In [17]:
val vectors = vectorPostings(scored)

vectors = MapPartitionsRDD[13] at map at <console>:68


MapPartitionsRDD[13] at map at <console>:68

In [27]:
vectors.take(5).foreach(println)

(350000,67)
(100000,89)
(300000,3)
(50000,30)
(200000,20)


# Sampling random centers

In [19]:
  def sampleVectors(vectors: RDD[(Int, Int)]): Array[(Int, Int)] = {

    assert(kmeansKernels % langs.length == 0, "kmeansKernels should be a multiple of the number of languages studied.")
    val perLang = kmeansKernels / langs.length

    // http://en.wikipedia.org/wiki/Reservoir_sampling
    def reservoirSampling(lang: Int, iter: Iterator[Int], size: Int): Array[Int] = {
      val res = new Array[Int](size)
      val rnd = new util.Random(lang)

      for (i <- 0 until size) {
        assert(iter.hasNext, s"iterator must have at least $size elements")
        res(i) = iter.next
      }

      var i = size.toLong
      while (iter.hasNext) {
        val elt = iter.next
        val j = math.abs(rnd.nextLong) % i
        if (j < size)
          res(j.toInt) = elt
        i += 1
      }

      res
    }

    val res =
      if (langSpread < 500)
        // sample the space regardless of the language
        vectors.takeSample(false, kmeansKernels, 42)
      else
        // sample the space uniformly from each language partition
        vectors.groupByKey.flatMap({
          case (lang, vectors) => reservoirSampling(lang, vectors.toIterator, perLang).map((lang, _))
        }).collect()

    assert(res.length == kmeansKernels, res.length)
    res
  }

sampleVectors: (vectors: org.apache.spark.rdd.RDD[(Int, Int)])Array[(Int, Int)]


# K-Means Algorithm

In [21]:
 @tailrec final def kmeans(means: Array[(Int, Int)], vectors: RDD[(Int, Int)], iter: Int = 1, debug: Boolean = false): Array[(Int, Int)] = {
    val newMeans = means.clone() // you need to compute newMeans


    // Side effects!
    vectors
      .map(
        vector => (findClosest(vector, means), vector)
      )
      .groupByKey()
      .mapValues(averageVectors)
      .collect()
      .foreach(pair => {
        newMeans.update(pair._1, pair._2)
      })

    val distance = euclideanDistance(means, newMeans)

    if (debug) {
      println(
        s"""Iteration: $iter
            |  * current distance: $distance
            |  * desired distance: $kmeansEta
            |  * means:""".stripMargin)
      for (idx <- 0 until kmeansKernels)
        println(f"   ${means(idx).toString}%20s ==> ${newMeans(idx).toString}%20s  " +
          f"  distance: ${euclideanDistance(means(idx), newMeans(idx))}%8.0f")
    }

    if (converged(distance))
      newMeans
    else if (iter < kmeansMaxIterations)
      kmeans(newMeans, vectors, iter + 1, debug)
    else {
      println("Reached max iterations!")
      newMeans
    }
  }


  //
  //
  //  Kmeans utilities:
  //
  //

  /** Decide whether the kmeans clustering converged */
  def converged(distance: Double) =
  distance < kmeansEta


  /** Return the euclidean distance between two points */
  def euclideanDistance(v1: (Int, Int), v2: (Int, Int)): Double = {
    val part1 = (v1._1 - v2._1).toDouble * (v1._1 - v2._1)
    val part2 = (v1._2 - v2._2).toDouble * (v1._2 - v2._2)
    part1 + part2
  }

  /** Return the euclidean distance between two points */
  def euclideanDistance(a1: Array[(Int, Int)], a2: Array[(Int, Int)]): Double = {
    assert(a1.length == a2.length)
    var sum = 0d
    var idx = 0
    while (idx < a1.length) {
      sum += euclideanDistance(a1(idx), a2(idx))
      idx += 1
    }
    sum
  }

  /** Return the closest point */
  def findClosest(p: (Int, Int), centers: Array[(Int, Int)]): Int = {
    var bestIndex = 0
    var closest = Double.PositiveInfinity
    for (i <- 0 until centers.length) {
      val tempDist = euclideanDistance(p, centers(i))
      if (tempDist < closest) {
        closest = tempDist
        bestIndex = i
      }
    }
    bestIndex
  }


  /** Average the vectors */
  def averageVectors(ps: Iterable[(Int, Int)]): (Int, Int) = {
    val iter = ps.iterator
    var count = 0
    var comp1: Long = 0
    var comp2: Long = 0
    while (iter.hasNext) {
      val item = iter.next
      comp1 += item._1
      comp2 += item._2
      count += 1
    }
    ((comp1 / count).toInt, (comp2 / count).toInt)
  }


kmeans: (means: Array[(Int, Int)], vectors: org.apache.spark.rdd.RDD[(Int, Int)], iter: Int, debug: Boolean)Array[(Int, Int)]
converged: (distance: Double)Boolean
euclideanDistance: (v1: (Int, Int), v2: (Int, Int))Double <and> (a1: Array[(Int, Int)], a2: Array[(Int, Int)])Double
euclideanDistance: (v1: (Int, Int), v2: (Int, Int))Double <and> (a1: Array[(Int, Int)], a2: Array[(Int, Int)])Double
findClosest: (p: (Int, Int), centers: Array[(Int, Int)])Int
averageVectors: (ps: Iterable[(Int, Int)])(Int, Int)


In [22]:
val means   = kmeans(sampleVectors(vectors), vectors, debug = true)

Iteration: 1                                                                    
  * current distance: 1419.0
  * desired distance: 20.0
  * means:
             (450000,0) ==>           (450000,0)    distance:        0
             (450000,2) ==>           (450000,5)    distance:        9
             (450000,1) ==>           (450000,1)    distance:        0
                  (0,0) ==>                (0,0)    distance:        0
                  (0,1) ==>                (0,5)    distance:       16
                  (0,0) ==>                (0,0)    distance:        0
             (600000,3) ==>           (600000,6)    distance:        9
             (600000,3) ==>           (600000,3)    distance:        0
             (600000,2) ==>           (600000,0)    distance:        4
            (150000,28) ==>          (150000,54)    distance:      676
             (150000,3) ==>           (150000,4)    distance:        1
             (150000,1) ==>           (150000,0)    distance:        1


Iteration: 4                                                                    
  * current distance: 10755.0
  * desired distance: 20.0
  * means:
             (450000,0) ==>           (450000,0)    distance:        0
             (450000,8) ==>          (450000,10)    distance:        4
             (450000,1) ==>           (450000,2)    distance:        1
                  (0,1) ==>                (0,2)    distance:        1
                 (0,33) ==>               (0,78)    distance:     2025
                  (0,0) ==>                (0,0)    distance:        0
            (600000,11) ==>          (600000,13)    distance:        4
             (600000,2) ==>           (600000,2)    distance:        0
             (600000,0) ==>           (600000,0)    distance:        0
           (150000,136) ==>         (150000,197)    distance:     3721
             (150000,5) ==>           (150000,7)    distance:        4
             (150000,0) ==>           (150000,0)    distance:        0

Iteration: 7                                                                    
  * current distance: 33280.0
  * desired distance: 20.0
  * means:
             (450000,0) ==>           (450000,0)    distance:        0
            (450000,15) ==>          (450000,17)    distance:        4
             (450000,3) ==>           (450000,3)    distance:        0
                  (0,5) ==>                (0,8)    distance:        9
                (0,257) ==>              (0,382)    distance:    15625
                  (0,0) ==>                (0,0)    distance:        0
            (600000,18) ==>          (600000,20)    distance:        4
             (600000,2) ==>           (600000,2)    distance:        0
             (600000,0) ==>           (600000,0)    distance:        0
           (150000,327) ==>         (150000,402)    distance:     5625
            (150000,15) ==>          (150000,21)    distance:       36
             (150000,1) ==>           (150000,1)    distance:        0

Iteration: 10                                                                   
  * current distance: 41501.0
  * desired distance: 20.0
  * means:
             (450000,0) ==>           (450000,0)    distance:        0
            (450000,20) ==>          (450000,22)    distance:        4
             (450000,4) ==>           (450000,5)    distance:        1
                 (0,19) ==>               (0,33)    distance:      196
                (0,664) ==>              (0,819)    distance:    24025
                  (0,1) ==>                (0,1)    distance:        0
            (600000,24) ==>          (600000,25)    distance:        1
             (600000,2) ==>           (600000,2)    distance:        0
             (600000,0) ==>           (600000,0)    distance:        0
           (150000,564) ==>         (150000,615)    distance:     2601
            (150000,41) ==>          (150000,54)    distance:      169
             (150000,2) ==>           (150000,2)    distance:        0

Iteration: 13                                                                   
  * current distance: 32252.0
  * desired distance: 20.0
  * means:
             (450000,0) ==>           (450000,0)    distance:        0
            (450000,30) ==>          (450000,34)    distance:       16
             (450000,5) ==>           (450000,5)    distance:        0
                 (0,78) ==>              (0,108)    distance:      900
               (0,1084) ==>             (0,1189)    distance:    11025
                  (0,1) ==>                (0,2)    distance:        1
            (600000,30) ==>          (600000,32)    distance:        4
             (600000,2) ==>           (600000,2)    distance:        0
             (600000,0) ==>           (600000,0)    distance:        0
           (150000,720) ==>         (150000,768)    distance:     2304
            (150000,78) ==>          (150000,90)    distance:      144
             (150000,2) ==>           (150000,2)    distance:        0

Iteration: 16                                                                   
  * current distance: 88258.0
  * desired distance: 20.0
  * means:
             (450000,0) ==>           (450000,1)    distance:        1
            (450000,41) ==>          (450000,45)    distance:       16
             (450000,6) ==>           (450000,7)    distance:        1
                (0,172) ==>              (0,206)    distance:     1156
               (0,1394) ==>             (0,1553)    distance:    25281
                  (0,2) ==>                (0,2)    distance:        0
            (600000,36) ==>          (600000,38)    distance:        4
             (600000,2) ==>           (600000,2)    distance:        0
             (600000,0) ==>           (600000,0)    distance:        0
           (150000,871) ==>         (150000,905)    distance:     1156
           (150000,117) ==>         (150000,130)    distance:      169
             (150000,3) ==>           (150000,3)    distance:        0

Iteration: 19                                                                   
  * current distance: 136824.0
  * desired distance: 20.0
  * means:
             (450000,1) ==>           (450000,1)    distance:        0
            (450000,54) ==>          (450000,57)    distance:        9
             (450000,8) ==>           (450000,8)    distance:        0
                (0,269) ==>              (0,295)    distance:      676
               (0,1777) ==>             (0,1847)    distance:     4900
                  (0,2) ==>                (0,2)    distance:        0
            (600000,41) ==>          (600000,43)    distance:        4
             (600000,2) ==>           (600000,2)    distance:        0
             (600000,0) ==>           (600000,0)    distance:        0
           (150000,985) ==>        (150000,1026)    distance:     1681
           (150000,155) ==>         (150000,167)    distance:      144
             (150000,3) ==>           (150000,3)    distance:        

Iteration: 22                                                                   
  * current distance: 776026.0
  * desired distance: 20.0
  * means:
             (450000,1) ==>           (450000,1)    distance:        0
            (450000,67) ==>          (450000,72)    distance:       25
             (450000,9) ==>          (450000,10)    distance:        1
                (0,335) ==>              (0,350)    distance:      225
               (0,1924) ==>             (0,1957)    distance:     1089
                  (0,2) ==>                (0,2)    distance:        0
            (600000,44) ==>          (600000,44)    distance:        0
             (600000,2) ==>           (600000,2)    distance:        0
             (600000,0) ==>           (600000,0)    distance:        0
          (150000,1153) ==>        (150000,1222)    distance:     4761
           (150000,192) ==>         (150000,203)    distance:      121
             (150000,3) ==>           (150000,3)    distance:        

Iteration: 25                                                                   
  * current distance: 18000.0
  * desired distance: 20.0
  * means:
             (450000,1) ==>           (450000,1)    distance:        0
            (450000,83) ==>          (450000,87)    distance:       16
            (450000,10) ==>          (450000,10)    distance:        0
                (0,379) ==>              (0,392)    distance:      169
               (0,2069) ==>             (0,2111)    distance:     1764
                  (0,2) ==>                (0,2)    distance:        0
            (600000,44) ==>          (600000,44)    distance:        0
             (600000,2) ==>           (600000,2)    distance:        0
             (600000,0) ==>           (600000,0)    distance:        0
          (150000,1391) ==>        (150000,1498)    distance:    11449
           (150000,224) ==>         (150000,236)    distance:      144
             (150000,3) ==>           (150000,3)    distance:        0

Iteration: 28                                                                   
  * current distance: 4329.0
  * desired distance: 20.0
  * means:
             (450000,1) ==>           (450000,1)    distance:        0
            (450000,95) ==>          (450000,99)    distance:       16
            (450000,10) ==>          (450000,10)    distance:        0
                (0,415) ==>              (0,423)    distance:       64
               (0,2179) ==>             (0,2203)    distance:      576
                  (0,2) ==>                (0,2)    distance:        0
            (600000,44) ==>          (600000,44)    distance:        0
             (600000,2) ==>           (600000,2)    distance:        0
             (600000,0) ==>           (600000,0)    distance:        0
          (150000,1594) ==>        (150000,1629)    distance:     1225
           (150000,255) ==>         (150000,262)    distance:       49
             (150000,3) ==>           (150000,3)    distance:        0


Iteration: 31                                                                   
  * current distance: 10560.0
  * desired distance: 20.0
  * means:
             (450000,1) ==>           (450000,1)    distance:        0
           (450000,103) ==>         (450000,104)    distance:        1
            (450000,10) ==>          (450000,10)    distance:        0
                (0,433) ==>              (0,435)    distance:        4
               (0,2228) ==>             (0,2228)    distance:        0
                  (0,2) ==>                (0,2)    distance:        0
            (600000,44) ==>          (600000,44)    distance:        0
             (600000,2) ==>           (600000,2)    distance:        0
             (600000,0) ==>           (600000,0)    distance:        0
          (150000,1629) ==>        (150000,1629)    distance:        0
           (150000,274) ==>         (150000,280)    distance:       36
             (150000,3) ==>           (150000,3)    distance:        0

Iteration: 34                                                                   
  * current distance: 18733.0
  * desired distance: 20.0
  * means:
             (450000,1) ==>           (450000,1)    distance:        0
           (450000,104) ==>         (450000,104)    distance:        0
            (450000,10) ==>          (450000,10)    distance:        0
                (0,442) ==>              (0,448)    distance:       36
               (0,2254) ==>             (0,2309)    distance:     3025
                  (0,2) ==>                (0,2)    distance:        0
            (600000,44) ==>          (600000,44)    distance:        0
             (600000,2) ==>           (600000,2)    distance:        0
             (600000,0) ==>           (600000,0)    distance:        0
          (150000,1629) ==>        (150000,1629)    distance:        0
           (150000,286) ==>         (150000,287)    distance:        1
             (150000,3) ==>           (150000,3)    distance:        0

Iteration: 37                                                                   
  * current distance: 99.0
  * desired distance: 20.0
  * means:
             (450000,1) ==>           (450000,1)    distance:        0
           (450000,104) ==>         (450000,104)    distance:        0
            (450000,10) ==>          (450000,10)    distance:        0
                (0,460) ==>              (0,463)    distance:        9
               (0,2309) ==>             (0,2309)    distance:        0
                  (0,2) ==>                (0,2)    distance:        0
            (600000,44) ==>          (600000,44)    distance:        0
             (600000,2) ==>           (600000,2)    distance:        0
             (600000,0) ==>           (600000,0)    distance:        0
          (150000,1629) ==>        (150000,1629)    distance:        0
           (150000,287) ==>         (150000,287)    distance:        0
             (150000,3) ==>           (150000,3)    distance:        0
  

Iteration: 40                                                                   
  * current distance: 83.0
  * desired distance: 20.0
  * means:
             (450000,1) ==>           (450000,1)    distance:        0
           (450000,104) ==>         (450000,104)    distance:        0
            (450000,10) ==>          (450000,10)    distance:        0
                (0,466) ==>              (0,466)    distance:        0
               (0,2309) ==>             (0,2309)    distance:        0
                  (0,2) ==>                (0,2)    distance:        0
            (600000,44) ==>          (600000,44)    distance:        0
             (600000,2) ==>           (600000,2)    distance:        0
             (600000,0) ==>           (600000,0)    distance:        0
          (150000,1629) ==>        (150000,1629)    distance:        0
           (150000,287) ==>         (150000,287)    distance:        0
             (150000,3) ==>           (150000,3)    distance:        0
  

Iteration: 43                                                                   
  * current distance: 100.0
  * desired distance: 20.0
  * means:
             (450000,1) ==>           (450000,1)    distance:        0
           (450000,104) ==>         (450000,104)    distance:        0
            (450000,10) ==>          (450000,10)    distance:        0
                (0,466) ==>              (0,466)    distance:        0
               (0,2309) ==>             (0,2309)    distance:        0
                  (0,2) ==>                (0,2)    distance:        0
            (600000,44) ==>          (600000,44)    distance:        0
             (600000,2) ==>           (600000,2)    distance:        0
             (600000,0) ==>           (600000,0)    distance:        0
          (150000,1629) ==>        (150000,1629)    distance:        0
           (150000,287) ==>         (150000,287)    distance:        0
             (150000,3) ==>           (150000,3)    distance:        0
 

Iteration: 46                                                                   
  * current distance: 4.0
  * desired distance: 20.0
  * means:
             (450000,1) ==>           (450000,1)    distance:        0
           (450000,104) ==>         (450000,104)    distance:        0
            (450000,10) ==>          (450000,10)    distance:        0
                (0,466) ==>              (0,466)    distance:        0
               (0,2309) ==>             (0,2309)    distance:        0
                  (0,2) ==>                (0,2)    distance:        0
            (600000,44) ==>          (600000,44)    distance:        0
             (600000,2) ==>           (600000,2)    distance:        0
             (600000,0) ==>           (600000,0)    distance:        0
          (150000,1629) ==>        (150000,1629)    distance:        0
           (150000,287) ==>         (150000,287)    distance:        0
             (150000,3) ==>           (150000,3)    distance:        0
   

means = Array((450000,1), (450000,104), (450000,10), (0,466), (0,2309), (0,2), (600000,44), (600000,2), (600000,0), (150000,1629), (150000,287), (150000,3), (300000,105), (300000,3), (300000,557), (50000,10271), (50000,334), (50000,2), (200000,2), (200000,99), (200000,530), (500000,3), (500000,176), (500000,32), (350000,2), (350000,940), (350000,210), (650000,74), (650000,14), (650000,2), (100000,2), (100000,183), (100000,1263), (400000,2), (400000,121), (400000,584), (550000,5), (550000,66), (550000,1130), (250000,271), (250000,1766), (250000,3), (700000,1), (700000,73), (700000,12))


[(450000,1), (450000,104), (450000,10), (0,466), (0,2309), (0,2), (600000,44), (600000,2), (600000,0), (150000,1629), (150000,287), (150000,3), (300000,105), (300000,3), (300000,557), (50000,10271), (50000,334), (50000,2), (200000,2), (200000,99), (200000,530), (500000,3), (500000,176), (500000,32), (350000,2), (350000,940), (350000,210), (650000,74), (650000,14), (650000,2), (100000,2), (100000,183), (100000,1263), (400000,2), (400000,121), (400000,584), (550000,5), (550000,66), (550000,1130), (250000,271), (250000,1766), (250000,3), (700000,1), (700000,73), (700000,12)]

In [23]:
  def clusterResults(means: Array[(Int, Int)], vectors: RDD[(Int, Int)]): Array[(String, Double, Int, Int)] = {
    val closest = vectors.map(p => (findClosest(p, means), p))
    val closestGrouped = closest.groupByKey()

    val median = closestGrouped.mapValues { vs =>
      val LangsInCluster = vs.map(z=> langs(z._1 / langSpread))		
      val langLabel: String   = LangsInCluster.groupBy(identity).maxBy(_._2.size)._1  // most common language in the cluster
      val langPercent: Double = (100.0 * LangsInCluster.count(_ == langLabel)) / vs.size  // percent of the questions in the most common language
      val clusterSize: Int    = vs.size
      
      val sortedscores    = vs.map(z => z._2).toSeq
      val (lower, upper) = sortedscores.sortWith(_<_).splitAt(sortedscores.length / 2)
      val medianScore: Int = {if (sortedscores.length % 2 == 0) ((lower.last + upper.head) / 2.0).toInt else upper.head.toInt }

      (langLabel, langPercent, clusterSize, medianScore)
    }

    median.collect().map(_._2).sortBy(_._4)
  }

clusterResults: (means: Array[(Int, Int)], vectors: org.apache.spark.rdd.RDD[(Int, Int)])Array[(String, Double, Int, Int)]


In [24]:
val results = clusterResults(means, vectors)




results = Array((MATLAB,100.0,3725,0), (PHP,100.0,315776,1), (CSS,100.0,113598,1), (Groovy,100.0,2729,1), (C#,100.0,361835,1), (Ruby,100.0,54727,1), (Objective-C,100.0,94745,1), (Java,100.0,383473,1), (JavaScript,100.0,365647,1), (Perl,100.0,19229,2), (MATLAB,100.0,10656,2), (Scala,100.0,12472,2), (C++,100.0,181255,2), (Clojure,100.0,3324,2), (Python,100.0,174573,2), (Haskell,100.0,10362,4), (Perl,100.0,4714,9), (Groovy,100.0,310,10), (Clojure,100.0,712,12), (Scala,100.0,679,27), (MATLAB,100.0,107,34), (Haskell,100.0,202,53), (Groovy,100.0,14,61), (Clojure,100.0,57,66), (Perl,100.0,58,77), (C#,100.0,2585,79), (Ruby,100.0,648,85), (Objective-C,100.0,784,97), (Scala,100.0,47,130), (PHP,100.0,470,139), (CSS,100.0,358,172), (C++,100.0,264,212), (Python,100...


[(MATLAB,100.0,3725,0), (PHP,100.0,315776,1), (CSS,100.0,113598,1), (Groovy,100.0,2729,1), (C#,100.0,361835,1), (Ruby,100.0,54727,1), (Objective-C,100.0,94745,1), (Java,100.0,383473,1), (JavaScript,100.0,365647,1), (Perl,100.0,19229,2), (MATLAB,100.0,10656,2), (Scala,100.0,12472,2), (C++,100.0,181255,2), (Clojure,100.0,3324,2), (Python,100.0,174573,2), (Haskell,100.0,10362,4), (Perl,100.0,4714,9), (Groovy,100.0,310,10), (Clojure,100.0,712,12), (Scala,100.0,679,27), (MATLAB,100.0,107,34), (Haskell,100.0,202,53), (Groovy,100.0,14,61), (Clojure,100.0,57,66), (Perl,100.0,58,77), (C#,100.0,2585,79), (Ruby,100.0,648,85), (Objective-C,100.0,784,97), (Scala,100.0,47,130), (PHP,100.0,470,139), (CSS,100.0,358,172), (C++,100.0,264,212), (Python,100.0,413,223), (Java,100.0,483,249), (JavaScript,100.0,433,375), (C#,100.0,147,443), (Objective-C,100.0,73,503), (Ruby,100.0,34,546), (CSS,100.0,26,766), (PHP,100.0,13,887), (Haskell,100.0,2,1130), (Python,100.0,19,1269), (C++,100.0,9,1290), (JavaScript,1

In [25]:
 def printResults(results: Array[(String, Double, Int, Int)]): Unit = {
    println("Resulting clusters:")
    println("  Score  Dominant language (%percent)  Questions")
    println("================================================")
    for ((lang, percent, size, score) <- results)
      println(f"${score}%7d  ${lang}%-17s (${percent}%-5.1f%%)      ${size}%7d")
  }

printResults: (results: Array[(String, Double, Int, Int)])Unit


In [26]:
printResults(results)

Resulting clusters:
  Score  Dominant language (%percent)  Questions
      0  MATLAB            (100.0%)         3725
      1  PHP               (100.0%)       315776
      1  CSS               (100.0%)       113598
      1  Groovy            (100.0%)         2729
      1  C#                (100.0%)       361835
      1  Ruby              (100.0%)        54727
      1  Objective-C       (100.0%)        94745
      1  Java              (100.0%)       383473
      1  JavaScript        (100.0%)       365647
      2  Perl              (100.0%)        19229
      2  MATLAB            (100.0%)        10656
      2  Scala             (100.0%)        12472
      2  C++               (100.0%)       181255
      2  Clojure           (100.0%)         3324
      2  Python            (100.0%)       174573
      4  Haskell           (100.0%)        10362
      9  Perl              (100.0%)         4714
     10  Groovy            (100.0%)          310
     12  Clojure           (100.0%)          712
