Skip to content

Latest commit

 

History

History
136 lines (109 loc) · 7.9 KB

implementations-spark.asciidoc

File metadata and controls

136 lines (109 loc) · 7.9 KB

SparkGraphComputer

<dependency>
   <groupId>org.apache.tinkerpop</groupId>
   <artifactId>spark-gremlin</artifactId>
   <version>x.y.z</version>
</dependency>

spark logo Spark is an Apache Software Foundation project focused on general-purpose OLAP data processing. Spark provides a hybrid in-memory/disk-based distributed computing model that is similar to Hadoop’s MapReduce model. Spark maintains a fluent function chaining DSL that is arguably easier for developers to work with than native Hadoop MapReduce. Spark-Gremlin provides an implementation of the bulk-synchronous parallel, distributed message passing algorithm within Spark and thus, any VertexProgram can be executed over SparkGraphComputer.

Furthermore the lib/ directory should be distributed across all machines in the SparkServer cluster. For this purpose TinkerPop provides a helper script, which takes the Spark installation directory and the Spark machines as input:

bin/hadoop/init-tp-spark.sh /usr/local/spark spark@10.0.0.1 spark@10.0.0.2 spark@10.0.0.3

Once the lib/ directory is distributed, SparkGraphComputer can be used as follows.

graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
g = traversal().with(graph).withComputer(SparkGraphComputer)
g.V().count()
g.V().out().out().values('name')
Note
We no longer support lambda executions via :remote on the Gremlin Console starting in TinkerPop 4.

The SparkGraphComputer algorithm leverages Spark’s caching abilities to reduce the amount of data shuffled across the wire on each iteration of the VertexProgram. When the graph is loaded as a Spark RDD (Resilient Distributed Dataset) it is immediately cached as graphRDD. The graphRDD is a distributed adjacency list which encodes the vertex, its properties, and all its incident edges. On the first iteration, each vertex (in parallel) is passed through VertexProgram.execute(). This yields an output of the vertex’s mutated state (i.e. updated compute keys — propertyX) and its outgoing messages. This viewOutgoingRDD is then reduced to viewIncomingRDD where the outgoing messages are sent to their respective vertices. If a MessageCombiner exists for the vertex program, then messages are aggregated locally and globally to ultimately yield one incoming message for the vertex. This reduce sequence is the "message pass." If the vertex program does not terminate on this iteration, then the viewIncomingRDD is joined with the cached graphRDD and the process continues. When there are no more iterations, there is a final join and the resultant RDD is stripped of its edges and messages. This mapReduceRDD is cached and is processed by each MapReduce job in the GraphComputer computation.

spark algorithm
Property Description

gremlin.hadoop.graphReader

A class for reading a graph-based RDD (e.g. an InputRDD or InputFormat).

gremlin.hadoop.graphWriter

A class for writing a graph-based RDD (e.g. an OutputRDD or OutputFormat).

gremlin.spark.graphStorageLevel

What StorageLevel to use for the cached graph during job execution (default MEMORY_ONLY).

gremlin.spark.persistContext

Whether to create a new SparkContext for every SparkGraphComputer or to reuse an existing one.

gremlin.spark.persistStorageLevel

What StorageLevel to use when persisted RDDs via PersistedOutputRDD (default MEMORY_ONLY).

InputRDD and OutputRDD

If the provider/user does not want to use Hadoop InputFormats, it is possible to leverage Spark’s RDD constructs directly. An InputRDD provides a read method that takes a SparkContext and returns a graphRDD. Likewise, and OutputRDD is used for writing a graphRDD.

If the graph system provider uses an InputRDD, the RDD should maintain an associated org.apache.spark.Partitioner. By doing so, SparkGraphComputer will not partition the loaded graph across the cluster as it has already been partitioned by the graph system provider. This can save a significant amount of time and space resources. If the InputRDD does not have a registered partitioner, SparkGraphComputer will partition the graph using a org.apache.spark.HashPartitioner with the number of partitions being either the number of existing partitions in the input (i.e. input splits) or the user specified number of GraphComputer.workers().

If the provider/user finds there are many small HDFS files generated by OutputRDD. The option gremlin.spark.outputRepartition can help to repartition the output according to the specified number. The option is disabled by default.

Storage Levels

The SparkGraphComputer uses MEMORY_ONLY to cache the input graph and the output graph by default. Users should be aware of the impact of different storage levels, since the default settings can quickly lead to memory issues on larger graphs. An overview of Spark’s persistence settings is provided in Spark’s programming guide.

Using a Persisted Context

It is possible to persist the graph RDD between jobs within the SparkContext (e.g. SparkServer) by leveraging PersistedOutputRDD. Note that gremlin.spark.persistContext should be set to true or else the persisted RDD will be destroyed when the SparkContext closes. The persisted RDD is named by the gremlin.hadoop.outputLocation configuration. Similarly, PersistedInputRDD is used with respective gremlin.hadoop.inputLocation to retrieve the persisted RDD from the SparkContext.

When using a persistent SparkContext the configuration used by the original Spark Configuration will be inherited by all threaded references to that Spark Context. The exception to this rule are those properties which have a specific thread local effect.

Thread Local Properties
  1. spark.jobGroup.id

  2. spark.job.description

  3. spark.job.interruptOnCancel

  4. spark.scheduler.pool

Finally, there is a spark object that can be used to manage persisted RDDs (see Interacting with Spark).

Using CloneVertexProgram

The CloneVertexProgram copies a whole graph from any graph InputFormat to any graph OutputFormat. TinkerPop provides formats such as GraphSONOutputFormat, GryoOutputFormat or ScriptOutputFormat. The example below takes a Hadoop graph as the input (in GryoInputFormat) and exports it as a GraphSON file (GraphSONOutputFormat).

hdfs.copyFromLocal('data/tinkerpop-modern.kryo', 'tinkerpop-modern.kryo')
graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
graph.configuration().setProperty('gremlin.hadoop.graphWriter', 'org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat')
graph.compute(SparkGraphComputer).program(CloneVertexProgram.build().create()).submit().get()
hdfs.ls('output')
hdfs.head('output/~g')