In [1]:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

In [2]:
/** Calculates the homogeneity custer scoring 
 *
 *  @param df a Spark DataFrame of the input data
 *  @param label a String column name of a column in the df where the true class is stored
 *  @param cluster a String column name of a column in the df where the cluster id is stored
 */
def homogeneity_score(df: DataFrame, label: String, cluster: String): Double = {
  def log2 = (x: Double) => scala.math.log10(x)/scala.math.log10(2.0)

  def entropy(count: Int, n: Long): Double ={
    -(count.toDouble / n) * log2(count.toDouble / n)
  }

  def c_entropy(count: Int, n: Long, k: Long): Double ={
    -(count.toDouble / n) * log2(count.toDouble / k)
  }

  val udf_entropy = udf(entropy _)
  val udf_c_entropy = udf(c_entropy _)


  //filtering for two or more counts to 
  //var df_two_plus = (df.groupBy(label)
  //                     .agg(count(lit(1)).alias("count"))
  //                     .as("df1")
  //                     .join(df.as("df2"), label)
  //                     .filter("count > 1")).select(label,cluster)

  val n = df.count().toLong
  val classes = df.groupBy(label).count()
  val clusters = df.groupBy(cluster).count().toDF(cluster, "count_k")
  // number of class c assigned to cluster k
  val n_ck = df.groupBy(label,cluster).count()

  val entropy_of_classes = (classes.withColumn("entropy", udf_entropy(classes("count"), lit(n)))
                                   .agg(sum("entropy"))
                                   .first()
                                   .getDouble(0))
  
  val joined_df = n_ck.as("n_ck").join(clusters, cluster)
  val conditional_entropy = (joined_df.withColumn("c_entropy", udf_c_entropy(joined_df("count"), lit(n), joined_df("count_k")))
                                 .agg(sum("c_entropy"))
                                 .first()
                                 .getDouble(0))
    
  1 - conditional_entropy.toDouble / entropy_of_classes
}

/** Calculates the completeness custer scoring 
 *
 *  @param df a Spark DataFrame of the input data
 *  @param label a String column name of a column in the df where the true class is stored
 *  @param cluster a String column name of a column in the df where the cluster id is stored
 */
def completeness_score(df: DataFrame, label: String, cluster: String): Double = {
  homogeneity_score(df, cluster, label)
}

/** Calculates the harmonic mean / v measurement of the custer scoring 
 *
 *  @param df a Spark DataFrame of the input data
 *  @param label a String column name of a column in the df where the true class is stored
 *  @param cluster a String column name of a column in the df where the cluster id is stored
 */
def v_measurement_score(df: DataFrame, label: String, cluster: String): Double = {
  val h = homogeneity_score(df, label, cluster)
  val c = completeness_score(df, label, cluster)
  2 * h * c / (h + c)
}

## Create Data

In [1]:
// Dataset 1
case class jz_row(label: String, cluster: String)
val table = Seq(jz_row("0", "a"),jz_row("0", "a"),jz_row("0", "a"),jz_row("0", "b"),jz_row("1", "b"),jz_row("1", "c"),jz_row("1", "c"),jz_row("2","d"))
var df = spark.createDataFrame(table)

In [2]:
// Dataset 2
val schema = df.schema
val labels_true = List("0,0,0,1,1,1,3,3,3,5,5,5,5,5,5,5,5").flatMap(_.split(","))
val labels_pred = List("0,1,1,1,1,1,3,3,3,5,5,5,5,5,5,5,5").flatMap(_.split(","))
val rows = labels_pred zip labels_true
val rdd = sc.parallelize (rows).map(x => Row(x._1, x._2))
df = spark.sqlContext.createDataFrame(rdd, schema)

In [10]:
// Dataset 2
val schema = df.schema
val labels_true = List("0,0,0,1,1,1,3,3,3,5,5,5,5,5,5,5,5").flatMap(_.split(","))
val labels_pred = List("0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0").flatMap(_.split(","))
val rows = labels_true zip labels_pred
val rdd = sc.parallelize (rows).map(x => Row(x._1, x._2))
df = spark.sqlContext.createDataFrame(rdd, schema)

In [11]:
df.show()

+-----+-------+
|label|cluster|
+-----+-------+
|    0|      0|
|    0|      0|
|    0|      0|
|    1|      0|
|    1|      0|
|    1|      0|
|    3|      0|
|    3|      0|
|    3|      0|
|    5|      0|
|    5|      0|
|    5|      0|
|    5|      0|
|    5|      0|
|    5|      0|
|    5|      0|
|    5|      0|
+-----+-------+



In [8]:
completeness_score(df, "label", "cluster")

0.0

In [9]:
homogeneity_score(df, "label", "cluster")

NaN