Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Use bulk request API for indexing.

  • Loading branch information...
commit 38c6a732083010f7a4b36e2c217571f1801970b2 1 parent b0f9f15
@deverton deverton authored committed
Showing with 13 additions and 4 deletions.
  1. +13 −4 src/main/java/org/elasticsearch/flume/ElasticSearchSink.java
View
17 src/main/java/org/elasticsearch/flume/ElasticSearchSink.java
@@ -17,7 +17,10 @@
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.reporter.ReportEvent;
import com.cloudera.util.Pair;
+
+import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
+import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.settings.ImmutableSettings;
@@ -137,10 +140,16 @@ private void index(Event e, XContentBuilder builder) {
if (indexPattern != null) {
iName = e.escapeString(indexPattern);
}
- client.prepareIndex(iName, indexType, null)
- .setSource(builder)
- .execute()
- .actionGet();
+ BulkRequestBuilder bulkRequest = client.prepareBulk();
+
+ bulkRequest.add(client.prepareIndex(iName, indexType, null)
+ .setSource(builder).request());
+
+ BulkResponse bulkResponse = bulkRequest.execute().actionGet();
+
+ if (bulkResponse.hasFailures()) {
+ throw new RuntimeException("Failed to index message. " + bulkResponse.items()[0].failureMessage());
+ }
if (!iName.equals(indexName)) {
client.admin().indices().prepareAliases().addAlias(iName, indexName).execute().actionGet();
Please sign in to comment.
Something went wrong with that request. Please try again.