Skip to content

Commit

Permalink
SNAPSHOT: Improve Resilience SnapshotShardService (elastic#36113)
Browse files Browse the repository at this point in the history
* Resolve the index in the snapshotting thread
* Added test for routing table - snapshot state mismatch
  • Loading branch information
original-brownbear committed Dec 3, 2018
1 parent 76c14db commit 66edf9b
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 2 deletions.
Expand Up @@ -342,15 +342,15 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {

for (final Map.Entry<ShardId, IndexShardSnapshotStatus> shardEntry : entry.getValue().entrySet()) {
final ShardId shardId = shardEntry.getKey();
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
final IndexId indexId = indicesMap.get(shardId.getIndexName());
assert indexId != null;
executor.execute(new AbstractRunnable() {

final SetOnce<Exception> failure = new SetOnce<>();

@Override
public void doRun() {
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
assert indexId != null;
snapshot(indexShard, snapshot, indexId, shardEntry.getValue());
}

Expand Down
Expand Up @@ -81,6 +81,8 @@
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.TestCustomMetaData;
import org.elasticsearch.test.disruption.BusyMasterServiceDisruption;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.rest.FakeRestRequest;

import java.io.IOException;
Expand Down Expand Up @@ -1123,6 +1125,50 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException {
assertThat(anotherStats.getTotalSize(), is(snapshot1FileSize));
}

public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception {
logger.info("--> starting a master node and two data nodes");
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNodes(2);
logger.info("--> creating repository");
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("max_snapshot_bytes_per_sec", "1000b")
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
assertAcked(prepareCreate("test-idx", 0, Settings.builder()
.put("number_of_shards", 5).put("number_of_replicas", 0)));
ensureGreen();
logger.info("--> indexing some data");
final int numdocs = randomIntBetween(50, 100);
IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs];
for (int i = 0; i < builders.length; i++) {
builders[i] = client().prepareIndex("test-idx", "type1",
Integer.toString(i)).setSource("field1", "bar " + i);
}
indexRandom(true, builders);
flushAndRefresh();
final String dataNode = blockNodeWithIndex("test-repo", "test-idx");
logger.info("--> snapshot");
client(internalCluster().getMasterName()).admin().cluster()
.prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
ServiceDisruptionScheme disruption = new BusyMasterServiceDisruption(random(), Priority.HIGH);
setDisruptionScheme(disruption);
disruption.startDisrupting();
logger.info("--> restarting data node, which should cause primary shards to be failed");
internalCluster().restartNode(dataNode, InternalTestCluster.EMPTY_CALLBACK);
unblockNode("test-repo", dataNode);
disruption.stopDisrupting();
// check that snapshot completes
assertBusy(() -> {
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster()
.prepareGetSnapshots("test-repo").setSnapshots("test-snap").setIgnoreUnavailable(true).get();
assertEquals(1, snapshotsStatusResponse.getSnapshots().size());
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
assertTrue(snapshotInfo.state().toString(), snapshotInfo.state().completed());
}, 30, TimeUnit.SECONDS);
}

private long calculateTotalFilesSize(List<Path> files) {
return files.stream().mapToLong(f -> {
try {
Expand Down
@@ -0,0 +1,89 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.disruption;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.InternalTestCluster;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;

public class BusyMasterServiceDisruption extends SingleNodeDisruption {
private final AtomicBoolean active = new AtomicBoolean();
private final Priority priority;

public BusyMasterServiceDisruption(Random random, Priority priority) {
super(random);
this.priority = priority;
}

@Override
public void startDisrupting() {
disruptedNode = cluster.getMasterName();
final String disruptionNodeCopy = disruptedNode;
if (disruptionNodeCopy == null) {
return;
}
ClusterService clusterService = cluster.getInstance(ClusterService.class, disruptionNodeCopy);
if (clusterService == null) {
return;
}
logger.info("making master service busy on node [{}] at priority [{}]", disruptionNodeCopy, priority);
active.set(true);
submitTask(clusterService);
}

private void submitTask(ClusterService clusterService) {
clusterService.getMasterService().submitStateUpdateTask(
"service_disruption_block",
new ClusterStateUpdateTask(priority) {
@Override
public ClusterState execute(ClusterState currentState) {
if (active.get()) {
submitTask(clusterService);
}
return currentState;
}

@Override
public void onFailure(String source, Exception e) {
logger.error("unexpected error during disruption", e);
}
}
);
}

@Override
public void stopDisrupting() {
active.set(false);
}

@Override
public void removeAndEnsureHealthy(InternalTestCluster cluster) {
removeFromCluster(cluster);
}

@Override
public TimeValue expectedTimeToHeal() {
return TimeValue.timeValueMinutes(0);
}
}

0 comments on commit 66edf9b

Please sign in to comment.