Skip to content

Commit

Permalink
added more properties for bulk loader and a counter for BlueprintsGra…
Browse files Browse the repository at this point in the history
…phOutputMapReducer.
  • Loading branch information
okram committed Nov 24, 2013
1 parent 06f0bb7 commit 4b09cc9
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 29 deletions.
1 change: 1 addition & 0 deletions bin/titan-cassandra-output.properties
Expand Up @@ -17,6 +17,7 @@ faunus.graph.output.titan.infer-schema=true
# faunus.graph.output.blueprints.script-file=BlueprintsScript.groovy
# controls size of transaction
mapred.max.split.size=5242880
# mapred.reduce.tasks=10
mapred.job.reuse.jvm.num.tasks=-1

faunus.sideeffect.output.format=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
Expand Down
1 change: 1 addition & 0 deletions bin/titan-hbase-output.properties
Expand Up @@ -15,6 +15,7 @@ faunus.graph.output.titan.infer-schema=true
# faunus.graph.output.blueprints.script-file=BlueprintsScript.groovy
# controls size of transaction
mapred.max.split.size=5242880
# mapred.reduce.tasks=10
mapred.job.reuse.jvm.num.tasks=-1

faunus.sideeffect.output.format=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
Expand Down
Expand Up @@ -213,7 +213,8 @@ public void reduce(final LongWritable key, final Iterable<Holder<FaunusVertex>>
faunusVertex.setProperty(ID_MAP_KEY, faunusBlueprintsIdMap);
context.write(NullWritable.get(), faunusVertex);
} else {
// TODO: faunusVertex never created
LOGGER.warn("No source vertex: faunusVertex[" + key.get() + "]");
context.getCounter(Counters.NULL_VERTICES_IGNORED).increment(1l);
}
}
}
Expand All @@ -232,40 +233,34 @@ public void setup(final Mapper.Context context) throws IOException, InterruptedE
@Override
public void map(final NullWritable key, final FaunusVertex value, final Mapper<NullWritable, FaunusVertex, NullWritable, FaunusVertex>.Context context) throws IOException, InterruptedException {
try {
if (null != value) {
final java.util.Map<Long, Object> faunusBlueprintsIdMap = value.getProperty(ID_MAP_KEY);
final Object blueprintsId = value.getProperty(BLUEPRINTS_ID);
Vertex blueprintsVertex = null;
if (null != blueprintsId)
blueprintsVertex = this.graph.getVertex(blueprintsId);
// this means that an adjacent vertex to this vertex wasn't created
if (null != blueprintsVertex) {
for (final Edge faunusEdge : value.getEdges(OUT)) {
final Object otherId = faunusBlueprintsIdMap.get(faunusEdge.getVertex(IN).getId());
Vertex otherVertex = null;
if (null != otherId)
otherVertex = this.graph.getVertex(otherId);
if (null != otherVertex) {
final Edge blueprintsEdge = this.graph.addEdge(null, blueprintsVertex, otherVertex, faunusEdge.getLabel());
context.getCounter(Counters.EDGES_WRITTEN).increment(1l);
for (final String property : faunusEdge.getPropertyKeys()) {
blueprintsEdge.setProperty(property, faunusEdge.getProperty(property));
context.getCounter(Counters.EDGE_PROPERTIES_WRITTEN).increment(1l);
}
} else {
LOGGER.warn("No target vertex: faunusVertex[" + faunusEdge.getVertex(IN).getId() + "] blueprintsVertex[" + otherId + "]");
context.getCounter(Counters.NULL_VERTEX_EDGES_IGNORED).increment(1l);
final java.util.Map<Long, Object> faunusBlueprintsIdMap = value.getProperty(ID_MAP_KEY);
final Object blueprintsId = value.getProperty(BLUEPRINTS_ID);
Vertex blueprintsVertex = null;
if (null != blueprintsId)
blueprintsVertex = this.graph.getVertex(blueprintsId);
// this means that an adjacent vertex to this vertex wasn't created
if (null != blueprintsVertex) {
for (final Edge faunusEdge : value.getEdges(OUT)) {
final Object otherId = faunusBlueprintsIdMap.get(faunusEdge.getVertex(IN).getId());
Vertex otherVertex = null;
if (null != otherId)
otherVertex = this.graph.getVertex(otherId);
if (null != otherVertex) {
final Edge blueprintsEdge = this.graph.addEdge(null, blueprintsVertex, otherVertex, faunusEdge.getLabel());
context.getCounter(Counters.EDGES_WRITTEN).increment(1l);
for (final String property : faunusEdge.getPropertyKeys()) {
blueprintsEdge.setProperty(property, faunusEdge.getProperty(property));
context.getCounter(Counters.EDGE_PROPERTIES_WRITTEN).increment(1l);
}
} else {
LOGGER.warn("No target vertex: faunusVertex[" + faunusEdge.getVertex(IN).getId() + "] blueprintsVertex[" + otherId + "]");
context.getCounter(Counters.NULL_VERTEX_EDGES_IGNORED).increment(1l);
}
} else {
LOGGER.warn("No source vertex: faunusVertex[" + key.get() + "] blueprintsVertex[" + blueprintsId + "]");
context.getCounter(Counters.NULL_VERTICES_IGNORED).increment(1l);
}
} else {
LOGGER.warn("No source vertex: faunusVertex[" + key.get() + "]");
LOGGER.warn("No source vertex: faunusVertex[" + key.get() + "] blueprintsVertex[" + blueprintsId + "]");
context.getCounter(Counters.NULL_VERTICES_IGNORED).increment(1l);
}

// the emitted vertex is not complete -- assuming this is the end of the stage and vertex is dead
context.write(NullWritable.get(), DEAD_FAUNUS_VERTEX);
} catch (final Exception e) {
Expand Down

0 comments on commit 4b09cc9

Please sign in to comment.