Skip to content

Commit

Permalink
Switch to BulkProcessor API
Browse files Browse the repository at this point in the history
  • Loading branch information
richardwilly98 committed Nov 3, 2013
1 parent 0da87d2 commit 973dba9
Show file tree
Hide file tree
Showing 9 changed files with 376 additions and 120 deletions.
196 changes: 121 additions & 75 deletions src/main/java/org/elasticsearch/river/mongodb/Indexer.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

import org.bson.types.BSONTimestamp;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
Expand Down Expand Up @@ -399,9 +399,9 @@ public static BSONTimestamp getLastTimestamp(Client client, MongoDBRiverDefiniti
*
* @param bulk
*/
static void updateLastTimestamp(final MongoDBRiverDefinition definition, final BSONTimestamp time, final BulkRequestBuilder bulk) {
static void updateLastTimestamp(final MongoDBRiverDefinition definition, final BSONTimestamp time, final BulkProcessor bulkProcessor) {
try {
bulk.add(indexRequest(definition.getRiverIndexName())
bulkProcessor.add(indexRequest(definition.getRiverIndexName())
.type(definition.getRiverName())
.id(definition.getMongoOplogNamespace())
.source(jsonBuilder().startObject().startObject(TYPE).field(LAST_TIMESTAMP_FIELD, JSON.serialize(time)).endObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.river.RiverSettings;
Expand All @@ -42,6 +44,10 @@ public class MongoDBRiverDefinition {
// defaults
public final static String DEFAULT_DB_HOST = "localhost";
public final static int DEFAULT_DB_PORT = 27017;
public final static int DEFAULT_CONCURRENT_REQUESTS = 50;
public final static int DEFAULT_BULK_ACTIONS = 500;
public final static TimeValue DEFAULT_FLUSH_INTERVAL = TimeValue.timeValueMillis(10);
public final static ByteSizeValue DEFAULT_BULK_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB);

// fields
public final static String DB_FIELD = "db";
Expand Down Expand Up @@ -79,6 +85,13 @@ public class MongoDBRiverDefinition {
public final static String THROTTLE_SIZE_FIELD = "throttle_size";
public final static String BULK_SIZE_FIELD = "bulk_size";
public final static String BULK_TIMEOUT_FIELD = "bulk_timeout";
public final static String CONCURRENT_BULK_REQUESTS_FIELD = "concurrent_bulk_requests";

public final static String BULK_FIELD = "bulk";
public final static String ACTIONS_FIELD = "actions";
public final static String SIZE_FIELD = "size";
public final static String CONCURRENT_REQUESTS_FIELD = "concurrent_requests";
public final static String FLUSH_INTERVAL_FIELD = "flush_interval";

// river
private final String riverName;
Expand Down Expand Up @@ -117,9 +130,10 @@ public class MongoDBRiverDefinition {
// index
private final String indexName;
private final String typeName;
private final int bulkSize;
private final TimeValue bulkTimeout;
private final int throttleSize;

// bulk
private final Bulk bulk;

public static class Builder {
// river
Expand Down Expand Up @@ -159,10 +173,10 @@ public static class Builder {
// index
private String indexName;
private String typeName;
private int bulkSize;
private TimeValue bulkTimeout;
private int throttleSize;


private Bulk bulk;

public Builder mongoServers(List<ServerAddress> mongoServers) {
this.mongoServers = mongoServers;
return this;
Expand Down Expand Up @@ -308,18 +322,13 @@ public Builder typeName(String typeName) {
return this;
}

public Builder bulkSize(int bulkSize) {
this.bulkSize = bulkSize;
return this;
}

public Builder bulkTimeout(TimeValue bulkTimeout) {
this.bulkTimeout = bulkTimeout;
public Builder throttleSize(int throttleSize) {
this.throttleSize = throttleSize;
return this;
}

public Builder throttleSize(int throttleSize) {
this.throttleSize = throttleSize;
public Builder bulk(Bulk bulk) {
this.bulk = bulk;
return this;
}

Expand All @@ -328,6 +337,73 @@ public MongoDBRiverDefinition build() {
}
}

static class Bulk {

private final int concurrentRequests;
private final int bulkActions;
private final ByteSizeValue bulkSize;
private final TimeValue flushInterval;

static class Builder {

private int concurrentRequests = DEFAULT_CONCURRENT_REQUESTS;
private int bulkActions = DEFAULT_BULK_ACTIONS;
private ByteSizeValue bulkSize = DEFAULT_BULK_SIZE;
private TimeValue flushInterval = DEFAULT_FLUSH_INTERVAL;

public Builder concurrentRequests(int concurrentRequests) {
this.concurrentRequests = concurrentRequests;
return this;
}

public Builder bulkActions(int bulkActions) {
this.bulkActions = bulkActions;
return this;
}

public Builder bulkSize(ByteSizeValue bulkSize) {
this.bulkSize = bulkSize;
return this;
}

public Builder flushInterval(TimeValue flushInterval) {
this.flushInterval = flushInterval;
return this;
}

/**
* Builds a new bulk processor.
*/
public Bulk build() {
return new Bulk(this);
}
}

public Bulk(final Builder builder) {
this.bulkActions = builder.bulkActions;
this.bulkSize = builder.bulkSize;
this.concurrentRequests = builder.concurrentRequests;
this.flushInterval = builder.flushInterval;
}

public int getConcurrentRequests() {
return concurrentRequests;
}

public int getBulkActions() {
return bulkActions;
}

public ByteSizeValue getBulkSize() {
return bulkSize;
}

public TimeValue getFlushInterval() {
return flushInterval;
}

}

@SuppressWarnings("unchecked")
public synchronized static MongoDBRiverDefinition parseSettings(String riverName, String riverIndexName, RiverSettings settings,
ScriptService scriptService) {
Expand Down Expand Up @@ -563,21 +639,30 @@ public synchronized static MongoDBRiverDefinition parseSettings(String riverName
Map<String, Object> indexSettings = (Map<String, Object>) settings.settings().get(INDEX_OBJECT);
builder.indexName(XContentMapValues.nodeStringValue(indexSettings.get(NAME_FIELD), builder.mongoDb));
builder.typeName(XContentMapValues.nodeStringValue(indexSettings.get(TYPE_FIELD), builder.mongoDb));
int bulkSize = XContentMapValues.nodeIntegerValue(indexSettings.get(BULK_SIZE_FIELD), 100);
builder.bulkSize(bulkSize);
if (indexSettings.containsKey(BULK_TIMEOUT_FIELD)) {
builder.bulkTimeout(TimeValue.parseTimeValue(
XContentMapValues.nodeStringValue(indexSettings.get(BULK_TIMEOUT_FIELD), "10ms"), TimeValue.timeValueMillis(10)));

Bulk.Builder bulkBuilder = new Bulk.Builder();
if (indexSettings.containsKey(BULK_FIELD)) {
Map<String, Object> bulkSettings = (Map<String, Object>) indexSettings.get(BULK_FIELD);
int bulkActions = XContentMapValues.nodeIntegerValue(bulkSettings.get(ACTIONS_FIELD), DEFAULT_BULK_ACTIONS);
bulkBuilder.bulkActions(bulkActions);
String size = XContentMapValues.nodeStringValue(bulkSettings.get(SIZE_FIELD), DEFAULT_BULK_SIZE.toString());
bulkBuilder.bulkSize(ByteSizeValue.parseBytesSizeValue(size));
bulkBuilder.concurrentRequests(XContentMapValues.nodeIntegerValue(bulkSettings.get(CONCURRENT_REQUESTS_FIELD), DEFAULT_CONCURRENT_REQUESTS));
bulkBuilder.flushInterval(XContentMapValues.nodeTimeValue(bulkSettings.get(FLUSH_INTERVAL_FIELD), DEFAULT_FLUSH_INTERVAL));
builder.throttleSize(XContentMapValues.nodeIntegerValue(indexSettings.get(THROTTLE_SIZE_FIELD), bulkActions * 5));
} else {
builder.bulkTimeout(TimeValue.timeValueMillis(10));
int bulkActions = XContentMapValues.nodeIntegerValue(indexSettings.get(BULK_SIZE_FIELD), DEFAULT_BULK_ACTIONS);
bulkBuilder.bulkActions(bulkActions);
bulkBuilder.bulkSize(DEFAULT_BULK_SIZE);
bulkBuilder.flushInterval(XContentMapValues.nodeTimeValue(indexSettings.get(BULK_TIMEOUT_FIELD), DEFAULT_FLUSH_INTERVAL));
bulkBuilder.concurrentRequests(XContentMapValues.nodeIntegerValue(indexSettings.get(CONCURRENT_BULK_REQUESTS_FIELD), DEFAULT_CONCURRENT_REQUESTS));
builder.throttleSize(XContentMapValues.nodeIntegerValue(indexSettings.get(THROTTLE_SIZE_FIELD), bulkActions * 5));
}
builder.throttleSize(XContentMapValues.nodeIntegerValue(indexSettings.get(THROTTLE_SIZE_FIELD), bulkSize * 5));
builder.bulk(bulkBuilder.build());
} else {
builder.indexName(builder.mongoDb);
builder.typeName(builder.mongoDb);
builder.bulkSize(100);
builder.bulkTimeout(TimeValue.timeValueMillis(10));
builder.throttleSize(builder.bulkSize * 5);
builder.bulk(new Bulk.Builder().build());
}
return builder.build();
}
Expand Down Expand Up @@ -694,10 +779,10 @@ private MongoDBRiverDefinition(final Builder builder) {
// index
this.indexName = builder.indexName;
this.typeName = builder.typeName;
this.bulkSize = builder.bulkSize;
this.bulkTimeout = builder.bulkTimeout;
this.throttleSize = builder.throttleSize;

// bulk
this.bulk = builder.bulk;
}

public List<ServerAddress> getMongoServers() {
Expand Down Expand Up @@ -816,14 +901,6 @@ public String getTypeName() {
return typeName;
}

public int getBulkSize() {
return bulkSize;
}

public TimeValue getBulkTimeout() {
return bulkTimeout;
}

public int getThrottleSize() {
return throttleSize;
}
Expand All @@ -832,4 +909,7 @@ public String getMongoOplogNamespace() {
return getMongoDb() + "." + getMongoCollection();
}

public Bulk getBulk() {
return bulk;
}
}
1 change: 1 addition & 0 deletions src/main/java/org/elasticsearch/river/mongodb/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ public enum Status {

RUNNING,
STOPPED,
IMPORT_FAILED,
INITIAL_IMPORT_FAILED;

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
Expand All @@ -17,6 +19,32 @@

public class MongoDBRiverDefinitionTest {

@Test
public void testLoadMongoDBRiverSimpleDefinition() {
try {
RiverName riverName = new RiverName("mongodb", "mongodb-" + System.currentTimeMillis());
InputStream in = getClass().getResourceAsStream("/org/elasticsearch/river/mongodb/test-mongodb-river-simple-definition.json");
RiverSettings riverSettings = new RiverSettings(ImmutableSettings.settingsBuilder().build(), XContentHelper.convertToMap(
Streams.copyToByteArray(in), false).v2());
ScriptService scriptService = null;
MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName.name(), "my-river-index", riverSettings,
scriptService);
Assert.assertNotNull(definition);
Assert.assertEquals("mydb", definition.getMongoDb());
Assert.assertEquals("mycollection", definition.getMongoCollection());
Assert.assertEquals("myindex", definition.getIndexName());

// Test default bulk values
Assert.assertEquals(MongoDBRiverDefinition.DEFAULT_BULK_ACTIONS, definition.getBulk().getBulkActions());
Assert.assertEquals(MongoDBRiverDefinition.DEFAULT_CONCURRENT_REQUESTS, definition.getBulk().getConcurrentRequests());
Assert.assertEquals(MongoDBRiverDefinition.DEFAULT_BULK_SIZE, definition.getBulk().getBulkSize());
Assert.assertEquals(MongoDBRiverDefinition.DEFAULT_FLUSH_INTERVAL, definition.getBulk().getFlushInterval());

} catch (Throwable t) {
Assert.fail("testLoadMongoDBRiverSimpleDefinition failed", t);
}
}

@Test
public void testLoadMongoDBRiverDefinition() {
try {
Expand All @@ -39,7 +67,49 @@ public void testLoadMongoDBRiverDefinition() {
Assert.assertEquals(0, definition.getSocketTimeout());
Assert.assertEquals(11000, definition.getConnectTimeout());
Assert.assertEquals(riverName.getName(), definition.getRiverName());
Assert.assertEquals(500, definition.getBulkSize());

// Test bulk
Assert.assertEquals(500, definition.getBulk().getBulkActions());
Assert.assertEquals(40, definition.getBulk().getConcurrentRequests());

} catch (Throwable t) {
Assert.fail("testLoadMongoDBRiverDefinition failed", t);
}
}

@Test
public void testLoadMongoDBRiverNewDefinition() {
try {
RiverName riverName = new RiverName("mongodb", "mongodb-" + System.currentTimeMillis());
InputStream in = getClass().getResourceAsStream("/org/elasticsearch/river/mongodb/test-mongodb-river-new-definition.json");
RiverSettings riverSettings = new RiverSettings(ImmutableSettings.settingsBuilder().build(), XContentHelper.convertToMap(
Streams.copyToByteArray(in), false).v2());
ScriptService scriptService = null;
MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName.name(), "my-river-index", riverSettings,
scriptService);
Assert.assertNotNull(definition);
Assert.assertEquals("mycollection", definition.getIncludeCollection());
Assert.assertTrue(definition.getParentTypes().contains("parent1"));
Assert.assertTrue(definition.getParentTypes().contains("parent2"));
Assert.assertFalse(definition.getParentTypes().contains("parent3"));
Assert.assertTrue(definition.isAdvancedTransformation());
Assert.assertEquals("mydatabase", definition.getMongoDb());
Assert.assertEquals("mycollection", definition.getMongoCollection());
Assert.assertEquals("myindex", definition.getIndexName());
Assert.assertEquals(0, definition.getSocketTimeout());
Assert.assertEquals(11000, definition.getConnectTimeout());
Assert.assertEquals(riverName.getName(), definition.getRiverName());

// actions: 500
// size: "20mb",
// concurrent_requests: 40,
// flush_interval: "50ms"

// Test bulk
Assert.assertEquals(500, definition.getBulk().getBulkActions());
Assert.assertEquals(40, definition.getBulk().getConcurrentRequests());
Assert.assertEquals(ByteSizeValue.parseBytesSizeValue("20mb"), definition.getBulk().getBulkSize());
Assert.assertEquals(TimeValue.timeValueMillis(50), definition.getBulk().getFlushInterval());

} catch (Throwable t) {
Assert.fail("testLoadMongoDBRiverDefinition failed", t);
Expand Down
Loading

0 comments on commit 973dba9

Please sign in to comment.