Skip to content

Commit

Permalink
Incorporating review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
  • Loading branch information
sarthakaggarwal97 committed Jun 28, 2023
1 parent be7561b commit 097efd8
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static java.util.stream.Collectors.toList;
Expand All @@ -37,10 +38,10 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;

public class ReindexCodecIT extends ReindexTestCase {
public class MultiCodecReindexIT extends ReindexTestCase {

public void testReindexingMultipleCodecs() throws InterruptedException, ExecutionException {
internalCluster().ensureAtLeastNumDataNodes(2);
internalCluster().ensureAtLeastNumDataNodes(1);
Map<String, String> codecMap = Map.of(
"best_compression",
"BEST_COMPRESSION",
Expand Down Expand Up @@ -71,6 +72,7 @@ private void assertReindexingWithMultipleCodecs(String destCodec, String destCod
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.codec", "default")
.put("index.merge.policy.max_merged_segment", "1b")
.build()
);
ensureGreen(index);
Expand All @@ -79,9 +81,17 @@ private void assertReindexingWithMultipleCodecs(String destCodec, String destCod

// indexing with all 4 codecs
for (Map.Entry<String, String> codec : codecMap.entrySet()) {
indexWithDifferentCodecs(index, codec.getKey(), codec.getValue(), nbDocs);
useCodec(index, codec.getKey());
ingestDocs(index, nbDocs);
}

assertTrue(
getSegments(index).stream()
.flatMap(s -> s.getAttributes().values().stream())
.collect(Collectors.toSet())
.containsAll(codecMap.values())
);

// creating destination index with destination codec
createIndex(
destIndex,
Expand All @@ -98,8 +108,8 @@ private void assertReindexingWithMultipleCodecs(String destCodec, String destCod
.waitForActiveShards(ActiveShardCount.ONE)
.get();

assertEquals(4 * nbDocs, bulkResponse.getCreated());
assertEquals(4 * nbDocs, bulkResponse.getTotal());
assertEquals(codecMap.size() * nbDocs, bulkResponse.getCreated());
assertEquals(codecMap.size() * nbDocs, bulkResponse.getTotal());
assertEquals(0, bulkResponse.getDeleted());
assertEquals(0, bulkResponse.getNoops());
assertEquals(0, bulkResponse.getVersionConflicts());
Expand All @@ -110,6 +120,19 @@ private void assertReindexingWithMultipleCodecs(String destCodec, String destCod
assertTrue(getSegments(destIndex).stream().allMatch(segment -> segment.attributes.containsValue(destCodecMode)));
}

private void useCodec(String index, String codec) throws ExecutionException, InterruptedException {
assertAcked(client().admin().indices().prepareClose(index));

assertAcked(
client().admin()
.indices()
.updateSettings(new UpdateSettingsRequest(index).settings(Settings.builder().put("index.codec", codec)))
.get()
);

assertAcked(client().admin().indices().prepareOpen(index));
}

private void flushAndRefreshIndex(String index) {

// Request is not blocked
Expand All @@ -135,19 +158,7 @@ private void flushAndRefreshIndex(String index) {
}
}

private void indexWithDifferentCodecs(String index, String codec, String codecMode, int nbDocs) throws InterruptedException,
ExecutionException {

assertAcked(client().admin().indices().prepareClose(index));

assertAcked(
client().admin()
.indices()
.updateSettings(new UpdateSettingsRequest(index).settings(Settings.builder().put("index.codec", codec)))
.get()
);

assertAcked(client().admin().indices().prepareOpen(index));
private void ingestDocs(String index, int nbDocs) throws InterruptedException {

indexRandom(
randomBoolean(),
Expand All @@ -158,7 +169,6 @@ private void indexWithDifferentCodecs(String index, String codec, String codecMo
.collect(toList())
);
flushAndRefreshIndex(index);
assertTrue(getSegments(index).stream().anyMatch(segment -> segment.attributes.containsValue(codecMode)));
}

private ArrayList<Segment> getSegments(String index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
public class CodecIT extends OpenSearchIntegTestCase {
public class MultiCodecMergeIT extends OpenSearchIntegTestCase {

public void testForceMergeMultipleCodecs() throws ExecutionException, InterruptedException {

Expand All @@ -62,31 +62,40 @@ public void testForceMergeMultipleCodecs() throws ExecutionException, Interrupte
private void forceMergeMultipleCodecs(String finalCodec, String finalCodecMode, Map<String, String> codecMap) throws ExecutionException,
InterruptedException {

internalCluster().ensureAtLeastNumDataNodes(2);
internalCluster().ensureAtLeastNumDataNodes(1);
final String index = "test-index" + finalCodec;

// creating index
createIndex(
index,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.codec", "default")
.put("index.merge.policy.max_merged_segment", "1b")
.build()
);
ensureGreen(index);

// ingesting and asserting segment codec mode for all four codecs
for (Map.Entry<String, String> codec : codecMap.entrySet()) {
assertSegmentCodec(index, codec.getKey(), codec.getValue());
useCodec(index, codec.getKey());
ingestDocs(index);
}

assertTrue(
getSegments(index).stream()
.flatMap(s -> s.getAttributes().values().stream())
.collect(Collectors.toSet())
.containsAll(codecMap.values())
);

// force merge into final codec
assertSegmentCodec(index, finalCodec, finalCodecMode);
useCodec(index, finalCodec);
flushAndRefreshIndex(index);
final ForceMergeResponse forceMergeResponse = client().admin().indices().prepareForceMerge(index).setMaxNumSegments(1).get();

assertThat(forceMergeResponse.getFailedShards(), is(0));
assertThat(forceMergeResponse.getSuccessfulShards(), is(2));
assertThat(forceMergeResponse.getSuccessfulShards(), is(1));

flushAndRefreshIndex(index);

Expand All @@ -95,8 +104,7 @@ private void forceMergeMultipleCodecs(String finalCodec, String finalCodecMode,
assertTrue(segments.stream().findFirst().get().attributes.containsValue(finalCodecMode));
}

private void assertSegmentCodec(String index, String codec, String codecMode) throws InterruptedException, ExecutionException {

private void useCodec(String index, String codec) throws ExecutionException, InterruptedException {
assertAcked(client().admin().indices().prepareClose(index));

assertAcked(
Expand All @@ -107,11 +115,11 @@ private void assertSegmentCodec(String index, String codec, String codecMode) th
);

assertAcked(client().admin().indices().prepareOpen(index));
}

private void ingestDocs(String index) throws InterruptedException {
ingest(index);
flushAndRefreshIndex(index);

assertTrue(getSegments(index).stream().anyMatch(segment -> segment.attributes.containsValue(codecMode)));
}

private ArrayList<Segment> getSegments(String index) {
Expand Down

0 comments on commit 097efd8

Please sign in to comment.