Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Spark support #1045

Closed
mbroecheler opened this issue Apr 27, 2015 · 12 comments
Closed

Initial Spark support #1045

mbroecheler opened this issue Apr 27, 2015 · 12 comments
Assignees
Milestone

Comments

@mbroecheler
Copy link
Member

as a first step toward #1021, it seems we can get Spark support in Titan by simply reusing the existing InputFormats for Hadoop in Spark.
While this may not be the most efficient way to go about this, it would provide us with an easy first integration opportunity to investigate Spark support and get some feedback in the Titan 0.9M2 release.

It is unclear as to what exactly is needed here, but @okram and @dkuppitz can help.

@mbroecheler mbroecheler added this to the Titan0.9 milestone Apr 27, 2015
@dalaro
Copy link
Member

dalaro commented Apr 28, 2015

Here's an old repo that might be tangentially useful on the input side. It's the simplest possible proof-of-concept that I could concoct that wrapped a Titan 0.5 inputformat in a Spark RDD: https://github.com/dalaro/titan-spark-test. Haven't touched it in a couple of months though.

@mbroecheler
Copy link
Member Author

From what I gathered this wouldn't even be necessary since all we need is the Hadoop InputFormat, right @okram.

@okram
Copy link
Contributor

okram commented Apr 30, 2015

I wrote an email to you guys, but here it is for issue record:

  1. Reading from Titan will be easy as that is simply setting the InputFormat to be TitanCassandraInputFormat in your Hadoop properties file --- along with the various properties that Titan requires.
  2. Writing to Titan is a different story. TitanCassandraOutputFormat will not work as that was something very specific to Faunus where the OutputFormat could contain any number of MapReduce jobs. We are no longer doing that as that is particular to Titan. Instead, we will have to use BulkLoaderVertexProgram. That hasn't been written yet (though I think Dan still might have a version that he and I wrote for TP3/Titan some time ago that is Titan specific).

@dalaro
Copy link
Member

dalaro commented May 1, 2015

Reading is indeed pretty simple. I ran into a thorny classpath conflict around Netty that affects Spark, but I made some temporary hacks to get it working, though I need to return to this to see what else is affected by the conflict.

Anyway, with Netty classpath conflicts out of the way, I could read from Titan-Cassandra after preloading GotG through OLTP:

gremlin> graph = GraphFactory.open('hg.prop')
==>hadoopgraph[cassandrainputformat->gryooutputformat]
gremlin> g = graph.traversal(computer(SparkGraphComputer))
==>graphtraversalsource[hadoopgraph[cassandrainputformat->gryooutputformat], sparkgraphcomputer]
gremlin> g.V().count()
... (spark progress messages) ...
==>12                                                                           
gremlin> g.V().valueMap()
... (spark progress messages) ...
==>[name:[hercules], age:[30]]                                                  
==>[name:[cerberus]]
==>[name:[saturn], age:[10000]]
==>[name:[sea]]
==>[name:[pluto], age:[4000]]
==>[name:[jupiter], age:[5000]]
==>[name:[nemean]]
==>[name:[hydra]]
==>[name:[neptune], age:[4500]]
==>[name:[sky]]
==>[name:[alcmene], age:[45]]
==>[name:[tartarus]]
# hg.prop
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=com.thinkaurelius.titan.hadoop.formats.cassandra.CassandraInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat
gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
gremlin.hadoop.deriveMemory=false
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output
input.conf.storage.backend=cassandra
input.conf.storage.hostname=localhost
#####################################
# GiraphGraphComputer Configuration #
#####################################
giraph.minWorkers=2
giraph.maxWorkers=2
giraph.useOutOfCoreGraph=true
giraph.useOutOfCoreMessages=true
mapred.map.child.java.opts=-Xmx1024m
mapred.reduce.child.java.opts=-Xmx1024m
giraph.numInputThreads=4
giraph.numComputeThreads=4
giraph.maxMessagesInMemory=100000
####################################
# SparkGraphComputer Configuration #
####################################
spark.master=local[4]
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner

I'm fuzzy on how the write side will work. I suspect the worst case is probably something like writing Gryo from Spark to disk/HDFS and then executing a separate BulkLoaderVertexProgram computation that reads the Gryo files. The BulkLoader Marko and I worked on is still in titan09, though I haven't touched it in a while.

@dalaro
Copy link
Member

dalaro commented May 1, 2015

I also just noticed that vertices without relations do not appear in Spark count or valueMap output. This could be due to CassandraInputFormat (i.e. the Titan bits involved here). I'm not sure at the moment.

@okram
Copy link
Contributor

okram commented May 1, 2015

@dalaro -- what do you mean "vertices without relations." You mean edge-less vertices? I don't have any test cases that test with edge-less vertices so perhaps its a Spark thing...... ?? eek. We may have to create a new @LoadGraphData with a toy graph that has "corner case topologies": edge-less vertices, self-loops, zero property knows-edges and single property knows-edges, etc. cc/ @spmallette

@dalaro
Copy link
Member

dalaro commented May 1, 2015

@okram right, g.addVertex() ; g.tx().commit().

At its storage level, Titan doesn't really allow for a relationless vertex. There's a hidden system relation on every extant vertex, even if a vertex hase no user-visible relations. I suspect that I'm not translating that into a TP3 compatible analog in the inputformat (if one exists in TP3).

@okram
Copy link
Contributor

okram commented May 1, 2015

An edge-less vertex is simply a vertex with v.edges(BOTH).hasNext() == false. It is nothing "special" in TP3. Is there a way for you to test edge-less-ness at the TitanXXXInputFormat level? That is, without SparkGraphComputer. In other words, just get a RecordReader from your InputFormat and iterate to see if edge-less vertices come back. See the following "file-based" InputFormat test helper for inspiration.

https://github.com/apache/incubator-tinkerpop/blob/master/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/TestFileReaderWriterHelper.java

@dalaro
Copy link
Member

dalaro commented May 2, 2015

Working with Marko, I did the following on this issue today:

  • ran BulkLoaderVertexProgram on Giraph, loading Grateful Dead from Gryo into Titan-Cassandra. BLVP needed minor changes to catch up with the new Kryo default registration list (Long[] -> long[]; no nulls to consider here).
  • ran BulkLoaderVertexProgram on Spark, loading Grateful Dead from Gryo into Titan-Cassandra. Ran into an odd config handling difference between GiraphGraphComputer and SparkGraphComputer, but there's a workaround that requires no TP3 changes (though the fact that I needed a workaround compared with GiraphGC could indicate a bug in SparkGC).
  • wrote a new Hadoop 1 OutputFormat (named "TitanH1OutputFormat" right now, subject to change) that can only write vertex properties.

This gives us two ways to write to Titan from GraphComputers:

  • For VertexPrograms that just want to write properties to existing vertices, such as PageRankVertexProgram, TitanH1OutputFormat can do that without any additional iterations or mapreduces.
  • For bulk loading, we have BulkLoaderVertexProgram, which can be fed from an arbitrary TP3-supported input source. For instance, workload that needs to add edges and vertices could first write to Gryo, then a separate BLVP computation could read the Gryo files and write to Titan.

This stuff is still pretty raw and experimental. Known problems:

  • The old org.jboss.netty seems to conflict somehow with the new io.netty under Spark. I have to delete the org.jboss.netty to get Spark to run. I'll look into that, since I'm concerned that's just breaking something else.
  • The config handling is ugly: BulkLoaderVertexProgram and TitanH1OutputFormat use different config key prefixes their respective Titan configs for no real reason, and TitanH1OutputFormat's keys are just prefixed with "input.conf.", which is bad in so many ways.
  • I think setting GraphComputer.ResultGraph to ORIGINAL on BLVP by default is wrong. It should be NEW.
  • Sometimes the future returned by GiraphGC running BLVP seems to block indefinitely, even after the bulk load has successfully completed. Opening StandardTitanGraph in another shell shows the data all there, but the future is stuck. I haven't seen this when doing the same BLVP on SparkGC yet.
  • I need to call InputOutputHelper.registerInputOutputPair to avoid a cosmetic NPE in at least one case
  • SparkGC and GiraphGC seem to process BulkLoaderVertexProgram's config differently; I have to use different steps to get the same results on each framework

But this is just the proof-of-concept stage. Here's what I can do now (b4e62ef). Feedback in general and in particular from @dkuppitz would be welcome.

  1. Store Spark-computed PageRank on Titan vertices using the OutputFormat

    # Load GotG into Titan-Cassandra, run PageRank, save results using TitanH1OutputFormat
    # (Shell commands below)
    bin/titan.sh stop
    mvn clean install -DskipTests=true
    rm lib/netty-3.2.7.Final.jar
    bin/titan.sh start
    bin/gremlin.sh <<EOF
    t = TitanFactory.open('conf/titan-cassandra-es.properties')
    GraphOfTheGodsFactory.load(t)
    
    // Pre-create pagerank property keys to avoid lock contention in the outputformat
    m = t.openManagement()
    m.makePropertyKey('gremlin.pageRankVertexProgram.pageRank').dataType(Double.class).make()
    m.makePropertyKey('gremlin.pageRankVertexProgram.edgeCount').dataType(Double.class).make()
    m.commit()
    t.close()
    
    // Temporary hack -- avoids an annoying but harmless NPE in future.get() below
    InputOutputHelper.registerInputOutputPair(FileInputFormat.class, com.thinkaurelius.titan.hadoop.formats.TitanH1OutputFormat.class)
    
    // Run PageRank on Spark and write the element compute keys to the respective Titan vertices
    hadoopGraph = GraphFactory.open('hg.prop')
    future = hadoopGraph.compute(SparkGraphComputer.class).program(new PageRankVertexProgram()).submit()
    future.get()
    
    // Dump vertices in OLTP, showing the just-added PR props
    t = TitanFactory.open('conf/titan-cassandra-es.properties')
    t.traversal().V().valueMap()
    EOF

    Here is the hg.prop file referenced above:

    gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
    gremlin.hadoop.graphInputFormat=com.thinkaurelius.titan.hadoop.formats.cassandra.CassandraInputFormat
    gremlin.hadoop.graphOutputFormat=com.thinkaurelius.titan.hadoop.formats.TitanH1OutputFormat
    gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
    gremlin.hadoop.deriveMemory=false
    gremlin.hadoop.jarsInDistributedCache=true
    gremlin.hadoop.inputLocation=none
    gremlin.hadoop.outputLocation=output
    input.conf.storage.backend=cassandrathrift
    input.conf.storage.hostname=localhost
    #####################################
    # GiraphGraphComputer Configuration #
    #####################################
    giraph.minWorkers=2
    giraph.maxWorkers=2
    giraph.useOutOfCoreGraph=true
    giraph.useOutOfCoreMessages=true
    mapred.map.child.java.opts=-Xmx1024m
    mapred.reduce.child.java.opts=-Xmx1024m
    giraph.numInputThreads=4
    giraph.numComputeThreads=4
    giraph.maxMessagesInMemory=100000
    ####################################
    # SparkGraphComputer Configuration #
    ####################################
    spark.master=local[4]
    spark.executor.memory=1g
    spark.serializer=org.apache.spark.serializer.KryoSerializer
    cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
    

    The last traversal should print something like this, irrespective of line order:

    ==>[gremlin.pageRankVertexProgram.pageRank:[0.23864803741939838], name:[cerberus], gremlin.pageRankVertexProgram.edgeCount:[1.0]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.2295501250550773], name:[sea], gremlin.pageRankVertexProgram.edgeCount:[0.0]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.21761710958434682], name:[sky], gremlin.pageRankVertexProgram.edgeCount:[0.0]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.17550000000000002], name:[nemean], gremlin.pageRankVertexProgram.edgeCount:[0.0]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.29716723463942407], name:[pluto], gremlin.pageRankVertexProgram.edgeCount:[4.0], age:[4000]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.17550000000000002], name:[hydra], gremlin.pageRankVertexProgram.edgeCount:[0.0]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.15000000000000002], name:[hercules], gremlin.pageRankVertexProgram.edgeCount:[5.0], age:[30]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.17550000000000002], name:[alcmene], gremlin.pageRankVertexProgram.edgeCount:[0.0], age:[45]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.31819816247447563], name:[jupiter], gremlin.pageRankVertexProgram.edgeCount:[4.0], age:[5000]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.2807651470037452], name:[neptune], gremlin.pageRankVertexProgram.edgeCount:[3.0], age:[4500]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.41599886933191116], name:[tartarus], gremlin.pageRankVertexProgram.edgeCount:[0.0]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.21761710958434682], name:[saturn], gremlin.pageRankVertexProgram.edgeCount:[0.0], age:[10000]]
    
  2. Load Grateful Dead into Titan-Cassandra on Spark using BulkLoaderVertexProgram

    # Load Grateful Dead into Titan-Cassandra with Spark
    # (Shell commands below)
    bin/titan.sh stop
    mvn clean install -DskipTests=true
    rm lib/netty-3.2.7.Final.jar
    bin/titan.sh start
    bin/gremlin.sh <<EOF
    // Spark
    graph = GraphFactory.open('blvp.prop')
    
    // Doesn't work on Spark; debugging shows the VP config is empty at runtime
    // future = graph.compute(SparkGraphComputer.class).program(new BulkLoaderVertexProgram()).result(GraphComputer.ResultGraph.NEW).persist(GraphComputer.Persist.EDGES).submit()
    
    apacheGraphConf = new org.apache.commons.configuration.BaseConfiguration()
    apacheGraphConf.setProperty('storage.backend', 'cassandrathrift')
    future = graph.compute(SparkGraphComputer.class).program(new BulkLoaderVertexProgram().useGraphConfig(apacheGraphConf)).result(GraphComputer.ResultGraph.NEW).persist(GraphComputer.Persist.EDGES).submit()
    
    future.get()
    
    t = TitanFactory.open('conf/titan-cassandra-es.properties')
    t.traversal().V().valueMap()
    t.close()
    EOF

    The valueMap() should print familiar Grateful Dead stuff, such as:

    ==>[name:[Tampa_Red]]
    ==>[name:[Lesh_Hart_Kreutzmann]]
    ==>[name:[Garcia]]
    ==>[name:[THE FROZEN LOGGER], songType:[cover], performances:[6]]
    ==>[name:[TASTEBUD], songType:[original], performances:[1]]
    ==>[name:[ROCKIN PNEUMONIA], songType:[], performances:[0]]
    ==>[name:[Greenwich_Barry_Spector]]
    ==>[name:[JAM], songType:[original], performances:[24]]
    [etc...]
    

    Here's blvp.prop referenced above:

    gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
    gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
    gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat
    gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
    gremlin.hadoop.deriveMemory=false
    gremlin.hadoop.jarsInDistributedCache=true
    gremlin.hadoop.inputLocation=data/grateful-dead-vertices.gio
    gremlin.hadoop.outputLocation=output
    input.conf.storage.backend=cassandrathrift
    input.conf.storage.hostname=localhost
    titan.bulkload.graphconfig.storage.backend=cassandrathrift
    titan.bulkload.graphconfig.storage.hostname=localhost
    #####################################
    # GiraphGraphComputer Configuration #
    #####################################
    giraph.minWorkers=1
    giraph.maxWorkers=1
    giraph.SplitMasterWorker=false
    giraph.useOutOfCoreGraph=true
    giraph.useOutOfCoreMessages=true
    mapred.map.child.java.opts=-Xmx1024m
    mapred.reduce.child.java.opts=-Xmx1024m
    giraph.numInputThreads=4
    giraph.numComputeThreads=4
    giraph.maxMessagesInMemory=100000
    ####################################
    # SparkGraphComputer Configuration #
    ####################################
    spark.master=local[4]
    spark.executor.memory=1g
    spark.serializer=org.apache.spark.serializer.KryoSerializer
    cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
    
  3. Load Grateful Dead into Titan-Cassandra on Giraph using BulkLoaderVertexProgram

    # Load Grateful Dead into Titan-Cassandra with Giraph
    # (Shell commands below)
    bin/titan.sh stop
    mvn clean install -DskipTests=true
    rm lib/netty-3.2.7.Final.jar
    bin/titan.sh start
    bin/gremlin.sh <<EOF
    graph = GraphFactory.open('blvp.prop')
    future = graph.compute(GiraphGraphComputer.class).program(new BulkLoaderVertexProgram()).result(GraphComputer.ResultGraph.NEW).persist(GraphComputer.Persist.EDGES).submit()
    future.get()
    
    t = TitanFactory.open('conf/titan-cassandra-es.properties')
    t.traversal().V().valueMap()
    t.close()
    EOF

    The blvp.prop file is the same one used for Spark (above).

@dalaro
Copy link
Member

dalaro commented May 2, 2015

Related commits:

dalaro added a commit that referenced this issue May 5, 2015
BulkLoaderVertexProgram and the Hadoop Input/OutputFormats now use
harmonized config key prefixes: titanmr.{ioformat,bulkload}.conf.
Also added a ConfigElement.getPath overload that includes the root
element name (the default behavior is still to exclude the root).

This commit fixes some DEBUG logging statements needlessly emitted at
ERROR.

For #1045
@dalaro
Copy link
Member

dalaro commented May 5, 2015

The netty dependency issue was fixed in 0c36a4f. I tweaked the configuration keys and changed BLVP's ResultGraph default to NEW in fab0f61. These issues were all obviously problems in Titan. The remaining issues are either likely to be TP3 issues (like the seeming difference in SparkGC and GiraphGC config processing) or ambiguous (like the hung future). I think this is an acceptable spot to close this issue and push future work to other issues.

Here are the changes from the config key tweaks:

input.conf.* -> titanmr.ioformat.conf.*
titan.bulkload.graphconfig.* -> titanmr.bulkload.conf.*

Here's hg.prop with those changes:

gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=com.thinkaurelius.titan.hadoop.formats.cassandra.CassandraInputFormat
gremlin.hadoop.graphOutputFormat=com.thinkaurelius.titan.hadoop.formats.TitanH1OutputFormat
gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
gremlin.hadoop.deriveMemory=false
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output
titanmr.ioformat.conf.storage.backend=cassandrathrift
titanmr.ioformat.conf.storage.hostname=localhost
#####################################
# GiraphGraphComputer Configuration #
#####################################
giraph.minWorkers=2
giraph.maxWorkers=2
giraph.useOutOfCoreGraph=true
giraph.useOutOfCoreMessages=true
mapred.map.child.java.opts=-Xmx1024m
mapred.reduce.child.java.opts=-Xmx1024m
giraph.numInputThreads=4
giraph.numComputeThreads=4
giraph.maxMessagesInMemory=100000
####################################
# SparkGraphComputer Configuration #
####################################
spark.master=local[4]
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner

Here's blvp.prop with those changes:

gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat
gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
gremlin.hadoop.deriveMemory=false
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=data/grateful-dead-vertices.gio
gremlin.hadoop.outputLocation=output
titanmr.ioformat.conf.storage.backend=cassandrathrift
titanmr.ioformat.conf.storage.hostname=localhost
titanmr.bulkload.conf.storage.backend=cassandrathrift
titanmr.bulkload.conf.storage.hostname=localhost
#####################################
# GiraphGraphComputer Configuration #
#####################################
giraph.minWorkers=1
giraph.maxWorkers=1
giraph.SplitMasterWorker=false
giraph.useOutOfCoreGraph=true
giraph.useOutOfCoreMessages=true
mapred.map.child.java.opts=-Xmx1024m
mapred.reduce.child.java.opts=-Xmx1024m
giraph.numInputThreads=4
giraph.numComputeThreads=4
giraph.maxMessagesInMemory=100000
####################################
# SparkGraphComputer Configuration #
####################################
spark.master=local[4]
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner

@dalaro dalaro closed this as completed May 5, 2015
@okram
Copy link
Contributor

okram commented May 5, 2015

@dalaro --- if there are aspects you think are wrong because of TP3, please file issues on our issue tracker. Thanks -- that was cool to see it work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants