Skip to content

Commit

Permalink
Merge pull request twitter#137 from AnishShah/issue136
Browse files Browse the repository at this point in the history
Remove the unnecessary "() =>" in `def iteratorSeq`
  • Loading branch information
pankajgupta committed Feb 1, 2015
2 parents 9d6bef8 + 78fe27d commit a29bad9
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import scala.collection.mutable
/**
* Construct an array based directed graph based on a set of edges. The graph can be constructed in
* the following ways based on the stored direction:
* 1. If OnlyIn or OnlyOut edges need to be kept, supply only those edges in the edges iteratorSeq
* 2. If BothInOut edges need to be kept, supply only the outgoing edges in the edges iteratorSeq
* 1. If OnlyIn or OnlyOut edges need to be kept, supply only those edges in the edges iterableSeq
* 2. If BothInOut edges need to be kept, supply only the outgoing edges in the edges iterableSeq
*/

/**
Expand All @@ -50,22 +50,22 @@ private case class NodesMaxIds(nodesInOnePart: Seq[Node],
maxIdInPart: Int, nodeWithOutEdgesMaxIdInPart: Int)

object ArrayBasedDirectedGraph {
def apply(iteratorSeq: Seq[() => Iterator[NodeIdEdgesMaxId]], parallelismLimit: Int,
def apply(iterableSeq: Seq[Iterable[NodeIdEdgesMaxId]], parallelismLimit: Int,
storedGraphDir: StoredGraphDir):
ArrayBasedDirectedGraph = {
val constructor = new ArrayBasedDirectedGraphConstructor(iteratorSeq, parallelismLimit, storedGraphDir)
val constructor = new ArrayBasedDirectedGraphConstructor(iterableSeq, parallelismLimit, storedGraphDir)
constructor()
}

@VisibleForTesting
def apply(iteratorFunc: () => Iterator[NodeIdEdgesMaxId],
storedGraphDir: StoredGraphDir): ArrayBasedDirectedGraph = apply(Seq(iteratorFunc), 1, storedGraphDir)
def apply(iterable: Iterable[NodeIdEdgesMaxId],
storedGraphDir: StoredGraphDir): ArrayBasedDirectedGraph = apply(Seq(iterable), 1, storedGraphDir)

/**
* Constructs array based directed graph
*/
private class ArrayBasedDirectedGraphConstructor(
iteratorSeq: Seq[() => Iterator[NodeIdEdgesMaxId]],
iterableSeq: Seq[Iterable[NodeIdEdgesMaxId]],
parallelismLimit: Int,
storedGraphDir: StoredGraphDir
) {
Expand All @@ -75,7 +75,7 @@ object ArrayBasedDirectedGraph {
private val futurePool = new BoundedFuturePool(FuturePool.unboundedPool, parallelismLimit)

/**
* Construct an array-based graph from an sequence of iterators over NodeIdEdgesMaxId
* Construct an array-based graph from an sequence of `NodeIdEdgesMaxId` iterables
* This function builds the array-based graph from a seq of nodes with out edges
* using the following steps:
* 0. read from file and construct a sequence of Nodes
Expand All @@ -90,7 +90,7 @@ object ArrayBasedDirectedGraph {
def apply(): ArrayBasedDirectedGraph = {

val result: Future[ArrayBasedDirectedGraph] = for {
(nodesOutEdges, maxNodeId, nodeWithOutEdgesMaxId) <- fillOutEdges(iteratorSeq, storedGraphDir)
(nodesOutEdges, maxNodeId, nodeWithOutEdgesMaxId) <- fillOutEdges(iterableSeq, storedGraphDir)
(table, nodeIdSet) <- markStoredNodes(nodesOutEdges, maxNodeId, storedGraphDir)
NodesWithNoOutEdgesAndGraphStats(nodesWithNoOutEdges, nodeWithOutEdgesCount, numEdges, numNodes) =
createNodesWithNoOutEdges(table, nodeIdSet, maxNodeId, storedGraphDir)
Expand All @@ -105,11 +105,11 @@ object ArrayBasedDirectedGraph {
}

/**
* Reads `iteratorSeq`'s edges, creates nodes and puts them in an `ArrayBuffer[Seq[Node]]`.
* Reads `iterableSeq`'s edges, creates nodes and puts them in an `ArrayBuffer[Seq[Node]]`.
* In every node only edges directly read from input are set.
* @return Future with read edges of type `Buffer[Seq[Node]]`, max node id and nodeWithOutEdgesMaxId
*/
private def fillOutEdges(iteratorSeq: Seq[() => Iterator[NodeIdEdgesMaxId]], storedGraphDir: StoredGraphDir):
private def fillOutEdges(iterableSeq: Seq[Iterable[NodeIdEdgesMaxId]], storedGraphDir: StoredGraphDir):
Future[(mutable.Buffer[Seq[Node]], Int, Int)] = {
log.debug("loading nodes and out edges from file in parallel")
val nodesOutEdges = new mutable.ArrayBuffer[Seq[Node]]
Expand All @@ -118,7 +118,7 @@ object ArrayBasedDirectedGraph {

val outEdges: Future[Seq[NodesMaxIds]] = statsReceiver.time(
"graph_dump_load_partial_nodes_and_out_edges_parallel") {
Future.collect(iteratorSeq.map(i => readOutEdges(i, storedGraphDir)))
Future.collect(iterableSeq.map(i => readOutEdges(i.iterator, storedGraphDir)))
}

outEdges.map {
Expand All @@ -133,9 +133,9 @@ object ArrayBasedDirectedGraph {
}

/**
* Reads out edges from iteratorFunc and returns `NodesMaxIds` object.
* Reads out edges from iterator and returns `NodesMaxIds` object.
*/
private def readOutEdges(iteratorFunc: () => Iterator[NodeIdEdgesMaxId], storedGraphDir: StoredGraphDir):
private def readOutEdges(iterator: Iterator[NodeIdEdgesMaxId], storedGraphDir: StoredGraphDir):
Future[NodesMaxIds] = futurePool {
statsReceiver.time("graph_load_read_out_edge_from_dump_files") {
val nodes = new mutable.ArrayBuffer[Node]
Expand All @@ -144,7 +144,6 @@ object ArrayBasedDirectedGraph {
var id = 0
var edgesLength = 0

val iterator = iteratorFunc()
iterator foreach { item =>
id = item.id
newMaxId = newMaxId max item.maxId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ class ArrayBasedDynamicDirectedGraph(val storedGraphDir: StoredGraphDir)

private var _nodeCount = 0

def this(dataIterator: Iterator[NodeIdEdgesMaxId],
def this(dataIterable: Iterable[NodeIdEdgesMaxId],
storedGraphDir: StoredGraphDir) {
this(storedGraphDir)
for (nodeData <- dataIterator) {
for (nodeData <- dataIterable.iterator) {
val id = nodeData.id
getOrCreateNode(id)
nodeData.edges map getOrCreateNode
Expand All @@ -44,9 +44,9 @@ class ArrayBasedDynamicDirectedGraph(val storedGraphDir: StoredGraphDir)
}
}

def this(iteratorSeq: Seq[() => Iterator[NodeIdEdgesMaxId]],
def this(iterableSeq: Seq[Iterable[NodeIdEdgesMaxId]],
storedGraphDir: StoredGraphDir) {
this((iteratorSeq.view flatMap { _()}).iterator, storedGraphDir)
this(iterableSeq.flatten , storedGraphDir)
}

/* Returns an option which is non-empty if outbound list for id is non-null. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object SharedArrayBasedDirectedGraph {
val emptyArray = Array.empty[Int]

/**
* Construct a shared array-based graph from a sequence of iterators over NodeIdEdgesMaxId.
* Construct a shared array-based graph from a sequence of NodeIdEdgesMaxId iterables.
* Eg each NodeIdEdgesMaxId could correspond to one graph dump file.
*
* This function builds the graph using similar steps as in ArrayBasedDirectedGraph.
Expand All @@ -41,12 +41,12 @@ object SharedArrayBasedDirectedGraph {
* an offset into this shared array. The avoid huge arrays, this edge array
* is also sharded based on node's id.
*
* @param iteratorSeq the sequence of nodes each with its own edges
* @param iterableSeq the sequence of nodes each with its own edges
* @param executorService the executor for parallel execution
* @param storedGraphDir the direction of the graph to be built
* @param numOfShards specifies the number of shards to use in creating shared array
*/
def apply(iteratorSeq: Seq[ () => Iterator[NodeIdEdgesMaxId] ], executorService: ExecutorService,
def apply(iterableSeq: Seq[Iterable[NodeIdEdgesMaxId]], executorService: ExecutorService,
storedGraphDir: StoredGraphDir, numOfShards: Int) = {

assert(numOfShards > 0)
Expand All @@ -66,10 +66,10 @@ object SharedArrayBasedDirectedGraph {
*/
log.info("read out num of edges and max id from files in parallel")
val futures = statsReceiver.time("graph_dump_read_out_num_of_edge_and_max_id_parallel") {
def readNumOfEdgesAndMaxId(iteratorFunc: () => Iterator[NodeIdEdgesMaxId]) =
def readNumOfEdgesAndMaxId(edgesIterable: Iterable[NodeIdEdgesMaxId]) =
statsReceiver.time("graph_load_read_out_edge_sizes_dump_files") {
var id, newMaxId, varNodeWithOutEdgesMaxId, numOfEdges, edgesLength, nodeCount = 0
val iteratorForEdgeSizes = iteratorFunc()
val iteratorForEdgeSizes = edgesIterable.iterator
iteratorForEdgeSizes foreach { item =>
id = item.id
newMaxId = newMaxId max item.maxId
Expand All @@ -82,8 +82,8 @@ object SharedArrayBasedDirectedGraph {
(newMaxId, varNodeWithOutEdgesMaxId, numOfEdges, nodeCount)
}

ExecutorUtils.parallelWork[() => Iterator[NodeIdEdgesMaxId], (Int, Int, Int, Int)](
executorService, iteratorSeq, readNumOfEdgesAndMaxId)
ExecutorUtils.parallelWork[Iterable[NodeIdEdgesMaxId], (Int, Int, Int, Int)](
executorService, iterableSeq, readNumOfEdgesAndMaxId)
}
futures.toArray map { future =>
val f = future.asInstanceOf[Future[(Int, Int, Int, Int)]]
Expand Down Expand Up @@ -112,12 +112,10 @@ object SharedArrayBasedDirectedGraph {
statsReceiver.time("graph_dump_load_partial_nodes_and_out_edges_parallel") {
var loadingCounter = new AtomicLong()
val outputMode = 10000
def readOutEdges(iteratorFunc: () => Iterator[NodeIdEdgesMaxId]) =
def readOutEdges(edgesIterable: Iterable[NodeIdEdgesMaxId]) =
statsReceiver.time("graph_load_read_out_edge_from_dump_files") {
var id, edgesLength, shardIdx, offset = 0

val iterator = iteratorFunc()

val iterator = edgesIterable.iterator
iterator foreach { item =>
id = item.id
nodeIdSet(id) = 1
Expand All @@ -135,8 +133,8 @@ object SharedArrayBasedDirectedGraph {
}
}

ExecutorUtils.parallelForEach[() => Iterator[NodeIdEdgesMaxId], Unit](
executorService, iteratorSeq, readOutEdges)
ExecutorUtils.parallelForEach[Iterable[NodeIdEdgesMaxId], Unit](
executorService, iterableSeq, readOutEdges)
}

log.info("Count total number of nodes")
Expand Down Expand Up @@ -229,8 +227,8 @@ object SharedArrayBasedDirectedGraph {
}

@VisibleForTesting
def apply( iteratorFunc: () => Iterator[NodeIdEdgesMaxId], storedGraphDir: StoredGraphDir):
SharedArrayBasedDirectedGraph = apply(Seq(iteratorFunc), MoreExecutors.sameThreadExecutor(),
def apply(iterable: Iterable[NodeIdEdgesMaxId], storedGraphDir: StoredGraphDir):
SharedArrayBasedDirectedGraph = apply(Seq(iterable), MoreExecutors.sameThreadExecutor(),
storedGraphDir, 1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,44 +62,44 @@ object TestGraphs {
TestNode(2, Nil, List(1))
)

def g3 = ArrayBasedDirectedGraph( () => Seq(
def g3 = ArrayBasedDirectedGraph(Seq(
NodeIdEdgesMaxId(10, Array(11, 12)),
NodeIdEdgesMaxId(11, Array(12)),
NodeIdEdgesMaxId(12, Array(11))
).iterator, StoredGraphDir.BothInOut)
), StoredGraphDir.BothInOut)

def g5 = ArrayBasedDirectedGraph( () => Seq(
def g5 = ArrayBasedDirectedGraph(Seq(
NodeIdEdgesMaxId(10, Array(11, 12, 13)),
NodeIdEdgesMaxId(11, Array(12)),
NodeIdEdgesMaxId(12, Array(11)),
NodeIdEdgesMaxId(13, Array(14)),
NodeIdEdgesMaxId(14, Array())
).iterator, StoredGraphDir.BothInOut)
), StoredGraphDir.BothInOut)

val nodeSeqIterator = () => Seq(
val nodeSeqIterator = Seq(
NodeIdEdgesMaxId(10, Array(11, 12, 13)),
NodeIdEdgesMaxId(11, Array(12, 14)),
NodeIdEdgesMaxId(12, Array(14)),
NodeIdEdgesMaxId(13, Array(12, 14)),
NodeIdEdgesMaxId(14, Array(15)),
NodeIdEdgesMaxId(15, Array(10, 11))
).iterator
)

// using testGraph becomes onerous for non-trivial graphs
def g6 = ArrayBasedDirectedGraph(nodeSeqIterator, StoredGraphDir.BothInOut)

def g6_onlyout = ArrayBasedDirectedGraph(nodeSeqIterator, StoredGraphDir.OnlyOut)
def g6_onlyin = ArrayBasedDirectedGraph(nodeSeqIterator, StoredGraphDir.OnlyIn)

val nodeSeqIterator2 = () => Seq(
val nodeSeqIterator2 = Seq(
NodeIdEdgesMaxId(10, Array(11, 12, 13)),
NodeIdEdgesMaxId(11, Array(10, 13, 14)),
NodeIdEdgesMaxId(12, Array(13, 14)),
NodeIdEdgesMaxId(13, Array(12, 14)),
NodeIdEdgesMaxId(14, Array(10, 11, 15)),
NodeIdEdgesMaxId(15, Array(10, 11, 16)),
NodeIdEdgesMaxId(16, Array(15))
).iterator
)
def g7_onlyout = ArrayBasedDirectedGraph(nodeSeqIterator2, StoredGraphDir.OnlyOut)
def g7_onlyin = ArrayBasedDirectedGraph(nodeSeqIterator2, StoredGraphDir.OnlyIn)

Expand Down Expand Up @@ -145,7 +145,7 @@ object TestGraphs {
val edgesFromSource = positiveBits map (x => if (x < source) x else x + 1)
nodes(source) = NodeIdEdgesMaxId(source, edgesFromSource)
}
ArrayBasedDirectedGraph( () => nodes.iterator, graphDir)
ArrayBasedDirectedGraph(nodes, graphDir)
}

/**
Expand Down Expand Up @@ -179,6 +179,6 @@ object TestGraphs {
val nodesEdges = nodes.indices map { i =>
NodeIdEdgesMaxId(i, nodes(i).asScala.toArray)
}
ArrayBasedDirectedGraph( () => nodesEdges.iterator, graphDir)
ArrayBasedDirectedGraph(nodesEdges, graphDir)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,26 +73,36 @@ class AdjacencyListGraphReader[T] (
* @param filename Name of file to read from
*/
private class OneShardReader(filename: String, nodeNumberer: NodeNumberer[T])
extends Iterator[NodeIdEdgesMaxId] {
extends Iterable[NodeIdEdgesMaxId] {

private val outEdgePattern = ("""^(\w+)""" + separator + """(\d+)""").r
var lastLineParsed = 0
private val lines = Source.fromFile(filename).getLines()
.map{x => {lastLineParsed += 1; x}}
private val holder = NodeIdEdgesMaxId(-1, null, -1)

override def hasNext: Boolean = lines.hasNext
override def iterator = new Iterator[NodeIdEdgesMaxId] {

override def next(): NodeIdEdgesMaxId = {
var i = 0
try {
val outEdgePattern(id, outEdgeCount) = lines.next().trim
val outEdgeCountInt = outEdgeCount.toInt
val externalNodeId = idReader(id)
val internalNodeId = nodeNumberer.externalToInternal(externalNodeId)
var lastLineParsed = 0
private val src = Source.fromFile(filename)
private val lines = src.getLines()
.map{x => {lastLineParsed += 1; x}}
private val holder = NodeIdEdgesMaxId(-1, null, -1)

var newMaxId = internalNodeId
val outEdgesArr = new Array[Int](outEdgeCountInt)
override def hasNext: Boolean = {
val isNotLastLine = lines.hasNext
if (!isNotLastLine) {
src.close()
}
isNotLastLine
}

override def next(): NodeIdEdgesMaxId = {
var i = 0
try {
val outEdgePattern(id, outEdgeCount) = lines.next().trim
val outEdgeCountInt = outEdgeCount.toInt
val externalNodeId = idReader(id)
val internalNodeId = nodeNumberer.externalToInternal(externalNodeId)

var newMaxId = internalNodeId
val outEdgesArr = new Array[Int](outEdgeCountInt)
while (i < outEdgeCountInt) {
val externalNghId = idReader(lines.next().trim)
val internalNghId = nodeNumberer.externalToInternal(externalNghId)
Expand All @@ -105,15 +115,16 @@ class AdjacencyListGraphReader[T] (
holder.edges = outEdgesArr
holder.maxId = newMaxId
holder
} catch {
case NonFatal(exc) =>
throw new IOException("Parsing failed near line: %d in %s"
.format(lastLineParsed, filename), exc)
} catch {
case NonFatal(exc) =>
throw new IOException("Parsing failed near line: %d in %s"
.format(lastLineParsed, filename), exc)
}
}
}
}

def oneShardReader(filename : String) : Iterator[NodeIdEdgesMaxId] = {
def oneShardReader(filename : String) : Iterable[NodeIdEdgesMaxId] = {
new OneShardReader(filename, nodeNumberer)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ import java.util.concurrent.ExecutorService
*/
trait GraphReader[T] {
/**
* Should return a sequence of iterators over `NodeIdEdgesMaxId` objects
* Should return a sequence of `NodeIdEdgesMaxId` iterables
*/
def iteratorSeq: Seq[() => Iterator[NodeIdEdgesMaxId]]
def iterableSeq: Seq[Iterable[NodeIdEdgesMaxId]]

/**
* Define node numberer
Expand All @@ -62,7 +62,7 @@ trait GraphReader[T] {
* Create an `ArrayBasedDirectedGraph`
*/
def toArrayBasedDirectedGraph() = {
ArrayBasedDirectedGraph(iteratorSeq, parallelismLimit, storedGraphDir)
ArrayBasedDirectedGraph(iterableSeq, parallelismLimit, storedGraphDir)
}

/**
Expand All @@ -71,13 +71,13 @@ trait GraphReader[T] {
* 128 is an arbitrary default
*/
def toSharedArrayBasedDirectedGraph(numShards: Int = 128) = {
SharedArrayBasedDirectedGraph(iteratorSeq, executorService, storedGraphDir, numShards)
SharedArrayBasedDirectedGraph(iterableSeq, executorService, storedGraphDir, numShards)
}

/**
* Create an `ArrayBasedDynamicDirectedGraph`
*/
def toArrayBasedDynamicDirectedGraph() = {
new ArrayBasedDynamicDirectedGraph(iteratorSeq, storedGraphDir)
new ArrayBasedDynamicDirectedGraph(iterableSeq, storedGraphDir)
}
}

0 comments on commit a29bad9

Please sign in to comment.