Skip to content

Commit

Permalink
Rivers: Add a _status doc for each river, closes elastic#468.
Browse files Browse the repository at this point in the history
  • Loading branch information
kimchy committed Nov 2, 2010
1 parent ef4c445 commit 2cdaf63
Showing 1 changed file with 49 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.river;

import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
Expand All @@ -35,6 +36,8 @@
import org.elasticsearch.common.inject.Injectors;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.river.cluster.RiverClusterChangedEvent;
import org.elasticsearch.river.cluster.RiverClusterService;
import org.elasticsearch.river.cluster.RiverClusterState;
Expand Down Expand Up @@ -103,27 +106,60 @@ public class RiversService extends AbstractLifecycleComponent<RiversService> {
@Override protected void doClose() throws ElasticSearchException {
}

public synchronized River createRiver(RiverName riverName, Map<String, Object> settings) throws ElasticSearchException {
public synchronized void createRiver(RiverName riverName, Map<String, Object> settings) throws ElasticSearchException {
if (riversInjectors.containsKey(riverName)) {
throw new RiverException(riverName, "river already exists");
logger.warn("ignoring river [{}][{}] creation, already exists", riverName.type(), riverName.name());
return;
}

logger.debug("creating river [{}][{}]", riverName.type(), riverName.name());

ModulesBuilder modules = new ModulesBuilder();
modules.add(new RiverNameModule(riverName));
modules.add(new RiverModule(riverName, settings, this.settings));
try {
ModulesBuilder modules = new ModulesBuilder();
modules.add(new RiverNameModule(riverName));
modules.add(new RiverModule(riverName, settings, this.settings));

Injector indexInjector = modules.createChildInjector(injector);
riversInjectors.put(riverName, indexInjector);
River river = indexInjector.getInstance(River.class);
rivers = MapBuilder.newMapBuilder(rivers).put(riverName, river).immutableMap();


// we need this start so there can be operations done (like creating an index) which can't be
// done on create since Guice can't create two concurrent child injectors
river.start();

XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field("ok", true);

Injector indexInjector = modules.createChildInjector(injector);
riversInjectors.put(riverName, indexInjector);
River river = indexInjector.getInstance(River.class);
rivers = MapBuilder.newMapBuilder(rivers).put(riverName, river).immutableMap();
builder.startObject("node");
builder.field("id", clusterService.localNode().id());
builder.field("name", clusterService.localNode().name());
builder.field("transport_address", clusterService.localNode().address().toString());
builder.endObject();

builder.endObject();

// we need this start so there can be operations done (like creating an index) which can't be
// done on create since Guice can't create two concurrent child injectors
river.start();
return river;

client.prepareIndex(riverIndexName, riverName.name(), "_status").setSource(builder).execute().actionGet();
} catch (Exception e) {
logger.warn("failed to create river [{}][{}]", e, riverName.type(), riverName.name());

try {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field("error", ExceptionsHelper.detailedMessage(e));

builder.startObject("node");
builder.field("id", clusterService.localNode().id());
builder.field("name", clusterService.localNode().name());
builder.field("transport_address", clusterService.localNode().address().toString());
builder.endObject();

client.prepareIndex(riverIndexName, riverName.name(), "_status").setSource(builder).execute().actionGet();
} catch (Exception e1) {
logger.warn("failed to write failed status for river creation", e);
}
}
}

public synchronized void closeRiver(RiverName riverName) throws ElasticSearchException {
Expand Down

0 comments on commit 2cdaf63

Please sign in to comment.