Skip to content

Commit

Permalink
Merge branch 'main' into stats-rollup
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com>
  • Loading branch information
shourya035 committed Aug 10, 2023
2 parents 043666a + d06926c commit 9654b55
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.apache.hc.core5.http.message.BasicHeader;
Expand All @@ -73,6 +74,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -298,37 +300,70 @@ public void testRequestResetAndAbort() throws Exception {
httpGet.reset();
assertFalse(httpGet.isAborted());

Future<ClassicHttpResponse> future = client.execute(getRequestProducer(httpGet, httpHost), getResponseConsumer(), null);
httpGet.setDependency((org.apache.hc.core5.concurrent.Cancellable) future);
httpGet.abort();
final Phaser phaser = new Phaser(2);
phaser.register();

try {
future.get();
fail("expected cancellation exception");
} catch (CancellationException e) {
// expected
Future<ClassicHttpResponse> future = client.execute(
getRequestProducer(httpGet, httpHost),
getResponseConsumer(phaser),
null
);
httpGet.setDependency((org.apache.hc.core5.concurrent.Cancellable) future);
httpGet.abort();

try {
phaser.arriveAndDeregister();
future.get();
fail("expected cancellation exception");
} catch (CancellationException e) {
// expected
}
assertTrue(future.isCancelled());
} finally {
// Forcing termination since the AsyncResponseConsumer may not be reached,
// the request is aborted right before
phaser.forceTermination();
}
assertTrue(future.isCancelled());
}
{
httpGet.reset();
Future<ClassicHttpResponse> future = client.execute(getRequestProducer(httpGet, httpHost), getResponseConsumer(), null);
assertFalse(httpGet.isAborted());
httpGet.setDependency((org.apache.hc.core5.concurrent.Cancellable) future);
httpGet.abort();
assertTrue(httpGet.isAborted());
final Phaser phaser = new Phaser(2);
phaser.register();

try {
assertTrue(future.isCancelled());
future.get();
throw new AssertionError("exception should have been thrown");
} catch (CancellationException e) {
// expected
httpGet.reset();
Future<ClassicHttpResponse> future = client.execute(
getRequestProducer(httpGet, httpHost),
getResponseConsumer(phaser),
null
);
assertFalse(httpGet.isAborted());
httpGet.setDependency((org.apache.hc.core5.concurrent.Cancellable) future);
httpGet.abort();
assertTrue(httpGet.isAborted());
try {
phaser.arriveAndDeregister();
assertTrue(future.isCancelled());
future.get();
throw new AssertionError("exception should have been thrown");
} catch (CancellationException e) {
// expected
}
} finally {
// Forcing termination since the AsyncResponseConsumer may not be reached,
// the request is aborted right before
phaser.forceTermination();
}
}
{
httpGet.reset();
assertFalse(httpGet.isAborted());
Future<ClassicHttpResponse> future = client.execute(getRequestProducer(httpGet, httpHost), getResponseConsumer(), null);
final Phaser phaser = new Phaser(0);
Future<ClassicHttpResponse> future = client.execute(
getRequestProducer(httpGet, httpHost),
getResponseConsumer(phaser),
null
);
assertFalse(httpGet.isAborted());
assertEquals(200, future.get().getCode());
assertFalse(future.isCancelled());
Expand Down Expand Up @@ -554,8 +589,15 @@ private Response bodyTest(RestClient restClient, String method, int statusCode,
return esResponse;
}

private AsyncResponseConsumer<ClassicHttpResponse> getResponseConsumer() {
return new HeapBufferedAsyncResponseConsumer(1024);
private AsyncResponseConsumer<ClassicHttpResponse> getResponseConsumer(Phaser phaser) {
phaser.register();
return new HeapBufferedAsyncResponseConsumer(1024) {
@Override
protected ClassicHttpResponse buildResult(HttpResponse response, byte[] entity, ContentType contentType) {
phaser.arriveAndAwaitAdvance();
return super.buildResult(response, entity, contentType);
}
};
}

private HttpUriRequestProducer getRequestProducer(HttpUriRequestBase request, HttpHost host) {
Expand Down
8 changes: 8 additions & 0 deletions release-notes/opensearch.release-notes-1.3.12.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
## 2023-08-09 Version 1.3.12 Release Notes

### Upgrades
- Upgrade `org.bouncycastle:bcprov-jdk15on` to `org.bouncycastle:bcprov-jdk15to18` version 1.75 ([#8247](https://github.com/opensearch-project/OpenSearch/pull/8247))
- Upgrade `org.bouncycastle:bcmail-jdk15on` to `org.bouncycastle:bcmail-jdk15to18` version 1.75 ([#8247](https://github.com/opensearch-project/OpenSearch/pull/8247))
- Upgrade `org.bouncycastle:bcpkix-jdk15on` to `org.bouncycastle:bcpkix-jdk15to18` version 1.75 ([#8247](https://github.com/opensearch-project/OpenSearch/pull/8247))
- Upgrade `netty` from 4.1.94.Final to 4.1.96.Final ([#9030](https://github.com/opensearch-project/OpenSearch/pull/9030))
- Upgrade bundled OpenJDK (July 2023 Patch releases) ([#8872](https://github.com/opensearch-project/OpenSearch/pull/8872))
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
package org.opensearch.remotestore;

import org.junit.After;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
Expand All @@ -33,10 +37,10 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -76,13 +80,18 @@ protected Map<String, Long> indexData(int numberOfIterations, boolean invokeFlus
indexingStats.put(MAX_SEQ_NO_REFRESHED_OR_FLUSHED + "-shard-" + shardId, maxSeqNoRefreshedOrFlushed);
refreshedOrFlushedOperations = totalOperations;
int numberOfOperations = randomIntBetween(20, 50);
for (int j = 0; j < numberOfOperations; j++) {
IndexResponse response = indexSingleDoc(index);
maxSeqNo = response.getSeqNo();
shardId = response.getShardId().id();
indexingStats.put(MAX_SEQ_NO_TOTAL + "-shard-" + shardId, maxSeqNo);
int numberOfBulk = randomIntBetween(1, 5);
for (int j = 0; j < numberOfBulk; j++) {
BulkResponse res = indexBulk(index, numberOfOperations);
for (BulkItemResponse singleResp : res.getItems()) {
indexingStats.put(
MAX_SEQ_NO_TOTAL + "-shard-" + singleResp.getResponse().getShardId().id(),
singleResp.getResponse().getSeqNo()
);
maxSeqNo = singleResp.getResponse().getSeqNo();
}
totalOperations += numberOfOperations;
}
totalOperations += numberOfOperations;
}

indexingStats.put(TOTAL_OPERATIONS, totalOperations);
Expand Down Expand Up @@ -132,6 +141,18 @@ protected IndexResponse indexSingleDoc(String indexName, boolean forceRefresh) {
return indexRequestBuilder.get();
}

protected BulkResponse indexBulk(String indexName, int numDocs) {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numDocs; i++) {
final IndexRequest request = client().prepareIndex(indexName)
.setId(UUIDs.randomBase64UUID())
.setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5))
.request();
bulkRequest.add(request);
}
return client().bulk(bulkRequest).actionGet();
}

public static Settings remoteStoreClusterSettings(String segmentRepoName) {
return remoteStoreClusterSettings(segmentRepoName, segmentRepoName);
}
Expand Down Expand Up @@ -179,10 +200,11 @@ protected Settings remoteStoreIndexSettings(int numberOfReplicas) {
return remoteStoreIndexSettings(numberOfReplicas, 1);
}

protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFieldLimit) {
protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFieldLimit, int refresh) {
return Settings.builder()
.put(remoteStoreIndexSettings(numberOfReplicas))
.put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), totalFieldLimit)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), String.valueOf(refresh))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.RemoteStoreRefreshListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand All @@ -29,12 +28,10 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

Expand Down Expand Up @@ -148,10 +145,9 @@ public void testRemoteTranslogCleanup() throws Exception {
verifyRemoteStoreCleanup();
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8658")
public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
internalCluster().startDataOnlyNodes(3);
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l));
internalCluster().startDataOnlyNodes(1);
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String indexUUID = client().admin()
Expand All @@ -163,20 +159,22 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
// Delete is async.
assertBusy(() -> {
int actualFileCount = getFileCount(indexPath);
if (numberOfIterations <= RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP) {
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations, numberOfIterations + 1)));
if (numberOfIterations <= LAST_N_METADATA_FILES_TO_KEEP) {
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
} else {
// As delete is async its possible that the file gets created before the deletion or after
// deletion.
MatcherAssert.assertThat(actualFileCount, is(oneOf(10, 11)));
MatcherAssert.assertThat(
actualFileCount,
is(oneOf(LAST_N_METADATA_FILES_TO_KEEP - 1, LAST_N_METADATA_FILES_TO_KEEP, LAST_N_METADATA_FILES_TO_KEEP + 1))
);
}
}, 30, TimeUnit.SECONDS);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8658")
public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
internalCluster().startDataOnlyNodes(3);
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l));
internalCluster().startDataOnlyNodes(1);
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, false, INDEX_NAME);
String indexUUID = client().admin()
Expand All @@ -187,6 +185,6 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata");
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations, numberOfIterations + 1)));
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ public void testPromoteReplicaToPrimary() throws Exception {
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), numOfDocs);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9130")
public void testFailoverWhileIndexing() throws Exception {
internalCluster().startNode();
internalCluster().startNode();
Expand All @@ -143,7 +142,7 @@ public void testFailoverWhileIndexing() throws Exception {
.setSource("field", numAutoGenDocs.get())
.get();

if (indexResponse.status() == RestStatus.CREATED || indexResponse.status() == RestStatus.ACCEPTED) {
if (indexResponse.status() == RestStatus.CREATED || indexResponse.status() == RestStatus.OK) {
numAutoGenDocs.incrementAndGet();
if (numAutoGenDocs.get() == docCount / 2) {
if (random().nextInt(3) == 0) {
Expand Down

0 comments on commit 9654b55

Please sign in to comment.