From 4b09cc9a82d9023177ca2cd91a92b563b2f541b1 Mon Sep 17 00:00:00 2001 From: "Marko A. Rodriguez" Date: Sun, 24 Nov 2013 14:47:44 -0700 Subject: [PATCH] added more properties for bulk loader and a counter for BlueprintsGraphOutputMapReducer. --- bin/titan-cassandra-output.properties | 1 + bin/titan-hbase-output.properties | 1 + .../BlueprintsGraphOutputMapReduce.java | 53 +++++++++---------- 3 files changed, 26 insertions(+), 29 deletions(-) diff --git a/bin/titan-cassandra-output.properties b/bin/titan-cassandra-output.properties index c2b78613..0403306c 100644 --- a/bin/titan-cassandra-output.properties +++ b/bin/titan-cassandra-output.properties @@ -17,6 +17,7 @@ faunus.graph.output.titan.infer-schema=true # faunus.graph.output.blueprints.script-file=BlueprintsScript.groovy # controls size of transaction mapred.max.split.size=5242880 +# mapred.reduce.tasks=10 mapred.job.reuse.jvm.num.tasks=-1 faunus.sideeffect.output.format=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat diff --git a/bin/titan-hbase-output.properties b/bin/titan-hbase-output.properties index 95f768ee..12912f29 100644 --- a/bin/titan-hbase-output.properties +++ b/bin/titan-hbase-output.properties @@ -15,6 +15,7 @@ faunus.graph.output.titan.infer-schema=true # faunus.graph.output.blueprints.script-file=BlueprintsScript.groovy # controls size of transaction mapred.max.split.size=5242880 +# mapred.reduce.tasks=10 mapred.job.reuse.jvm.num.tasks=-1 faunus.sideeffect.output.format=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat diff --git a/src/main/java/com/thinkaurelius/faunus/formats/BlueprintsGraphOutputMapReduce.java b/src/main/java/com/thinkaurelius/faunus/formats/BlueprintsGraphOutputMapReduce.java index caf5be3f..6517b9bd 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/BlueprintsGraphOutputMapReduce.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/BlueprintsGraphOutputMapReduce.java @@ -213,7 +213,8 @@ public void reduce(final LongWritable key, final Iterable> faunusVertex.setProperty(ID_MAP_KEY, faunusBlueprintsIdMap); context.write(NullWritable.get(), faunusVertex); } else { - // TODO: faunusVertex never created + LOGGER.warn("No source vertex: faunusVertex[" + key.get() + "]"); + context.getCounter(Counters.NULL_VERTICES_IGNORED).increment(1l); } } } @@ -232,40 +233,34 @@ public void setup(final Mapper.Context context) throws IOException, InterruptedE @Override public void map(final NullWritable key, final FaunusVertex value, final Mapper.Context context) throws IOException, InterruptedException { try { - if (null != value) { - final java.util.Map faunusBlueprintsIdMap = value.getProperty(ID_MAP_KEY); - final Object blueprintsId = value.getProperty(BLUEPRINTS_ID); - Vertex blueprintsVertex = null; - if (null != blueprintsId) - blueprintsVertex = this.graph.getVertex(blueprintsId); - // this means that an adjacent vertex to this vertex wasn't created - if (null != blueprintsVertex) { - for (final Edge faunusEdge : value.getEdges(OUT)) { - final Object otherId = faunusBlueprintsIdMap.get(faunusEdge.getVertex(IN).getId()); - Vertex otherVertex = null; - if (null != otherId) - otherVertex = this.graph.getVertex(otherId); - if (null != otherVertex) { - final Edge blueprintsEdge = this.graph.addEdge(null, blueprintsVertex, otherVertex, faunusEdge.getLabel()); - context.getCounter(Counters.EDGES_WRITTEN).increment(1l); - for (final String property : faunusEdge.getPropertyKeys()) { - blueprintsEdge.setProperty(property, faunusEdge.getProperty(property)); - context.getCounter(Counters.EDGE_PROPERTIES_WRITTEN).increment(1l); - } - } else { - LOGGER.warn("No target vertex: faunusVertex[" + faunusEdge.getVertex(IN).getId() + "] blueprintsVertex[" + otherId + "]"); - context.getCounter(Counters.NULL_VERTEX_EDGES_IGNORED).increment(1l); + final java.util.Map faunusBlueprintsIdMap = value.getProperty(ID_MAP_KEY); + final Object blueprintsId = value.getProperty(BLUEPRINTS_ID); + Vertex blueprintsVertex = null; + if (null != blueprintsId) + blueprintsVertex = this.graph.getVertex(blueprintsId); + // this means that an adjacent vertex to this vertex wasn't created + if (null != blueprintsVertex) { + for (final Edge faunusEdge : value.getEdges(OUT)) { + final Object otherId = faunusBlueprintsIdMap.get(faunusEdge.getVertex(IN).getId()); + Vertex otherVertex = null; + if (null != otherId) + otherVertex = this.graph.getVertex(otherId); + if (null != otherVertex) { + final Edge blueprintsEdge = this.graph.addEdge(null, blueprintsVertex, otherVertex, faunusEdge.getLabel()); + context.getCounter(Counters.EDGES_WRITTEN).increment(1l); + for (final String property : faunusEdge.getPropertyKeys()) { + blueprintsEdge.setProperty(property, faunusEdge.getProperty(property)); + context.getCounter(Counters.EDGE_PROPERTIES_WRITTEN).increment(1l); } + } else { + LOGGER.warn("No target vertex: faunusVertex[" + faunusEdge.getVertex(IN).getId() + "] blueprintsVertex[" + otherId + "]"); + context.getCounter(Counters.NULL_VERTEX_EDGES_IGNORED).increment(1l); } - } else { - LOGGER.warn("No source vertex: faunusVertex[" + key.get() + "] blueprintsVertex[" + blueprintsId + "]"); - context.getCounter(Counters.NULL_VERTICES_IGNORED).increment(1l); } } else { - LOGGER.warn("No source vertex: faunusVertex[" + key.get() + "]"); + LOGGER.warn("No source vertex: faunusVertex[" + key.get() + "] blueprintsVertex[" + blueprintsId + "]"); context.getCounter(Counters.NULL_VERTICES_IGNORED).increment(1l); } - // the emitted vertex is not complete -- assuming this is the end of the stage and vertex is dead context.write(NullWritable.get(), DEAD_FAUNUS_VERTEX); } catch (final Exception e) {