Skip to content

Commit

Permalink
fix for broken river state API
Browse files Browse the repository at this point in the history
  • Loading branch information
jprante committed Aug 4, 2014
1 parent 47b0923 commit e606421
Show file tree
Hide file tree
Showing 12 changed files with 134 additions and 95 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ bulk mode ensures high throughput when indexing to Elasticsearch.

| Elasticsearch version | Plugin | Release date |
| ------------------------ | -----------| -------------|
| 1.3.1 | 1.3.0.3 | Aug 4, 2014 |
| 1.3.1 | 1.3.0.2 | Aug 2, 2014 |
| 1.3.1 | 1.3.0.1 | Jul 31, 2014 |
| 1.3.0 | 1.3.0.0 | Jul 24, 2014 |
Expand All @@ -59,7 +60,7 @@ bulk mode ensures high throughput when indexing to Elasticsearch.

## Installation

./bin/plugin --install jdbc --url http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-river-jdbc/1.3.0.2/elasticsearch-river-jdbc-1.3.0.2-plugin.zip
./bin/plugin --install jdbc --url http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-river-jdbc/1.3.0.3/elasticsearch-river-jdbc-1.3.0.3-plugin.zip

Do not forget to restart the node after installing.

Expand All @@ -74,6 +75,7 @@ Change into this directory to invoke the `./bin/plugin` command line tool.

| File | SHA1 |
| ---------------------------------------------| -----------------------------------------|
| elasticsearch-river-jdbc-1.3.0.3-plugin.zip | 7e3fe518c716305a7878fddb299f0c263fb5ed4b |
| elasticsearch-river-jdbc-1.3.0.2-plugin.zip | 7f87af3055223d15238da9c81ae95ff6ea0ce934 |
| elasticsearch-river-jdbc-1.3.0.1-plugin.zip | ee58c51acfb4bc2294939c655ff2f790890808bc |
| elasticsearch-river-jdbc-1.3.0.0-plugin.zip | f303bf240e443bbe81ccc614bfad6b4d103eb073 |
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<groupId>org.xbib.elasticsearch.plugin</groupId>
<artifactId>elasticsearch-river-jdbc</artifactId>
<version>1.3.0.2</version>
<version>1.3.0.3</version>

<packaging>jar</packaging>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

import static org.elasticsearch.common.collect.Maps.newHashMap;

/**
* A river state represents a point in time when a river has a defined behavior with a set of parameters
*/
public class RiverState implements Streamable, Comparable<RiverState> {

/**
Expand All @@ -29,7 +32,7 @@ public class RiverState implements Streamable, Comparable<RiverState> {
/**
* A custom map for more information about the river
*/
private Map<String, Object> custom = newHashMap();
private Map<String, Object> map = newHashMap();

public RiverState() {
}
Expand All @@ -47,83 +50,93 @@ public String getType() {
return type;
}

public RiverState setSettings(Settings settings) {
this.settings = settings;
return this;
}

public Settings getSettings() {
return settings;
}

public RiverState setMap(Map<String,Object> map) {
this.map = map;
return this;
}

public Map<String, Object> getMap() {
return map;
}

public RiverState setStarted(Date started) {
custom.put("started", started);
map.put("started", started);
return this;
}

public Date getStarted() {
return (Date) custom.get("started");
return (Date) map.get("started");
}

public RiverState setCounter(Long counter) {
custom.put("counter", counter);
map.put("counter", counter);
return this;
}

public Long getCounter() {
return custom.containsKey("counter") ? (Long) custom.get("counter") : 0L;
return map.containsKey("counter") ? (Long) map.get("counter") : 0L;
}

public RiverState setTimestamp(Date timestamp) {
custom.put("timestamp", timestamp);
map.put("timestamp", timestamp);
return this;
}

public Date getTimestamp() {
return (Date) custom.get("timestamp");
return (Date) map.get("timestamp");
}

public RiverState setEnabled(Boolean enabled) {
custom.put("enabled", enabled);
map.put("enabled", enabled);
return this;
}

public Boolean isEnabled() {
return (Boolean) custom.get("enabled");
return (Boolean) map.get("enabled");
}

public RiverState setActive(Boolean active) {
custom.put("active", active);
map.put("active", active);
return this;
}

public Boolean isActive() {
return custom.containsKey("active") ? (Boolean) custom.get("active") : false;
return map.containsKey("active") ? (Boolean) map.get("active") : false;
}

public RiverState setSettings(Settings settings) {
this.settings = settings;
return this;
}

public Settings getSettings() {
return settings;
}

public RiverState setCustom(Map<String, Object> custom) {
custom.put("custom", custom);
this.map.put("custom", custom);
return this;
}

public Map<String, Object> getCustom() {
return (Map<String, Object>) custom.get("custom");
return (Map<String, Object>) this.map.get("custom");
}

@Override
public void readFrom(StreamInput in) throws IOException {
this.name = in.readOptionalString();
this.type = in.readOptionalString();
ImmutableSettings.readSettingsFromStream(in);
custom = in.readMap();
map = in.readMap();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(name);
out.writeOptionalString(type);
ImmutableSettings.writeSettingsToStream(settings, out);
out.writeMap(custom);
out.writeMap(map);
}

@Override
Expand All @@ -133,6 +146,6 @@ public int compareTo(RiverState o) {

@Override
public String toString() {
return "[name="+name+",type="+type+",settings="+settings.getAsMap()+",custom="+custom+"]";
return "[name="+name+",type="+type+",settings="+settings.getAsMap()+",map="+map+"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@

public class RiverStateModule extends AbstractModule {

/**
* Register metadata factory in Elasticsearch
*/
static {
MetaData.registerFactory(RiverStatesMetaData.TYPE, RiverStatesMetaData.FACTORY);
}

/**
* Only one RiverStateService instance is allowed
*/
@Override
protected void configure() {
bind(RiverStateService.class).asEagerSingleton();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import static org.elasticsearch.common.collect.Lists.newLinkedList;
import static org.elasticsearch.common.collect.Maps.newHashMap;

/**
* The RiverStateService manages reading and writing of river states in the cluster state
*/
public class RiverStateService extends AbstractComponent implements ClusterStateListener {

private final ClusterService clusterService;
Expand Down Expand Up @@ -66,6 +69,11 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}

/**
* Register a new river for river state management
* @param request a river state register request
* @param listener listener for cluster state update response
*/
public void registerRiver(final RegisterRiverStateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
final RiverState newRiverMetaData = request.riverState;
clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
Expand Down Expand Up @@ -120,19 +128,11 @@ public boolean mustAck(DiscoveryNode discoveryNode) {
});
}

private boolean registerRiver(RiverState riverMetaData) {
RiverState previous = riverStates.get(riverMetaData.getName());
if (previous != null) {
if (!previous.getType().equals(riverMetaData.getType()) && previous.getSettings().equals(riverMetaData.getSettings())) {
return false;
}
}
Map<String, RiverState> newRiverStates = newHashMap();
newRiverStates.put(riverMetaData.getName(), riverMetaData);
riverStates = ImmutableMap.copyOf(newRiverStates);
return true;
}

/**
* Unregister river from river state management
* @param request the unregister river state request
* @param listener listener for cluster state updates
*/
public void unregisterRiver(final UnregisterRiverStateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
@Override
Expand Down Expand Up @@ -171,6 +171,20 @@ public boolean mustAck(DiscoveryNode discoveryNode) {
});
}

private boolean registerRiver(RiverState riverMetaData) {
RiverState previous = riverStates.get(riverMetaData.getName());
if (previous != null) {
if (!previous.getType().equals(riverMetaData.getType()) && previous.getSettings().equals(riverMetaData.getSettings())) {
return false;
}
}
Map<String, RiverState> newRiverStates = newHashMap();
newRiverStates.put(riverMetaData.getName(), riverMetaData);
riverStates = ImmutableMap.copyOf(newRiverStates);
return true;
}


public static class RegisterRiverStateRequest extends ClusterStateUpdateRequest<RegisterRiverStateRequest> {

final String cause;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public RiverStatesMetaData fromXContent(XContentParser parser) throws IOExceptio
}
String type = null;
Settings settings = ImmutableSettings.EMPTY;
Map<String,Object> custom = newHashMap();
Map<String,Object> map = newHashMap();
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
String currentFieldName = parser.currentName();
Expand All @@ -120,11 +120,11 @@ public RiverStatesMetaData fromXContent(XContentParser parser) throws IOExceptio
}
settings = ImmutableSettings.settingsBuilder().put(SettingsLoader.Helper.loadNestedFromMap(parser.mapOrdered())).build();
break;
case "custom":
case "map":
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new ElasticsearchParseException("failed to parse river [" + name + "], incompatible params");
}
custom = parser.mapOrdered();
map = parser.mapOrdered();
break;
default:
throw new ElasticsearchParseException("failed to parse river [" + name + "], unknown field [" + currentFieldName + "]");
Expand All @@ -136,7 +136,7 @@ public RiverStatesMetaData fromXContent(XContentParser parser) throws IOExceptio
if (type == null) {
throw new ElasticsearchParseException("failed to parse river [" + name + "], missing river type");
}
river.add(new RiverState(name, type).setSettings(settings).setCustom(custom));
river.add(new RiverState(name, type).setSettings(settings).setMap(map));
} else {
throw new ElasticsearchParseException("failed to parse rivers");
}
Expand All @@ -159,7 +159,7 @@ public void toXContent(RiverState river, XContentBuilder builder, ToXContent.Par
builder.field(settingEntry.getKey(), settingEntry.getValue());
}
builder.endObject();
builder.field("custom", river.getCustom());
builder.field("map").map(river.getMap());
builder.endObject();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import org.elasticsearch.river.River;

/**
* A stateful river is an extension of a river with a river state
*/
public interface StatefulRiver extends River {

RiverState getRiverState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.xbib.cron.CronThreadPoolExecutor;
import org.xbib.elasticsearch.action.river.jdbc.state.RiverState;
import org.xbib.elasticsearch.support.client.Ingest;
import org.xbib.elasticsearch.support.client.node.NodeClient;
import org.xbib.elasticsearch.support.client.node.BulkNodeClient;
import org.xbib.io.URIUtil;
import org.xbib.pipeline.AbstractPipeline;
import org.xbib.pipeline.Pipeline;
Expand Down Expand Up @@ -200,7 +200,7 @@ public Feeder<T, R, P> setClient(Client client) {
Runtime.getRuntime().availableProcessors());
ByteSizeValue maxvolume = settings.getAsBytesSize("maxbulkvolume", ByteSizeValue.parseBytesSizeValue("10m"));
TimeValue maxrequestwait = settings.getAsTime("maxrequestwait", TimeValue.timeValueSeconds(60));
this.ingest = new NodeClient()
this.ingest = new BulkNodeClient()
.maxActionsPerBulkRequest(maxbulkactions)
.maxConcurrentBulkRequests(maxconcurrentbulkrequests)
.maxRequestWait(maxrequestwait)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,16 @@ public class JDBCFeeder<T, R extends PipelineRequest, P extends Pipeline<T, R>>
*/
private String defaultIndex;

/**
* Constructor
*/
public JDBCFeeder() {
}

/**
* Constructor for execution from pipeline
* @param feeder the feeder to inherit from
*/
public JDBCFeeder(JDBCFeeder feeder) {
super(feeder);
this.name = feeder.getName();
Expand All @@ -82,11 +89,20 @@ public String getType() {
return "jdbc";
}

/**
* The name of the feeder
* @param name the feeder name
* @return this feeder
*/
public Feeder<T, R, P> setName(String name) {
this.name = name;
return this;
}

/**
* Get name of feeder
* @return the name
*/
public String getName() {
return name;
}
Expand Down Expand Up @@ -146,7 +162,6 @@ public void executeTask(Map<String, Object> map) throws Exception {
GetRiverStateResponse getRiverStateResponse = getRiverStateRequestBuilder.execute().actionGet();
riverState = getRiverStateResponse.getState();
logger.debug("got river state");
riverState.setCustom(riverContext.asMap());
Long counter = riverState.getCounter() + 1;
this.riverState = riverState.setCounter(counter)
.setEnabled(true)
Expand Down

0 comments on commit e606421

Please sign in to comment.