Skip to content

Commit

Permalink
Fixing UTs
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com>
  • Loading branch information
shourya035 committed May 31, 2024
1 parent e09d1f8 commit c00a803
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -34,34 +34,56 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.mockito.MockedStatic;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.OpenSearchAllocationTestCase;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.AllocationCommands;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.compress.DeflateCompressor;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.BlobStoreTestUtil;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.snapshots.InternalSnapshotsInfoService;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.VersionUtils;
import org.opensearch.test.gateway.TestGatewayAllocator;
import org.opensearch.threadpool.TestThreadPool;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
import static org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING;
Expand All @@ -77,6 +99,7 @@
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.opensearch.snapshots.InternalSnapshotsInfoService.INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING;

public class FailedShardsRoutingTests extends OpenSearchAllocationTestCase {
private final Logger logger = LogManager.getLogger(FailedShardsRoutingTests.class);
Expand Down Expand Up @@ -826,7 +849,6 @@ private void testReplicaIsPromoted(boolean isSegmentReplicationEnabled) {

public void testPreferReplicaOnRemoteNodeForPrimaryPromotion() {
FeatureFlags.initializeFeatureFlags(Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build());
AllocationService allocation = createAllocationService(Settings.builder().build());

// segment replication enabled
Settings.Builder settingsBuilder = settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);
Expand All @@ -849,6 +871,31 @@ public void testPreferReplicaOnRemoteNodeForPrimaryPromotion() {
.routingTable(initialRoutingTable)
.build();

BlobPath basePath = BlobPath.cleanPath().add("test");
RepositoriesService repositoriesService = mock(RepositoriesService.class);
BlobStoreRepository repository = mock(BlobStoreRepository.class);
BlobStore blobStore = mock(BlobStore.class);
when(repository.blobStore()).thenReturn(blobStore);
when(repositoriesService.repository(anyString())).thenReturn(repository);
when(repository.basePath()).thenReturn(basePath);
when(repository.getCompressor()).thenReturn(new DeflateCompressor());
when(blobStore.isBlobMetadataEnabled()).thenReturn(true);

TestThreadPool testThreadPool = new TestThreadPool(getTestName());
final ClusterService clusterService = ClusterServiceUtils.createClusterService(testThreadPool);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(clusterService.state());
final InternalSnapshotsInfoService internalSnapshotsInfoService = new InternalSnapshotsInfoService(
Settings.builder().put(INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING.getKey(), randomIntBetween(1, 10)).build(),
clusterService,
() -> repositoriesService,
() -> rerouteService
);

TestGatewayAllocator gatewayAllocator = new TestGatewayAllocator();
AllocationService allocation = createAllocationService(Settings.EMPTY, gatewayAllocator, internalSnapshotsInfoService);



ShardId shardId = new ShardId(metadata.index("test").getIndex(), 0);

// add a remote node and start primary shard
Expand Down Expand Up @@ -954,5 +1001,6 @@ public void testPreferReplicaOnRemoteNodeForPrimaryPromotion() {
|| primaryShardRouting3.currentNodeId().equals(nonRemoteNode2.getId())
);
assertEquals(expectedCandidateForSegRep.allocationId(), primaryShardRouting3.allocationId());
testThreadPool.shutdownNow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.cluster.routing.allocation;

import org.junit.After;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
Expand All @@ -43,6 +44,7 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
Expand All @@ -51,19 +53,33 @@
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.RemoteStoreMigrationAllocationDecider;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.compress.DeflateCompressor;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.snapshots.InternalSnapshotsInfoService;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;

import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY;
Expand All @@ -75,6 +91,7 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.hamcrest.core.Is.is;
import static org.opensearch.snapshots.InternalSnapshotsInfoService.INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING;

public class RemoteStoreMigrationAllocationDeciderTests extends OpenSearchAllocationTestCase {

Expand Down Expand Up @@ -109,6 +126,7 @@ public class RemoteStoreMigrationAllocationDeciderTests extends OpenSearchAlloca
private RoutingTable routingTable = null;

private ShardId shardId = new ShardId(TEST_INDEX, "_na_", 0);
private TestThreadPool testThreadPool;

private void beforeAllocation(String direction) {
FeatureFlags.initializeFeatureFlags(directionEnabledNodeSettings);
Expand Down Expand Up @@ -138,17 +156,47 @@ private void beforeAllocation(String direction) {
getClusterSettings(customSettings)
);


BlobPath basePath = BlobPath.cleanPath().add("test");
RepositoriesService repositoriesService = mock(RepositoriesService.class);
BlobStoreRepository repository = mock(BlobStoreRepository.class);
BlobStore blobStore = mock(BlobStore.class);
when(repository.blobStore()).thenReturn(blobStore);
when(repositoriesService.repository(anyString())).thenReturn(repository);
when(repository.basePath()).thenReturn(basePath);
when(repository.getCompressor()).thenReturn(new DeflateCompressor());
when(blobStore.isBlobMetadataEnabled()).thenReturn(true);

if (Objects.isNull(testThreadPool)) {
testThreadPool = new TestThreadPool(getTestName());
}
final ClusterService clusterService = ClusterServiceUtils.createClusterService(testThreadPool);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(clusterService.state());
final InternalSnapshotsInfoService internalSnapshotsInfoService = new InternalSnapshotsInfoService(
Settings.builder().put(INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING.getKey(), randomIntBetween(1, 10)).build(),
clusterService,
() -> repositoriesService,
() -> rerouteService
);

routingAllocation = new RoutingAllocation(
new AllocationDeciders(Collections.singleton(remoteStoreMigrationAllocationDecider)),
clusterState.getRoutingNodes(),
clusterState,
null,
internalSnapshotsInfoService,
null,
0L
);
routingAllocation.debugDecision(true);
}

@After
public void tearDown() throws Exception {
testThreadPool.shutdownNow();
super.tearDown();
}

private void prepareRoutingTable(boolean isReplicaAllocation, String primaryShardNodeId) {
routingTable = RoutingTable.builder()
.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.snapshots.InternalSnapshotsInfoService;
import org.opensearch.snapshots.SnapshotShardSizeInfo;
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.test.OpenSearchTestCase;
Expand All @@ -70,6 +71,7 @@

import static java.util.Collections.emptyMap;
import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.opensearch.snapshots.InternalSnapshotsInfoService.INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING;

public abstract class OpenSearchAllocationTestCase extends OpenSearchTestCase {
private static final ClusterSettings EMPTY_CLUSTER_SETTINGS = new ClusterSettings(
Expand Down Expand Up @@ -103,6 +105,7 @@ public static MockAllocationService createAllocationService(Settings settings, C
randomAllocationDeciders(settings, clusterSettings, random),
new TestGatewayAllocator(),
new BalancedShardsAllocator(settings),

EmptyClusterInfoService.INSTANCE,
SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES
);
Expand Down

0 comments on commit c00a803

Please sign in to comment.