Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segment Replication - Fix ShardLockObtained error during corruption cases #10370

Merged
merged 12 commits into from
Oct 5, 2023
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix remove ingest processor handing ignore_missing parameter not correctly ([10089](https://github.com/opensearch-project/OpenSearch/pull/10089))
- Fix circular dependency in Settings initialization ([10194](https://github.com/opensearch-project/OpenSearch/pull/10194))
- Fix registration and initialization of multiple extensions ([10256](https://github.com/opensearch-project/OpenSearch/pull/10256))
- Fix Segment Replication ShardLockObtainFailedException bug during index corruption ([10370](https://github.com/opensearch-project/OpenSearch/pull/10370))

### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,10 @@ protected IndexShard getIndexShard(String node, ShardId shardId, String indexNam
protected IndexShard getIndexShard(String node, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
IndexService indexService = indicesService.indexService(index);
assertNotNull(indexService);
final Optional<Integer> shardId = indexService.shardIds().stream().findFirst();
return indexService.getShard(shardId.get());
return shardId.map(indexService::getShard).orElse(null);
}

protected boolean segmentReplicationWithRemoteEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.util.BytesRef;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.get.GetResponse;
Expand Down Expand Up @@ -58,6 +59,7 @@
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.XContentBuilder;
Expand All @@ -71,6 +73,7 @@
import org.opensearch.index.engine.NRTReplicationReaderManager;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.NodeClosedException;
Expand All @@ -82,6 +85,7 @@
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportService;
import org.junit.Before;

Expand All @@ -94,6 +98,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
Expand Down Expand Up @@ -1777,4 +1782,134 @@ public void testRealtimeTermVectorRequestsUnSuccessful() throws IOException {

}

public void testSendCorruptBytesToReplica() throws Exception {
// this test stubs transport calls specific to node-node replication.
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
segmentReplicationWithRemoteEnabled()
);
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.refresh_interval", -1)
.build()
);
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
andrross marked this conversation as resolved.
Show resolved Hide resolved

MockTransportService primaryTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
));
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean failed = new AtomicBoolean(false);
primaryTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replicaNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK) && failed.getAndSet(true) == false) {
FileChunkRequest req = (FileChunkRequest) request;
logger.info("SENDING CORRUPT file chunk [{}] lastChunk: {}", req, req.lastChunk());
TransportRequest corrupt = new FileChunkRequest(
req.recoveryId(),
((FileChunkRequest) request).requestSeqNo(),
((FileChunkRequest) request).shardId(),
((FileChunkRequest) request).metadata(),
((FileChunkRequest) request).position(),
new BytesArray("test"),
false,
0,
0L
);
connection.sendRequest(requestId, action, corrupt, options);
latch.countDown();
} else {
connection.sendRequest(requestId, action, request, options);
}
}
);
for (int i = 0; i < 100; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
andrross marked this conversation as resolved.
Show resolved Hide resolved
final long originalRecoveryTime = getRecoveryStopTime(replicaNode);
andrross marked this conversation as resolved.
Show resolved Hide resolved
assertNotEquals(originalRecoveryTime, 0);
andrross marked this conversation as resolved.
Show resolved Hide resolved
refresh(INDEX_NAME);
latch.await();
assertTrue(failed.get());
waitForNewPeerRecovery(replicaNode, originalRecoveryTime);
// reset checkIndex to ensure our original shard doesn't throw
resetCheckIndexStatus();
waitForSearchableDocs(100, primaryNode, replicaNode);
}

public void testWipeSegmentBetweenSyncs() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.refresh_interval", -1)
.build()
);
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
refresh(INDEX_NAME);
ensureGreen(INDEX_NAME);
final long originalRecoveryTime = getRecoveryStopTime(replicaNode);

final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME);
waitForSearchableDocs(INDEX_NAME, 10, List.of(replicaNode));
indexShard.store().directory().deleteFile("_0.si");

for (int i = 11; i < 21; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
refresh(INDEX_NAME);
waitForNewPeerRecovery(replicaNode, originalRecoveryTime);
resetCheckIndexStatus();
waitForSearchableDocs(20, primaryNode, replicaNode);
}

private void waitForNewPeerRecovery(String replicaNode, long originalRecoveryTime) throws Exception {
assertBusy(() -> {
// assert we have a peer recovery after the original
final long time = getRecoveryStopTime(replicaNode);
assertNotEquals(time, 0);
assertNotEquals(originalRecoveryTime, time);

}, 1, TimeUnit.MINUTES);
}

private long getRecoveryStopTime(String nodeName) {
final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(INDEX_NAME).get();
final List<RecoveryState> recoveryStates = recoveryResponse.shardRecoveryStates().get(INDEX_NAME);
logger.info("Recovery states {}", recoveryResponse);
for (RecoveryState recoveryState : recoveryStates) {
if (recoveryState.getTargetNode().getName().equals(nodeName)) {
return recoveryState.getTimer().stopTime();
}
}
return 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@
try {
commitSegmentInfos();
} catch (IOException e) {
maybeFailEngine("flush", e);
throw new FlushFailedEngineException(shardId, e);
} finally {
flushLock.unlock();
Expand Down Expand Up @@ -437,13 +438,29 @@
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
}
commitSegmentInfos(latestSegmentInfos);
IOUtils.close(readerManager, translogManager, store::decRef);
try {
commitSegmentInfos(latestSegmentInfos);
} catch (IOException e) {
// mark the store corrupted unless we are closing as result of engine failure.
// in this case Engine#failShard will handle store corruption.
if (failEngineLock.isHeldByCurrentThread() == false && store.isMarkedCorrupted() == false) {
try {
store.markStoreCorrupted(e);
} catch (IOException ex) {
logger.warn("Unable to mark store corrupted", ex);
}

Check warning on line 451 in server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java#L448-L451

Added lines #L448 - L451 were not covered by tests
}
}
IOUtils.close(readerManager, translogManager);
} catch (Exception e) {
logger.warn("failed to close engine", e);
logger.error("failed to close engine", e);

Check warning on line 456 in server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java#L456

Added line #L456 was not covered by tests
} finally {
logger.debug("engine closed [{}]", reason);
closedLatch.countDown();
try {
store.decRef();
logger.debug("engine closed [{}]", reason);
} finally {
closedLatch.countDown();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,13 @@
*/
public Map<String, StoreFileMetadata> getSegmentMetadataMap(SegmentInfos segmentInfos) throws IOException {
assert indexSettings.isSegRepEnabled();
return loadMetadata(segmentInfos, directory, logger, true).fileMetadata;
failIfCorrupted();
try {
return loadMetadata(segmentInfos, directory, logger, true).fileMetadata;
} catch (NoSuchFileException | CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
markStoreCorrupted(ex);
throw ex;

Check warning on line 393 in server/src/main/java/org/opensearch/index/store/Store.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/Store.java#L391-L393

Added lines #L391 - L393 were not covered by tests
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.OpenSearchException;
import org.opensearch.action.StepListener;
import org.opensearch.common.UUIDs;
import org.opensearch.common.lucene.Lucene;
Expand Down Expand Up @@ -261,9 +260,7 @@
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
// broken. We have to clean up this shard entirely, remove all files and bubble it up to the
// source shard since this index might be broken there as well? The Source can handle this and checks
// its content on disk if possible.
// broken. We have to clean up this shard entirely, remove all files and bubble it up.
try {
try {
store.removeCorruptionMarker();
Expand All @@ -279,14 +276,14 @@
// In this case the shard is closed at some point while updating the reader.
// This can happen when the engine is closed in a separate thread.
logger.warn("Shard is already closed, closing replication");
} catch (OpenSearchException ex) {
} catch (CancellableThreads.ExecutionCancelledException ex) {

Check warning on line 279 in server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java#L279

Added line #L279 was not covered by tests
andrross marked this conversation as resolved.
Show resolved Hide resolved
/*
Ignore closed replication target as it can happen due to index shard closed event in a separate thread.
In such scenario, ignore the exception
*/
assert cancellableThreads.isCancelled() : "Replication target closed but segment replication not cancelled";
assert cancellableThreads.isCancelled() : "Replication target cancelled but cancellable threads not cancelled";
} catch (Exception ex) {
throw new OpenSearchCorruptionException(ex);
throw new ReplicationFailedException(ex);
} finally {
if (store != null) {
store.decRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CorruptIndexException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.action.support.ChannelActionListener;
Expand All @@ -28,6 +29,7 @@
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.store.Store;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.recovery.ForceSyncRequest;
Expand All @@ -46,6 +48,7 @@
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -522,7 +525,7 @@
@Override
public void onFailure(Exception e) {
logger.debug("Replication failed {}", target.description());
if (e instanceof OpenSearchCorruptionException) {
if (isStoreCorrupt(target) || e instanceof CorruptIndexException || e instanceof OpenSearchCorruptionException) {
onGoingReplications.fail(replicationId, new ReplicationFailedException("Store corruption during replication", e), true);
return;
}
Expand All @@ -531,6 +534,27 @@
});
}

private boolean isStoreCorrupt(SegmentReplicationTarget target) {
// ensure target is not already closed. In that case
// we can assume the store is not corrupt and that the replication
// event completed successfully.
if (target.refCount() > 0) {
final Store store = target.store();
if (store.tryIncRef()) {
try {
return store.isMarkedCorrupted();
} catch (IOException ex) {
logger.warn("Unable to determine if store is corrupt", ex);
return false;

Check warning on line 548 in server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java#L546-L548

Added lines #L546 - L548 were not covered by tests
} finally {
store.decRef();
}
}
}
// store already closed.
return false;
}

private class FileChunkTransportRequestHandler implements TransportRequestHandler<FileChunkRequest> {

// How many bytes we've copied since we last called RateLimiter.pause
Expand Down