diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala index 0723223954610..70b4fb5133e12 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala @@ -40,27 +40,23 @@ import org.apache.spark.{SparkConf, SparkContext} * n: Number of sampled points on innermost circle.. There are proportionally more points * within the outer/larger circles * maxIterations: Number of Power Iterations - * outerRadius: radius of the outermost of the concentric circles * }}} * * Here is a sample run and output: * - * ./bin/run-example mllib.PowerIterationClusteringExample -k 3 --n 30 --maxIterations 15 - * - * Cluster assignments: 1 -> [0,1,2,3,4],2 -> [5,6,7,8,9,10,11,12,13,14], - * 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29] + * ./bin/run-example mllib.PowerIterationClusteringExample -k 2 --n 10 --maxIterations 15 * + * Cluster assignments: 1 -> [0,1,2,3,4,5,6,7,8,9], + * 0 -> [10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29] * * If you use it as a template to create your own app, please use `spark-submit` to submit your app. */ object PowerIterationClusteringExample { case class Params( - input: String = null, - k: Int = 3, - numPoints: Int = 5, - maxIterations: Int = 10, - outerRadius: Double = 3.0 + k: Int = 2, + numPoints: Int = 10, + maxIterations: Int = 15 ) extends AbstractParams[Params] def main(args: Array[String]) { @@ -69,7 +65,7 @@ object PowerIterationClusteringExample { val parser = new OptionParser[Params]("PowerIterationClusteringExample") { head("PowerIterationClusteringExample: an example PIC app using concentric circles.") opt[Int]('k', "k") - .text(s"number of circles (/clusters), default: ${defaultParams.k}") + .text(s"number of circles (clusters), default: ${defaultParams.k}") .action((x, c) => c.copy(k = x)) opt[Int]('n', "n") .text(s"number of points in smallest circle, default: ${defaultParams.numPoints}") @@ -77,9 +73,6 @@ object PowerIterationClusteringExample { opt[Int]("maxIterations") .text(s"number of iterations, default: ${defaultParams.maxIterations}") .action((x, c) => c.copy(maxIterations = x)) - opt[Double]('r', "r") - .text(s"radius of outermost circle, default: ${defaultParams.outerRadius}") - .action((x, c) => c.copy(outerRadius = x)) } parser.parse(args, defaultParams).map { params => @@ -97,20 +90,21 @@ object PowerIterationClusteringExample { Logger.getRootLogger.setLevel(Level.WARN) - val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius) + val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints) val model = new PowerIterationClustering() .setK(params.k) .setMaxIterations(params.maxIterations) + .setInitializationMode("degree") .run(circlesRdd) val clusters = model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id)) - val assignments = clusters.toList.sortBy { case (k, v) => v.length} + val assignments = clusters.toList.sortBy { case (k, v) => v.length } val assignmentsStr = assignments .map { case (k, v) => s"$k -> ${v.sorted.mkString("[", ",", "]")}" - }.mkString(",") + }.mkString(", ") val sizesStr = assignments.map { - _._2.size + _._2.length }.sorted.mkString("(", ",", ")") println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr") @@ -124,20 +118,17 @@ object PowerIterationClusteringExample { } } - def generateCirclesRdd(sc: SparkContext, - nCircles: Int = 3, - nPoints: Int = 30, - outerRadius: Double): RDD[(Long, Long, Double)] = { - - val radii = Array.tabulate(nCircles) { cx => outerRadius / (nCircles - cx)} - val groupSizes = Array.tabulate(nCircles) { cx => (cx + 1) * nPoints} - val points = (0 until nCircles).flatMap { cx => - generateCircle(radii(cx), groupSizes(cx)) + def generateCirclesRdd( + sc: SparkContext, + nCircles: Int, + nPoints: Int): RDD[(Long, Long, Double)] = { + val points = (1 to nCircles).flatMap { i => + generateCircle(i, i * nPoints) }.zipWithIndex val rdd = sc.parallelize(points) val distancesRdd = rdd.cartesian(rdd).flatMap { case (((x0, y0), i0), ((x1, y1), i1)) => if (i0 < i1) { - Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1), 1.0))) + Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1)))) } else { None } @@ -148,11 +139,9 @@ object PowerIterationClusteringExample { /** * Gaussian Similarity: http://en.wikipedia.org/wiki/Radial_basis_function_kernel */ - def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double), sigma: Double): Double = { - val coeff = 1.0 / (math.sqrt(2.0 * math.Pi) * sigma) - val expCoeff = -1.0 / 2.0 * math.pow(sigma, 2.0) + def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double)): Double = { val ssquares = (p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2) - coeff * math.exp(expCoeff * ssquares) + math.exp(-ssquares / 2.0) } } // scalastyle:on println diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index 893ee1f12bfb3..084e38ba40b38 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -24,7 +24,6 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.annotation.Since import org.apache.spark.api.java.JavaRDD import org.apache.spark.graphx._ -import org.apache.spark.graphx.impl.GraphImpl import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.util.{Loader, MLUtils, Saveable} import org.apache.spark.rdd.RDD @@ -262,10 +261,10 @@ object PowerIterationClustering extends Logging { }, mergeMsg = _ + _, TripletFields.EdgeOnly) - GraphImpl.fromExistingRDDs(vD, graph.edges) + Graph(vD, graph.edges) .mapTriplets( e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON), - TripletFields.All) + new TripletFields(/* useSrc */ true, /* useDst */ false, /* useEdge */ true)) } /** @@ -291,10 +290,10 @@ object PowerIterationClustering extends Logging { }, mergeMsg = _ + _, TripletFields.EdgeOnly) - GraphImpl.fromExistingRDDs(vD, gA.edges) + Graph(vD, gA.edges) .mapTriplets( e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON), - TripletFields.Src) + new TripletFields(/* useSrc */ true, /* useDst */ false, /* useEdge */ true)) } /** @@ -315,7 +314,7 @@ object PowerIterationClustering extends Logging { }, preservesPartitioning = true).cache() val sum = r.values.map(math.abs).sum() val v0 = r.mapValues(x => x / sum) - GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges) + Graph(VertexRDD(v0), g.edges) } /** @@ -330,7 +329,7 @@ object PowerIterationClustering extends Logging { def initDegreeVector(g: Graph[Double, Double]): Graph[Double, Double] = { val sum = g.vertices.values.sum() val v0 = g.vertices.mapValues(_ / sum) - GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges) + Graph(VertexRDD(v0), g.edges) } /** @@ -355,7 +354,7 @@ object PowerIterationClustering extends Logging { val v = curG.aggregateMessages[Double]( sendMsg = ctx => ctx.sendToSrc(ctx.attr * ctx.dstAttr), mergeMsg = _ + _, - TripletFields.Dst).cache() + new TripletFields(/* useSrc */ false, /* useDst */ true, /* useEdge */ true)).cache() // normalize v val norm = v.values.map(math.abs).sum() logInfo(s"$msgPrefix: norm(v) = $norm.") @@ -368,7 +367,7 @@ object PowerIterationClustering extends Logging { diffDelta = math.abs(delta - prevDelta) logInfo(s"$msgPrefix: diff(delta) = $diffDelta.") // update v - curG = GraphImpl.fromExistingRDDs(VertexRDD(v1), g.edges) + curG = Graph(VertexRDD(v1), g.edges) prevDelta = delta } curG.vertices diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala index 189000512155f..3d81d375c716e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala @@ -30,62 +30,65 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon import org.apache.spark.mllib.clustering.PowerIterationClustering._ + /** Generates a circle of points. */ + private def genCircle(r: Double, n: Int): Array[(Double, Double)] = { + Array.tabulate(n) { i => + val theta = 2.0 * math.Pi * i / n + (r * math.cos(theta), r * math.sin(theta)) + } + } + + /** Computes Gaussian similarity. */ + private def sim(x: (Double, Double), y: (Double, Double)): Double = { + val dist2 = (x._1 - y._1) * (x._1 - y._1) + (x._2 - y._2) * (x._2 - y._2) + math.exp(-dist2 / 2.0) + } + test("power iteration clustering") { - /* - We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for - edge (3, 4). - - 15-14 -13 -12 - | | - 4 . 3 - 2 11 - | | x | | - 5 0 - 1 10 - | | - 6 - 7 - 8 - 9 - */ + // Generate two circles following the example in the PIC paper. + val r1 = 1.0 + val n1 = 10 + val r2 = 4.0 + val n2 = 40 + val n = n1 + n2 + val points = genCircle(r1, n1) ++ genCircle(r2, n2) + val similarities = for (i <- 1 until n; j <- 0 until i) yield { + (i.toLong, j.toLong, sim(points(i), points(j))) + } - val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0), - (1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge - (4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0), - (10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0)) val model = new PowerIterationClustering() .setK(2) + .setMaxIterations(40) .run(sc.parallelize(similarities, 2)) val predictions = Array.fill(2)(mutable.Set.empty[Long]) model.assignments.collect().foreach { a => predictions(a.cluster) += a.id } - assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet)) + assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet)) val model2 = new PowerIterationClustering() .setK(2) + .setMaxIterations(10) .setInitializationMode("degree") .run(sc.parallelize(similarities, 2)) val predictions2 = Array.fill(2)(mutable.Set.empty[Long]) model2.assignments.collect().foreach { a => predictions2(a.cluster) += a.id } - assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet)) + assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet)) } test("power iteration clustering on graph") { - /* - We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for - edge (3, 4). - - 15-14 -13 -12 - | | - 4 . 3 - 2 11 - | | x | | - 5 0 - 1 10 - | | - 6 - 7 - 8 - 9 - */ - - val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0), - (1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge - (4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0), - (10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0)) + // Generate two circles following the example in the PIC paper. + val r1 = 1.0 + val n1 = 10 + val r2 = 4.0 + val n2 = 40 + val n = n1 + n2 + val points = genCircle(r1, n1) ++ genCircle(r2, n2) + val similarities = for (i <- 1 until n; j <- 0 until i) yield { + (i.toLong, j.toLong, sim(points(i), points(j))) + } val edges = similarities.flatMap { case (i, j, s) => if (i != j) { @@ -98,22 +101,24 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon val model = new PowerIterationClustering() .setK(2) + .setMaxIterations(40) .run(graph) val predictions = Array.fill(2)(mutable.Set.empty[Long]) model.assignments.collect().foreach { a => predictions(a.cluster) += a.id } - assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet)) + assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet)) val model2 = new PowerIterationClustering() .setK(2) + .setMaxIterations(10) .setInitializationMode("degree") .run(sc.parallelize(similarities, 2)) val predictions2 = Array.fill(2)(mutable.Set.empty[Long]) model2.assignments.collect().foreach { a => predictions2(a.cluster) += a.id } - assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet)) + assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet)) } test("normalize and powerIter") {