diff --git a/README.md b/README.md
index cec43db4..7ddcd0e2 100644
--- a/README.md
+++ b/README.md
@@ -45,6 +45,7 @@ bulk mode ensures high throughput when indexing to Elasticsearch.
| Elasticsearch version | Plugin | Release date |
| ------------------------ | -----------| -------------|
+| 1.3.1 | 1.3.0.3 | Aug 4, 2014 |
| 1.3.1 | 1.3.0.2 | Aug 2, 2014 |
| 1.3.1 | 1.3.0.1 | Jul 31, 2014 |
| 1.3.0 | 1.3.0.0 | Jul 24, 2014 |
@@ -59,7 +60,7 @@ bulk mode ensures high throughput when indexing to Elasticsearch.
## Installation
- ./bin/plugin --install jdbc --url http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-river-jdbc/1.3.0.2/elasticsearch-river-jdbc-1.3.0.2-plugin.zip
+ ./bin/plugin --install jdbc --url http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-river-jdbc/1.3.0.3/elasticsearch-river-jdbc-1.3.0.3-plugin.zip
Do not forget to restart the node after installing.
@@ -74,6 +75,7 @@ Change into this directory to invoke the `./bin/plugin` command line tool.
| File | SHA1 |
| ---------------------------------------------| -----------------------------------------|
+| elasticsearch-river-jdbc-1.3.0.3-plugin.zip | 7e3fe518c716305a7878fddb299f0c263fb5ed4b |
| elasticsearch-river-jdbc-1.3.0.2-plugin.zip | 7f87af3055223d15238da9c81ae95ff6ea0ce934 |
| elasticsearch-river-jdbc-1.3.0.1-plugin.zip | ee58c51acfb4bc2294939c655ff2f790890808bc |
| elasticsearch-river-jdbc-1.3.0.0-plugin.zip | f303bf240e443bbe81ccc614bfad6b4d103eb073 |
diff --git a/pom.xml b/pom.xml
index 6a0f0485..ba9087f0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
org.xbib.elasticsearch.plugin
elasticsearch-river-jdbc
- 1.3.0.2
+ 1.3.0.3
jar
diff --git a/src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/RiverState.java b/src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/RiverState.java
index 306995e4..d4f5e0a8 100644
--- a/src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/RiverState.java
+++ b/src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/RiverState.java
@@ -12,6 +12,9 @@
import static org.elasticsearch.common.collect.Maps.newHashMap;
+/**
+ * A river state represents a point in time when a river has a defined behavior with a set of parameters
+ */
public class RiverState implements Streamable, Comparable {
/**
@@ -29,7 +32,7 @@ public class RiverState implements Streamable, Comparable {
/**
* A custom map for more information about the river
*/
- private Map custom = newHashMap();
+ private Map map = newHashMap();
public RiverState() {
}
@@ -47,67 +50,77 @@ public String getType() {
return type;
}
+ public RiverState setSettings(Settings settings) {
+ this.settings = settings;
+ return this;
+ }
+
+ public Settings getSettings() {
+ return settings;
+ }
+
+ public RiverState setMap(Map map) {
+ this.map = map;
+ return this;
+ }
+
+ public Map getMap() {
+ return map;
+ }
+
public RiverState setStarted(Date started) {
- custom.put("started", started);
+ map.put("started", started);
return this;
}
public Date getStarted() {
- return (Date) custom.get("started");
+ return (Date) map.get("started");
}
public RiverState setCounter(Long counter) {
- custom.put("counter", counter);
+ map.put("counter", counter);
return this;
}
public Long getCounter() {
- return custom.containsKey("counter") ? (Long) custom.get("counter") : 0L;
+ return map.containsKey("counter") ? (Long) map.get("counter") : 0L;
}
public RiverState setTimestamp(Date timestamp) {
- custom.put("timestamp", timestamp);
+ map.put("timestamp", timestamp);
return this;
}
public Date getTimestamp() {
- return (Date) custom.get("timestamp");
+ return (Date) map.get("timestamp");
}
public RiverState setEnabled(Boolean enabled) {
- custom.put("enabled", enabled);
+ map.put("enabled", enabled);
return this;
}
public Boolean isEnabled() {
- return (Boolean) custom.get("enabled");
+ return (Boolean) map.get("enabled");
}
public RiverState setActive(Boolean active) {
- custom.put("active", active);
+ map.put("active", active);
return this;
}
public Boolean isActive() {
- return custom.containsKey("active") ? (Boolean) custom.get("active") : false;
+ return map.containsKey("active") ? (Boolean) map.get("active") : false;
}
- public RiverState setSettings(Settings settings) {
- this.settings = settings;
- return this;
- }
-
- public Settings getSettings() {
- return settings;
- }
public RiverState setCustom(Map custom) {
- custom.put("custom", custom);
+ this.map.put("custom", custom);
return this;
}
public Map getCustom() {
- return (Map) custom.get("custom");
+ return (Map) this.map.get("custom");
}
@Override
@@ -115,7 +128,7 @@ public void readFrom(StreamInput in) throws IOException {
this.name = in.readOptionalString();
this.type = in.readOptionalString();
ImmutableSettings.readSettingsFromStream(in);
- custom = in.readMap();
+ map = in.readMap();
}
@Override
@@ -123,7 +136,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(name);
out.writeOptionalString(type);
ImmutableSettings.writeSettingsToStream(settings, out);
- out.writeMap(custom);
+ out.writeMap(map);
}
@Override
@@ -133,6 +146,6 @@ public int compareTo(RiverState o) {
@Override
public String toString() {
- return "[name="+name+",type="+type+",settings="+settings.getAsMap()+",custom="+custom+"]";
+ return "[name="+name+",type="+type+",settings="+settings.getAsMap()+",map="+map+"]";
}
}
diff --git a/src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/RiverStateModule.java b/src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/RiverStateModule.java
index 6063e572..b0978a8b 100644
--- a/src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/RiverStateModule.java
+++ b/src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/RiverStateModule.java
@@ -5,10 +5,16 @@
public class RiverStateModule extends AbstractModule {
+ /**
+ * Register metadata factory in Elasticsearch
+ */
static {
MetaData.registerFactory(RiverStatesMetaData.TYPE, RiverStatesMetaData.FACTORY);
}
+ /**
+ * Only one RiverStateService instance is allowed
+ */
@Override
protected void configure() {
bind(RiverStateService.class).asEagerSingleton();
diff --git a/src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/RiverStateService.java b/src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/RiverStateService.java
index 7d05021e..5ad93104 100644
--- a/src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/RiverStateService.java
+++ b/src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/RiverStateService.java
@@ -23,6 +23,9 @@
import static org.elasticsearch.common.collect.Lists.newLinkedList;
import static org.elasticsearch.common.collect.Maps.newHashMap;
+/**
+ * The RiverStateService manages reading and writing of river states in the cluster state
+ */
public class RiverStateService extends AbstractComponent implements ClusterStateListener {
private final ClusterService clusterService;
@@ -66,6 +69,11 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}
+ /**
+ * Register a new river for river state management
+ * @param request a river state register request
+ * @param listener listener for cluster state update response
+ */
public void registerRiver(final RegisterRiverStateRequest request, final ActionListener listener) {
final RiverState newRiverMetaData = request.riverState;
clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask(request, listener) {
@@ -120,19 +128,11 @@ public boolean mustAck(DiscoveryNode discoveryNode) {
});
}
- private boolean registerRiver(RiverState riverMetaData) {
- RiverState previous = riverStates.get(riverMetaData.getName());
- if (previous != null) {
- if (!previous.getType().equals(riverMetaData.getType()) && previous.getSettings().equals(riverMetaData.getSettings())) {
- return false;
- }
- }
- Map newRiverStates = newHashMap();
- newRiverStates.put(riverMetaData.getName(), riverMetaData);
- riverStates = ImmutableMap.copyOf(newRiverStates);
- return true;
- }
-
+ /**
+ * Unregister river from river state management
+ * @param request the unregister river state request
+ * @param listener listener for cluster state updates
+ */
public void unregisterRiver(final UnregisterRiverStateRequest request, final ActionListener listener) {
clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask(request, listener) {
@Override
@@ -171,6 +171,20 @@ public boolean mustAck(DiscoveryNode discoveryNode) {
});
}
+ private boolean registerRiver(RiverState riverMetaData) {
+ RiverState previous = riverStates.get(riverMetaData.getName());
+ if (previous != null) {
+ if (!previous.getType().equals(riverMetaData.getType()) && previous.getSettings().equals(riverMetaData.getSettings())) {
+ return false;
+ }
+ }
+ Map newRiverStates = newHashMap();
+ newRiverStates.put(riverMetaData.getName(), riverMetaData);
+ riverStates = ImmutableMap.copyOf(newRiverStates);
+ return true;
+ }
+
+
public static class RegisterRiverStateRequest extends ClusterStateUpdateRequest {
final String cause;
diff --git a/src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/RiverStatesMetaData.java b/src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/RiverStatesMetaData.java
index 4c36d434..2cd76820 100644
--- a/src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/RiverStatesMetaData.java
+++ b/src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/RiverStatesMetaData.java
@@ -103,7 +103,7 @@ public RiverStatesMetaData fromXContent(XContentParser parser) throws IOExceptio
}
String type = null;
Settings settings = ImmutableSettings.EMPTY;
- Map custom = newHashMap();
+ Map map = newHashMap();
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
String currentFieldName = parser.currentName();
@@ -120,11 +120,11 @@ public RiverStatesMetaData fromXContent(XContentParser parser) throws IOExceptio
}
settings = ImmutableSettings.settingsBuilder().put(SettingsLoader.Helper.loadNestedFromMap(parser.mapOrdered())).build();
break;
- case "custom":
+ case "map":
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new ElasticsearchParseException("failed to parse river [" + name + "], incompatible params");
}
- custom = parser.mapOrdered();
+ map = parser.mapOrdered();
break;
default:
throw new ElasticsearchParseException("failed to parse river [" + name + "], unknown field [" + currentFieldName + "]");
@@ -136,7 +136,7 @@ public RiverStatesMetaData fromXContent(XContentParser parser) throws IOExceptio
if (type == null) {
throw new ElasticsearchParseException("failed to parse river [" + name + "], missing river type");
}
- river.add(new RiverState(name, type).setSettings(settings).setCustom(custom));
+ river.add(new RiverState(name, type).setSettings(settings).setMap(map));
} else {
throw new ElasticsearchParseException("failed to parse rivers");
}
@@ -159,7 +159,7 @@ public void toXContent(RiverState river, XContentBuilder builder, ToXContent.Par
builder.field(settingEntry.getKey(), settingEntry.getValue());
}
builder.endObject();
- builder.field("custom", river.getCustom());
+ builder.field("map").map(river.getMap());
builder.endObject();
}
diff --git a/src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/StatefulRiver.java b/src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/StatefulRiver.java
index bfdbe337..db4b2538 100644
--- a/src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/StatefulRiver.java
+++ b/src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/StatefulRiver.java
@@ -2,6 +2,9 @@
import org.elasticsearch.river.River;
+/**
+ * A stateful river is an extension of a river with a river state
+ */
public interface StatefulRiver extends River {
RiverState getRiverState();
diff --git a/src/main/java/org/xbib/elasticsearch/plugin/feeder/AbstractFeeder.java b/src/main/java/org/xbib/elasticsearch/plugin/feeder/AbstractFeeder.java
index c2bb2c71..b1db3d6d 100644
--- a/src/main/java/org/xbib/elasticsearch/plugin/feeder/AbstractFeeder.java
+++ b/src/main/java/org/xbib/elasticsearch/plugin/feeder/AbstractFeeder.java
@@ -13,7 +13,7 @@
import org.xbib.cron.CronThreadPoolExecutor;
import org.xbib.elasticsearch.action.river.jdbc.state.RiverState;
import org.xbib.elasticsearch.support.client.Ingest;
-import org.xbib.elasticsearch.support.client.node.NodeClient;
+import org.xbib.elasticsearch.support.client.node.BulkNodeClient;
import org.xbib.io.URIUtil;
import org.xbib.pipeline.AbstractPipeline;
import org.xbib.pipeline.Pipeline;
@@ -200,7 +200,7 @@ public Feeder setClient(Client client) {
Runtime.getRuntime().availableProcessors());
ByteSizeValue maxvolume = settings.getAsBytesSize("maxbulkvolume", ByteSizeValue.parseBytesSizeValue("10m"));
TimeValue maxrequestwait = settings.getAsTime("maxrequestwait", TimeValue.timeValueSeconds(60));
- this.ingest = new NodeClient()
+ this.ingest = new BulkNodeClient()
.maxActionsPerBulkRequest(maxbulkactions)
.maxConcurrentBulkRequests(maxconcurrentbulkrequests)
.maxRequestWait(maxrequestwait)
diff --git a/src/main/java/org/xbib/elasticsearch/plugin/feeder/jdbc/JDBCFeeder.java b/src/main/java/org/xbib/elasticsearch/plugin/feeder/jdbc/JDBCFeeder.java
index 46493366..a38467ba 100644
--- a/src/main/java/org/xbib/elasticsearch/plugin/feeder/jdbc/JDBCFeeder.java
+++ b/src/main/java/org/xbib/elasticsearch/plugin/feeder/jdbc/JDBCFeeder.java
@@ -58,9 +58,16 @@ public class JDBCFeeder>
*/
private String defaultIndex;
+ /**
+ * Constructor
+ */
public JDBCFeeder() {
}
+ /**
+ * Constructor for execution from pipeline
+ * @param feeder the feeder to inherit from
+ */
public JDBCFeeder(JDBCFeeder feeder) {
super(feeder);
this.name = feeder.getName();
@@ -82,11 +89,20 @@ public String getType() {
return "jdbc";
}
+ /**
+ * The name of the feeder
+ * @param name the feeder name
+ * @return this feeder
+ */
public Feeder setName(String name) {
this.name = name;
return this;
}
+ /**
+ * Get name of feeder
+ * @return the name
+ */
public String getName() {
return name;
}
@@ -146,7 +162,6 @@ public void executeTask(Map map) throws Exception {
GetRiverStateResponse getRiverStateResponse = getRiverStateRequestBuilder.execute().actionGet();
riverState = getRiverStateResponse.getState();
logger.debug("got river state");
- riverState.setCustom(riverContext.asMap());
Long counter = riverState.getCounter() + 1;
this.riverState = riverState.setCounter(counter)
.setEnabled(true)
diff --git a/src/main/java/org/xbib/elasticsearch/rest/action/river/state/get/RestRiverGetStateAction.java b/src/main/java/org/xbib/elasticsearch/rest/action/river/state/get/RestRiverGetStateAction.java
index 9c83336c..efd475be 100644
--- a/src/main/java/org/xbib/elasticsearch/rest/action/river/state/get/RestRiverGetStateAction.java
+++ b/src/main/java/org/xbib/elasticsearch/rest/action/river/state/get/RestRiverGetStateAction.java
@@ -23,6 +23,7 @@ public class RestRiverGetStateAction extends BaseRestHandler {
@Inject
public RestRiverGetStateAction(Settings settings, Client client, RestController controller) {
super(settings, client);
+
controller.registerHandler(RestRequest.Method.GET, "/_river/jdbc/{riverName}/_state", this);
}
@@ -31,10 +32,7 @@ public void handleRequest(RestRequest request, RestChannel channel, Client clien
String riverName = request.param("riverName");
String riverType = "jdbc";
GetRiverStateRequest riverStateRequest = new GetRiverStateRequest();
- riverStateRequest.setRiverName(riverName)
- .setRiverType(riverType)
- .masterNodeTimeout(request.paramAsTime("master_timeout", riverStateRequest.masterNodeTimeout()))
- .local(request.paramAsBoolean("local", riverStateRequest.local()));
+ riverStateRequest.setRiverName(riverName).setRiverType(riverType);
client.admin().cluster().execute(GetRiverStateAction.INSTANCE, riverStateRequest, new RestBuilderListener(channel) {
@Override
@@ -42,12 +40,12 @@ public RestResponse buildResponse(GetRiverStateResponse getRiverStateResponse, X
builder.startObject();
builder.startArray("state");
for (RiverState state : getRiverStateResponse.getStates()) {
- builder.startObject();
- builder.field("name", state.getName());
- builder.field("type", state.getType());
- builder.field("settings", state.getSettings().getAsMap());
- builder.field("custom", state.getCustom());
- builder.endObject();
+ builder.startObject()
+ .field("name", state.getName())
+ .field("type", state.getType())
+ .field("settings", state.getSettings().getAsMap())
+ .field("map").map(state.getMap())
+ .endObject();
}
builder.endArray().endObject();
return new BytesRestResponse(OK, builder);
diff --git a/src/main/java/org/xbib/elasticsearch/support/client/node/NodeClient.java b/src/main/java/org/xbib/elasticsearch/support/client/node/BulkNodeClient.java
similarity index 89%
rename from src/main/java/org/xbib/elasticsearch/support/client/node/NodeClient.java
rename to src/main/java/org/xbib/elasticsearch/support/client/node/BulkNodeClient.java
index 482a2197..50228280 100644
--- a/src/main/java/org/xbib/elasticsearch/support/client/node/NodeClient.java
+++ b/src/main/java/org/xbib/elasticsearch/support/client/node/BulkNodeClient.java
@@ -34,9 +34,9 @@
/**
* Node client support
*/
-public class NodeClient implements Ingest {
+public class BulkNodeClient implements Ingest {
- private final static ESLogger logger = ESLoggerFactory.getLogger(NodeClient.class.getSimpleName());
+ private final static ESLogger logger = ESLoggerFactory.getLogger(BulkNodeClient.class.getSimpleName());
private int maxActionsPerBulkRequest = 100;
@@ -61,53 +61,53 @@ public class NodeClient implements Ingest {
private Throwable throwable;
@Override
- public NodeClient shards(int shards) {
+ public BulkNodeClient shards(int shards) {
configHelper.setting("index.number_of_shards", shards);
return this;
}
@Override
- public NodeClient replica(int replica) {
+ public BulkNodeClient replica(int replica) {
configHelper.setting("index.number_of_replica", replica);
return this;
}
@Override
- public NodeClient maxActionsPerBulkRequest(int maxActionsPerBulkRequest) {
+ public BulkNodeClient maxActionsPerBulkRequest(int maxActionsPerBulkRequest) {
this.maxActionsPerBulkRequest = maxActionsPerBulkRequest;
return this;
}
@Override
- public NodeClient maxConcurrentBulkRequests(int maxConcurrentBulkRequests) {
+ public BulkNodeClient maxConcurrentBulkRequests(int maxConcurrentBulkRequests) {
this.maxConcurrentBulkRequests = maxConcurrentBulkRequests;
return this;
}
@Override
- public NodeClient maxVolumePerBulkRequest(ByteSizeValue maxVolume) {
+ public BulkNodeClient maxVolumePerBulkRequest(ByteSizeValue maxVolume) {
this.maxVolume = maxVolume;
return this;
}
@Override
- public NodeClient maxRequestWait(TimeValue timeValue) {
+ public BulkNodeClient maxRequestWait(TimeValue timeValue) {
// ignore, not implemented
return this;
}
@Override
- public NodeClient flushIngestInterval(TimeValue flushInterval) {
+ public BulkNodeClient flushIngestInterval(TimeValue flushInterval) {
this.flushInterval = flushInterval;
return this;
}
@Override
- public NodeClient newClient(URI uri) {
+ public BulkNodeClient newClient(URI uri) {
throw new UnsupportedOperationException();
}
- public NodeClient newClient(Client client) {
+ public BulkNodeClient newClient(Client client) {
this.client = client;
this.state = new State();
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@@ -190,7 +190,7 @@ public State getState() {
}
@Override
- public NodeClient putMapping(String index) {
+ public BulkNodeClient putMapping(String index) {
if (client == null) {
logger.warn("no client for put mapping");
return this;
@@ -200,7 +200,7 @@ public NodeClient putMapping(String index) {
}
@Override
- public NodeClient deleteMapping(String index, String type) {
+ public BulkNodeClient deleteMapping(String index, String type) {
if (client == null) {
logger.warn("no client for delete mapping");
return this;
@@ -210,7 +210,7 @@ public NodeClient deleteMapping(String index, String type) {
}
@Override
- public NodeClient index(String index, String type, String id, String source) {
+ public BulkNodeClient index(String index, String type, String id, String source) {
if (closed) {
throw new ElasticsearchIllegalStateException("client is closed");
}
@@ -232,7 +232,7 @@ public NodeClient index(String index, String type, String id, String source) {
}
@Override
- public NodeClient bulkIndex(IndexRequest indexRequest) {
+ public BulkNodeClient bulkIndex(IndexRequest indexRequest) {
if (closed) {
throw new ElasticsearchIllegalStateException("client is closed");
}
@@ -254,7 +254,7 @@ public NodeClient bulkIndex(IndexRequest indexRequest) {
}
@Override
- public NodeClient delete(String index, String type, String id) {
+ public BulkNodeClient delete(String index, String type, String id) {
if (closed) {
throw new ElasticsearchIllegalStateException("client is closed");
}
@@ -276,7 +276,7 @@ public NodeClient delete(String index, String type, String id) {
}
@Override
- public NodeClient bulkDelete(DeleteRequest deleteRequest) {
+ public BulkNodeClient bulkDelete(DeleteRequest deleteRequest) {
if (closed) {
throw new ElasticsearchIllegalStateException("client is closed");
}
@@ -298,7 +298,7 @@ public NodeClient bulkDelete(DeleteRequest deleteRequest) {
}
@Override
- public NodeClient flushIngest() {
+ public BulkNodeClient flushIngest() {
if (closed) {
throw new ElasticsearchIllegalStateException("client is closed");
}
@@ -308,7 +308,7 @@ public NodeClient flushIngest() {
}
@Override
- public NodeClient waitForResponses(TimeValue maxWaitTime) throws InterruptedException {
+ public BulkNodeClient waitForResponses(TimeValue maxWaitTime) throws InterruptedException {
if (closed) {
throw new ElasticsearchIllegalStateException("client is closed");
}
@@ -317,7 +317,7 @@ public NodeClient waitForResponses(TimeValue maxWaitTime) throws InterruptedExce
}
@Override
- public NodeClient startBulk(String index) throws IOException {
+ public BulkNodeClient startBulk(String index) throws IOException {
if (state == null) {
return this;
}
@@ -329,7 +329,7 @@ public NodeClient startBulk(String index) throws IOException {
}
@Override
- public NodeClient stopBulk(String index) throws IOException {
+ public BulkNodeClient stopBulk(String index) throws IOException {
if (state == null) {
return this;
}
@@ -341,13 +341,13 @@ public NodeClient stopBulk(String index) throws IOException {
}
@Override
- public NodeClient flush(String index) {
+ public BulkNodeClient flush(String index) {
ClientHelper.flush(client, index);
return this;
}
@Override
- public NodeClient refresh(String index) {
+ public BulkNodeClient refresh(String index) {
ClientHelper.refresh(client, index);
return this;
}
@@ -359,7 +359,7 @@ public int updateReplicaLevel(String index, int level) throws IOException {
@Override
- public NodeClient waitForCluster(ClusterHealthStatus status, TimeValue timeout) throws IOException {
+ public BulkNodeClient waitForCluster(ClusterHealthStatus status, TimeValue timeout) throws IOException {
ClientHelper.waitForCluster(client, status, timeout);
return this;
}
@@ -391,7 +391,7 @@ public synchronized void shutdown() {
}
@Override
- public NodeClient newIndex(String index) {
+ public BulkNodeClient newIndex(String index) {
if (closed) {
throw new ElasticsearchIllegalStateException("client is closed");
}
@@ -423,7 +423,7 @@ public NodeClient newIndex(String index) {
}
@Override
- public NodeClient deleteIndex(String index) {
+ public BulkNodeClient deleteIndex(String index) {
if (closed) {
throw new ElasticsearchIllegalStateException("client is closed");
}
diff --git a/src/main/java/org/xbib/elasticsearch/support/river/RiverHelper.java b/src/main/java/org/xbib/elasticsearch/support/river/RiverHelper.java
index acf413f8..cfe60ca7 100644
--- a/src/main/java/org/xbib/elasticsearch/support/river/RiverHelper.java
+++ b/src/main/java/org/xbib/elasticsearch/support/river/RiverHelper.java
@@ -20,10 +20,6 @@ public class RiverHelper {
private RiverHelper() {
}
- public static void waitForRiverEnabled(ClusterAdminClient client, String riverName, String riverType) throws InterruptedException, IOException {
- waitForRiverEnabled(client, riverName, riverType, 15);
- }
-
public static void waitForRiverEnabled(ClusterAdminClient client, String riverName, String riverType, int seconds) throws InterruptedException, IOException {
GetRiverStateRequest riverStateRequest = new GetRiverStateRequest()
.setRiverName(riverName)
@@ -58,10 +54,6 @@ private static boolean isEnabled(String riverName, GetRiverStateResponse riverSt
return false;
}
- public static void waitForActiveRiver(ClusterAdminClient client, String riverName, String riverType) throws InterruptedException, IOException {
- waitForInactiveRiver(client, riverName, riverType, 30);
- }
-
public static void waitForActiveRiver(ClusterAdminClient client, String riverName, String riverType, int seconds) throws InterruptedException, IOException {
GetRiverStateRequest riverStateRequest = new GetRiverStateRequest()
.setRiverName(riverName)
@@ -81,10 +73,6 @@ public static void waitForActiveRiver(ClusterAdminClient client, String riverNam
}
}
- public static void waitForInactiveRiver(ClusterAdminClient client, String riverName, String riverType) throws InterruptedException, IOException {
- waitForInactiveRiver(client, riverName, riverType, 30);
- }
-
public static void waitForInactiveRiver(ClusterAdminClient client, String riverName, String riverType, int seconds) throws InterruptedException, IOException {
GetRiverStateRequest riverStateRequest = new GetRiverStateRequest()
.setRiverName(riverName)