Skip to content

Commit

Permalink
better logging in case of state persistence failure
Browse files Browse the repository at this point in the history
  • Loading branch information
kimchy committed Aug 4, 2011
1 parent 1e6dbc5 commit cbb95de
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 85 deletions.
@@ -0,0 +1,44 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.thread;

import org.elasticsearch.common.logging.ESLogger;

/**
*/
public class LoggingRunnable implements Runnable {

private final Runnable runnable;

private final ESLogger logger;

public LoggingRunnable(ESLogger logger, Runnable runnable) {
this.runnable = runnable;
this.logger = logger;
}

@Override public void run() {
try {
runnable.run();
} catch (Exception e) {
logger.warn("failed to execute [{}]", e, runnable.toString());
}
}
}
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.LZFStreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.thread.LoggingRunnable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand Down Expand Up @@ -193,52 +194,7 @@ public LocalGatewayStartedShards currentStartedShards() {

// we only write the local metadata if this is a possible master node
if (event.state().nodes().localNode().masterNode() && event.metaDataChanged()) {
executor.execute(new Runnable() {
@Override public void run() {
LocalGatewayMetaState.Builder builder = LocalGatewayMetaState.builder();
if (currentMetaState != null) {
builder.state(currentMetaState);
}
final long version = event.state().metaData().version();
builder.version(version);
builder.metaData(event.state().metaData());

try {
File stateFile = new File(location, "metadata-" + version);
OutputStream fos = new FileOutputStream(stateFile);
if (compress) {
fos = new LZFOutputStream(fos);
}
LocalGatewayMetaState stateToWrite = builder.build();
XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, fos);
if (prettyPrint) {
xContentBuilder.prettyPrint();
}
xContentBuilder.startObject();
LocalGatewayMetaState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject();
xContentBuilder.close();
fos.close();

FileSystemUtils.syncFile(stateFile);

currentMetaState = stateToWrite;

// delete all the other files
File[] files = location.listFiles(new FilenameFilter() {
@Override public boolean accept(File dir, String name) {
return name.startsWith("metadata-") && !name.equals("metadata-" + version);
}
});
for (File file : files) {
file.delete();
}

} catch (IOException e) {
logger.warn("failed to write updated state", e);
}
}
});
executor.execute(new LoggingRunnable(logger, new PersistMetaData(event)));
}

if (event.state().nodes().localNode().dataNode() && event.routingTableChanged()) {
Expand Down Expand Up @@ -282,45 +238,7 @@ public LocalGatewayStartedShards currentStartedShards() {
// only write if something changed...
if (changed) {
final LocalGatewayStartedShards stateToWrite = builder.build();
executor.execute(new Runnable() {
@Override public void run() {
try {
File stateFile = new File(location, "shards-" + event.state().version());
OutputStream fos = new FileOutputStream(stateFile);
if (compress) {
fos = new LZFOutputStream(fos);
}

XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, fos);
if (prettyPrint) {
xContentBuilder.prettyPrint();
}
xContentBuilder.startObject();
LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject();
xContentBuilder.close();

fos.close();

FileSystemUtils.syncFile(stateFile);

currentStartedShards = stateToWrite;
} catch (IOException e) {
logger.warn("failed to write updated state", e);
return;
}

// delete all the other files
File[] files = location.listFiles(new FilenameFilter() {
@Override public boolean accept(File dir, String name) {
return name.startsWith("shards-") && !name.equals("shards-" + event.state().version());
}
});
for (File file : files) {
file.delete();
}
}
});
executor.execute(new LoggingRunnable(logger, new PersistShards(event, stateToWrite)));
}
}
}
Expand Down Expand Up @@ -472,4 +390,109 @@ private LocalGatewayStartedShards readStartedShards(byte[] data) throws IOExcept
}
}
}

class PersistMetaData implements Runnable {
private final ClusterChangedEvent event;

public PersistMetaData(ClusterChangedEvent event) {
this.event = event;
}

@Override public void run() {
LocalGatewayMetaState.Builder builder = LocalGatewayMetaState.builder();
if (currentMetaState != null) {
builder.state(currentMetaState);
}
final long version = event.state().metaData().version();
builder.version(version);
builder.metaData(event.state().metaData());

try {
File stateFile = new File(location, "metadata-" + version);
OutputStream fos = new FileOutputStream(stateFile);
if (compress) {
fos = new LZFOutputStream(fos);
}
LocalGatewayMetaState stateToWrite = builder.build();
XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, fos);
if (prettyPrint) {
xContentBuilder.prettyPrint();
}
xContentBuilder.startObject();
LocalGatewayMetaState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject();
xContentBuilder.close();
fos.close();

FileSystemUtils.syncFile(stateFile);

currentMetaState = stateToWrite;

// delete all the other files
File[] files = location.listFiles(new FilenameFilter() {
@Override public boolean accept(File dir, String name) {
return name.startsWith("metadata-") && !name.equals("metadata-" + version);
}
});
if (files != null) {
for (File file : files) {
file.delete();
}
}

} catch (IOException e) {
logger.warn("failed to write updated state", e);
}
}
}

class PersistShards implements Runnable {
private final ClusterChangedEvent event;
private final LocalGatewayStartedShards stateToWrite;

public PersistShards(ClusterChangedEvent event, LocalGatewayStartedShards stateToWrite) {
this.event = event;
this.stateToWrite = stateToWrite;
}

@Override public void run() {
try {
File stateFile = new File(location, "shards-" + event.state().version());
OutputStream fos = new FileOutputStream(stateFile);
if (compress) {
fos = new LZFOutputStream(fos);
}

XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, fos);
if (prettyPrint) {
xContentBuilder.prettyPrint();
}
xContentBuilder.startObject();
LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject();
xContentBuilder.close();

fos.close();

FileSystemUtils.syncFile(stateFile);

currentStartedShards = stateToWrite;
} catch (IOException e) {
logger.warn("failed to write updated state", e);
return;
}

// delete all the other files
File[] files = location.listFiles(new FilenameFilter() {
@Override public boolean accept(File dir, String name) {
return name.startsWith("shards-") && !name.equals("shards-" + event.state().version());
}
});
if (files != null) {
for (File file : files) {
file.delete();
}
}
}
}
}

0 comments on commit cbb95de

Please sign in to comment.