Skip to content

Commit

Permalink
Add RemoteStoreStatsTrackerFactoryTests and RemoteStoreTestsHelper
Browse files Browse the repository at this point in the history
Signed-off-by: Bhumika Saini <sabhumik@amazon.com>
  • Loading branch information
BhumikaSaini-Amazon committed Aug 25, 2023
1 parent 4f9d48a commit 98d6242
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ protected RemoteStoreStats shardOperation(RemoteStoreStatsRequest request, Shard
throw new ShardNotFoundException(indexShard.shardId());
}

RemoteSegmentTransferTracker remoteSegmentTransferTracker = remoteStorePressureService.getRemoteRefreshSegmentTracker(
RemoteSegmentTransferTracker remoteSegmentTransferTracker = remoteStorePressureService.getRemoteSegmentTransferTracker(
indexShard.shardId()
);
assert Objects.nonNull(remoteSegmentTransferTracker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public RemoteStorePressureService(
* @param shardId shard id
* @return the tracker if index is remote-backed, else null.
*/
public RemoteSegmentTransferTracker getRemoteRefreshSegmentTracker(ShardId shardId) {
public RemoteSegmentTransferTracker getRemoteSegmentTransferTracker(ShardId shardId) {
return remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
*/
public class RemoteStorePressureSettings {

private static class Defaults {
static class Defaults {
private static final double BYTES_LAG_VARIANCE_FACTOR = 10.0;
private static final double UPLOAD_TIME_LAG_VARIANCE_FACTOR = 10.0;
private static final double VARIANCE_FACTOR_MIN_VALUE = 1.0;
private static final int MIN_CONSECUTIVE_FAILURES_LIMIT = 5;
private static final int MIN_CONSECUTIVE_FAILURES_LIMIT_MIN_VALUE = 1;
private static final int MOVING_AVERAGE_WINDOW_SIZE = 20;
private static final int MOVING_AVERAGE_WINDOW_SIZE_MIN_VALUE = 5;
static final int MOVING_AVERAGE_WINDOW_SIZE = 20;
static final int MOVING_AVERAGE_WINDOW_SIZE_MIN_VALUE = 5;
}

public static final Setting<Boolean> REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED = Setting.boolSetting(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,16 @@ public void afterIndexShardClosed(ShardId shardId, IndexShard indexShard, Settin
}

void updateMovingAverageWindowSize(BiConsumer<RemoteSegmentTransferTracker, Integer> biConsumer, int updatedSize) {
remoteSegmentTrackerMap.values().forEach(tracker -> biConsumer.accept(tracker, updatedSize));
movingAverageWindowSize = updatedSize;
remoteSegmentTrackerMap.values().forEach(tracker -> biConsumer.accept(tracker, movingAverageWindowSize));
}

RemoteSegmentTransferTracker getRemoteSegmentTransferTracker(ShardId shardId) {
return remoteSegmentTrackerMap.get(shardId);
}

// visible for testing
long getMovingAverageWindowSize() {
return movingAverageWindowSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1386,7 +1386,7 @@ public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean inclu
// Populate remote_store stats only if the index is remote store backed
if (indexSettings.isRemoteStoreEnabled()) {
segmentsStats.addRemoteSegmentStats(
new RemoteSegmentStats(remoteStorePressureService.getRemoteRefreshSegmentTracker(shardId).stats())
new RemoteSegmentStats(remoteStorePressureService.getRemoteSegmentTransferTracker(shardId).stats())
);
}
return segmentsStats;
Expand Down Expand Up @@ -3696,7 +3696,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
this,
// Add the checkpoint publisher if the Segment Replciation via remote store is enabled.
indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY,
remoteStorePressureService.getRemoteRefreshSegmentTracker(shardId())
remoteStorePressureService.getRemoteSegmentTransferTracker(shardId())
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void setUp() throws Exception {
Collections.emptySet()
);

when(pressureService.getRemoteRefreshSegmentTracker(any())).thenReturn(mock(RemoteSegmentTransferTracker.class));
when(pressureService.getRemoteSegmentTransferTracker(any())).thenReturn(mock(RemoteSegmentTransferTracker.class));
when(indicesService.indexService(INDEX)).thenReturn(indexService);
when(indexService.getIndexSettings()).thenReturn(new IndexSettings(remoteStoreIndexMetadata, Settings.EMPTY));
statsAction = new TransportRemoteStoreStatsAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,12 @@

package org.opensearch.index.remote;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -28,8 +23,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexShard;

public class RemoteStorePressureServiceTests extends OpenSearchTestCase {

Expand Down Expand Up @@ -74,40 +68,14 @@ public void testIsSegmentsUploadBackpressureEnabled() {
assertTrue(pressureService.isSegmentsUploadBackpressureEnabled());
}

public void testAfterIndexShardCreatedForRemoteBackedIndex() {
IndexShard indexShard = createIndexShard(shardId, true);
remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(Settings.EMPTY);
pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY, remoteStoreStatsTrackerFactory);
remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard);
assertNotNull(pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()));
}

public void testAfterIndexShardCreatedForNonRemoteBackedIndex() {
IndexShard indexShard = createIndexShard(shardId, false);
remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(Settings.EMPTY);
pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY, remoteStoreStatsTrackerFactory);
remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard);
assertNull(pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()));
}

public void testAfterIndexShardClosed() {
IndexShard indexShard = createIndexShard(shardId, true);
remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(Settings.EMPTY);
pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY, remoteStoreStatsTrackerFactory);
remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard);
assertNotNull(pressureService.getRemoteRefreshSegmentTracker(shardId));
remoteStoreStatsTrackerFactory.afterIndexShardClosed(shardId, indexShard, indexShard.indexSettings().getSettings());
assertNull(pressureService.getRemoteRefreshSegmentTracker(shardId));
}

public void testValidateSegmentUploadLag() {
// Create the pressure tracker
IndexShard indexShard = createIndexShard(shardId, true);
remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(Settings.EMPTY);
pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY, remoteStoreStatsTrackerFactory);
remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard);

RemoteSegmentTransferTracker pressureTracker = pressureService.getRemoteRefreshSegmentTracker(shardId);
RemoteSegmentTransferTracker pressureTracker = pressureService.getRemoteSegmentTransferTracker(shardId);
pressureTracker.updateLocalRefreshSeqNo(6);

// 1. time lag more than dynamic threshold
Expand Down Expand Up @@ -158,17 +126,4 @@ public void testValidateSegmentUploadLag() {
pressureService.validateSegmentsUploadLag(shardId);
}

private static IndexShard createIndexShard(ShardId shardId, boolean remoteStoreEnabled) {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, String.valueOf(remoteStoreEnabled))
.build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings);
Store store = mock(Store.class);
IndexShard indexShard = mock(IndexShard.class);
when(indexShard.indexSettings()).thenReturn(indexSettings);
when(indexShard.shardId()).thenReturn(shardId);
when(indexShard.store()).thenReturn(store);
return indexShard;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.remote;

import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.test.OpenSearchTestCase;

import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexShard;

public class RemoteStoreStatsTrackerFactoryTests extends OpenSearchTestCase {
private ShardId shardId;
private IndexShard indexShard;
private RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory;

@Override
public void setUp() throws Exception {
super.setUp();
shardId = new ShardId("index", "uuid", 0);
indexShard = createIndexShard(shardId, true);
remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(Settings.EMPTY);
}

public void testAfterIndexShardCreatedForRemoteBackedIndex() {
remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard);
assertNotNull(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId()));
}

public void testAfterIndexShardCreatedForNonRemoteBackedIndex() {
indexShard = createIndexShard(shardId, false);
remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard);
assertNull(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId()));
}

public void testAfterIndexShardClosed() {
remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard);
assertNotNull(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId));
remoteStoreStatsTrackerFactory.afterIndexShardClosed(shardId, indexShard, indexShard.indexSettings().getSettings());
assertNull(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId));
}

public void testUpdateMovingAverageWindowSize() {
remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard);

ShardId shardId2 = new ShardId("index", "uuid", 1);
IndexShard indexShard2 = createIndexShard(shardId2, true);
remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard2);

int defaultSize = RemoteStorePressureSettings.Defaults.MOVING_AVERAGE_WINDOW_SIZE;
assertEquals(defaultSize, remoteStoreStatsTrackerFactory.getMovingAverageWindowSize());
assertFalse(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId).isUploadBytesAverageReady());
assertFalse(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId).isUploadBytesPerSecAverageReady());
assertFalse(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId).isUploadTimeMsAverageReady());
assertFalse(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId2).isUploadBytesAverageReady());
assertFalse(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId2).isUploadBytesPerSecAverageReady());
assertFalse(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId2).isUploadTimeMsAverageReady());

int updatedSize = 0;
assertThrows(
IllegalArgumentException.class,
() -> remoteStoreStatsTrackerFactory.updateMovingAverageWindowSize(
RemoteSegmentTransferTracker::updateMovingAverageWindowSize,
updatedSize
)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.remote;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.IndexSettingsModule;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Helper functions for Remote Store tests
*/
public class RemoteStoreTestsHelper {
static IndexShard createIndexShard(ShardId shardId, boolean remoteStoreEnabled) {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, String.valueOf(remoteStoreEnabled))
.build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings);
Store store = mock(Store.class);
IndexShard indexShard = mock(IndexShard.class);
when(indexShard.indexSettings()).thenReturn(indexSettings);
when(indexShard.shardId()).thenReturn(shardId);
when(indexShard.store()).thenReturn(store);
return indexShard;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1822,7 +1822,7 @@ public void testShardStatsWithRemoteStoreEnabled() throws IOException {
.build()
);
RemoteSegmentTransferTracker remoteRefreshSegmentTracker = shard.getRemoteStorePressureService()
.getRemoteRefreshSegmentTracker(shard.shardId);
.getRemoteSegmentTransferTracker(shard.shardId);
populateSampleRemoteStoreStats(remoteRefreshSegmentTracker);
ShardStats shardStats = new ShardStats(
shard.routingEntry(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void setup(boolean primary, int numberOfDocs) throws IOException {
remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(Settings.EMPTY);
remoteStorePressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY, remoteStoreStatsTrackerFactory);
remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard);
RemoteSegmentTransferTracker tracker = remoteStorePressureService.getRemoteRefreshSegmentTracker(indexShard.shardId());
RemoteSegmentTransferTracker tracker = remoteStorePressureService.getRemoteSegmentTransferTracker(indexShard.shardId());
remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard, SegmentReplicationCheckpointPublisher.EMPTY, tracker);
}

Expand Down Expand Up @@ -328,7 +328,7 @@ public void testRefreshSuccessOnFirstAttempt() throws Exception {
assertBusy(() -> assertEquals(0, refreshCountLatch.getCount()));
assertBusy(() -> assertEquals(0, successLatch.getCount()));
RemoteStorePressureService pressureService = tuple.v2();
RemoteSegmentTransferTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId());
RemoteSegmentTransferTracker segmentTracker = pressureService.getRemoteSegmentTransferTracker(indexShard.shardId());
assertNoLagAndTotalUploadsFailed(segmentTracker, 0);
}

Expand All @@ -349,7 +349,7 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception {
assertBusy(() -> assertEquals(0, refreshCountLatch.getCount()));
assertBusy(() -> assertEquals(0, successLatch.getCount()));
RemoteStorePressureService pressureService = tuple.v2();
RemoteSegmentTransferTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId());
RemoteSegmentTransferTracker segmentTracker = pressureService.getRemoteSegmentTransferTracker(indexShard.shardId());
assertNoLagAndTotalUploadsFailed(segmentTracker, 1);
}

Expand Down Expand Up @@ -395,7 +395,7 @@ public void testRefreshSuccessOnThirdAttempt() throws Exception {
assertBusy(() -> assertEquals(0, refreshCountLatch.getCount()));
assertBusy(() -> assertEquals(0, successLatch.getCount()));
RemoteStorePressureService pressureService = tuple.v2();
RemoteSegmentTransferTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId());
RemoteSegmentTransferTracker segmentTracker = pressureService.getRemoteSegmentTransferTracker(indexShard.shardId());
assertNoLagAndTotalUploadsFailed(segmentTracker, 2);
}

Expand All @@ -412,7 +412,7 @@ public void testTrackerData() throws Exception {
Tuple<RemoteStoreRefreshListener, RemoteStorePressureService> tuple = mockIndexShardWithRetryAndScheduleRefresh(1);
RemoteStoreRefreshListener listener = tuple.v1();
RemoteStorePressureService pressureService = tuple.v2();
RemoteSegmentTransferTracker tracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId());
RemoteSegmentTransferTracker tracker = pressureService.getRemoteSegmentTransferTracker(indexShard.shardId());
assertNoLag(tracker);
indexDocs(100, randomIntBetween(100, 200));
indexShard.refresh("test");
Expand Down Expand Up @@ -544,7 +544,7 @@ private Tuple<RemoteStoreRefreshListener, RemoteStorePressureService> mockIndexS
when(shard.indexSettings()).thenReturn(indexShard.indexSettings());
when(shard.shardId()).thenReturn(indexShard.shardId());
remoteStoreStatsTrackerFactory.afterIndexShardCreated(shard);
RemoteSegmentTransferTracker tracker = remoteStorePressureService.getRemoteRefreshSegmentTracker(indexShard.shardId());
RemoteSegmentTransferTracker tracker = remoteStorePressureService.getRemoteSegmentTransferTracker(indexShard.shardId());
RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, emptyCheckpointPublisher, tracker);
refreshListener.afterRefresh(true);
return Tuple.tuple(refreshListener, remoteStorePressureService);
Expand Down

0 comments on commit 98d6242

Please sign in to comment.