Skip to content
Browse files

Hadoop OutputFormat now logs before and after batch if 10 s have pass…

…ed since last heard from, rather than every 10th batch
  • Loading branch information...
1 parent 11cfc89 commit fd540657b805d2df88e9d276ddc9c2aa71b6a40c Philip (flip) Kromer committed
Showing with 27 additions and 17 deletions.
  1. +27 −17 src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java
View
44 src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java
@@ -40,13 +40,13 @@
import com.infochimps.elasticsearch.hadoop.util.HadoopUtils;
/**
-
+
Hadoop OutputFormat for writing arbitrary MapWritables (essentially HashMaps) into Elasticsearch. Records are batched up and sent
in a one-hop manner to the elastic search data nodes that will index them.
-
+
*/
public class ElasticSearchOutputFormat extends OutputFormat<NullWritable, MapWritable> implements Configurable {
-
+
static Log LOG = LogFactory.getLog(ElasticSearchOutputFormat.class);
private Configuration conf = null;
@@ -60,12 +60,13 @@ Hadoop OutputFormat for writing arbitrary MapWritables (essentially HashMaps) in
private String idFieldName;
private String objType;
private String[] fieldNames;
-
+
// Used for bookkeeping purposes
private AtomicLong totalBulkTime = new AtomicLong();
private AtomicLong totalBulkItems = new AtomicLong();
- private Random randgen = new Random();
+ private Random randgen = new Random();
private long runStartTime = System.currentTimeMillis();
+ private long lastLogTime = 0;
// For hadoop configuration
private static final String ES_CONFIG_NAME = "elasticsearch.yml";
@@ -82,7 +83,7 @@ Hadoop OutputFormat for writing arbitrary MapWritables (essentially HashMaps) in
private static final String COMMA = ",";
private static final String SLASH = "/";
private static final String NO_ID_FIELD = "-1";
-
+
private volatile BulkRequestBuilder currentRequest;
/**
@@ -104,7 +105,7 @@ Hadoop OutputFormat for writing arbitrary MapWritables (essentially HashMaps) in
<li><b>elasticsearch.id.field.name</b> - When <b>elasticsearch.is_json</b> is true, this is the name of a field in the json document that contains the document's id. If -1 is used then the document is assumed to have no id and one is assigned to it by elasticsearch.</li>
<li><b>elasticsearch.field.names</b> - When <b>elasticsearch.is_json</b> is false, this is a comma separated list of field names.</li>
<li><b>elasticsearch.id.field</b> - When <b>elasticsearch.is_json</b> is false, this is the numeric index of the field to use as the document id. If -1 is used the document is assumed to have no id and one is assigned to it by elasticsearch.</li>
- </ul>
+ </ul>
*/
public ElasticSearchRecordWriter(TaskAttemptContext context) {
Configuration conf = context.getConfiguration();
@@ -118,7 +119,7 @@ public ElasticSearchRecordWriter(TaskAttemptContext context) {
LOG.info("Using field:["+idFieldName+"] for document ids");
}
this.objType = conf.get(ES_OBJECT_TYPE);
-
+
//
// Fetches elasticsearch.yml and the plugins directory from the distributed cache, or
// from the local config.
@@ -134,7 +135,7 @@ public ElasticSearchRecordWriter(TaskAttemptContext context) {
System.setProperty(ES_CONFIG,conf.get(ES_CONFIG));
System.setProperty(ES_PLUGINS,conf.get(ES_PLUGINS));
}
-
+
start_embedded_client();
initialize_index(indexName);
currentRequest = client.prepareBulk();
@@ -144,7 +145,7 @@ public ElasticSearchRecordWriter(TaskAttemptContext context) {
Closes the connection to elasticsearch. Any documents remaining in the bulkRequest object are indexed.
*/
public void close(TaskAttemptContext context) throws IOException {
- if (currentRequest.numberOfActions() > 0) {
+ if (currentRequest.numberOfActions() > 0) {
try {
BulkResponse response = currentRequest.execute().actionGet();
} catch (Exception e) {
@@ -175,7 +176,7 @@ public void write(NullWritable key, MapWritable fields) throws IOException {
try {
Text mapKey = new Text(idFieldName);
String record_id = fields.get(mapKey).toString();
- currentRequest.add(Requests.indexRequest(indexName).id(record_id).type(objType).create(false).source(builder));
+ currentRequest.add(Requests.indexRequest(indexName).id(record_id).type(objType).create(false).source(builder));
} catch (Exception e) {
LOG.warn("Encountered malformed record");
}
@@ -198,14 +199,14 @@ private void buildContent(XContentBuilder builder, Writable value) throws IOExce
} else if (value instanceof FloatWritable) {
builder.value(((FloatWritable)value).get());
} else if (value instanceof BooleanWritable) {
- builder.value(((BooleanWritable)value).get());
+ builder.value(((BooleanWritable)value).get());
} else if (value instanceof MapWritable) {
builder.startObject();
for (Map.Entry<Writable,Writable> entry : ((MapWritable)value).entrySet()) {
if (!(entry.getValue() instanceof NullWritable)) {
builder.field(entry.getKey().toString());
buildContent(builder, entry.getValue());
- }
+ }
}
builder.endObject();
} else if (value instanceof ArrayWritable) {
@@ -215,7 +216,7 @@ private void buildContent(XContentBuilder builder, Writable value) throws IOExce
buildContent(builder, arrayOfThings[i]);
}
builder.endArray();
- }
+ }
}
/**
@@ -224,12 +225,21 @@ private void buildContent(XContentBuilder builder, Writable value) throws IOExce
private void processBulkIfNeeded() {
totalBulkItems.incrementAndGet();
if (currentRequest.numberOfActions() >= bulkSize) {
- try {
+ boolean loggable = (System.currentTimeMillis() - lastLogTime >= 10000);
+
+ try {
long startTime = System.currentTimeMillis();
+ if (loggable){ LOG.info("Sending [" + (currentRequest.numberOfActions()) + "]items"); }
BulkResponse response = currentRequest.execute().actionGet();
totalBulkTime.addAndGet(System.currentTimeMillis() - startTime);
- if (randgen.nextDouble() < 0.1) {
- LOG.info("Indexed [" + totalBulkItems.get() + "] in [" + (totalBulkTime.get()/1000) + "s] of indexing"+"[" + ((System.currentTimeMillis() - runStartTime)/1000) + "s] of wall clock"+" for ["+ (float)(1000.0*totalBulkItems.get())/(System.currentTimeMillis() - runStartTime) + "rec/s]");
+ if (loggable) {
+ LOG.info("Indexed [" + (currentRequest.numberOfActions()) + "]items " +
+ "in [" + ((System.currentTimeMillis() - startTime)/1000) + "]s; " +
+ "avg [" + (float)(1000.0*totalBulkItems.get())/(System.currentTimeMillis() - runStartTime) + "]rec/s" +
+ "(total [" + totalBulkItems.get() + "]items " +
+ "indexed in [" + (totalBulkTime.get()/1000) + "]s, " +
+ "wall clock [" + ((System.currentTimeMillis() - runStartTime)/1000) + "]s)");
+ lastLogTime = System.currentTimeMillis();
}
} catch (Exception e) {
LOG.warn("Bulk request failed: " + e.getMessage());

0 comments on commit fd54065

Please sign in to comment.
Something went wrong with that request. Please try again.