Hadoop-Gremlin provides various I/O formats — i.e. Hadoop
InputFormat
and OutputFormat
. All of the formats make use of an adjacency list
representation of the graph where each "row" represents a single vertex, its properties, and its incoming and
outgoing edges.
-
InputFormat:
org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
-
OutputFormat:
org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat
Gryo is a binary graph format that leverages Kryo to make a compact, binary representation of a vertex. It is recommended that users leverage Gryo given its space/time savings over text-based representations.
Note
|
The GryoInputFormat is splittable.
|
-
InputFormat:
org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat
-
OutputFormat:
org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat
GraphSON is a JSON based graph format. GraphSON is a space-expensive graph format in that it is a text-based markup language. However, it is convenient for many developers to work with as its structure is simple (easy to create and parse).
The data below represents an adjacency list representation of the classic TinkerGraph toy graph in GraphSON format.
{"id":1,"label":"person","outE":{"created":[{"id":9,"inV":3,"properties":{"weight":0.4}}],"knows":[{"id":7,"inV":2,"properties":{"weight":0.5}},{"id":8,"inV":4,"properties":{"weight":1.0}}]},"properties":{"name":[{"id":0,"value":"marko"}],"age":[{"id":1,"value":29}]}}
{"id":2,"label":"person","inE":{"knows":[{"id":7,"outV":1,"properties":{"weight":0.5}}]},"properties":{"name":[{"id":2,"value":"vadas"}],"age":[{"id":3,"value":27}]}}
{"id":3,"label":"software","inE":{"created":[{"id":9,"outV":1,"properties":{"weight":0.4}},{"id":11,"outV":4,"properties":{"weight":0.4}},{"id":12,"outV":6,"properties":{"weight":0.2}}]},"properties":{"name":[{"id":4,"value":"lop"}],"lang":[{"id":5,"value":"java"}]}}
{"id":4,"label":"person","inE":{"knows":[{"id":8,"outV":1,"properties":{"weight":1.0}}]},"outE":{"created":[{"id":10,"inV":5,"properties":{"weight":1.0}},{"id":11,"inV":3,"properties":{"weight":0.4}}]},"properties":{"name":[{"id":6,"value":"josh"}],"age":[{"id":7,"value":32}]}}
{"id":5,"label":"software","inE":{"created":[{"id":10,"outV":4,"properties":{"weight":1.0}}]},"properties":{"name":[{"id":8,"value":"ripple"}],"lang":[{"id":9,"value":"java"}]}}
{"id":6,"label":"person","outE":{"created":[{"id":12,"inV":3,"properties":{"weight":0.2}}]},"properties":{"name":[{"id":10,"value":"peter"}],"age":[{"id":11,"value":35}]}}
-
InputFormat:
org.apache.tinkerpop.gremlin.hadoop.structure.io.script.ScriptInputFormat
-
OutputFormat:
org.apache.tinkerpop.gremlin.hadoop.structure.io.script.ScriptOutputFormat
ScriptInputFormat
and ScriptOutputFormat
take an arbitrary script and use that script to either read or write
Vertex
objects, respectively. This can be considered the most general InputFormat
/OutputFormat
possible in that
Hadoop-Gremlin uses the user provided script for all reading/writing.
The data below represents an adjacency list representation of the classic TinkerGraph toy graph. First line reads,
"vertex 1
, labeled person
having 2 property values (marko
and 29
) has 3 outgoing edges; the first edge is
labeled knows
, connects the current vertex 1
with vertex 2
and has a property value 0.4
, and so on."
1:person:marko:29 knows:2:0.5,knows:4:1.0,created:3:0.4
2:person:vadas:27
3:project:lop:java
4:person:josh:32 created:3:0.4,created:5:1.0
5:project:ripple:java
6:person:peter:35 created:3:0.2
There is no corresponding InputFormat
that can parse this particular file (or some adjacency list variant of it).
As such, ScriptInputFormat
can be used. With ScriptInputFormat
a script is stored in HDFS and leveraged by each
mapper in the Hadoop job. The script must have the following method defined:
def parse(String line) { ... }
In order to create vertices and edges, the parse()
method gets access to a global variable named graph
, which holds
the local StarGraph
for the current line/vertex.
An appropriate parse()
for the above adjacency list file is:
def parse(line) {
def parts = line.split(/ /)
def (id, label, name, x) = parts[0].split(/:/).toList()
def v1 = graph.addVertex(T.id, id, T.label, label)
if (name != null) v1.property('name', name) // first value is always the name
if (x != null) {
// second value depends on the vertex label; it's either
// the age of a person or the language of a project
if (label.equals('project')) v1.property('lang', x)
else v1.property('age', Integer.valueOf(x))
}
if (parts.length == 2) {
parts[1].split(/,/).grep { !it.isEmpty() }.each {
def (eLabel, refId, weight) = it.split(/:/).toList()
def v2 = graph.addVertex(T.id, refId)
v1.addOutEdge(eLabel, v2, 'weight', Double.valueOf(weight))
}
}
return v1
}
The resultant Vertex
denotes whether the line parsed yielded a valid Vertex. As such, if the line is not valid
(e.g. a comment line, a skip line, etc.), then simply return null
.
The principle above can also be used to convert a vertex to an arbitrary String
representation that is ultimately
streamed back to a file in HDFS. This is the role of ScriptOutputFormat
. ScriptOutputFormat
requires that the
provided script maintains a method with the following signature:
def stringify(Vertex vertex) { ... }
An appropriate stringify()
to produce output in the same format that was shown in the ScriptInputFormat
sample is:
def stringify(vertex) {
def v = vertex.values('name', 'age', 'lang').inject(vertex.id(), vertex.label()).join(':')
def outE = vertex.outE().map {
def e = it.get()
e.values('weight').inject(e.label(), e.inV().next().id()).join(':')
}.join(',')
return [v, outE].join('\t')
}
Hadoop-Gremlin provides two implementations of the Storage
API:
-
FileSystemStorage
: Access HDFS and local file system data. -
SparkContextStorage
: Access Spark persisted RDD data.
The distributed file system of Hadoop is called HDFS.
The results of any OLAP operation are stored in HDFS accessible via hdfs
. For local file system access, there is fs
.
graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties') graph.compute(SparkGraphComputer).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey('clusterCount').create()).submit().get(); hdfs.ls() hdfs.ls('output') hdfs.head('output', GryoInputFormat) hdfs.head('output', 'clusterCount', SequenceFileInputFormat) hdfs.rm('output') hdfs.ls()
If a Spark context is persisted, then Spark RDDs will remain the Spark cache and accessible over subsequent jobs.
RDDs are retrieved and saved to the SparkContext
via PersistedInputRDD
and PersistedOutputRDD
respectively.
Persisted RDDs can be accessed using spark
.
Spark.create('local[4]') graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties') graph.configuration().setProperty('gremlin.hadoop.graphWriter', PersistedOutputRDD.class.getCanonicalName()) graph.configuration().setProperty('gremlin.spark.persistContext',true) graph.compute(SparkGraphComputer).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey('clusterCount').create()).submit().get(); spark.ls() spark.ls('output') spark.head('output', PersistedInputRDD) spark.head('output', 'clusterCount', PersistedInputRDD) spark.rm('output') spark.ls()