Skip to content

Commit

Permalink
Resolves twitter#110.
Browse files Browse the repository at this point in the history
Allows nodes that store neighbors sorted in ArrayBasedGraph
  • Loading branch information
szymonm committed Feb 1, 2015
1 parent a29bad9 commit a3b24e1
Show file tree
Hide file tree
Showing 14 changed files with 427 additions and 88 deletions.
Expand Up @@ -16,7 +16,7 @@ package com.twitter.cassovary.graph
import com.google.common.annotations.VisibleForTesting
import com.twitter.cassovary.graph.StoredGraphDir._
import com.twitter.cassovary.graph.node._
import com.twitter.cassovary.util.BoundedFuturePool
import com.twitter.cassovary.util.{SortedArrayOps, BoundedFuturePool}
import com.twitter.finagle.stats.DefaultStatsReceiver
import com.twitter.logging.Logger
import com.twitter.util.{Await, FuturePool, Future}
Expand Down Expand Up @@ -49,29 +49,58 @@ object NodeIdEdgesMaxId {
private case class NodesMaxIds(nodesInOnePart: Seq[Node],
maxIdInPart: Int, nodeWithOutEdgesMaxIdInPart: Int)

/**
* ArrayBasedDirectedGraph can be stored with neighbors sorted or not. Therefore there
* are 3 strategies of loading a graph from input:
* `AlreadySorted` - creates a graph with sorted neighbors from sorted input
* `SortWhileReading` - creates a graph with sorted neighbors sorting them
* while reading
* `LeaveUnsorted` - creates graph with unsorted neighbors (default)
*/
object NeighborsSortingStrategy extends Enumeration {
type NeighborsSortingStrategy = Value

val AlreadySorted = Value
val SortWhileReading = Value
val LeaveUnsorted = Value
}

object ArrayBasedDirectedGraph {
def apply(iterableSeq: Seq[Iterable[NodeIdEdgesMaxId]], parallelismLimit: Int,
storedGraphDir: StoredGraphDir):
import NeighborsSortingStrategy._

def apply(iteratorSeq: Seq[Iterable[NodeIdEdgesMaxId]],
parallelismLimit: Int,
storedGraphDir: StoredGraphDir,
neighborsSortingStrategy: NeighborsSortingStrategy = LeaveUnsorted):
ArrayBasedDirectedGraph = {
val constructor = new ArrayBasedDirectedGraphConstructor(iterableSeq, parallelismLimit, storedGraphDir)
val constructor = new ArrayBasedDirectedGraphConstructor(iteratorSeq,
parallelismLimit,
storedGraphDir,
neighborsSortingStrategy)
constructor()
}

@VisibleForTesting
def apply(iterable: Iterable[NodeIdEdgesMaxId],
storedGraphDir: StoredGraphDir): ArrayBasedDirectedGraph = apply(Seq(iterable), 1, storedGraphDir)
storedGraphDir: StoredGraphDir,
neighborsSortingStrategy: NeighborsSortingStrategy): ArrayBasedDirectedGraph = {
apply(Seq(iterable), 1, storedGraphDir, neighborsSortingStrategy)
}

/**
* Constructs array based directed graph
*/
private class ArrayBasedDirectedGraphConstructor(
iterableSeq: Seq[Iterable[NodeIdEdgesMaxId]],
parallelismLimit: Int,
storedGraphDir: StoredGraphDir
storedGraphDir: StoredGraphDir,
neighborsSortingStrategy: NeighborsSortingStrategy
) {
private lazy val log = Logger.get()
private val statsReceiver = DefaultStatsReceiver

private val emptyArray = Array[Int]()

private val futurePool = new BoundedFuturePool(FuturePool.unboundedPool, parallelismLimit)

/**
Expand Down Expand Up @@ -148,9 +177,10 @@ object ArrayBasedDirectedGraph {
id = item.id
newMaxId = newMaxId max item.maxId
varNodeWithOutEdgesMaxId = varNodeWithOutEdgesMaxId max item.id
val edges = item.edges
edgesLength = edges.length
val newNode = ArrayBasedDirectedNode(id, edges, storedGraphDir)
edgesLength = item.edges.length
val edges = if (neighborsSortingStrategy == SortWhileReading) item.edges.sorted else item.edges
val newNode = ArrayBasedDirectedNode(id, edges, storedGraphDir,
neighborsSortingStrategy != LeaveUnsorted)
nodes += newNode
}
NodesMaxIds(nodes, newMaxId, varNodeWithOutEdgesMaxId)
Expand Down Expand Up @@ -224,7 +254,8 @@ object ArrayBasedDirectedGraph {
if (nodeIdSet(id) == 1) {
numNodes += 1
if (table(id) == null) {
val node = ArrayBasedDirectedNode(id, ArrayBasedDirectedNode.noNodes, storedGraphDir)
val node = ArrayBasedDirectedNode(id, emptyArray, storedGraphDir,
neighborsSortingStrategy != LeaveUnsorted)
table(id) = node
if (storedGraphDir == StoredGraphDir.BothInOut)
nodesWithNoOutEdges += node
Expand Down Expand Up @@ -259,11 +290,12 @@ object ArrayBasedDirectedGraph {
val futures = (nodesOutEdges.iterator ++ Iterator(nodesWithNoOutEdges)).map {
(nodes: Seq[Node]) => futurePool {
nodes foreach { node =>
val biDirNode = node.asInstanceOf[BiDirectionalNode]
val biDirNode = node.asInstanceOf[FillingInEdgesBiDirectionalNode]
val nodeId = biDirNode.id
val edgeSize = inEdgesSizes(nodeId).intValue()
if (edgeSize > 0)
if (edgeSize > 0) {
biDirNode.inEdges = new Array[Int](edgeSize)
}
// reset inEdgesSizes, and use it as index pointer of
// the current insertion place when adding in edges
inEdgesSizes(nodeId).set(0)
Expand All @@ -282,7 +314,7 @@ object ArrayBasedDirectedGraph {
nodes foreach { node =>
node.outboundNodes foreach { outEdge =>
val index = inEdgesSizes(outEdge).getAndIncrement
table(outEdge).asInstanceOf[BiDirectionalNode].inEdges(index) = node.id
table(outEdge).asInstanceOf[FillingInEdgesBiDirectionalNode].inEdges(index) = node.id
}
}
}
Expand All @@ -291,10 +323,26 @@ object ArrayBasedDirectedGraph {
}
}

def finishInEdgesFilling(): Future[Unit] = {
log.debug("finishing filling")
statsReceiver.time("finishing_filling_in_edges") {
val futures = nodesOutEdges.map {
nodes => futurePool {
nodes.foreach {
node =>
table(node.id) = node.asInstanceOf[FillingInEdgesBiDirectionalNode].finishingFilling()
}
}
}
Future.join(futures)
}
}

for {
inEdgesSizes <- findInEdgesSizes(nodesOutEdges, nodeIdSet, numNodes)
_ <- instantiateInEdges(inEdgesSizes)
_ <- populateInEdges(inEdgesSizes)
_ <- finishInEdgesFilling()
} yield ()
}

Expand Down
Expand Up @@ -19,7 +19,7 @@ import scala.util.Random
* Represents a node in a directed graph.
*/
trait Node {
import GraphDir._
import com.twitter.cassovary.graph.GraphDir._

/**
* The unique id of this node.
Expand Down Expand Up @@ -214,6 +214,17 @@ trait Node {
}
}

/**
* @return Intersection of `dir` neighbors with `nodeIds`.
*/
def intersect(dir: GraphDir, nodeIds: Seq[Int]): Seq[Int] = {
intersect(neighborIds(dir), nodeIds)
}

protected def intersect(neighbors: Seq[Int], nodeIds: Seq[Int]): Seq[Int] = {
neighbors.intersect(nodeIds)
}

/**
* the neighbor count in the allowing direction `dir`
* @param dir the direction (inbound or outbound) that the method is applied to.
Expand Down Expand Up @@ -273,9 +284,15 @@ trait Node {

object Node {
private lazy val randGen: Random = new Random
def apply(nodeId: Int, out: Seq[Int], in: Seq[Int]) = new Node {
val id = nodeId
def inboundNodes() = in
def outboundNodes() = out

def apply(nodeId: Int, in: Seq[Int], out: Seq[Int]): Node = {
new SeqBasedNode(nodeId, in, out)
}

def withSortedNeighbors(nodeId: Int, in: Array[Int], out: Array[Int]) = {
new SeqBasedNode(nodeId, in, out) with SortedNeighborsNodeOps
}
}

class SeqBasedNode private[graph] (val id: Int, val inboundNodes: Seq[Int], val outboundNodes: Seq[Int])
extends Node
@@ -0,0 +1,32 @@
/*
* Copyright 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.twitter.cassovary.graph

import com.twitter.cassovary.util.SortedArrayOps

/**
* This trait designed to be mixed in to `Node` when neighbors of node
* are sorted Arrays.
*/
trait SortedNeighborsNodeOps {
self: Node =>

override protected def containsNode(nodeIds: Seq[Int], queryNodeId: Int): Boolean = {
SortedArrayOps.exists(nodeIds.toArray, queryNodeId)
}

override protected def intersect(neighbors: Seq[Int], nodeIds: Seq[Int]): Seq[Int] = {
SortedArrayOps.intersectSorted(neighbors.toArray, nodeIds.toArray)
}
}
Expand Up @@ -66,15 +66,15 @@ object TestGraphs {
NodeIdEdgesMaxId(10, Array(11, 12)),
NodeIdEdgesMaxId(11, Array(12)),
NodeIdEdgesMaxId(12, Array(11))
), StoredGraphDir.BothInOut)
), StoredGraphDir.BothInOut, NeighborsSortingStrategy.LeaveUnsorted)

def g5 = ArrayBasedDirectedGraph(Seq(
NodeIdEdgesMaxId(10, Array(11, 12, 13)),
NodeIdEdgesMaxId(11, Array(12)),
NodeIdEdgesMaxId(12, Array(11)),
NodeIdEdgesMaxId(13, Array(14)),
NodeIdEdgesMaxId(14, Array())
), StoredGraphDir.BothInOut)
), StoredGraphDir.BothInOut, NeighborsSortingStrategy.LeaveUnsorted)

val nodeSeqIterator = Seq(
NodeIdEdgesMaxId(10, Array(11, 12, 13)),
Expand All @@ -86,10 +86,13 @@ object TestGraphs {
)

// using testGraph becomes onerous for non-trivial graphs
def g6 = ArrayBasedDirectedGraph(nodeSeqIterator, StoredGraphDir.BothInOut)
def g6 = ArrayBasedDirectedGraph(nodeSeqIterator, StoredGraphDir.BothInOut,
NeighborsSortingStrategy.LeaveUnsorted)

def g6_onlyout = ArrayBasedDirectedGraph(nodeSeqIterator, StoredGraphDir.OnlyOut)
def g6_onlyin = ArrayBasedDirectedGraph(nodeSeqIterator, StoredGraphDir.OnlyIn)
def g6_onlyout = ArrayBasedDirectedGraph(nodeSeqIterator, StoredGraphDir.OnlyOut,
NeighborsSortingStrategy.LeaveUnsorted)
def g6_onlyin = ArrayBasedDirectedGraph(nodeSeqIterator, StoredGraphDir.OnlyIn,
NeighborsSortingStrategy.LeaveUnsorted)

val nodeSeqIterator2 = Seq(
NodeIdEdgesMaxId(10, Array(11, 12, 13)),
Expand All @@ -100,8 +103,10 @@ object TestGraphs {
NodeIdEdgesMaxId(15, Array(10, 11, 16)),
NodeIdEdgesMaxId(16, Array(15))
)
def g7_onlyout = ArrayBasedDirectedGraph(nodeSeqIterator2, StoredGraphDir.OnlyOut)
def g7_onlyin = ArrayBasedDirectedGraph(nodeSeqIterator2, StoredGraphDir.OnlyIn)
def g7_onlyout = ArrayBasedDirectedGraph(nodeSeqIterator2, StoredGraphDir.OnlyOut,
NeighborsSortingStrategy.LeaveUnsorted)
def g7_onlyin = ArrayBasedDirectedGraph(nodeSeqIterator2, StoredGraphDir.OnlyIn,
NeighborsSortingStrategy.LeaveUnsorted)

// a complete graph is where each node follows every other node
def generateCompleteGraph(numNodes: Int) = {
Expand Down Expand Up @@ -145,7 +150,8 @@ object TestGraphs {
val edgesFromSource = positiveBits map (x => if (x < source) x else x + 1)
nodes(source) = NodeIdEdgesMaxId(source, edgesFromSource)
}
ArrayBasedDirectedGraph(nodes, graphDir)
ArrayBasedDirectedGraph(nodes, graphDir,
NeighborsSortingStrategy.LeaveUnsorted)
}

/**
Expand Down Expand Up @@ -179,6 +185,7 @@ object TestGraphs {
val nodesEdges = nodes.indices map { i =>
NodeIdEdgesMaxId(i, nodes(i).asScala.toArray)
}
ArrayBasedDirectedGraph(nodesEdges, graphDir)
ArrayBasedDirectedGraph(nodesEdges, graphDir,
NeighborsSortingStrategy.LeaveUnsorted)
}
}
Expand Up @@ -23,15 +23,16 @@ object ArrayBasedDirectedNode {
* @param nodeId an id of the node
* @param neighbors a seq of ids of the neighbors read from file
* @param dir the stored graph direction (OnlyIn, OnlyOut, BothInOut or Mutual)
* @param sortedNeighbors true if the neighbors of the node will be sorted
*
* @return a node
*/
def apply(nodeId: Int, neighbors: Array[Int], dir: StoredGraphDir) = {
def apply(nodeId: Int, neighbors: Seq[Int], dir: StoredGraphDir, sortedNeighbors: Boolean = false) = {
dir match {
case StoredGraphDir.OnlyIn | StoredGraphDir.OnlyOut | StoredGraphDir.Mutual =>
UniDirectionalNode(nodeId, neighbors, dir)
UniDirectionalNode(nodeId, neighbors, dir, sortedNeighbors)
case StoredGraphDir.BothInOut =>
BiDirectionalNode(nodeId, neighbors)
BiDirectionalNode(nodeId, neighbors, sortedNeighbors)
}
}
}
Expand Up @@ -13,36 +13,55 @@
*/
package com.twitter.cassovary.graph.node

import com.twitter.cassovary.graph.Node
import com.twitter.cassovary.util.SharedArraySeq
import com.twitter.cassovary.graph.{SortedNeighborsNodeOps, SeqBasedNode, Node}
import com.twitter.cassovary.util.{SortedArrayOps, SharedArraySeq}

import scala.collection.mutable

/**
* Nodes in the graph that store both directions and
* whose inEdges (and only inEdges) can be mutated after initialization
*/
abstract class BiDirectionalNode private[graph] (val id: Int) extends Node {
var inEdges: Array[Int] = BiDirectionalNode.noNodes
def inboundNodes = inEdges
trait BiDirectionalNode extends Node

case class FillingInEdgesBiDirectionalNode(id: Int, outboundNodes: Seq[Int], sortedNeighbors: Boolean)
extends BiDirectionalNode {

var inEdges: mutable.Seq[Int] = Array.empty[Int]

override def inboundNodes(): Seq[Int] = inEdges

def finishingFilling(): BiDirectionalNode = {
if (sortedNeighbors) {
BiDirectionalNode(id, inEdges.sorted, outboundNodes, true)
} else {
BiDirectionalNode.apply(id, inEdges, outboundNodes)
}
}
}

object BiDirectionalNode {
val noNodes = Array.empty[Int]
def apply(nodeId: Int, out: Seq[Int], sortedNeighbors: Boolean) = {
FillingInEdgesBiDirectionalNode(nodeId, out, sortedNeighbors)
}

def apply(nodeId: Int, neighbors: Array[Int]) = {
new BiDirectionalNode(nodeId) {
def outboundNodes = neighbors
def apply(nodeId: Int, in: Seq[Int], out: Seq[Int], sortedNeighbors: Boolean = false) = {
if (sortedNeighbors) {
new SeqBasedNode(nodeId, in, out) with BiDirectionalNode with SortedNeighborsNodeOps
} else {
new SeqBasedNode(nodeId, in, out) with BiDirectionalNode
}
}
}

object SharedArrayBasedBiDirectionalNode {

def apply(nodeId: Int, edgeArrOffset: Int, edgeArrLen: Int, sharedArray: Array[Array[Int]],
reverseDirEdgeArray: Array[Int]) = {
new Node {
val id = nodeId
def outboundNodes = new SharedArraySeq(nodeId, sharedArray, edgeArrOffset, edgeArrLen)
def inboundNodes = reverseDirEdgeArray
def outboundNodes() = new SharedArraySeq(nodeId, sharedArray, edgeArrOffset, edgeArrLen)
def inboundNodes() = reverseDirEdgeArray
}
}
}

0 comments on commit a3b24e1

Please sign in to comment.