Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use Graph instead of GraphImpl and update tests/example based on PIC paper #2

Merged
merged 1 commit into from
Feb 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,23 @@ import org.apache.spark.{SparkConf, SparkContext}
* n: Number of sampled points on innermost circle.. There are proportionally more points
* within the outer/larger circles
* maxIterations: Number of Power Iterations
* outerRadius: radius of the outermost of the concentric circles
* }}}
*
* Here is a sample run and output:
*
* ./bin/run-example mllib.PowerIterationClusteringExample -k 3 --n 30 --maxIterations 15
*
* Cluster assignments: 1 -> [0,1,2,3,4],2 -> [5,6,7,8,9,10,11,12,13,14],
* 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
* ./bin/run-example mllib.PowerIterationClusteringExample -k 2 --n 10 --maxIterations 15
*
* Cluster assignments: 1 -> [0,1,2,3,4,5,6,7,8,9],
* 0 -> [10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
*
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
*/
object PowerIterationClusteringExample {

case class Params(
input: String = null,
k: Int = 3,
numPoints: Int = 5,
maxIterations: Int = 10,
outerRadius: Double = 3.0
k: Int = 2,
numPoints: Int = 10,
maxIterations: Int = 15
) extends AbstractParams[Params]

def main(args: Array[String]) {
Expand All @@ -69,17 +65,14 @@ object PowerIterationClusteringExample {
val parser = new OptionParser[Params]("PowerIterationClusteringExample") {
head("PowerIterationClusteringExample: an example PIC app using concentric circles.")
opt[Int]('k', "k")
.text(s"number of circles (/clusters), default: ${defaultParams.k}")
.text(s"number of circles (clusters), default: ${defaultParams.k}")
.action((x, c) => c.copy(k = x))
opt[Int]('n', "n")
.text(s"number of points in smallest circle, default: ${defaultParams.numPoints}")
.action((x, c) => c.copy(numPoints = x))
opt[Int]("maxIterations")
.text(s"number of iterations, default: ${defaultParams.maxIterations}")
.action((x, c) => c.copy(maxIterations = x))
opt[Double]('r', "r")
.text(s"radius of outermost circle, default: ${defaultParams.outerRadius}")
.action((x, c) => c.copy(outerRadius = x))
}

parser.parse(args, defaultParams).map { params =>
Expand All @@ -97,20 +90,21 @@ object PowerIterationClusteringExample {

Logger.getRootLogger.setLevel(Level.WARN)

val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius)
val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
val model = new PowerIterationClustering()
.setK(params.k)
.setMaxIterations(params.maxIterations)
.setInitializationMode("degree")
.run(circlesRdd)

val clusters = model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id))
val assignments = clusters.toList.sortBy { case (k, v) => v.length}
val assignments = clusters.toList.sortBy { case (k, v) => v.length }
val assignmentsStr = assignments
.map { case (k, v) =>
s"$k -> ${v.sorted.mkString("[", ",", "]")}"
}.mkString(",")
}.mkString(", ")
val sizesStr = assignments.map {
_._2.size
_._2.length
}.sorted.mkString("(", ",", ")")
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")

Expand All @@ -124,20 +118,17 @@ object PowerIterationClusteringExample {
}
}

def generateCirclesRdd(sc: SparkContext,
nCircles: Int = 3,
nPoints: Int = 30,
outerRadius: Double): RDD[(Long, Long, Double)] = {

val radii = Array.tabulate(nCircles) { cx => outerRadius / (nCircles - cx)}
val groupSizes = Array.tabulate(nCircles) { cx => (cx + 1) * nPoints}
val points = (0 until nCircles).flatMap { cx =>
generateCircle(radii(cx), groupSizes(cx))
def generateCirclesRdd(
sc: SparkContext,
nCircles: Int,
nPoints: Int): RDD[(Long, Long, Double)] = {
val points = (1 to nCircles).flatMap { i =>
generateCircle(i, i * nPoints)
}.zipWithIndex
val rdd = sc.parallelize(points)
val distancesRdd = rdd.cartesian(rdd).flatMap { case (((x0, y0), i0), ((x1, y1), i1)) =>
if (i0 < i1) {
Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1), 1.0)))
Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1))))
} else {
None
}
Expand All @@ -148,11 +139,9 @@ object PowerIterationClusteringExample {
/**
* Gaussian Similarity: http://en.wikipedia.org/wiki/Radial_basis_function_kernel
*/
def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double), sigma: Double): Double = {
val coeff = 1.0 / (math.sqrt(2.0 * math.Pi) * sigma)
val expCoeff = -1.0 / 2.0 * math.pow(sigma, 2.0)
def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double)): Double = {
val ssquares = (p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2)
coeff * math.exp(expCoeff * ssquares)
math.exp(-ssquares / 2.0)
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.annotation.Since
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.graphx._
import org.apache.spark.graphx.impl.GraphImpl
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.{Loader, MLUtils, Saveable}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -262,10 +261,10 @@ object PowerIterationClustering extends Logging {
},
mergeMsg = _ + _,
TripletFields.EdgeOnly)
GraphImpl.fromExistingRDDs(vD, graph.edges)
Graph(vD, graph.edges)
.mapTriplets(
e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
TripletFields.All)
new TripletFields(/* useSrc */ true, /* useDst */ false, /* useEdge */ true))
}

/**
Expand All @@ -291,10 +290,10 @@ object PowerIterationClustering extends Logging {
},
mergeMsg = _ + _,
TripletFields.EdgeOnly)
GraphImpl.fromExistingRDDs(vD, gA.edges)
Graph(vD, gA.edges)
.mapTriplets(
e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
TripletFields.Src)
new TripletFields(/* useSrc */ true, /* useDst */ false, /* useEdge */ true))
}

/**
Expand All @@ -315,7 +314,7 @@ object PowerIterationClustering extends Logging {
}, preservesPartitioning = true).cache()
val sum = r.values.map(math.abs).sum()
val v0 = r.mapValues(x => x / sum)
GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
Graph(VertexRDD(v0), g.edges)
}

/**
Expand All @@ -330,7 +329,7 @@ object PowerIterationClustering extends Logging {
def initDegreeVector(g: Graph[Double, Double]): Graph[Double, Double] = {
val sum = g.vertices.values.sum()
val v0 = g.vertices.mapValues(_ / sum)
GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
Graph(VertexRDD(v0), g.edges)
}

/**
Expand All @@ -355,7 +354,7 @@ object PowerIterationClustering extends Logging {
val v = curG.aggregateMessages[Double](
sendMsg = ctx => ctx.sendToSrc(ctx.attr * ctx.dstAttr),
mergeMsg = _ + _,
TripletFields.Dst).cache()
new TripletFields(/* useSrc */ false, /* useDst */ true, /* useEdge */ true)).cache()
// normalize v
val norm = v.values.map(math.abs).sum()
logInfo(s"$msgPrefix: norm(v) = $norm.")
Expand All @@ -368,7 +367,7 @@ object PowerIterationClustering extends Logging {
diffDelta = math.abs(delta - prevDelta)
logInfo(s"$msgPrefix: diff(delta) = $diffDelta.")
// update v
curG = GraphImpl.fromExistingRDDs(VertexRDD(v1), g.edges)
curG = Graph(VertexRDD(v1), g.edges)
prevDelta = delta
}
curG.vertices
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,62 +30,65 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon

import org.apache.spark.mllib.clustering.PowerIterationClustering._

/** Generates a circle of points. */
private def genCircle(r: Double, n: Int): Array[(Double, Double)] = {
Array.tabulate(n) { i =>
val theta = 2.0 * math.Pi * i / n
(r * math.cos(theta), r * math.sin(theta))
}
}

/** Computes Gaussian similarity. */
private def sim(x: (Double, Double), y: (Double, Double)): Double = {
val dist2 = (x._1 - y._1) * (x._1 - y._1) + (x._2 - y._2) * (x._2 - y._2)
math.exp(-dist2 / 2.0)
}

test("power iteration clustering") {
/*
We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for
edge (3, 4).

15-14 -13 -12
| |
4 . 3 - 2 11
| | x | |
5 0 - 1 10
| |
6 - 7 - 8 - 9
*/
// Generate two circles following the example in the PIC paper.
val r1 = 1.0
val n1 = 10
val r2 = 4.0
val n2 = 40
val n = n1 + n2
val points = genCircle(r1, n1) ++ genCircle(r2, n2)
val similarities = for (i <- 1 until n; j <- 0 until i) yield {
(i.toLong, j.toLong, sim(points(i), points(j)))
}

val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0),
(1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge
(4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0),
(10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0))
val model = new PowerIterationClustering()
.setK(2)
.setMaxIterations(40)
.run(sc.parallelize(similarities, 2))
val predictions = Array.fill(2)(mutable.Set.empty[Long])
model.assignments.collect().foreach { a =>
predictions(a.cluster) += a.id
}
assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet))

val model2 = new PowerIterationClustering()
.setK(2)
.setMaxIterations(10)
.setInitializationMode("degree")
.run(sc.parallelize(similarities, 2))
val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
model2.assignments.collect().foreach { a =>
predictions2(a.cluster) += a.id
}
assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
}

test("power iteration clustering on graph") {
/*
We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for
edge (3, 4).

15-14 -13 -12
| |
4 . 3 - 2 11
| | x | |
5 0 - 1 10
| |
6 - 7 - 8 - 9
*/

val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0),
(1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge
(4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0),
(10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0))
// Generate two circles following the example in the PIC paper.
val r1 = 1.0
val n1 = 10
val r2 = 4.0
val n2 = 40
val n = n1 + n2
val points = genCircle(r1, n1) ++ genCircle(r2, n2)
val similarities = for (i <- 1 until n; j <- 0 until i) yield {
(i.toLong, j.toLong, sim(points(i), points(j)))
}

val edges = similarities.flatMap { case (i, j, s) =>
if (i != j) {
Expand All @@ -98,22 +101,24 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon

val model = new PowerIterationClustering()
.setK(2)
.setMaxIterations(40)
.run(graph)
val predictions = Array.fill(2)(mutable.Set.empty[Long])
model.assignments.collect().foreach { a =>
predictions(a.cluster) += a.id
}
assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet))

val model2 = new PowerIterationClustering()
.setK(2)
.setMaxIterations(10)
.setInitializationMode("degree")
.run(sc.parallelize(similarities, 2))
val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
model2.assignments.collect().foreach { a =>
predictions2(a.cluster) += a.id
}
assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
}

test("normalize and powerIter") {
Expand Down