diff --git a/cassovary-core/src/main/scala/com/twitter/cassovary/graph/ArrayBasedDirectedGraph.scala b/cassovary-core/src/main/scala/com/twitter/cassovary/graph/ArrayBasedDirectedGraph.scala index 21ea24a8..08fc5723 100644 --- a/cassovary-core/src/main/scala/com/twitter/cassovary/graph/ArrayBasedDirectedGraph.scala +++ b/cassovary-core/src/main/scala/com/twitter/cassovary/graph/ArrayBasedDirectedGraph.scala @@ -16,7 +16,7 @@ package com.twitter.cassovary.graph import com.google.common.annotations.VisibleForTesting import com.twitter.cassovary.graph.StoredGraphDir._ import com.twitter.cassovary.graph.node._ -import com.twitter.cassovary.util.BoundedFuturePool +import com.twitter.cassovary.util.{SortedArrayOps, BoundedFuturePool} import com.twitter.finagle.stats.DefaultStatsReceiver import com.twitter.logging.Logger import com.twitter.util.{Await, FuturePool, Future} @@ -49,17 +49,43 @@ object NodeIdEdgesMaxId { private case class NodesMaxIds(nodesInOnePart: Seq[Node], maxIdInPart: Int, nodeWithOutEdgesMaxIdInPart: Int) +/** + * ArrayBasedDirectedGraph can be stored with neighbors sorted or not. Therefore there + * are 3 strategies of loading a graph from input: + * `AlreadySorted` - creates a graph with sorted neighbors from sorted input + * `SortWhileReading` - creates a graph with sorted neighbors sorting them + * while reading + * `LeaveUnsorted` - creates graph with unsorted neighbors (default) + */ +object NeighborsSortingStrategy extends Enumeration { + type NeighborsSortingStrategy = Value + + val AlreadySorted = Value + val SortWhileReading = Value + val LeaveUnsorted = Value +} + object ArrayBasedDirectedGraph { - def apply(iterableSeq: Seq[Iterable[NodeIdEdgesMaxId]], parallelismLimit: Int, - storedGraphDir: StoredGraphDir): + import NeighborsSortingStrategy._ + + def apply(iteratorSeq: Seq[Iterable[NodeIdEdgesMaxId]], + parallelismLimit: Int, + storedGraphDir: StoredGraphDir, + neighborsSortingStrategy: NeighborsSortingStrategy = LeaveUnsorted): ArrayBasedDirectedGraph = { - val constructor = new ArrayBasedDirectedGraphConstructor(iterableSeq, parallelismLimit, storedGraphDir) + val constructor = new ArrayBasedDirectedGraphConstructor(iteratorSeq, + parallelismLimit, + storedGraphDir, + neighborsSortingStrategy) constructor() } @VisibleForTesting def apply(iterable: Iterable[NodeIdEdgesMaxId], - storedGraphDir: StoredGraphDir): ArrayBasedDirectedGraph = apply(Seq(iterable), 1, storedGraphDir) + storedGraphDir: StoredGraphDir, + neighborsSortingStrategy: NeighborsSortingStrategy): ArrayBasedDirectedGraph = { + apply(Seq(iterable), 1, storedGraphDir, neighborsSortingStrategy) + } /** * Constructs array based directed graph @@ -67,11 +93,14 @@ object ArrayBasedDirectedGraph { private class ArrayBasedDirectedGraphConstructor( iterableSeq: Seq[Iterable[NodeIdEdgesMaxId]], parallelismLimit: Int, - storedGraphDir: StoredGraphDir + storedGraphDir: StoredGraphDir, + neighborsSortingStrategy: NeighborsSortingStrategy ) { private lazy val log = Logger.get() private val statsReceiver = DefaultStatsReceiver + private val emptyArray = Array[Int]() + private val futurePool = new BoundedFuturePool(FuturePool.unboundedPool, parallelismLimit) /** @@ -148,9 +177,10 @@ object ArrayBasedDirectedGraph { id = item.id newMaxId = newMaxId max item.maxId varNodeWithOutEdgesMaxId = varNodeWithOutEdgesMaxId max item.id - val edges = item.edges - edgesLength = edges.length - val newNode = ArrayBasedDirectedNode(id, edges, storedGraphDir) + edgesLength = item.edges.length + val edges = if (neighborsSortingStrategy == SortWhileReading) item.edges.sorted else item.edges + val newNode = ArrayBasedDirectedNode(id, edges, storedGraphDir, + neighborsSortingStrategy != LeaveUnsorted) nodes += newNode } NodesMaxIds(nodes, newMaxId, varNodeWithOutEdgesMaxId) @@ -224,7 +254,8 @@ object ArrayBasedDirectedGraph { if (nodeIdSet(id) == 1) { numNodes += 1 if (table(id) == null) { - val node = ArrayBasedDirectedNode(id, ArrayBasedDirectedNode.noNodes, storedGraphDir) + val node = ArrayBasedDirectedNode(id, emptyArray, storedGraphDir, + neighborsSortingStrategy != LeaveUnsorted) table(id) = node if (storedGraphDir == StoredGraphDir.BothInOut) nodesWithNoOutEdges += node @@ -259,11 +290,12 @@ object ArrayBasedDirectedGraph { val futures = (nodesOutEdges.iterator ++ Iterator(nodesWithNoOutEdges)).map { (nodes: Seq[Node]) => futurePool { nodes foreach { node => - val biDirNode = node.asInstanceOf[BiDirectionalNode] + val biDirNode = node.asInstanceOf[FillingInEdgesBiDirectionalNode] val nodeId = biDirNode.id val edgeSize = inEdgesSizes(nodeId).intValue() - if (edgeSize > 0) + if (edgeSize > 0) { biDirNode.inEdges = new Array[Int](edgeSize) + } // reset inEdgesSizes, and use it as index pointer of // the current insertion place when adding in edges inEdgesSizes(nodeId).set(0) @@ -282,7 +314,7 @@ object ArrayBasedDirectedGraph { nodes foreach { node => node.outboundNodes foreach { outEdge => val index = inEdgesSizes(outEdge).getAndIncrement - table(outEdge).asInstanceOf[BiDirectionalNode].inEdges(index) = node.id + table(outEdge).asInstanceOf[FillingInEdgesBiDirectionalNode].inEdges(index) = node.id } } } @@ -291,10 +323,26 @@ object ArrayBasedDirectedGraph { } } + def finishInEdgesFilling(): Future[Unit] = { + log.debug("finishing filling") + statsReceiver.time("finishing_filling_in_edges") { + val futures = nodesOutEdges.map { + nodes => futurePool { + nodes.foreach { + node => + table(node.id) = node.asInstanceOf[FillingInEdgesBiDirectionalNode].finishingFilling() + } + } + } + Future.join(futures) + } + } + for { inEdgesSizes <- findInEdgesSizes(nodesOutEdges, nodeIdSet, numNodes) _ <- instantiateInEdges(inEdgesSizes) _ <- populateInEdges(inEdgesSizes) + _ <- finishInEdgesFilling() } yield () } diff --git a/cassovary-core/src/main/scala/com/twitter/cassovary/graph/Node.scala b/cassovary-core/src/main/scala/com/twitter/cassovary/graph/Node.scala index 9a44d1c7..d96628e6 100644 --- a/cassovary-core/src/main/scala/com/twitter/cassovary/graph/Node.scala +++ b/cassovary-core/src/main/scala/com/twitter/cassovary/graph/Node.scala @@ -19,7 +19,7 @@ import scala.util.Random * Represents a node in a directed graph. */ trait Node { - import GraphDir._ + import com.twitter.cassovary.graph.GraphDir._ /** * The unique id of this node. @@ -214,6 +214,17 @@ trait Node { } } + /** + * @return Intersection of `dir` neighbors with `nodeIds`. + */ + def intersect(dir: GraphDir, nodeIds: Seq[Int]): Seq[Int] = { + intersect(neighborIds(dir), nodeIds) + } + + protected def intersect(neighbors: Seq[Int], nodeIds: Seq[Int]): Seq[Int] = { + neighbors.intersect(nodeIds) + } + /** * the neighbor count in the allowing direction `dir` * @param dir the direction (inbound or outbound) that the method is applied to. @@ -273,9 +284,15 @@ trait Node { object Node { private lazy val randGen: Random = new Random - def apply(nodeId: Int, out: Seq[Int], in: Seq[Int]) = new Node { - val id = nodeId - def inboundNodes() = in - def outboundNodes() = out + + def apply(nodeId: Int, in: Seq[Int], out: Seq[Int]): Node = { + new SeqBasedNode(nodeId, in, out) + } + + def withSortedNeighbors(nodeId: Int, in: Array[Int], out: Array[Int]) = { + new SeqBasedNode(nodeId, in, out) with SortedNeighborsNodeOps } } + +class SeqBasedNode private[graph] (val id: Int, val inboundNodes: Seq[Int], val outboundNodes: Seq[Int]) + extends Node diff --git a/cassovary-core/src/main/scala/com/twitter/cassovary/graph/SortedNeighborsNodeOps.scala b/cassovary-core/src/main/scala/com/twitter/cassovary/graph/SortedNeighborsNodeOps.scala new file mode 100644 index 00000000..07c6c85d --- /dev/null +++ b/cassovary-core/src/main/scala/com/twitter/cassovary/graph/SortedNeighborsNodeOps.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2015 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this + * file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.twitter.cassovary.graph + +import com.twitter.cassovary.util.SortedArrayOps + +/** + * This trait designed to be mixed in to `Node` when neighbors of node + * are sorted Arrays. + */ +trait SortedNeighborsNodeOps { + self: Node => + + override protected def containsNode(nodeIds: Seq[Int], queryNodeId: Int): Boolean = { + SortedArrayOps.exists(nodeIds.toArray, queryNodeId) + } + + override protected def intersect(neighbors: Seq[Int], nodeIds: Seq[Int]): Seq[Int] = { + SortedArrayOps.intersectSorted(neighbors.toArray, nodeIds.toArray) + } +} diff --git a/cassovary-core/src/main/scala/com/twitter/cassovary/graph/TestGraph.scala b/cassovary-core/src/main/scala/com/twitter/cassovary/graph/TestGraph.scala index 5b4e36fe..e4358429 100644 --- a/cassovary-core/src/main/scala/com/twitter/cassovary/graph/TestGraph.scala +++ b/cassovary-core/src/main/scala/com/twitter/cassovary/graph/TestGraph.scala @@ -66,7 +66,7 @@ object TestGraphs { NodeIdEdgesMaxId(10, Array(11, 12)), NodeIdEdgesMaxId(11, Array(12)), NodeIdEdgesMaxId(12, Array(11)) - ), StoredGraphDir.BothInOut) + ), StoredGraphDir.BothInOut, NeighborsSortingStrategy.LeaveUnsorted) def g5 = ArrayBasedDirectedGraph(Seq( NodeIdEdgesMaxId(10, Array(11, 12, 13)), @@ -74,7 +74,7 @@ object TestGraphs { NodeIdEdgesMaxId(12, Array(11)), NodeIdEdgesMaxId(13, Array(14)), NodeIdEdgesMaxId(14, Array()) - ), StoredGraphDir.BothInOut) + ), StoredGraphDir.BothInOut, NeighborsSortingStrategy.LeaveUnsorted) val nodeSeqIterator = Seq( NodeIdEdgesMaxId(10, Array(11, 12, 13)), @@ -86,10 +86,13 @@ object TestGraphs { ) // using testGraph becomes onerous for non-trivial graphs - def g6 = ArrayBasedDirectedGraph(nodeSeqIterator, StoredGraphDir.BothInOut) + def g6 = ArrayBasedDirectedGraph(nodeSeqIterator, StoredGraphDir.BothInOut, + NeighborsSortingStrategy.LeaveUnsorted) - def g6_onlyout = ArrayBasedDirectedGraph(nodeSeqIterator, StoredGraphDir.OnlyOut) - def g6_onlyin = ArrayBasedDirectedGraph(nodeSeqIterator, StoredGraphDir.OnlyIn) + def g6_onlyout = ArrayBasedDirectedGraph(nodeSeqIterator, StoredGraphDir.OnlyOut, + NeighborsSortingStrategy.LeaveUnsorted) + def g6_onlyin = ArrayBasedDirectedGraph(nodeSeqIterator, StoredGraphDir.OnlyIn, + NeighborsSortingStrategy.LeaveUnsorted) val nodeSeqIterator2 = Seq( NodeIdEdgesMaxId(10, Array(11, 12, 13)), @@ -100,8 +103,10 @@ object TestGraphs { NodeIdEdgesMaxId(15, Array(10, 11, 16)), NodeIdEdgesMaxId(16, Array(15)) ) - def g7_onlyout = ArrayBasedDirectedGraph(nodeSeqIterator2, StoredGraphDir.OnlyOut) - def g7_onlyin = ArrayBasedDirectedGraph(nodeSeqIterator2, StoredGraphDir.OnlyIn) + def g7_onlyout = ArrayBasedDirectedGraph(nodeSeqIterator2, StoredGraphDir.OnlyOut, + NeighborsSortingStrategy.LeaveUnsorted) + def g7_onlyin = ArrayBasedDirectedGraph(nodeSeqIterator2, StoredGraphDir.OnlyIn, + NeighborsSortingStrategy.LeaveUnsorted) // a complete graph is where each node follows every other node def generateCompleteGraph(numNodes: Int) = { @@ -145,7 +150,8 @@ object TestGraphs { val edgesFromSource = positiveBits map (x => if (x < source) x else x + 1) nodes(source) = NodeIdEdgesMaxId(source, edgesFromSource) } - ArrayBasedDirectedGraph(nodes, graphDir) + ArrayBasedDirectedGraph(nodes, graphDir, + NeighborsSortingStrategy.LeaveUnsorted) } /** @@ -179,6 +185,7 @@ object TestGraphs { val nodesEdges = nodes.indices map { i => NodeIdEdgesMaxId(i, nodes(i).asScala.toArray) } - ArrayBasedDirectedGraph(nodesEdges, graphDir) + ArrayBasedDirectedGraph(nodesEdges, graphDir, + NeighborsSortingStrategy.LeaveUnsorted) } } diff --git a/cassovary-core/src/main/scala/com/twitter/cassovary/graph/node/ArrayBasedDirectedNode.scala b/cassovary-core/src/main/scala/com/twitter/cassovary/graph/node/ArrayBasedDirectedNode.scala index 1a4a51dc..ee02ea4b 100644 --- a/cassovary-core/src/main/scala/com/twitter/cassovary/graph/node/ArrayBasedDirectedNode.scala +++ b/cassovary-core/src/main/scala/com/twitter/cassovary/graph/node/ArrayBasedDirectedNode.scala @@ -23,15 +23,16 @@ object ArrayBasedDirectedNode { * @param nodeId an id of the node * @param neighbors a seq of ids of the neighbors read from file * @param dir the stored graph direction (OnlyIn, OnlyOut, BothInOut or Mutual) + * @param sortedNeighbors true if the neighbors of the node will be sorted * * @return a node */ - def apply(nodeId: Int, neighbors: Array[Int], dir: StoredGraphDir) = { + def apply(nodeId: Int, neighbors: Seq[Int], dir: StoredGraphDir, sortedNeighbors: Boolean = false) = { dir match { case StoredGraphDir.OnlyIn | StoredGraphDir.OnlyOut | StoredGraphDir.Mutual => - UniDirectionalNode(nodeId, neighbors, dir) + UniDirectionalNode(nodeId, neighbors, dir, sortedNeighbors) case StoredGraphDir.BothInOut => - BiDirectionalNode(nodeId, neighbors) + BiDirectionalNode(nodeId, neighbors, sortedNeighbors) } } } diff --git a/cassovary-core/src/main/scala/com/twitter/cassovary/graph/node/BiDirectionalNode.scala b/cassovary-core/src/main/scala/com/twitter/cassovary/graph/node/BiDirectionalNode.scala index 272d09fe..e418fdb6 100644 --- a/cassovary-core/src/main/scala/com/twitter/cassovary/graph/node/BiDirectionalNode.scala +++ b/cassovary-core/src/main/scala/com/twitter/cassovary/graph/node/BiDirectionalNode.scala @@ -13,36 +13,55 @@ */ package com.twitter.cassovary.graph.node -import com.twitter.cassovary.graph.Node -import com.twitter.cassovary.util.SharedArraySeq +import com.twitter.cassovary.graph.{SortedNeighborsNodeOps, SeqBasedNode, Node} +import com.twitter.cassovary.util.{SortedArrayOps, SharedArraySeq} + +import scala.collection.mutable /** * Nodes in the graph that store both directions and * whose inEdges (and only inEdges) can be mutated after initialization */ -abstract class BiDirectionalNode private[graph] (val id: Int) extends Node { - var inEdges: Array[Int] = BiDirectionalNode.noNodes - def inboundNodes = inEdges +trait BiDirectionalNode extends Node + +case class FillingInEdgesBiDirectionalNode(id: Int, outboundNodes: Seq[Int], sortedNeighbors: Boolean) + extends BiDirectionalNode { + + var inEdges: mutable.Seq[Int] = Array.empty[Int] + + override def inboundNodes(): Seq[Int] = inEdges + + def finishingFilling(): BiDirectionalNode = { + if (sortedNeighbors) { + BiDirectionalNode(id, inEdges.sorted, outboundNodes, true) + } else { + BiDirectionalNode.apply(id, inEdges, outboundNodes) + } + } } object BiDirectionalNode { - val noNodes = Array.empty[Int] + def apply(nodeId: Int, out: Seq[Int], sortedNeighbors: Boolean) = { + FillingInEdgesBiDirectionalNode(nodeId, out, sortedNeighbors) + } - def apply(nodeId: Int, neighbors: Array[Int]) = { - new BiDirectionalNode(nodeId) { - def outboundNodes = neighbors + def apply(nodeId: Int, in: Seq[Int], out: Seq[Int], sortedNeighbors: Boolean = false) = { + if (sortedNeighbors) { + new SeqBasedNode(nodeId, in, out) with BiDirectionalNode with SortedNeighborsNodeOps + } else { + new SeqBasedNode(nodeId, in, out) with BiDirectionalNode } } } object SharedArrayBasedBiDirectionalNode { - def apply(nodeId: Int, edgeArrOffset: Int, edgeArrLen: Int, sharedArray: Array[Array[Int]], reverseDirEdgeArray: Array[Int]) = { new Node { val id = nodeId - def outboundNodes = new SharedArraySeq(nodeId, sharedArray, edgeArrOffset, edgeArrLen) - def inboundNodes = reverseDirEdgeArray + def outboundNodes() = new SharedArraySeq(nodeId, sharedArray, edgeArrOffset, edgeArrLen) + def inboundNodes() = reverseDirEdgeArray } } } + diff --git a/cassovary-core/src/main/scala/com/twitter/cassovary/graph/node/UniDirectionalNode.scala b/cassovary-core/src/main/scala/com/twitter/cassovary/graph/node/UniDirectionalNode.scala index a8dafbf1..7d2eec7b 100644 --- a/cassovary-core/src/main/scala/com/twitter/cassovary/graph/node/UniDirectionalNode.scala +++ b/cassovary-core/src/main/scala/com/twitter/cassovary/graph/node/UniDirectionalNode.scala @@ -13,68 +13,47 @@ */ package com.twitter.cassovary.graph.node -import com.twitter.cassovary.graph.{Node, StoredGraphDir} import com.twitter.cassovary.graph.StoredGraphDir._ -import com.twitter.cassovary.util.SharedArraySeq +import com.twitter.cassovary.graph.{SortedNeighborsNodeOps, Node, SeqBasedNode} +import com.twitter.cassovary.util.{SortedArrayOps, SharedArraySeq} /** * Nodes in the graph that store edges in only one direction (or in the case Mutual Dir graph, * both directions have the same edges). Also the edges stored can not be mutated */ -abstract class UniDirectionalNode private[graph] (val id: Int) extends Node +trait UniDirectionalNode extends Node { + def neighbors(): Seq[Int] = if (inboundNodes() == Nil) outboundNodes() else inboundNodes() +} /** * Factory object for creating uni-directional nodes that uses array as underlying storage * for node's edges */ object UniDirectionalNode { - - def apply(nodeId: Int, neighbors: Array[Int], dir: StoredGraphDir) = { - dir match { - case StoredGraphDir.OnlyIn => - new UniDirectionalNode(nodeId) { - def inboundNodes = neighbors - def outboundNodes = Nil - } - case StoredGraphDir.OnlyOut => - new UniDirectionalNode(nodeId) { - def inboundNodes = Nil - def outboundNodes = neighbors - } - case StoredGraphDir.Mutual => - new UniDirectionalNode(nodeId) { - def inboundNodes = neighbors - def outboundNodes = neighbors - } + def apply(nodeId: Int, neighbors: Seq[Int], dir: StoredGraphDir, sortedNeighbors: Boolean = false) = { + if (sortedNeighbors) { + new SeqBasedNode(nodeId, + if (dir == OnlyOut) Nil else neighbors, + if (dir == OnlyIn) Nil else neighbors) with UniDirectionalNode with SortedNeighborsNodeOps + } else { + new SeqBasedNode(nodeId, + if (dir == OnlyOut) Nil else neighbors, + if (dir == OnlyIn) Nil else neighbors) with UniDirectionalNode } } } /** * Factory object for creating uni-directional nodes that uses shared array as underlying storage - * for node's edges, * i.e. multiple nodes share a sharded two-dimensional array + * for node's edges, * i.e. multiple nodes share a shared two-dimensional array * object to hold its edges */ object SharedArrayBasedUniDirectionalNode { - def apply(nodeId: Int, edgeArrOffset: Int, edgeArrLen: Int, sharedArray: Array[Array[Int]], - dir: StoredGraphDir) = { - dir match { - case StoredGraphDir.OnlyIn => - new UniDirectionalNode(nodeId) { - def inboundNodes = new SharedArraySeq(nodeId, sharedArray, edgeArrOffset, edgeArrLen) - def outboundNodes = Nil - } - case StoredGraphDir.OnlyOut => - new UniDirectionalNode(nodeId) { - def inboundNodes = Nil - def outboundNodes = new SharedArraySeq(nodeId, sharedArray, edgeArrOffset, edgeArrLen) - } - case StoredGraphDir.Mutual => - new UniDirectionalNode(nodeId) { - def inboundNodes = new SharedArraySeq(nodeId, sharedArray, edgeArrOffset, edgeArrLen) - def outboundNodes = new SharedArraySeq(nodeId, sharedArray, edgeArrOffset, edgeArrLen) - } - } + dir: StoredGraphDir) = { + val neighbors = new SharedArraySeq(nodeId, sharedArray, edgeArrOffset, edgeArrLen) + new SeqBasedNode(nodeId, + if (dir == OnlyOut) Nil else neighbors, + if (dir == OnlyIn) Nil else neighbors) with UniDirectionalNode } } diff --git a/cassovary-core/src/main/scala/com/twitter/cassovary/util/SortedArrayOps.scala b/cassovary-core/src/main/scala/com/twitter/cassovary/util/SortedArrayOps.scala new file mode 100644 index 00000000..e5c4f5d7 --- /dev/null +++ b/cassovary-core/src/main/scala/com/twitter/cassovary/util/SortedArrayOps.scala @@ -0,0 +1,92 @@ +package com.twitter.cassovary.util + +import java.{util => jutil} + +object SortedArrayOps { + /** + * @return true if `elem` is in the `array`. + * + * Assumes that the collection is sorted. + */ + def exists(array: Array[Int], elem: Int): Boolean = { + // Not using collection.Searching because of boxing/unboxing + jutil.Arrays.binarySearch(array, elem) match { + case idx if idx < 0 => false + case idx => true + } + } + + /** + * Linear intersection of two sorted arrays. + */ + def intersectSorted(array: Array[Int], that: Array[Int]): Array[Int] = { + val intersection = Array.ofDim[Int](math.min(array.length, that.length)) + var intersectionIdx = 0 + var arrayIdx = 0 + var thatIdx = 0 + + while (arrayIdx < array.length && thatIdx < that.length) { + if (array(arrayIdx) == that(thatIdx)) { + intersection(intersectionIdx) = array(arrayIdx) + intersectionIdx += 1 + arrayIdx += 1 + thatIdx += 1 + } else if (array(arrayIdx) < that(thatIdx)) { + arrayIdx += 1 + } else { + thatIdx += 1 + } + } + + slice(intersection, 0, intersectionIdx) + } + + /** + * Linear union of two sorted arrays. + * + * @return A sorted array containing only once each element + * that wast in either of two arrays. + */ + def unionSorted(thisA: Array[Int], that: Array[Int]): Array[Int] = { + val union = Array.ofDim[Int](thisA.length + that.length) + var resultIdx = 0 + var thisIdx = 0 + var thatIdx = 0 + + @inline def putInDest(x: Int): Unit = { + union(resultIdx) = x + resultIdx += 1 + } + + while (thisIdx < thisA.length || thatIdx < that.length) { + if (thisIdx == thisA.length) { + putInDest(that(thatIdx)) + thatIdx += 1 + } else if (thatIdx == that.length) { + putInDest(thisA(thisIdx)) + thisIdx += 1 + } else { + if (thisA(thisIdx) == that(thatIdx)) { + putInDest(thisA(thisIdx)) + thisIdx += 1 + thatIdx += 1 // ignoring duplicates + } else if (thisA(thisIdx) < that(thatIdx)) { + putInDest(thisA(thisIdx)) + thisIdx += 1 + } else { + putInDest(that(thatIdx)) + thatIdx += 1 + } + } + } + + slice(union, 0, resultIdx) + } + + @inline def slice(a: Array[Int], from: Int, len: Int): Array[Int] = { + val result = Array.ofDim[Int](len) + Array.copy(a, from, result, 0, len) + result + } +} + diff --git a/cassovary-core/src/test/scala/com/twitter/cassovary/graph/ArrayBasedDirectedGraphSpec.scala b/cassovary-core/src/test/scala/com/twitter/cassovary/graph/ArrayBasedDirectedGraphSpec.scala index 2c0a4c37..0a7ff527 100644 --- a/cassovary-core/src/test/scala/com/twitter/cassovary/graph/ArrayBasedDirectedGraphSpec.scala +++ b/cassovary-core/src/test/scala/com/twitter/cassovary/graph/ArrayBasedDirectedGraphSpec.scala @@ -16,5 +16,58 @@ package com.twitter.cassovary.graph import org.scalatest.WordSpec class ArrayBasedDirectedGraphSpec extends WordSpec with GraphBehaviours { - verifyGraphBuilding(ArrayBasedDirectedGraph.apply, sampleGraphEdges) + verifyGraphBuilding(ArrayBasedDirectedGraph.apply(_, _, NeighborsSortingStrategy.LeaveUnsorted), + sampleGraphEdges) + + "ArrayBasedDirectedGraph" should { + "read neighbors without sorting" in { + val graph = ArrayBasedDirectedGraph.apply( + Iterable( + NodeIdEdgesMaxId(1, Array(2, 3), 3), + NodeIdEdgesMaxId(2, Array(3, 1), 3), + NodeIdEdgesMaxId(3, Array(1, 2), 2)), StoredGraphDir.BothInOut, + NeighborsSortingStrategy.LeaveUnsorted) + graph.getNodeById(1).get.outboundNodes() should be (Seq(2, 3)) + graph.getNodeById(2).get.outboundNodes() should be (Seq(3, 1)) + graph.getNodeById(3).get.outboundNodes() should be (Seq(1, 2)) + + graph.getNodeById(1).get.inboundNodes() should be (Seq(2, 3)) + graph.getNodeById(2).get.inboundNodes() should be (Seq(1, 3)) + graph.getNodeById(3).get.inboundNodes() should be (Seq(1, 2)) + } + + "read sorted neighbors and return graph with sorted neighbors" in { + val graph = ArrayBasedDirectedGraph.apply( + Iterable( + NodeIdEdgesMaxId(3, Array(1, 2), 2), + NodeIdEdgesMaxId(1, Array(2, 3), 3), + NodeIdEdgesMaxId(2, Array(1, 3), 3) + ), StoredGraphDir.BothInOut, + NeighborsSortingStrategy.AlreadySorted) + graph.getNodeById(1).get.outboundNodes() should be (Seq(2, 3)) + graph.getNodeById(2).get.outboundNodes() should be (Seq(1, 3)) + graph.getNodeById(3).get.outboundNodes() should be (Seq(1, 2)) + + graph.getNodeById(1).get.inboundNodes() should be (Seq(2, 3)) + graph.getNodeById(2).get.inboundNodes() should be (Seq(1, 3)) + graph.getNodeById(3).get.inboundNodes() should be (Seq(1, 2)) + } + + "sort neighbors while reading" in { + val graph = ArrayBasedDirectedGraph.apply( + Iterable( + NodeIdEdgesMaxId(3, Array(2, 1)), + NodeIdEdgesMaxId(1, Array(3, 2), 3), + NodeIdEdgesMaxId(2, Array(3, 1), 3) + ), StoredGraphDir.BothInOut, + NeighborsSortingStrategy.SortWhileReading) + graph.getNodeById(1).get.outboundNodes() should be (Seq(2, 3)) + graph.getNodeById(2).get.outboundNodes() should be (Seq(1, 3)) + graph.getNodeById(3).get.outboundNodes() should be (Seq(1, 2)) + + graph.getNodeById(1).get.inboundNodes() should be (Seq(2, 3)) + graph.getNodeById(2).get.inboundNodes() should be (Seq(1, 3)) + graph.getNodeById(3).get.inboundNodes() should be (Seq(1, 2)) + } + } } diff --git a/cassovary-core/src/test/scala/com/twitter/cassovary/graph/GraphBehaviours.scala b/cassovary-core/src/test/scala/com/twitter/cassovary/graph/GraphBehaviours.scala index a3c5ce15..262ed0c3 100644 --- a/cassovary-core/src/test/scala/com/twitter/cassovary/graph/GraphBehaviours.scala +++ b/cassovary-core/src/test/scala/com/twitter/cassovary/graph/GraphBehaviours.scala @@ -12,6 +12,7 @@ */ package com.twitter.cassovary.graph +import com.twitter.cassovary.graph.NeighborsSortingStrategy.NeighborsSortingStrategy import com.twitter.cassovary.graph.StoredGraphDir._ import com.twitter.cassovary.util.NodeNumberer import org.scalatest.{Matchers, WordSpec} diff --git a/cassovary-core/src/test/scala/com/twitter/cassovary/graph/NodeSpec.scala b/cassovary-core/src/test/scala/com/twitter/cassovary/graph/NodeSpec.scala index 7f12c4df..67502624 100644 --- a/cassovary-core/src/test/scala/com/twitter/cassovary/graph/NodeSpec.scala +++ b/cassovary-core/src/test/scala/com/twitter/cassovary/graph/NodeSpec.scala @@ -126,4 +126,26 @@ class NodeSpec extends WordSpec with Matchers { node.randomNeighborSet(4, GraphDir.OutDir)(0) shouldEqual 3 } } + + "A node with sorted neighbors" should { + "allow linear intersection of neighbors with other node" in { + val node1 = new SeqBasedNode(0, Array(1, 2), Array(4, 6)) + with SortedNeighborsNodeOps + + val node2 = new SeqBasedNode(0, Array(3), Array(4, 6)) + with SortedNeighborsNodeOps + + node1.intersect(GraphDir.InDir, node2.inboundNodes) should be (Seq()) + node1.intersect(GraphDir.OutDir, node2.outboundNodes) should be (Seq(4, 6)) + } + + "allow logarithmic membership checking" in { + val node = new SeqBasedNode(0, Array(1, 2, 3), Array(4, 5, 6)) with SortedNeighborsNodeOps + node.isNeighbor(GraphDir.InDir, 1) should be (true) + node.isNeighbor(GraphDir.InDir, 4) should be (false) + + node.isNeighbor(GraphDir.OutDir, 1) should be (false) + node.isNeighbor(GraphDir.OutDir, 4) should be (true) + } + } } diff --git a/cassovary-core/src/test/scala/com/twitter/cassovary/graph/node/ArrayBasedDirectedNodeSpec.scala b/cassovary-core/src/test/scala/com/twitter/cassovary/graph/node/ArrayBasedDirectedNodeSpec.scala index 45f1b0c6..0df7a317 100644 --- a/cassovary-core/src/test/scala/com/twitter/cassovary/graph/node/ArrayBasedDirectedNodeSpec.scala +++ b/cassovary-core/src/test/scala/com/twitter/cassovary/graph/node/ArrayBasedDirectedNodeSpec.scala @@ -20,6 +20,6 @@ class ArrayBasedDirectedNodeSpec extends NodeBehaviors { val actualOut = ArrayBasedDirectedNode(nodeId, neighbors, StoredGraphDir.OnlyOut) val actualMutual = ArrayBasedDirectedNode(nodeId, neighbors, StoredGraphDir.Mutual) val actualBoth = ArrayBasedDirectedNode(nodeId, neighbors, StoredGraphDir.BothInOut) - actualBoth.asInstanceOf[BiDirectionalNode].inEdges = inEdges + actualBoth.asInstanceOf[FillingInEdgesBiDirectionalNode].inEdges = inEdges correctlyConstructNodes(actualIn, actualOut, actualMutual, actualBoth) } diff --git a/cassovary-core/src/test/scala/com/twitter/cassovary/graph/node/NodeBehaviors.scala b/cassovary-core/src/test/scala/com/twitter/cassovary/graph/node/NodeBehaviors.scala index a867e14b..b7c3ef37 100644 --- a/cassovary-core/src/test/scala/com/twitter/cassovary/graph/node/NodeBehaviors.scala +++ b/cassovary-core/src/test/scala/com/twitter/cassovary/graph/node/NodeBehaviors.scala @@ -24,9 +24,9 @@ trait NodeBehaviors extends WordSpec with Matchers { def correctlyConstructNodes(actualOnlyIn: Node, actualOnlyOut: Node, actualMutual: Node, actualBoth: Node): Unit = { "construct uni-directional nodes correctly" in { - val onlyInNode = Node(nodeId, out = Nil, neighbors) - val onlyOutNode = Node(nodeId, out = neighbors, Nil) - val mutualNode = Node(nodeId, out = neighbors, neighbors) + val onlyInNode = Node(nodeId, neighbors, out = Nil) + val onlyOutNode = Node(nodeId, Nil, out = neighbors) + val mutualNode = Node(nodeId, neighbors, out = neighbors) actualOnlyIn should deepEquals(onlyInNode) actualOnlyOut should deepEquals(onlyOutNode) actualMutual should deepEquals(mutualNode) diff --git a/cassovary-core/src/test/scala/com/twitter/cassovary/util/SortedArrayOpsSpec.scala b/cassovary-core/src/test/scala/com/twitter/cassovary/util/SortedArrayOpsSpec.scala new file mode 100644 index 00000000..7c52604e --- /dev/null +++ b/cassovary-core/src/test/scala/com/twitter/cassovary/util/SortedArrayOpsSpec.scala @@ -0,0 +1,68 @@ +/* + * Copyright 2014 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this + * file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.twitter.cassovary.util + +import org.scalatest.{Matchers, WordSpec} + +class SortedArrayOpsSpec extends WordSpec with Matchers { + "SortedArrayWrapper" should { + "check if element exists in the array" when { + "empty array" in { + SortedArrayOps.exists(Array[Int](), 4) should be (false) + } + "array with 1 element" in { + SortedArrayOps.exists(Array(1), 4) should be (false) + SortedArrayOps.exists(Array(4), 4) should be (true) + } + "array with few elements" in { + SortedArrayOps.exists(Array(1,2,3,4), 4) should be (true) + SortedArrayOps.exists(Array(1,2,3,4), 5) should be (false) + } + } + + "compute an intersection of two arrays" when { + "first of the arrays in empty" in { + SortedArrayOps.intersectSorted(Array(), Array(1, 2, 3, 4)) should be (Array()) + } + + "second of the arrays in empty" in { + SortedArrayOps.intersectSorted(Array(1, 2, 3, 4), Array()) should be (Array()) + } + + "arrays don't contain duplicates" in { + SortedArrayOps.intersectSorted(Array(2, 3), Array(1, 4)) should be (Array()) + } + + "arrays contain duplicates" in { + SortedArrayOps.intersectSorted(Array(2, 3), Array(1, 2, 3, 4)) should be (Array(2, 3)) + } + } + "compute a union of two arrays" when { + "first is empty" in { + SortedArrayOps.unionSorted(Array(), Array(1, 2, 3, 4)) should be (Array(1, 2, 3, 4)) + } + "second is empty" in { + SortedArrayOps.unionSorted(Array(1, 2, 3, 4), Array()) should be (Array(1, 2, 3, 4)) + } + + "arrays don't contain duplicates" in { + SortedArrayOps.unionSorted(Array(2, 3), Array(1, 4)) should be (Array(1, 2, 3, 4)) + } + + "arrays contain duplicates" in { + SortedArrayOps.unionSorted(Array(2, 3), Array(1, 2, 4)) should be (Array(1, 2, 3, 4)) + } + } + } +}