Skip to content

Commit

Permalink
Merge branch 'remote-store-enabled-its' of github.com:sachinpkale/Ope…
Browse files Browse the repository at this point in the history
…nSearch into remote-store-enabled-its
  • Loading branch information
Arpit-Bandejiya committed Sep 1, 2023
2 parents 998cc58 + 4bdc7c1 commit 0f29caf
Show file tree
Hide file tree
Showing 127 changed files with 976 additions and 938 deletions.
Expand Up @@ -231,6 +231,7 @@ public void testAnalyze() {
assertSameIndices(analyzeRequest, analyzeShardAction);
}

@AwaitsFix(bugUrl = "https://github.com/sachinpkale/OpenSearch")
public void testIndex() {
String[] indexShardActions = new String[] { BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]" };
interceptTransportActions(indexShardActions);
Expand All @@ -242,6 +243,7 @@ public void testIndex() {
assertSameIndices(indexRequest, indexShardActions);
}

@AwaitsFix(bugUrl = "https://github.com/sachinpkale/OpenSearch")
public void testDelete() {
String[] deleteShardActions = new String[] { BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]" };
interceptTransportActions(deleteShardActions);
Expand All @@ -253,6 +255,7 @@ public void testDelete() {
assertSameIndices(deleteRequest, deleteShardActions);
}

@AwaitsFix(bugUrl = "https://github.com/sachinpkale/OpenSearch")
public void testUpdate() {
// update action goes to the primary, index op gets executed locally, then replicated
String[] updateShardActions = new String[] { UpdateAction.NAME + "[s]", BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]" };
Expand All @@ -268,6 +271,7 @@ public void testUpdate() {
assertSameIndices(updateRequest, updateShardActions);
}

@AwaitsFix(bugUrl = "https://github.com/sachinpkale/OpenSearch")
public void testUpdateUpsert() {
// update action goes to the primary, index op gets executed locally, then replicated
String[] updateShardActions = new String[] { UpdateAction.NAME + "[s]", BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]" };
Expand All @@ -283,6 +287,7 @@ public void testUpdateUpsert() {
assertSameIndices(updateRequest, updateShardActions);
}

@AwaitsFix(bugUrl = "https://github.com/sachinpkale/OpenSearch")
public void testUpdateDelete() {
// update action goes to the primary, delete op gets executed locally, then replicated
String[] updateShardActions = new String[] { UpdateAction.NAME + "[s]", BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]" };
Expand All @@ -300,6 +305,7 @@ public void testUpdateDelete() {
assertSameIndices(updateRequest, updateShardActions);
}

@AwaitsFix(bugUrl = "https://github.com/sachinpkale/OpenSearch")
public void testBulk() {
String[] bulkShardActions = new String[] { BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]" };
interceptTransportActions(bulkShardActions);
Expand Down Expand Up @@ -400,6 +406,7 @@ public void testMultiGet() {
assertIndicesSubset(indices, multiGetShardAction);
}

@AwaitsFix(bugUrl = "https://github.com/sachinpkale/OpenSearch")
public void testFlush() {
String[] indexShardActions = new String[] {
TransportShardFlushAction.NAME,
Expand Down Expand Up @@ -429,6 +436,7 @@ public void testForceMerge() {
assertSameIndices(mergeRequest, mergeShardAction);
}

@AwaitsFix(bugUrl = "https://github.com/sachinpkale/OpenSearch")
public void testRefresh() {
String[] indexShardActions = new String[] {
TransportShardRefreshAction.NAME,
Expand Down
Expand Up @@ -135,7 +135,7 @@ public void onFailure(Exception e) {
ensureSearchable();
while (latch.getCount() > 0) {
assertHitCount(
client().prepareSearch()
client().prepareSearch().setPreference("_primary")
.setQuery(matchAllQuery())
.setPostFilter(
boolQuery().must(matchAllQuery())
Expand Down
Expand Up @@ -60,6 +60,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.getKey(), true)
.put(remoteStoreGlobalClusterSettings(REPOSITORY_NAME, REPOSITORY_2_NAME))
.build();
}

Expand Down
Expand Up @@ -112,7 +112,7 @@
* <p>
* We need at least 2 nodes so we have a cluster-manager node a non-cluster-manager node
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, minNumDataNodes = 2)
public class TasksIT extends AbstractTasksIT {

public void testTaskCounts() {
Expand Down
Expand Up @@ -63,7 +63,7 @@ public void testDeleteIndexOnIndexReadOnlyAllowDeleteSetting() {
try {
Settings settings = Settings.builder().put(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, true).build();
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get());
assertSearchHits(client().prepareSearch().get(), "1");
assertSearchHits(client().prepareSearch().setPreference("_primary").get(), "1");
assertBlocked(
client().prepareIndex().setIndex("test").setId("2").setSource("foo", "bar"),
IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK
Expand All @@ -72,7 +72,7 @@ public void testDeleteIndexOnIndexReadOnlyAllowDeleteSetting() {
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.number_of_replicas", 2)),
IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK
);
assertSearchHits(client().prepareSearch().get(), "1");
assertSearchHits(client().prepareSearch().setPreference("_primary").get(), "1");
assertAcked(client().admin().indices().prepareDelete("test"));
} finally {
Settings settings = Settings.builder().putNull(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE).build();
Expand Down Expand Up @@ -121,7 +121,7 @@ public void testDeleteIndexOnClusterReadOnlyAllowDeleteSetting() {
try {
Settings settings = Settings.builder().put(Metadata.SETTING_READ_ONLY_ALLOW_DELETE_SETTING.getKey(), true).build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).get());
assertSearchHits(client().prepareSearch().get(), "1");
assertSearchHits(client().prepareSearch().setPreference("_primary").get(), "1");
assertBlocked(
client().prepareIndex().setIndex("test").setId("2").setSource("foo", "bar"),
Metadata.CLUSTER_READ_ONLY_ALLOW_DELETE_BLOCK
Expand All @@ -130,7 +130,7 @@ public void testDeleteIndexOnClusterReadOnlyAllowDeleteSetting() {
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.number_of_replicas", 2)),
Metadata.CLUSTER_READ_ONLY_ALLOW_DELETE_BLOCK
);
assertSearchHits(client().prepareSearch().get(), "1");
assertSearchHits(client().prepareSearch().setPreference("_primary").get(), "1");
assertAcked(client().admin().indices().prepareDelete("test"));
} finally {
Settings settings = Settings.builder().putNull(Metadata.SETTING_READ_ONLY_ALLOW_DELETE_SETTING.getKey()).build();
Expand Down
Expand Up @@ -32,10 +32,13 @@

package org.opensearch.action.admin.indices.flush;

import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;

import java.util.Arrays;
import java.util.Objects;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_METADATA;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_READ;
Expand Down Expand Up @@ -67,10 +70,17 @@ public void testFlushWithBlocks() {
SETTING_READ_ONLY_ALLOW_DELETE
)) {
try {
GetSettingsRequest getSettingsRequest = new GetSettingsRequest().indices("test");
String remoteStoreEnabledStr = client().admin().indices().getSettings(getSettingsRequest).actionGet().getSetting("test", IndexMetadata.SETTING_REMOTE_STORE_ENABLED);
enableIndexBlock("test", blockSetting);
FlushResponse response = client().admin().indices().prepareFlush("test").execute().actionGet();
assertNoFailures(response);
assertThat(response.getSuccessfulShards(), equalTo(numShards.totalNumShards));
logger.warn("IndexSettings (" + remoteStoreEnabledStr + ")");
if(Objects.equals(remoteStoreEnabledStr, "true")) {
assertThat(response.getSuccessfulShards(), equalTo(numShards.numPrimaries));
} else {
assertThat(response.getSuccessfulShards(), equalTo(numShards.totalNumShards));
}
} finally {
disableIndexBlock("test", blockSetting);
}
Expand Down
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.index.IndexCommit;
import org.opensearch.action.admin.indices.flush.FlushResponse;
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
Expand All @@ -47,6 +48,7 @@
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.util.Objects;

import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -82,22 +84,38 @@ public void testForceMergeUUIDConsistent() throws Exception {
assertThat(getForceMergeUUID(primary), nullValue());
assertThat(getForceMergeUUID(replica), nullValue());

GetSettingsRequest getSettingsRequest = new GetSettingsRequest().indices("test-index");
String remoteStoreEnabledStr = client().admin().indices().getSettings(getSettingsRequest).actionGet().getSetting("test-index", IndexMetadata.SETTING_REMOTE_STORE_ENABLED);
logger.warn("IndexSettings (" + remoteStoreEnabledStr + ")");

final ForceMergeResponse forceMergeResponse = client().admin().indices().prepareForceMerge(index).setMaxNumSegments(1).get();

assertThat(forceMergeResponse.getFailedShards(), is(0));
assertThat(forceMergeResponse.getSuccessfulShards(), is(2));
if(Objects.equals(remoteStoreEnabledStr, "true")) {
assertThat(forceMergeResponse.getSuccessfulShards(), is(2));
} else {
assertThat(forceMergeResponse.getSuccessfulShards(), is(2));
}

// Force flush to force a new commit that contains the force flush UUID
final FlushResponse flushResponse = client().admin().indices().prepareFlush(index).setForce(true).get();
assertThat(flushResponse.getFailedShards(), is(0));
assertThat(flushResponse.getSuccessfulShards(), is(2));
if(Objects.equals(remoteStoreEnabledStr, "true")) {
assertThat(forceMergeResponse.getSuccessfulShards(), is(2));
} else {
assertThat(forceMergeResponse.getSuccessfulShards(), is(2));
}

final String primaryForceMergeUUID = getForceMergeUUID(primary);
assertThat(primaryForceMergeUUID, notNullValue());

final String replicaForceMergeUUID = getForceMergeUUID(replica);
assertThat(replicaForceMergeUUID, notNullValue());
assertThat(primaryForceMergeUUID, is(replicaForceMergeUUID));
if(Objects.equals(remoteStoreEnabledStr, "true")) {
}
else {
final String replicaForceMergeUUID = getForceMergeUUID(replica);
assertThat(replicaForceMergeUUID, notNullValue());
assertThat(primaryForceMergeUUID, is(replicaForceMergeUUID));
}
}

private static String getForceMergeUUID(IndexShard indexShard) throws IOException {
Expand Down
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.admin.indices.get;

import org.junit.Before;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.get.GetIndexRequest.Feature;
import org.opensearch.action.support.IndicesOptions;
Expand All @@ -58,10 +59,10 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

@OpenSearchIntegTestCase.SuiteScopeTestCase

public class GetIndexIT extends OpenSearchIntegTestCase {
@Override
protected void setupSuiteScopeCluster() throws Exception {
@Before
protected void setupTest() throws Exception {
assertAcked(prepareCreate("idx").addAlias(new Alias("alias_idx")).setSettings(Settings.builder().put("number_of_shards", 1)).get());
ensureSearchable("idx");
createIndex("empty_idx");
Expand Down
Expand Up @@ -540,7 +540,7 @@ public void testBulkIndexingWhileInitializing() throws Exception {

refresh();

SearchResponse countResponse = client().prepareSearch().setSize(0).get();
SearchResponse countResponse = client().prepareSearch().setPreference("_primary").setSize(0).get();
assertHitCount(countResponse, numDocs);
}

Expand Down
Expand Up @@ -140,7 +140,7 @@ public void testTwoNodesNoClusterManagerBlock() throws Exception {
logger.info("--> verify we get the data back");
for (int i = 0; i < 10; i++) {
assertThat(
client().prepareSearch()
client().prepareSearch().setPreference("_primary")
.setSize(0)
.setQuery(QueryBuilders.matchAllQuery())
.execute()
Expand Down Expand Up @@ -196,7 +196,7 @@ public void testTwoNodesNoClusterManagerBlock() throws Exception {

logger.info("--> verify we get the data back after cluster reform");
for (int i = 0; i < 10; i++) {
assertHitCount(client().prepareSearch().setSize(0).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 100);
assertHitCount(client().prepareSearch().setPreference("_primary").setSize(0).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 100);
}

logger.info("--> clearing voting config exclusions");
Expand Down Expand Up @@ -245,7 +245,7 @@ public void testTwoNodesNoClusterManagerBlock() throws Exception {

logger.info("--> verify we the data back");
for (int i = 0; i < 10; i++) {
assertHitCount(client().prepareSearch().setSize(0).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 100);
assertHitCount(client().prepareSearch().setPreference("_primary").setSize(0).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 100);
}
}

Expand Down Expand Up @@ -306,7 +306,7 @@ public void testThreeNodesNoClusterManagerBlock() throws Exception {
refresh();
logger.info("--> verify we get the data back");
for (int i = 0; i < 10; i++) {
assertHitCount(client().prepareSearch().setSize(0).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 100);
assertHitCount(client().prepareSearch().setPreference("_primary").setSize(0).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 100);
}

List<String> nonClusterManagerNodes = new ArrayList<>(
Expand Down Expand Up @@ -338,7 +338,7 @@ public void testThreeNodesNoClusterManagerBlock() throws Exception {

logger.info("--> verify we the data back");
for (int i = 0; i < 10; i++) {
assertHitCount(client().prepareSearch().setSize(0).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 100);
assertHitCount(client().prepareSearch().setPreference("_primary").setSize(0).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 100);
}
}

Expand Down
Expand Up @@ -76,7 +76,7 @@ public void testDecommissionNodeNoReplicas() {
}
client().admin().indices().prepareRefresh().execute().actionGet();
assertThat(
client().prepareSearch()
client().prepareSearch().setPreference("_primary")
.setSize(0)
.setQuery(QueryBuilders.matchAllQuery())
.execute()
Expand Down Expand Up @@ -117,7 +117,7 @@ public void testDecommissionNodeNoReplicas() {

client().admin().indices().prepareRefresh().execute().actionGet();
assertThat(
client().prepareSearch()
client().prepareSearch().setPreference("_primary")
.setSize(0)
.setQuery(QueryBuilders.matchAllQuery())
.execute()
Expand Down Expand Up @@ -191,7 +191,7 @@ public void testDisablingAllocationFiltering() {
}
client().admin().indices().prepareRefresh().execute().actionGet();
assertThat(
client().prepareSearch()
client().prepareSearch().setPreference("_primary")
.setSize(0)
.setQuery(QueryBuilders.matchAllQuery())
.execute()
Expand Down
Expand Up @@ -449,7 +449,7 @@ public void testAllClusterManagerEligibleNodesFailedDanglingIndexImport() throws
);

logger.info("--> verify 1 doc in the index");
assertHitCount(client().prepareSearch().setQuery(matchAllQuery()).get(), 1L);
assertHitCount(client().prepareSearch().setPreference("_primary").setQuery(matchAllQuery()).get(), 1L);
assertThat(client().prepareGet("test", "1").execute().actionGet().isExists(), equalTo(true));

logger.info("--> stop data-only node and detach it from the old cluster");
Expand Down
Expand Up @@ -223,7 +223,7 @@ public void testDoNotAllowStaleReplicasToBePromotedToPrimary() throws Exception

logger.info("--> check that the up-to-date primary shard gets promoted and that documents are available");
ensureYellow("test");
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2L);
assertHitCount(client().prepareSearch().setPreference("_primary").setSize(0).setQuery(matchAllQuery()).get(), 2L);
}

public void testFailedAllocationOfStalePrimaryToDataNodeWithNoData() throws Exception {
Expand Down Expand Up @@ -605,7 +605,7 @@ public void testNotWaitForQuorumCopies() throws Exception {
internalCluster().restartRandomDataNode();
logger.info("--> checking that index still gets allocated with only 1 shard copy being available");
ensureYellow("test");
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 1L);
assertHitCount(client().prepareSearch().setPreference("_primary").setSize(0).setQuery(matchAllQuery()).get(), 1L);
}

/**
Expand Down
Expand Up @@ -87,6 +87,6 @@ public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Excep

IndicesStoreIntegrationIT.relocateAndBlockCompletion(logger, "test", 0, node_1, node_2);
// now search for the documents and see if we get a reply
assertThat(client().prepareSearch().setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
assertThat(client().prepareSearch().setPreference("_primary").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
}
}

0 comments on commit 0f29caf

Please sign in to comment.