Skip to content

Commit

Permalink
Merge pull request #224 from yb1/gz
Browse files Browse the repository at this point in the history
GraphReaders read Gzip files
  • Loading branch information
pankajgupta authored Nov 1, 2016
2 parents ac6b93d + 7396361 commit 09d695a
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ object PerformanceBenchmark extends GzipGraphDownloader {
val getNodeFlag = flags("gn", 0, "run getNodeById benchmark with a given number of steps")
val reps = flags("reps", DEFAULT_REPS, "number of times to run benchmark")
val adjacencyList = flags("a", false, "graph in adjacency list format")
val gzipFlag = flags("gz", false, "Is file in gzip format")
val dirOutFlag = flags("out", true, "Graph direction: out")
val dirInFlag = flags("in", true, "Graph direction: in")


def cacheRemoteFile(url: String): (String, String) = {
printf("Downloading remote file from %s\n", url)
Expand All @@ -105,12 +109,19 @@ object PerformanceBenchmark extends GzipGraphDownloader {

def readGraph(path: String, filename: String, adjacencyList: Boolean): DirectedGraph[Node] = {
if (adjacencyList) {
AdjacencyListGraphReader.forIntIds(path, filename).toArrayBasedDirectedGraph()
AdjacencyListGraphReader.forIntIds(path, filename, isGzip = gzipFlag()).toArrayBasedDirectedGraph()
} else {
val sep = separatorInt().toChar
val graphDir = if (dirInFlag() && dirOutFlag())
StoredGraphDir.BothInOut
else if (dirInFlag())
StoredGraphDir.OnlyIn
else
StoredGraphDir.OnlyOut

printf("Using Character (%d in Int) as separator\n", sep.toInt)
ListOfEdgesGraphReader.forIntIds(path, filename, graphDir = StoredGraphDir.BothInOut,
separator = sep).toSharedArrayBasedDirectedGraph(forceSparseRepr = None)
ListOfEdgesGraphReader.forIntIds(path, filename, graphDir = graphDir,
separator = sep, isGzip = gzipFlag()).toSharedArrayBasedDirectedGraph(forceSparseRepr = None)
//separator = sep).toArrayBasedDirectedGraph(neighborsSortingStrategy = LeaveUnsorted,
// forceSparseRepr = None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
*/
package com.twitter.cassovary.util.io

import com.twitter.cassovary.graph.{StoredGraphDir, NodeIdEdgesMaxId}
import com.twitter.cassovary.graph.{NodeIdEdgesMaxId, StoredGraphDir}
import com.twitter.cassovary.graph.StoredGraphDir._
import com.twitter.cassovary.util.{ParseString, NodeNumberer}
import com.twitter.cassovary.util.{NodeNumberer, ParseString}
import com.twitter.util.NonFatal
import java.io.IOException
import java.io.{BufferedInputStream, FileInputStream, IOException}
import java.util.zip.GZIPInputStream
import scala.io.Source

/**
Expand Down Expand Up @@ -60,13 +61,11 @@ class AdjacencyListGraphReader[T] (
val directory: String,
override val prefixFileNames: String = "",
val nodeNumberer: NodeNumberer[T],
idReader: (String => T)
idReader: (String => T),
isGzip: Boolean = false,
separator: Char = ' '
) extends GraphReaderFromDirectory[T] {

/**
* Separator between node ids forming edge.
*/
protected val separator = " "

/**
* Read in nodes and edges from a single file
Expand All @@ -75,47 +74,35 @@ class AdjacencyListGraphReader[T] (
private class OneShardReader(filename: String, nodeNumberer: NodeNumberer[T])
extends Iterable[NodeIdEdgesMaxId] {

private val outEdgePattern = ("""^(\w+)""" + separator + """(\d+)""").r
def adjacencyNodeIdsIterator(): Iterator[(Int, Option[Int])] = {
new AdjacencyTsFileReader[T](filename, separator, idReader) map { case (nodeId, count) =>
val internalNodeId = nodeNumberer.externalToInternal(nodeId)
(internalNodeId, count)
}
}

override def iterator = new Iterator[NodeIdEdgesMaxId] {

var lastLineParsed = 0
private val src = Source.fromFile(filename)
private val lines = src.getLines()
.map{x => {lastLineParsed += 1; x}}
private val nodeIds = adjacencyNodeIdsIterator()

override def hasNext: Boolean = {
val isNotLastLine = lines.hasNext
if (!isNotLastLine) {
src.close()
}
isNotLastLine
}
override def hasNext: Boolean = nodeIds.hasNext

override def next(): NodeIdEdgesMaxId = {
var i = 0
try {
val outEdgePattern(id, outEdgeCount) = lines.next().trim
val outEdgeCountInt = outEdgeCount.toInt
val externalNodeId = idReader(id)
val internalNodeId = nodeNumberer.externalToInternal(externalNodeId)
val (id, outEdgeCountIntOpt) = nodeIds.next()
val internalNodeId = id

var newMaxId = internalNodeId
val outEdgesArr = new Array[Int](outEdgeCountInt)
while (i < outEdgeCountInt) {
val externalNghId = idReader(lines.next().trim)
val internalNghId = nodeNumberer.externalToInternal(externalNghId)
newMaxId = newMaxId max internalNghId
outEdgesArr(i) = internalNghId
i += 1
}

NodeIdEdgesMaxId(internalNodeId, outEdgesArr, newMaxId)
} catch {
case NonFatal(exc) =>
throw new IOException("Parsing failed near line: %d in %s"
.format(lastLineParsed, filename), exc)
var newMaxId = internalNodeId
val outEdgeCountInt = outEdgeCountIntOpt.get
val outEdgesArr = new Array[Int](outEdgeCountInt)
while (i < outEdgeCountInt) {
val (internalNghId, tmp) = nodeIds.next()
newMaxId = newMaxId max internalNghId
outEdgesArr(i) = internalNghId
i += 1
}

NodeIdEdgesMaxId(internalNodeId, outEdgesArr, newMaxId)
}
}
}
Expand All @@ -127,16 +114,16 @@ class AdjacencyListGraphReader[T] (
// note that we are assuming that n.id.toString does the right thing, which is
// true for int and long ids but might not be for a general T.
def reverseParseNode(n: NodeIdEdgesMaxId): String = {
n.id + separator + n.edges.length + "\n" + n.edges.mkString("\n") + "\n"
n.id + separator.toString + n.edges.length + "\n" + n.edges.mkString("\n") + "\n"
}

}

object AdjacencyListGraphReader {
def forIntIds(directory: String, prefixFileNames: String = "",
nodeNumberer: NodeNumberer[Int] = new NodeNumberer.IntIdentity(),
graphDir: StoredGraphDir = StoredGraphDir.OnlyOut) =
new AdjacencyListGraphReader[Int](directory, prefixFileNames, nodeNumberer, ParseString.toInt) {
graphDir: StoredGraphDir = StoredGraphDir.OnlyOut, isGzip: Boolean = false, separator: Char = ' ') =
new AdjacencyListGraphReader[Int](directory, prefixFileNames, nodeNumberer, ParseString.toInt, isGzip = isGzip) {
override def storedGraphDir: StoredGraphDir = graphDir
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,26 @@ import java.io.IOException

import com.twitter.logging.Logger
import com.twitter.util.NonFatal

import java.io.FileInputStream
import java.io.BufferedInputStream
import java.util.zip.GZIPInputStream
import scala.io.Source

abstract class FileReader[T](fileName: String) extends Iterator[T] {

abstract class FileReader[T](fileName: String, isGzip: Boolean = false) extends Iterator[T] {
protected val log = Logger.get()
protected var lastLineParsed = 0
log.info("Starting reading from file %s...\n", fileName)
private val lines = Source.fromFile(fileName).getLines().map { x =>
private val source = if (isGzip)
Source.fromInputStream(new GZIPInputStream(new BufferedInputStream(new FileInputStream(fileName))), "ISO-8859-1")
else
Source.fromFile(fileName)

private val lines = source.getLines().map { x =>
lastLineParsed += 1
x
}

private var _next: Option[T] = checkNext()

def hasNext: Boolean = _next.isDefined
Expand Down Expand Up @@ -53,16 +62,16 @@ abstract class FileReader[T](fileName: String) extends Iterator[T] {
}

def close(): Unit = {
Source.fromFile(fileName).close()
source.close()
}

}

// T is the id type, typically Int or Long or String
class TwoTsFileReader[T](fileName: String,
separator: Char,
idReader: (String, Int, Int) => T)
extends FileReader[(T, T)](fileName) {
idReader: (String, Int, Int) => T, isGzip: Boolean = false)
extends FileReader[(T, T)](fileName, isGzip) {

def processOneLine(line: String): (T, T) = {
val i = line.indexOf(separator)
Expand All @@ -76,4 +85,25 @@ class TwoTsFileReader[T](fileName: String,
(source, dest)
}

}

// Here, T is either the source or dest id (typically Int or Long or String)
// and "Int" represents the count of adjacency.
class AdjacencyTsFileReader[T](fileName: String,
separator: Char,
idReader: (String => T), isGzip: Boolean = false)
extends FileReader[(T, Option[Int])](fileName, isGzip) {

def processOneLine(line: String): (T, Option[Int]) = {
val i = line.indexOf(separator)
if (i == -1) {
val dest = idReader(line)
(dest, None)
} else {
val source = idReader(line.substring(0, i))
val count = line.substring(i + 1, line.length).toInt
(source, Some(count))
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,15 @@ class ListOfEdgesGraphReader[T](
removeDuplicates: Boolean = false,
sortNeighbors: Boolean = false,
separator: Char = ' ',
sortedBySourceId: Boolean = true
sortedBySourceId: Boolean = true,
isGzip: Boolean = false
) extends GraphReaderFromDirectory[T] {

private class OneShardReader(filename: String, nodeNumberer: NodeNumberer[T])
extends Iterable[NodeIdEdgesMaxId] {

def twoNodeIdsIterator(): Iterator[(Int, Int)] = {
new TwoTsFileReader[T](filename, separator, idReader) map { case (source, dest) =>
new TwoTsFileReader[T](filename, separator, idReader, isGzip) map { case (source, dest) =>
val internalFromId = nodeNumberer.externalToInternal(source)
val internalToId = nodeNumberer.externalToInternal(dest)
(internalFromId, internalToId)
Expand Down Expand Up @@ -215,10 +216,11 @@ object ListOfEdgesGraphReader {
removeDuplicates: Boolean = false,
sortNeighbors: Boolean = false,
separator: Char = ' ',
graphDir: StoredGraphDir = StoredGraphDir.OnlyOut) =
graphDir: StoredGraphDir = StoredGraphDir.OnlyOut,
isGzip: Boolean = false) =
new ListOfEdgesGraphReader[Int](directory, prefixFileNames,
new NodeNumberer.IntIdentity(), ParseString.toInt, removeDuplicates,
sortNeighbors, separator) {
sortNeighbors, separator, isGzip = isGzip) {
override def storedGraphDir: StoredGraphDir = graphDir
}
}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.twitter.cassovary.util.io

import java.io.{BufferedInputStream, FileInputStream}
import java.util.zip.GZIPInputStream

import com.twitter.cassovary.util.ParseString
import org.scalatest.{Matchers, WordSpec}

Expand All @@ -9,8 +12,12 @@ class FileReaderSpec extends WordSpec with Matchers {

private val directory: String = "cassovary-core/src/test/resources/graphs/"

def allLines(fname: String): List[String] = {
Source.fromFile(fname).getLines().toList.filter(!_.startsWith("#"))
def allLines(fname: String, isGzip : Boolean = false): List[String] = {
val source = if (isGzip)
Source.fromInputStream(new GZIPInputStream(new BufferedInputStream(new FileInputStream(fname))), "ISO-8859-1")
else
Source.fromFile(fname)
source.getLines().toList.filter(!_.startsWith("#"))
}

def checkStrings(fileName: String) {
Expand All @@ -23,10 +30,10 @@ class FileReaderSpec extends WordSpec with Matchers {
expectedLines shouldEqual actualLines
}

def checkPair[T](fileName: String, separator: Char = ' ', idReader: (String, Int, Int) => T) {
def checkPair[T](fileName: String, separator: Char = ' ', idReader: (String, Int, Int) => T, isGzip: Boolean = false) {
val pathName = directory + fileName
val actualLines = new TwoTsFileReader[Int](pathName, separator, ParseString.toInt).toList
val expectedLines = allLines(pathName).map { l =>
val actualLines = new TwoTsFileReader[Int](pathName, separator, ParseString.toInt, isGzip).toList
val expectedLines = allLines(pathName, isGzip).map { l =>
val arr = l.split(separator).map(_.toInt)
(arr(0), arr(1))
}
Expand All @@ -41,6 +48,32 @@ class FileReaderSpec extends WordSpec with Matchers {
checkPair[String](fileName, separator, ParseString.identity)
}

def checkGzipIntPair(fileName: String, separator: Char = ' '): Unit = {
checkPair[Int](fileName, separator, ParseString.toInt, true)
}

def checkAdjacency[T](fileName: String, separator: Char = ' ', idReader: (String, Int, Int) => T, isGzip: Boolean = false) {
val pathName = directory + fileName
val actualLines = new AdjacencyTsFileReader[Int](pathName, separator, ParseString.toInt, isGzip).toList
val expectedLines = allLines(pathName, isGzip).map { l =>
val arr = l.split(separator).map(_.toInt)
if (arr.length > 1) {
(arr(0), Some(arr(1)))
} else {
(arr(0), None)
}
}
expectedLines shouldEqual actualLines
}

def checkIntAdjacency(fileName: String, separator: Char = ' '): Unit = {
checkAdjacency[Int](fileName, separator, ParseString.toInt)
}

def checkGzipIntAdjacency(fileName: String, separator: Char = ' '): Unit = {
checkAdjacency[Int](fileName, separator, ParseString.toInt, true)
}

"FileReader" when {
"using strings" should {
checkStrings("toy_3nodes.txt")
Expand All @@ -53,6 +86,16 @@ class FileReaderSpec extends WordSpec with Matchers {
checkIntPair("toy_6nodelabels_label1_int.txt")
checkStringPair("toy_list5edges.txt")
}

"gzip test" should {
checkGzipIntPair("gzip_toy_list5edges.txt.gz")
checkGzipIntAdjacency("gzip_toy_6nodes_adj_1.txt.gz");
}

"adjacencyTsFileReader test" should {
checkIntAdjacency("toy_6nodes_adj_1.txt")
checkIntAdjacency("toy_6nodes_adj_2.txt")
}
}

}

0 comments on commit 09d695a

Please sign in to comment.