Skip to content

Commit

Permalink
Using sparse data structure for storing node ids to offsets.
Browse files Browse the repository at this point in the history
  • Loading branch information
Pankaj Gupta committed Oct 27, 2015
1 parent 1295dce commit 3934969
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 13 deletions.
Expand Up @@ -126,7 +126,7 @@ object PerformanceBenchmark extends App with GzipGraphDownloader {
val sep = separatorInt().toChar
printf("Using Character (%d in Int) as separator\n", sep.toInt)
ListOfEdgesGraphReader.forIntIds(path, filename, graphDir = StoredGraphDir.BothInOut,
separator = sep).toSharedArrayBasedDirectedGraph(forceSparseRepr = Some(false))
separator = sep).toSharedArrayBasedDirectedGraph(forceSparseRepr = None)
//separator = sep).toArrayBasedDirectedGraph(neighborsSortingStrategy = LeaveUnsorted,
// forceSparseRepr = None)
}
Expand Down
Expand Up @@ -4,7 +4,7 @@ import com.google.common.annotations.VisibleForTesting
import com.twitter.cassovary.graph.StoredGraphDir._
import com.twitter.cassovary.graph.node._
import com.twitter.cassovary.util.collections.Int2IntMap
import com.twitter.cassovary.util.{ArrayBackedSet, BoundedFuturePool, Sharded2dArray}
import com.twitter.cassovary.util.{Int2ObjectMap, ArrayBackedSet, BoundedFuturePool, Sharded2dArray}
import com.twitter.finagle.stats.{Stat, DefaultStatsReceiver}
import com.twitter.logging.Logger
import com.twitter.util.{Await, Future, FuturePool}
Expand Down Expand Up @@ -215,12 +215,15 @@ object SharedArrayBasedDirectedGraph {

val numNodes = nodeCollection.size
val nodesWithInEdges = new NodeCollection(nodeCollection.graphInfo, forceSparseRepr)
val inEdgesSizes = new Array[AtomicInteger](nodeCollection.maxNodeId + 1)
val reverseShardsInfo = newShardsInfo()

val inEdgesSizes = Int2ObjectMap[AtomicInteger](nodeCollection.considerGraphSparse,
Some(nodeCollection.graphInfo.numNodes), Some(nodeCollection.maxNodeId),isConcurrent = false)

def partitionNodeIdsPerShard() = {
nodeCollection foreach { id =>
shardsInfo(EdgeShards.hash(id)).numIdsMapped += 1
inEdgesSizes.update(id, new AtomicInteger())
}
shardsInfo foreach { shardInfo =>
shardInfo.idsMapped = new Array[Int](shardInfo.numIdsMapped)
Expand Down Expand Up @@ -255,11 +258,9 @@ object SharedArrayBasedDirectedGraph {

def findInEdgesSizes() = {
log.info("calculating incoming neighbor sizes for each node")
doForAllNodeIds { id => inEdgesSizes(id) = new AtomicInteger() } flatMap { _ =>
doForAllNodeIds { id =>
outEdges.foreach(id) { neighbor =>
inEdgesSizes(neighbor).incrementAndGet()
}
doForAllNodeIds { id =>
outEdges.foreach(id) { neighbor =>
inEdgesSizes(neighbor).incrementAndGet()
}
}
}
Expand Down Expand Up @@ -299,14 +300,14 @@ object SharedArrayBasedDirectedGraph {
}
}

def fillInEdges(sharedInEdgesArray: Array[Array[Int]],
nextFreeEdgeIndexPerNode: Array[AtomicInteger]): Future[Unit] = {
def fillInEdges(sharedInEdgesArray: Array[Array[Int]]): Future[Unit] = {
log.info("filling in edges")
Stat.timeFuture(statsReceiver.stat("graph_load_fill_in_edges")) {
doForAllNodeIds { nodeId =>
outEdges.foreach(nodeId) { neighborId =>
val shard = sharedInEdgesArray(EdgeShards.hash(neighborId))
shard(nextFreeEdgeIndexPerNode(neighborId).getAndIncrement) = nodeId
//remember that we re-used inEdgesSizes to point to offset of edges for this neighborId
shard(inEdgesSizes(neighborId).getAndIncrement) = nodeId
}
}
}
Expand All @@ -333,7 +334,7 @@ object SharedArrayBasedDirectedGraph {
_ <- findInShardSizes()
sharedInEdges = instantiateSharedArray(reverseShardsInfo)
_ <- fillInEdgesOffsets(sharedInEdges)
_ <- fillInEdges(sharedInEdges, inEdgesSizes)
_ <- fillInEdges(sharedInEdges)
_ <- sortInEdges(sharedInEdges)
} yield Some(sharded2dArray(nodesWithInEdges, sharedInEdges))
}
Expand All @@ -348,7 +349,7 @@ private class NodeCollection(val graphInfo: SharedGraphMetaInfo, forceSparsity:
extends Iterable[Int] {

val maxNodeId = graphInfo.maxId
private val considerGraphSparse: Boolean = forceSparsity getOrElse {
val considerGraphSparse: Boolean = forceSparsity getOrElse {
// sparse if number of nodes is much less than maxNodeId, AND
// number of edges is also less than maxNodeId. If number of edges
// were similar to or greater than maxNodeId, then the extra overhead of allocating
Expand Down

0 comments on commit 3934969

Please sign in to comment.