Skip to content

Commit

Permalink
SPARK-1786: Edge Partition Serialization
Browse files Browse the repository at this point in the history
This appears to address the issue with edge partition serialization.  The solution appears to be just registering the `PrimitiveKeyOpenHashMap`.  However I noticed that we appear to have forked that code in GraphX but retained the same name (which is confusing).  I also renamed our local copy to `GraphXPrimitiveKeyOpenHashMap`.  We should consider dropping that and using the one in Spark if possible.

Author: Ankur Dave <ankurdave@gmail.com>
Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com>

Closes apache#724 from jegonzal/edge_partition_serialization and squashes the following commits:

b0a525a [Ankur Dave] Disable reference tracking to fix serialization test
bb7f548 [Ankur Dave] Add failing test for EdgePartition Kryo serialization
67dac22 [Joseph E. Gonzalez] Making EdgePartition serializable.
  • Loading branch information
ankurdave authored and mateiz committed May 12, 2014
1 parent f938a15 commit a6b02fb
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import org.apache.spark.util.BoundedPriorityQueue
import org.apache.spark.util.collection.BitSet

import org.apache.spark.graphx.impl._
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
import org.apache.spark.util.collection.OpenHashSet


/**
* Registers GraphX classes with Kryo for improved performance.
Expand All @@ -43,8 +46,8 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[PartitionStrategy])
kryo.register(classOf[BoundedPriorityQueue[Object]])
kryo.register(classOf[EdgeDirection])

// This avoids a large number of hash table lookups.
kryo.setReferences(false)
kryo.register(classOf[GraphXPrimitiveKeyOpenHashMap[VertexId, Int]])
kryo.register(classOf[OpenHashSet[Int]])
kryo.register(classOf[OpenHashSet[Long]])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl
import scala.reflect.{classTag, ClassTag}

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

/**
* A collection of edges stored in columnar format, along with any vertex attributes referenced. The
Expand All @@ -42,12 +42,12 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
private[graphx]
class EdgePartition[
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag](
@transient val srcIds: Array[VertexId],
@transient val dstIds: Array[VertexId],
@transient val data: Array[ED],
@transient val index: PrimitiveKeyOpenHashMap[VertexId, Int],
@transient val vertices: VertexPartition[VD],
@transient val activeSet: Option[VertexSet] = None
val srcIds: Array[VertexId] = null,
val dstIds: Array[VertexId] = null,
val data: Array[ED] = null,
val index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int] = null,
val vertices: VertexPartition[VD] = null,
val activeSet: Option[VertexSet] = None
) extends Serializable {

/** Return a new `EdgePartition` with the specified edge data. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.util.Sorting
import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

private[graphx]
class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag](
Expand All @@ -41,7 +41,7 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla
val srcIds = new Array[VertexId](edgeArray.size)
val dstIds = new Array[VertexId](edgeArray.size)
val data = new Array[ED](edgeArray.size)
val index = new PrimitiveKeyOpenHashMap[VertexId, Int]
val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
// Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
// adding them to the index
if (edgeArray.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl
import scala.reflect.ClassTag

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

/**
* The Iterator type returned when constructing edge triplets. This could be an anonymous class in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.rdd.ShuffledRDD
import org.apache.spark.util.collection.{BitSet, PrimitiveVector}

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

/**
* A message from the edge partition `pid` to the vertex partition containing `vid` specifying that
Expand Down Expand Up @@ -69,7 +69,7 @@ object RoutingTablePartition {
: Iterator[RoutingTableMessage] = {
// Determine which positions each vertex id appears in using a map where the low 2 bits
// represent src and dst
val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, Byte]
edgePartition.srcIds.iterator.foreach { srcId =>
map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.reflect.ClassTag
import org.apache.spark.util.collection.{BitSet, PrimitiveVector}

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

/** Stores vertex attributes to ship to an edge partition. */
private[graphx]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.reflect.ClassTag
import org.apache.spark.util.collection.BitSet

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

private[graphx] object VertexPartition {
/** Construct a `VertexPartition` from the given vertices. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.reflect.ClassTag
import org.apache.spark.util.collection.BitSet

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

private[graphx] object VertexPartitionBase {
/**
Expand All @@ -32,7 +32,7 @@ private[graphx] object VertexPartitionBase {
*/
def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)])
: (VertexIdToIndexMap, Array[VD], BitSet) = {
val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
iter.foreach { pair =>
map(pair._1) = pair._2
}
Expand All @@ -45,7 +45,7 @@ private[graphx] object VertexPartitionBase {
*/
def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
: (VertexIdToIndexMap, Array[VD], BitSet) = {
val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
iter.foreach { pair =>
map.setMerge(pair._1, pair._2, mergeFunc)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.Logging
import org.apache.spark.util.collection.BitSet

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

/**
* An class containing additional operations for subclasses of VertexPartitionBase that provide
Expand Down Expand Up @@ -224,7 +224,7 @@ private[graphx] abstract class VertexPartitionBaseOps
* Construct a new VertexPartition whose index contains only the vertices in the mask.
*/
def reindex(): Self[VD] = {
val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
val hashMap = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
val arbitraryMerge = (a: VD, b: VD) => a
for ((k, v) <- self.iterator) {
hashMap.setMerge(k, v, arbitraryMerge)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.reflect._
* Under the hood, it uses our OpenHashSet implementation.
*/
private[graphx]
class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
class GraphXPrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
@specialized(Long, Int, Double) V: ClassTag](
val keySet: OpenHashSet[K], var _values: Array[V])
extends Iterable[(K, V)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import scala.util.Random

import org.scalatest.FunSuite

import org.apache.spark.SparkConf
import org.apache.spark.serializer.KryoSerializer

import org.apache.spark.graphx._

class EdgePartitionSuite extends FunSuite {
Expand Down Expand Up @@ -120,4 +123,19 @@ class EdgePartitionSuite extends FunSuite {
assert(!ep.isActive(-1))
assert(ep.numActives == Some(2))
}

test("Kryo serialization") {
val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
val a: EdgePartition[Int, Int] = makeEdgePartition(aList)
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
val s = new KryoSerializer(conf).newInstance()
val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a))
assert(aSer.srcIds.toList === a.srcIds.toList)
assert(aSer.dstIds.toList === a.dstIds.toList)
assert(aSer.data.toList === a.data.toList)
assert(aSer.index != null)
assert(aSer.vertices.iterator.toSet === a.vertices.iterator.toSet)
}
}

0 comments on commit a6b02fb

Please sign in to comment.