From 3934969b46ee47c37726296110031f7929bb0a83 Mon Sep 17 00:00:00 2001 From: Pankaj Gupta Date: Tue, 27 Oct 2015 10:05:31 -0700 Subject: [PATCH] Using sparse data structure for storing node ids to offsets. --- .../cassovary/PerformanceBenchmark.scala | 2 +- .../graph/SharedArrayBasedDirectedGraph.scala | 25 ++++++++++--------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/cassovary-benchmarks/src/main/scala/com/twitter/cassovary/PerformanceBenchmark.scala b/cassovary-benchmarks/src/main/scala/com/twitter/cassovary/PerformanceBenchmark.scala index 93f74252..7fea6fd2 100644 --- a/cassovary-benchmarks/src/main/scala/com/twitter/cassovary/PerformanceBenchmark.scala +++ b/cassovary-benchmarks/src/main/scala/com/twitter/cassovary/PerformanceBenchmark.scala @@ -126,7 +126,7 @@ object PerformanceBenchmark extends App with GzipGraphDownloader { val sep = separatorInt().toChar printf("Using Character (%d in Int) as separator\n", sep.toInt) ListOfEdgesGraphReader.forIntIds(path, filename, graphDir = StoredGraphDir.BothInOut, - separator = sep).toSharedArrayBasedDirectedGraph(forceSparseRepr = Some(false)) + separator = sep).toSharedArrayBasedDirectedGraph(forceSparseRepr = None) //separator = sep).toArrayBasedDirectedGraph(neighborsSortingStrategy = LeaveUnsorted, // forceSparseRepr = None) } diff --git a/cassovary-core/src/main/scala/com/twitter/cassovary/graph/SharedArrayBasedDirectedGraph.scala b/cassovary-core/src/main/scala/com/twitter/cassovary/graph/SharedArrayBasedDirectedGraph.scala index 54ddb58b..efc290b6 100644 --- a/cassovary-core/src/main/scala/com/twitter/cassovary/graph/SharedArrayBasedDirectedGraph.scala +++ b/cassovary-core/src/main/scala/com/twitter/cassovary/graph/SharedArrayBasedDirectedGraph.scala @@ -4,7 +4,7 @@ import com.google.common.annotations.VisibleForTesting import com.twitter.cassovary.graph.StoredGraphDir._ import com.twitter.cassovary.graph.node._ import com.twitter.cassovary.util.collections.Int2IntMap -import com.twitter.cassovary.util.{ArrayBackedSet, BoundedFuturePool, Sharded2dArray} +import com.twitter.cassovary.util.{Int2ObjectMap, ArrayBackedSet, BoundedFuturePool, Sharded2dArray} import com.twitter.finagle.stats.{Stat, DefaultStatsReceiver} import com.twitter.logging.Logger import com.twitter.util.{Await, Future, FuturePool} @@ -215,12 +215,15 @@ object SharedArrayBasedDirectedGraph { val numNodes = nodeCollection.size val nodesWithInEdges = new NodeCollection(nodeCollection.graphInfo, forceSparseRepr) - val inEdgesSizes = new Array[AtomicInteger](nodeCollection.maxNodeId + 1) val reverseShardsInfo = newShardsInfo() + val inEdgesSizes = Int2ObjectMap[AtomicInteger](nodeCollection.considerGraphSparse, + Some(nodeCollection.graphInfo.numNodes), Some(nodeCollection.maxNodeId),isConcurrent = false) + def partitionNodeIdsPerShard() = { nodeCollection foreach { id => shardsInfo(EdgeShards.hash(id)).numIdsMapped += 1 + inEdgesSizes.update(id, new AtomicInteger()) } shardsInfo foreach { shardInfo => shardInfo.idsMapped = new Array[Int](shardInfo.numIdsMapped) @@ -255,11 +258,9 @@ object SharedArrayBasedDirectedGraph { def findInEdgesSizes() = { log.info("calculating incoming neighbor sizes for each node") - doForAllNodeIds { id => inEdgesSizes(id) = new AtomicInteger() } flatMap { _ => - doForAllNodeIds { id => - outEdges.foreach(id) { neighbor => - inEdgesSizes(neighbor).incrementAndGet() - } + doForAllNodeIds { id => + outEdges.foreach(id) { neighbor => + inEdgesSizes(neighbor).incrementAndGet() } } } @@ -299,14 +300,14 @@ object SharedArrayBasedDirectedGraph { } } - def fillInEdges(sharedInEdgesArray: Array[Array[Int]], - nextFreeEdgeIndexPerNode: Array[AtomicInteger]): Future[Unit] = { + def fillInEdges(sharedInEdgesArray: Array[Array[Int]]): Future[Unit] = { log.info("filling in edges") Stat.timeFuture(statsReceiver.stat("graph_load_fill_in_edges")) { doForAllNodeIds { nodeId => outEdges.foreach(nodeId) { neighborId => val shard = sharedInEdgesArray(EdgeShards.hash(neighborId)) - shard(nextFreeEdgeIndexPerNode(neighborId).getAndIncrement) = nodeId + //remember that we re-used inEdgesSizes to point to offset of edges for this neighborId + shard(inEdgesSizes(neighborId).getAndIncrement) = nodeId } } } @@ -333,7 +334,7 @@ object SharedArrayBasedDirectedGraph { _ <- findInShardSizes() sharedInEdges = instantiateSharedArray(reverseShardsInfo) _ <- fillInEdgesOffsets(sharedInEdges) - _ <- fillInEdges(sharedInEdges, inEdgesSizes) + _ <- fillInEdges(sharedInEdges) _ <- sortInEdges(sharedInEdges) } yield Some(sharded2dArray(nodesWithInEdges, sharedInEdges)) } @@ -348,7 +349,7 @@ private class NodeCollection(val graphInfo: SharedGraphMetaInfo, forceSparsity: extends Iterable[Int] { val maxNodeId = graphInfo.maxId - private val considerGraphSparse: Boolean = forceSparsity getOrElse { + val considerGraphSparse: Boolean = forceSparsity getOrElse { // sparse if number of nodes is much less than maxNodeId, AND // number of edges is also less than maxNodeId. If number of edges // were similar to or greater than maxNodeId, then the extra overhead of allocating