<dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>spark-gremlin</artifactId>
<version>x.y.z</version>
</dependency>
Spark is an Apache Software Foundation
project focused on general-purpose OLAP data processing. Spark provides a hybrid in-memory/disk-based distributed
computing model that is similar to Hadoop’s MapReduce model. Spark maintains a fluent function chaining DSL that is
arguably easier for developers to work with than native Hadoop MapReduce. Spark-Gremlin provides an implementation of
the bulk-synchronous parallel, distributed message passing algorithm within Spark and thus, any
VertexProgram
can be
executed over SparkGraphComputer
.
Furthermore the lib/
directory should be distributed across all machines in the SparkServer cluster. For this purpose
TinkerPop provides a helper script, which takes the Spark installation directory and the Spark machines as input:
bin/hadoop/init-tp-spark.sh /usr/local/spark spark@10.0.0.1 spark@10.0.0.2 spark@10.0.0.3
Once the lib/
directory is distributed, SparkGraphComputer
can be used as follows.
graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties') g = traversal().with(graph).withComputer(SparkGraphComputer) g.V().count() g.V().out().out().values('name')
Note
|
We no longer support lambda executions via :remote on the Gremlin Console starting in TinkerPop 4.
|
The SparkGraphComputer
algorithm leverages Spark’s caching abilities to reduce the amount of data shuffled across
the wire on each iteration of the VertexProgram
. When the graph is loaded as a Spark RDD
(Resilient Distributed Dataset) it is immediately cached as graphRDD
. The graphRDD
is a distributed adjacency
list which encodes the vertex, its properties, and all its incident edges. On the first iteration, each vertex
(in parallel) is passed through VertexProgram.execute()
. This yields an output of the vertex’s mutated state
(i.e. updated compute keys — propertyX
) and its outgoing messages. This viewOutgoingRDD
is then reduced to
viewIncomingRDD
where the outgoing messages are sent to their respective vertices. If a MessageCombiner
exists
for the vertex program, then messages are aggregated locally and globally to ultimately yield one incoming message
for the vertex. This reduce sequence is the "message pass." If the vertex program does not terminate on this
iteration, then the viewIncomingRDD
is joined with the cached graphRDD
and the process continues. When there
are no more iterations, there is a final join and the resultant RDD is stripped of its edges and messages. This
mapReduceRDD
is cached and is processed by each MapReduce
job in the
GraphComputer
computation.
Property | Description |
---|---|
gremlin.hadoop.graphReader |
A class for reading a graph-based RDD (e.g. an |
gremlin.hadoop.graphWriter |
A class for writing a graph-based RDD (e.g. an |
gremlin.spark.graphStorageLevel |
What |
gremlin.spark.persistContext |
Whether to create a new |
gremlin.spark.persistStorageLevel |
What |
If the provider/user does not want to use Hadoop InputFormats
, it is possible to leverage Spark’s RDD
constructs directly. An InputRDD
provides a read method that takes a SparkContext
and returns a graphRDD. Likewise,
and OutputRDD
is used for writing a graphRDD.
If the graph system provider uses an InputRDD
, the RDD should maintain an associated org.apache.spark.Partitioner
. By doing so,
SparkGraphComputer
will not partition the loaded graph across the cluster as it has already been partitioned by the graph system provider.
This can save a significant amount of time and space resources. If the InputRDD
does not have a registered partitioner,
SparkGraphComputer
will partition the graph using a org.apache.spark.HashPartitioner
with the number of partitions
being either the number of existing partitions in the input (i.e. input splits) or the user specified number of GraphComputer.workers()
.
If the provider/user finds there are many small HDFS files generated by OutputRDD
. The option gremlin.spark.outputRepartition
can help to repartition the output according to the specified number. The option is disabled by default.
The SparkGraphComputer
uses MEMORY_ONLY
to cache the input graph and the output graph by default. Users should be aware of the impact of
different storage levels, since the default settings can quickly lead to memory issues on larger graphs. An overview of Spark’s persistence
settings is provided in Spark’s programming guide.
It is possible to persist the graph RDD between jobs within the SparkContext
(e.g. SparkServer) by leveraging PersistedOutputRDD
.
Note that gremlin.spark.persistContext
should be set to true
or else the persisted RDD will be destroyed when the SparkContext
closes.
The persisted RDD is named by the gremlin.hadoop.outputLocation
configuration. Similarly, PersistedInputRDD
is used with respective
gremlin.hadoop.inputLocation
to retrieve the persisted RDD from the SparkContext
.
When using a persistent SparkContext
the configuration used by the original Spark Configuration will be inherited by all threaded
references to that Spark Context. The exception to this rule are those properties which have a specific thread local effect.
-
spark.jobGroup.id
-
spark.job.description
-
spark.job.interruptOnCancel
-
spark.scheduler.pool
Finally, there is a spark
object that can be used to manage persisted RDDs (see Interacting with Spark).
The CloneVertexProgram copies a whole graph from any graph InputFormat
to any graph
OutputFormat
. TinkerPop provides formats such as GraphSONOutputFormat
, GryoOutputFormat
or ScriptOutputFormat
.
The example below takes a Hadoop graph as the input (in GryoInputFormat
) and exports it as a GraphSON file
(GraphSONOutputFormat
).
hdfs.copyFromLocal('data/tinkerpop-modern.kryo', 'tinkerpop-modern.kryo') graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties') graph.configuration().setProperty('gremlin.hadoop.graphWriter', 'org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat') graph.compute(SparkGraphComputer).program(CloneVertexProgram.build().create()).submit().get() hdfs.ls('output') hdfs.head('output/~g')