Skip to content

Commit

Permalink
Add more checks in testResumeUploadAfterFailedPrimaryRelocation IT
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Nov 28, 2023
1 parent a27e030 commit ea629f4
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@

package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.transport.MockTransportService;
Expand All @@ -20,7 +18,6 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

public class BaseRemoteStoreRestoreIT extends RemoteStoreBaseIntegTestCase {
Expand Down Expand Up @@ -49,18 +46,6 @@ protected void restore(String... indices) {
restore(randomBoolean(), indices);
}

protected void restore(boolean restoreAllShards, String... indices) {
if (restoreAllShards) {
assertAcked(client().admin().indices().prepareClose(indices));
}
client().admin()
.cluster()
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices(indices).restoreAllShards(restoreAllShards),
PlainActionFuture.newFuture()
);
}

protected void verifyRestoredData(Map<String, Long> indexStats, String indexName, boolean indexMoreData) throws Exception {
ensureYellowAndNoInitializingShards(indexName);
ensureGreen(indexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.bulk.BulkItemResponse;
Expand All @@ -16,6 +17,7 @@
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
Expand Down Expand Up @@ -58,6 +60,7 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
protected static final String REPOSITORY_NAME = "test-remote-store-repo";
Expand Down Expand Up @@ -396,4 +399,16 @@ protected IndexShard getIndexShard(String dataNode, String indexName) throws Exe
IndexService indexService = indicesService.indexService(new Index(indexName, uuid));
return indexService.getShard(0);
}

protected void restore(boolean restoreAllShards, String... indices) {
if (restoreAllShards) {
assertAcked(client().admin().indices().prepareClose(indices));
}
client().admin()
.cluster()
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices(indices).restoreAllShards(restoreAllShards),
PlainActionFuture.newFuture()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -705,7 +706,7 @@ public void testNoMultipleWriterDuringPrimaryRelocation() throws ExecutionExcept
assertFalse(clusterHealthResponse.isTimedOut());
}

public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionException, InterruptedException {
public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionException, InterruptedException, IOException {
// In this test, we fail the hand off during the primary relocation. This will undo the drainRefreshes and
// drainSync performed as part of relocation handoff (before performing the handoff transport action).
// We validate the same here by failing the peer recovery and ensuring we can index afterward as well.
Expand Down Expand Up @@ -759,6 +760,46 @@ public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionExcep
int moreDocs = randomIntBetween(5, 10);
indexBulk(INDEX_NAME, moreDocs);
flushAndRefresh(INDEX_NAME);
int uncommittedOps = randomIntBetween(5, 10);
indexBulk(INDEX_NAME, uncommittedOps);
assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), docs + moreDocs);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));

restore(true, INDEX_NAME);
ensureGreen(INDEX_NAME);
assertHitCount(
client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(),
docs + moreDocs + uncommittedOps
);

String newNode = internalCluster().startDataOnlyNodes(1).get(0);
ensureStableCluster(3);
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(INDEX_NAME, 0, newPrimary, newNode))
.execute()
.actionGet();

ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForStatus(ClusterHealthStatus.GREEN)
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setTimeout(TimeValue.timeValueSeconds(10))
.execute()
.actionGet();
assertFalse(clusterHealthResponse.isTimedOut());

ex = assertThrows(
SearchPhaseExecutionException.class,
() -> client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get()
);
assertEquals("all shards failed", ex.getMessage());
assertHitCount(
client(newNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(),
docs + moreDocs + uncommittedOps
);
}
}

0 comments on commit ea629f4

Please sign in to comment.