From 2cdaf6357b525d32b54667dd6ae48dd3a01dfe85 Mon Sep 17 00:00:00 2001 From: kimchy Date: Tue, 2 Nov 2010 12:09:45 +0200 Subject: [PATCH] Rivers: Add a _status doc for each river, closes #468. --- .../elasticsearch/river/RiversService.java | 62 +++++++++++++++---- 1 file changed, 49 insertions(+), 13 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiversService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiversService.java index 1956ee39171cd..e7fdc1129e3cd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiversService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiversService.java @@ -20,6 +20,7 @@ package org.elasticsearch.river; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.Client; @@ -35,6 +36,8 @@ import org.elasticsearch.common.inject.Injectors; import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.river.cluster.RiverClusterChangedEvent; import org.elasticsearch.river.cluster.RiverClusterService; import org.elasticsearch.river.cluster.RiverClusterState; @@ -103,27 +106,60 @@ public class RiversService extends AbstractLifecycleComponent { @Override protected void doClose() throws ElasticSearchException { } - public synchronized River createRiver(RiverName riverName, Map settings) throws ElasticSearchException { + public synchronized void createRiver(RiverName riverName, Map settings) throws ElasticSearchException { if (riversInjectors.containsKey(riverName)) { - throw new RiverException(riverName, "river already exists"); + logger.warn("ignoring river [{}][{}] creation, already exists", riverName.type(), riverName.name()); + return; } logger.debug("creating river [{}][{}]", riverName.type(), riverName.name()); - ModulesBuilder modules = new ModulesBuilder(); - modules.add(new RiverNameModule(riverName)); - modules.add(new RiverModule(riverName, settings, this.settings)); + try { + ModulesBuilder modules = new ModulesBuilder(); + modules.add(new RiverNameModule(riverName)); + modules.add(new RiverModule(riverName, settings, this.settings)); + + Injector indexInjector = modules.createChildInjector(injector); + riversInjectors.put(riverName, indexInjector); + River river = indexInjector.getInstance(River.class); + rivers = MapBuilder.newMapBuilder(rivers).put(riverName, river).immutableMap(); + + + // we need this start so there can be operations done (like creating an index) which can't be + // done on create since Guice can't create two concurrent child injectors + river.start(); + + XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); + builder.field("ok", true); - Injector indexInjector = modules.createChildInjector(injector); - riversInjectors.put(riverName, indexInjector); - River river = indexInjector.getInstance(River.class); - rivers = MapBuilder.newMapBuilder(rivers).put(riverName, river).immutableMap(); + builder.startObject("node"); + builder.field("id", clusterService.localNode().id()); + builder.field("name", clusterService.localNode().name()); + builder.field("transport_address", clusterService.localNode().address().toString()); + builder.endObject(); + builder.endObject(); - // we need this start so there can be operations done (like creating an index) which can't be - // done on create since Guice can't create two concurrent child injectors - river.start(); - return river; + + client.prepareIndex(riverIndexName, riverName.name(), "_status").setSource(builder).execute().actionGet(); + } catch (Exception e) { + logger.warn("failed to create river [{}][{}]", e, riverName.type(), riverName.name()); + + try { + XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); + builder.field("error", ExceptionsHelper.detailedMessage(e)); + + builder.startObject("node"); + builder.field("id", clusterService.localNode().id()); + builder.field("name", clusterService.localNode().name()); + builder.field("transport_address", clusterService.localNode().address().toString()); + builder.endObject(); + + client.prepareIndex(riverIndexName, riverName.name(), "_status").setSource(builder).execute().actionGet(); + } catch (Exception e1) { + logger.warn("failed to write failed status for river creation", e); + } + } } public synchronized void closeRiver(RiverName riverName) throws ElasticSearchException {