diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java index 99c5d7fb2bae7..d29dacb001434 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java @@ -8,9 +8,7 @@ package org.opensearch.remotestore; -import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; import org.opensearch.action.index.IndexResponse; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.common.settings.Settings; import org.opensearch.plugins.Plugin; import org.opensearch.test.transport.MockTransportService; @@ -20,7 +18,6 @@ import java.util.Map; import java.util.concurrent.TimeUnit; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; public class BaseRemoteStoreRestoreIT extends RemoteStoreBaseIntegTestCase { @@ -49,18 +46,6 @@ protected void restore(String... indices) { restore(randomBoolean(), indices); } - protected void restore(boolean restoreAllShards, String... indices) { - if (restoreAllShards) { - assertAcked(client().admin().indices().prepareClose(indices)); - } - client().admin() - .cluster() - .restoreRemoteStore( - new RestoreRemoteStoreRequest().indices(indices).restoreAllShards(restoreAllShards), - PlainActionFuture.newFuture() - ); - } - protected void verifyRestoredData(Map indexStats, String indexName, boolean indexMoreData) throws Exception { ensureYellowAndNoInitializingShards(indexName); ensureGreen(indexName); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 0c68b11ebff56..8c15ebd0505d9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -8,6 +8,7 @@ package org.opensearch.remotestore; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; import org.opensearch.action.admin.indices.get.GetIndexRequest; import org.opensearch.action.admin.indices.get.GetIndexResponse; import org.opensearch.action.bulk.BulkItemResponse; @@ -16,6 +17,7 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.WriteRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.RepositoriesMetadata; @@ -58,6 +60,7 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase { protected static final String REPOSITORY_NAME = "test-remote-store-repo"; @@ -396,4 +399,16 @@ protected IndexShard getIndexShard(String dataNode, String indexName) throws Exe IndexService indexService = indicesService.indexService(new Index(indexName, uuid)); return indexService.getShard(0); } + + protected void restore(boolean restoreAllShards, String... indices) { + if (restoreAllShards) { + assertAcked(client().admin().indices().prepareClose(indices)); + } + client().admin() + .cluster() + .restoreRemoteStore( + new RestoreRemoteStoreRequest().indices(indices).restoreAllShards(restoreAllShards), + PlainActionFuture.newFuture() + ); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 25146d53b5654..e1997fea3433a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -35,6 +35,7 @@ import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.Plugin; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; @@ -705,7 +706,7 @@ public void testNoMultipleWriterDuringPrimaryRelocation() throws ExecutionExcept assertFalse(clusterHealthResponse.isTimedOut()); } - public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionException, InterruptedException { + public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionException, InterruptedException, IOException { // In this test, we fail the hand off during the primary relocation. This will undo the drainRefreshes and // drainSync performed as part of relocation handoff (before performing the handoff transport action). // We validate the same here by failing the peer recovery and ensuring we can index afterward as well. @@ -759,6 +760,46 @@ public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionExcep int moreDocs = randomIntBetween(5, 10); indexBulk(INDEX_NAME, moreDocs); flushAndRefresh(INDEX_NAME); + int uncommittedOps = randomIntBetween(5, 10); + indexBulk(INDEX_NAME, uncommittedOps); assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), docs + moreDocs); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); + + restore(true, INDEX_NAME); + ensureGreen(INDEX_NAME); + assertHitCount( + client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + docs + moreDocs + uncommittedOps + ); + + String newNode = internalCluster().startDataOnlyNodes(1).get(0); + ensureStableCluster(3); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, newPrimary, newNode)) + .execute() + .actionGet(); + + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForStatus(ClusterHealthStatus.GREEN) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(TimeValue.timeValueSeconds(10)) + .execute() + .actionGet(); + assertFalse(clusterHealthResponse.isTimedOut()); + + ex = assertThrows( + SearchPhaseExecutionException.class, + () -> client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get() + ); + assertEquals("all shards failed", ex.getMessage()); + assertHitCount( + client(newNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + docs + moreDocs + uncommittedOps + ); } }