Skip to content

Commit

Permalink
Switch indices read-only if a node runs out of disk space
Browse files Browse the repository at this point in the history
Today when we run out of disk all kinds of crazy things can happen
and nodes are becoming hard to maintain once out of disk is hit.
While we try to move shards away if we hit watermarks this might not
be possible in many situations. Based on the discussion in elastic#24299
this change monitors disk utiliation and adds a floodstage watermark
that causes all indices that are allocated on a node hitting the floodstage
mark to be switched read-only (with the option to be deleted). This allows users to react on the low disk
situation while subsequent write requests will be rejected. Users can switch
individual indices read-write once the situation is sorted out. There is no
automatic read-write switch once the node has enough space. This requires
user interaction.

The floodstage watermark is set to `95%` utilization by default.

Closes elastic#24299
  • Loading branch information
s1monw committed Jul 4, 2017
1 parent 6894ef6 commit b071450
Show file tree
Hide file tree
Showing 14 changed files with 268 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,4 @@ public interface ClusterInfoService {

/** The latest cluster information */
ClusterInfo getClusterInfo();

/** Add a listener that will be called every time new information is gathered */
void addListener(Listener listener);

/**
* Interface for listeners to implement in order to perform actions when
* new information about the cluster has been gathered
*/
interface Listener {
void onNewInfo(ClusterInfo info);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,4 @@ private EmptyClusterInfoService() {
public ClusterInfo getClusterInfo() {
return ClusterInfo.EMPTY;
}

@Override
public void addListener(Listener listener) {
// no-op, no new info is ever gathered, so adding listeners is useless
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
* InternalClusterInfoService provides the ClusterInfoService interface,
Expand Down Expand Up @@ -86,9 +87,10 @@ public class InternalClusterInfoService extends AbstractComponent
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final NodeClient client;
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final Consumer<ClusterInfo> listener;

public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client,
Consumer<ClusterInfo> listener) {
super(settings);
this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
this.mostAvailableSpaceUsages = ImmutableOpenMap.of();
Expand All @@ -109,6 +111,7 @@ public InternalClusterInfoService(Settings settings, ClusterService clusterServi
this.clusterService.addLocalNodeMasterListener(this);
// Add to listen for state changes (when nodes are added)
this.clusterService.addListener(this);
this.listener = listener;
}

private void setEnabled(boolean enabled) {
Expand Down Expand Up @@ -201,11 +204,6 @@ public ClusterInfo getClusterInfo() {
return new ClusterInfo(leastAvailableSpaceUsages, mostAvailableSpaceUsages, shardSizes, shardRoutingToDataPath);
}

@Override
public void addListener(Listener listener) {
this.listeners.add(listener);
}

/**
* Class used to submit {@link #maybeRefresh()} on the
* {@link InternalClusterInfoService} threadpool, these jobs will
Expand Down Expand Up @@ -362,21 +360,17 @@ public void onFailure(Exception e) {
logger.warn("Failed to update shard information for ClusterInfoUpdateJob within {} timeout", fetchTimeout);
}
ClusterInfo clusterInfo = getClusterInfo();
for (Listener l : listeners) {
try {
l.onNewInfo(clusterInfo);
} catch (Exception e) {
logger.info("Failed executing ClusterInfoService listener", e);
}
try {
listener.accept(clusterInfo);
} catch (Exception e) {
logger.info("Failed executing ClusterInfoService listener", e);
}
return clusterInfo;
}

static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> newShardSizes,
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath, ClusterState state) {
MetaData meta = state.getMetaData();
for (ShardStats s : stats) {
IndexMetaData indexMeta = meta.index(s.getShardRouting().index());
newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath());
long size = s.getStats().getStore().sizeInBytes();
String sid = ClusterInfo.shardIdentifierFromRouting(s.getShardRouting());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,23 @@

package org.elasticsearch.cluster.routing.allocation;

import java.util.HashSet;
import java.util.Set;
import java.util.function.Supplier;

import com.carrotsearch.hppc.ObjectLookupContainer;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
Expand All @@ -40,29 +45,30 @@
* reroute if it does. Also responsible for logging about nodes that have
* passed the disk watermarks
*/
public class DiskThresholdMonitor extends AbstractComponent implements ClusterInfoService.Listener {
public class DiskThresholdMonitor extends AbstractComponent {
private final DiskThresholdSettings diskThresholdSettings;
private final Client client;
private final Set<String> nodeHasPassedWatermark = Sets.newConcurrentHashSet();

private final Supplier<ClusterState> clusterStateSupplier;
private long lastRunNS;

// TODO: remove injection when ClusterInfoService is not injected
@Inject
public DiskThresholdMonitor(Settings settings, ClusterSettings clusterSettings,
ClusterInfoService infoService, Client client) {
public DiskThresholdMonitor(Settings settings, Supplier<ClusterState> clusterStateSupplier, ClusterSettings clusterSettings,
Client client) {
super(settings);
this.clusterStateSupplier = clusterStateSupplier;
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
this.client = client;
infoService.addListener(this);
}

/**
* Warn about the given disk usage if the low or high watermark has been passed
*/
private void warnAboutDiskIfNeeded(DiskUsage usage) {
// Check absolute disk values
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) {
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes()) {
logger.warn("floodstage disk watermark [{}] exceeded on {}, all indices on this node will marked read-only",
diskThresholdSettings.getFreeBytesThresholdFloodStage(), usage);
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) {
logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node",
diskThresholdSettings.getFreeBytesThresholdHigh(), usage);
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes()) {
Expand All @@ -72,6 +78,9 @@ private void warnAboutDiskIfNeeded(DiskUsage usage) {

// Check percentage disk values
if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
logger.warn("floodstage disk watermark [{}] exceeded on {}, all indices on this node will marked read-only",
Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdFloodStage(), "%"), usage);
} else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node",
Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdHigh(), "%"), usage);
} else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) {
Expand All @@ -80,7 +89,7 @@ private void warnAboutDiskIfNeeded(DiskUsage usage) {
}
}

@Override

public void onNewInfo(ClusterInfo info) {
ImmutableOpenMap<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages();
if (usages != null) {
Expand All @@ -95,12 +104,21 @@ public void onNewInfo(ClusterInfo info) {
nodeHasPassedWatermark.remove(node);
}
}

ClusterState state = clusterStateSupplier.get();
Set<String> indicesToMarkReadOnly = new HashSet<>();
for (ObjectObjectCursor<String, DiskUsage> entry : usages) {
String node = entry.key;
DiskUsage usage = entry.value;
warnAboutDiskIfNeeded(usage);
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() ||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
RoutingNode routingNode = state.getRoutingNodes().node(node);
if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?!
for (ShardRouting routing : routingNode) {
indicesToMarkReadOnly.add(routing.index().getName());
}
}
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) {
lastRunNS = System.nanoTime();
Expand Down Expand Up @@ -136,9 +154,25 @@ public void onNewInfo(ClusterInfo info) {
}
if (reroute) {
logger.info("rerouting shards: [{}]", explanation);
// Execute an empty reroute, but don't block on the response
client.admin().cluster().prepareReroute().execute();
reroute();
}
indicesToMarkReadOnly.removeIf(index ->
state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index)
);
if (indicesToMarkReadOnly.isEmpty() == false) {
markIndicesReadOnly(indicesToMarkReadOnly);
}
}
}

protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) {
// set read-only block but don't block on the response
client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY)).
setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build()).execute();
}

protected void reroute() {
// Execute an empty reroute, but don't block on the response
client.admin().cluster().prepareReroute().execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public class DiskThresholdSettings {
new Setting<>("cluster.routing.allocation.disk.watermark.high", "90%",
(s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.high"),
Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<String> CLUSTER_ROUTING_ALLOCATION_FLOOD_STAGE_SETTING =
new Setting<>("cluster.routing.allocation.disk.floodstage", "95%",
(s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.floodstage"),
Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING =
Setting.boolSetting("cluster.routing.allocation.disk.include_relocations", true,
Setting.Property.Dynamic, Setting.Property.NodeScope);;
Expand All @@ -58,17 +62,23 @@ public class DiskThresholdSettings {
private volatile boolean includeRelocations;
private volatile boolean enabled;
private volatile TimeValue rerouteInterval;
private volatile String floodStageRaw;
private volatile Double freeDiskThresholdFloodStage;
private volatile ByteSizeValue freeBytesThresholdFloodStage;

public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings) {
final String lowWatermark = CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.get(settings);
final String highWatermark = CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.get(settings);
final String floodStage = CLUSTER_ROUTING_ALLOCATION_FLOOD_STAGE_SETTING.get(settings);
setHighWatermark(highWatermark);
setLowWatermark(lowWatermark);
setFloodStageRaw(floodStage);
this.includeRelocations = CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.get(settings);
this.rerouteInterval = CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(settings);
this.enabled = CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, this::setLowWatermark);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, this::setHighWatermark);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_FLOOD_STAGE_SETTING, this::setFloodStageRaw);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING, this::setIncludeRelocations);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, this::setRerouteInterval);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled);
Expand Down Expand Up @@ -99,7 +109,15 @@ private void setHighWatermark(String highWatermark) {
this.highWatermarkRaw = highWatermark;
this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(highWatermark);
this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark,
CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey());
}

private void setFloodStageRaw(String floodStageRaw) {
// Watermark is expressed in terms of used data, but we need "free" data watermark
this.floodStageRaw = floodStageRaw;
this.freeDiskThresholdFloodStage = 100.0 - thresholdPercentageFromWatermark(floodStageRaw);
this.freeBytesThresholdFloodStage = thresholdBytesFromWatermark(floodStageRaw,
CLUSTER_ROUTING_ALLOCATION_FLOOD_STAGE_SETTING.getKey());
}

/**
Expand Down Expand Up @@ -132,6 +150,18 @@ public ByteSizeValue getFreeBytesThresholdHigh() {
return freeBytesThresholdHigh;
}

public Double getFreeDiskThresholdFloodStage() {
return freeDiskThresholdFloodStage;
}

public ByteSizeValue getFreeBytesThresholdFloodStage() {
return freeBytesThresholdFloodStage;
}

public String getFloodStageRaw() {
return floodStageRaw;
}

public boolean includeRelocations() {
return includeRelocations;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_FLOOD_STAGE_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING,
Expand Down

0 comments on commit b071450

Please sign in to comment.