diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/thread/LoggingRunnable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/thread/LoggingRunnable.java new file mode 100644 index 0000000000000..043c58c0fc4c7 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/thread/LoggingRunnable.java @@ -0,0 +1,44 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.thread; + +import org.elasticsearch.common.logging.ESLogger; + +/** + */ +public class LoggingRunnable implements Runnable { + + private final Runnable runnable; + + private final ESLogger logger; + + public LoggingRunnable(ESLogger logger, Runnable runnable) { + this.runnable = runnable; + this.logger = logger; + } + + @Override public void run() { + try { + runnable.run(); + } catch (Exception e) { + logger.warn("failed to execute [{}]", e, runnable.toString()); + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java index 95320418fde39..726bc19cc15a2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java @@ -43,6 +43,7 @@ import org.elasticsearch.common.io.stream.CachedStreamInput; import org.elasticsearch.common.io.stream.LZFStreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.thread.LoggingRunnable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -193,52 +194,7 @@ public LocalGatewayStartedShards currentStartedShards() { // we only write the local metadata if this is a possible master node if (event.state().nodes().localNode().masterNode() && event.metaDataChanged()) { - executor.execute(new Runnable() { - @Override public void run() { - LocalGatewayMetaState.Builder builder = LocalGatewayMetaState.builder(); - if (currentMetaState != null) { - builder.state(currentMetaState); - } - final long version = event.state().metaData().version(); - builder.version(version); - builder.metaData(event.state().metaData()); - - try { - File stateFile = new File(location, "metadata-" + version); - OutputStream fos = new FileOutputStream(stateFile); - if (compress) { - fos = new LZFOutputStream(fos); - } - LocalGatewayMetaState stateToWrite = builder.build(); - XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, fos); - if (prettyPrint) { - xContentBuilder.prettyPrint(); - } - xContentBuilder.startObject(); - LocalGatewayMetaState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); - xContentBuilder.endObject(); - xContentBuilder.close(); - fos.close(); - - FileSystemUtils.syncFile(stateFile); - - currentMetaState = stateToWrite; - - // delete all the other files - File[] files = location.listFiles(new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return name.startsWith("metadata-") && !name.equals("metadata-" + version); - } - }); - for (File file : files) { - file.delete(); - } - - } catch (IOException e) { - logger.warn("failed to write updated state", e); - } - } - }); + executor.execute(new LoggingRunnable(logger, new PersistMetaData(event))); } if (event.state().nodes().localNode().dataNode() && event.routingTableChanged()) { @@ -282,45 +238,7 @@ public LocalGatewayStartedShards currentStartedShards() { // only write if something changed... if (changed) { final LocalGatewayStartedShards stateToWrite = builder.build(); - executor.execute(new Runnable() { - @Override public void run() { - try { - File stateFile = new File(location, "shards-" + event.state().version()); - OutputStream fos = new FileOutputStream(stateFile); - if (compress) { - fos = new LZFOutputStream(fos); - } - - XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, fos); - if (prettyPrint) { - xContentBuilder.prettyPrint(); - } - xContentBuilder.startObject(); - LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); - xContentBuilder.endObject(); - xContentBuilder.close(); - - fos.close(); - - FileSystemUtils.syncFile(stateFile); - - currentStartedShards = stateToWrite; - } catch (IOException e) { - logger.warn("failed to write updated state", e); - return; - } - - // delete all the other files - File[] files = location.listFiles(new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return name.startsWith("shards-") && !name.equals("shards-" + event.state().version()); - } - }); - for (File file : files) { - file.delete(); - } - } - }); + executor.execute(new LoggingRunnable(logger, new PersistShards(event, stateToWrite))); } } } @@ -472,4 +390,109 @@ private LocalGatewayStartedShards readStartedShards(byte[] data) throws IOExcept } } } + + class PersistMetaData implements Runnable { + private final ClusterChangedEvent event; + + public PersistMetaData(ClusterChangedEvent event) { + this.event = event; + } + + @Override public void run() { + LocalGatewayMetaState.Builder builder = LocalGatewayMetaState.builder(); + if (currentMetaState != null) { + builder.state(currentMetaState); + } + final long version = event.state().metaData().version(); + builder.version(version); + builder.metaData(event.state().metaData()); + + try { + File stateFile = new File(location, "metadata-" + version); + OutputStream fos = new FileOutputStream(stateFile); + if (compress) { + fos = new LZFOutputStream(fos); + } + LocalGatewayMetaState stateToWrite = builder.build(); + XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, fos); + if (prettyPrint) { + xContentBuilder.prettyPrint(); + } + xContentBuilder.startObject(); + LocalGatewayMetaState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); + xContentBuilder.endObject(); + xContentBuilder.close(); + fos.close(); + + FileSystemUtils.syncFile(stateFile); + + currentMetaState = stateToWrite; + + // delete all the other files + File[] files = location.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.startsWith("metadata-") && !name.equals("metadata-" + version); + } + }); + if (files != null) { + for (File file : files) { + file.delete(); + } + } + + } catch (IOException e) { + logger.warn("failed to write updated state", e); + } + } + } + + class PersistShards implements Runnable { + private final ClusterChangedEvent event; + private final LocalGatewayStartedShards stateToWrite; + + public PersistShards(ClusterChangedEvent event, LocalGatewayStartedShards stateToWrite) { + this.event = event; + this.stateToWrite = stateToWrite; + } + + @Override public void run() { + try { + File stateFile = new File(location, "shards-" + event.state().version()); + OutputStream fos = new FileOutputStream(stateFile); + if (compress) { + fos = new LZFOutputStream(fos); + } + + XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, fos); + if (prettyPrint) { + xContentBuilder.prettyPrint(); + } + xContentBuilder.startObject(); + LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); + xContentBuilder.endObject(); + xContentBuilder.close(); + + fos.close(); + + FileSystemUtils.syncFile(stateFile); + + currentStartedShards = stateToWrite; + } catch (IOException e) { + logger.warn("failed to write updated state", e); + return; + } + + // delete all the other files + File[] files = location.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.startsWith("shards-") && !name.equals("shards-" + event.state().version()); + } + }); + if (files != null) { + for (File file : files) { + file.delete(); + } + } + } + } }