Skip to content

Commit

Permalink
Moving get snapshot requests to listener based async calls (#8377)
Browse files Browse the repository at this point in the history
* Moving get snapshot requests to listener based async calls
---------

Signed-off-by: INDRAJIT BANERJEE <indrajohn7@gmail.com>
Signed-off-by: Indrajit Banerjee <banerind@amazon.com>
  • Loading branch information
indrajohn7 committed Aug 8, 2023
1 parent e8d3e69 commit 2de1c24
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Add support to restore only unassigned shards of an index ([#8792](https://github.com/opensearch-project/OpenSearch/pull/8792))
- Replace the deprecated IndexReader APIs with new storedFields() & termVectors() ([#7792](https://github.com/opensearch-project/OpenSearch/pull/7792))
- [Remote Store] Restrict user override for remote store index level settings ([#8812](https://github.com/opensearch-project/OpenSearch/pull/8812))
- Removed blocking wait in TransportGetSnapshotsAction which was exhausting generic threadpool ([#8377](https://github.com/opensearch-project/OpenSearch/pull/8377))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.CollectionUtil;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.SnapshotsInProgress;
Expand Down Expand Up @@ -138,57 +138,64 @@ protected void clusterManagerOperation(
currentSnapshots.add(snapshotInfo);
}

final RepositoryData repositoryData;
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
if (isCurrentSnapshotsOnly(request.snapshots()) == false) {
repositoryData = PlainActionFuture.get(fut -> repositoriesService.getRepositoryData(repository, fut));
for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
allSnapshotIds.put(snapshotId.getName(), snapshotId);
}
repositoriesService.getRepositoryData(repository, repositoryDataListener);
} else {
repositoryData = null;
// Setting repositoryDataListener response to be null if the request has only current snapshot
repositoryDataListener.onResponse(null);
}
repositoryDataListener.whenComplete(repositoryData -> {
if (repositoryData != null) {
for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
allSnapshotIds.put(snapshotId.getName(), snapshotId);
}
}

final Set<SnapshotId> toResolve = new HashSet<>();
if (isAllSnapshots(request.snapshots())) {
toResolve.addAll(allSnapshotIds.values());
} else {
for (String snapshotOrPattern : request.snapshots()) {
if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) {
toResolve.addAll(currentSnapshots.stream().map(SnapshotInfo::snapshotId).collect(Collectors.toList()));
} else if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) {
if (allSnapshotIds.containsKey(snapshotOrPattern)) {
toResolve.add(allSnapshotIds.get(snapshotOrPattern));
} else if (request.ignoreUnavailable() == false) {
throw new SnapshotMissingException(repository, snapshotOrPattern);
}
} else {
for (Map.Entry<String, SnapshotId> entry : allSnapshotIds.entrySet()) {
if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
toResolve.add(entry.getValue());
final Set<SnapshotId> toResolve = new HashSet<>();
if (isAllSnapshots(request.snapshots())) {
toResolve.addAll(allSnapshotIds.values());
} else {
for (String snapshotOrPattern : request.snapshots()) {
if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) {
toResolve.addAll(currentSnapshots.stream().map(SnapshotInfo::snapshotId).collect(Collectors.toList()));
} else if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) {
if (allSnapshotIds.containsKey(snapshotOrPattern)) {
toResolve.add(allSnapshotIds.get(snapshotOrPattern));
} else if (request.ignoreUnavailable() == false) {
throw new SnapshotMissingException(repository, snapshotOrPattern);
}
} else {
for (Map.Entry<String, SnapshotId> entry : allSnapshotIds.entrySet()) {
if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
toResolve.add(entry.getValue());
}
}
}
}
}

if (toResolve.isEmpty() && request.ignoreUnavailable() == false && isCurrentSnapshotsOnly(request.snapshots()) == false) {
throw new SnapshotMissingException(repository, request.snapshots()[0]);
if (toResolve.isEmpty()
&& request.ignoreUnavailable() == false
&& isCurrentSnapshotsOnly(request.snapshots()) == false) {
throw new SnapshotMissingException(repository, request.snapshots()[0]);
}
}
}

final List<SnapshotInfo> snapshotInfos;
if (request.verbose()) {
snapshotInfos = snapshots(snapshotsInProgress, repository, new ArrayList<>(toResolve), request.ignoreUnavailable());
} else {
if (repositoryData != null) {
// want non-current snapshots as well, which are found in the repository data
snapshotInfos = buildSimpleSnapshotInfos(toResolve, repositoryData, currentSnapshots);
final List<SnapshotInfo> snapshotInfos;
if (request.verbose()) {
snapshotInfos = snapshots(snapshotsInProgress, repository, new ArrayList<>(toResolve), request.ignoreUnavailable());
} else {
// only want current snapshots
snapshotInfos = currentSnapshots.stream().map(SnapshotInfo::basic).collect(Collectors.toList());
CollectionUtil.timSort(snapshotInfos);
if (repositoryData != null) {
// want non-current snapshots as well, which are found in the repository data
snapshotInfos = buildSimpleSnapshotInfos(toResolve, repositoryData, currentSnapshots);
} else {
// only want current snapshots
snapshotInfos = currentSnapshots.stream().map(SnapshotInfo::basic).collect(Collectors.toList());
CollectionUtil.timSort(snapshotInfos);
}
}
}
listener.onResponse(new GetSnapshotsResponse(snapshotInfos));
listener.onResponse(new GetSnapshotsResponse(snapshotInfos));
}, listener::onFailure);
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@
import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotAction;
import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.opensearch.action.admin.cluster.snapshots.delete.TransportDeleteSnapshotAction;
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsAction;
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
import org.opensearch.action.admin.cluster.snapshots.get.TransportGetSnapshotsAction;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotAction;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
Expand Down Expand Up @@ -743,15 +746,13 @@ public void testConcurrentSnapshotCreateAndDeleteOther() {
);

final StepListener<CreateSnapshotResponse> createOtherSnapshotResponseStepListener = new StepListener<>();

continueOrDie(
createSnapshotResponseStepListener,
createSnapshotResponse -> client().admin()
.cluster()
.prepareCreateSnapshot(repoName, "snapshot-2")
.execute(createOtherSnapshotResponseStepListener)
);

final StepListener<AcknowledgedResponse> deleteSnapshotStepListener = new StepListener<>();

continueOrDie(
Expand Down Expand Up @@ -784,7 +785,6 @@ public void testConcurrentSnapshotCreateAndDeleteOther() {
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
// We end up with two snapshots no matter if the delete worked out or not
assertThat(snapshotIds, hasSize(2));

for (SnapshotId snapshotId : snapshotIds) {
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
Expand All @@ -794,6 +794,96 @@ public void testConcurrentSnapshotCreateAndDeleteOther() {
}
}

public void testTransportGetSnapshotsAction() {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));

String repoName = "repo";
final String[] snapshotsList = { "snapshot-1", "snapshot-2" };
final String[] indexList = { "index-1", "index-2" };
final int shards = randomIntBetween(1, 10);

TestClusterNodes.TestClusterNode clusterManagerNode = testClusterNodes.currentClusterManager(
testClusterNodes.nodes.values().iterator().next().clusterService.state()
);

for (int i = 0; i < snapshotsList.length; i++) {
final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
final String snapshot = snapshotsList[i];
final String index = indexList[i];
continueOrDie(
createRepoAndIndex(repoName, index, shards),
createSnapshotResponse -> client().admin()
.cluster()
.prepareCreateSnapshot(repoName, snapshot)
.setWaitForCompletion(true)
.execute(createSnapshotResponseStepListener)
);
}
deterministicTaskQueue.runAllRunnableTasks();

TransportAction getSnapshotsAction = clusterManagerNode.actions.get(GetSnapshotsAction.INSTANCE);
TransportGetSnapshotsAction transportGetSnapshotsAction = (TransportGetSnapshotsAction) getSnapshotsAction;
GetSnapshotsRequest repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName).snapshots(snapshotsList);

transportGetSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> {
assertNotNull("Snapshot list should not be null", repoSnapshotResponse.getSnapshots());
assertThat(repoSnapshotResponse.getSnapshots(), hasSize(2));
List<SnapshotInfo> snapshotInfos = repoSnapshotResponse.getSnapshots();
for (SnapshotInfo snapshotInfo : snapshotInfos) {
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
assertEquals(0, snapshotInfo.failedShards());
assertTrue(Arrays.stream(snapshotsList).anyMatch(snapshotInfo.snapshotId().getName()::equals));
}
}, exception -> { throw new AssertionError(exception); }));
}

public void testTransportGetCurrentSnapshotsAction() {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));

String repoName = "repo";
final String index = "index-1";
final String[] snapshotsList = { GetSnapshotsRequest.CURRENT_SNAPSHOT };
final int shards = randomIntBetween(1, 10);

TestClusterNodes.TestClusterNode clusterManagerNode = testClusterNodes.currentClusterManager(
testClusterNodes.nodes.values().iterator().next().clusterService.state()
);

final StepListener<CreateSnapshotResponse> createSnapshotResponseListener = new StepListener<>();
clusterManagerNode.clusterService.addListener(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().custom(SnapshotsInProgress.TYPE) != null) {
TransportAction getSnapshotsAction = clusterManagerNode.actions.get(GetSnapshotsAction.INSTANCE);
TransportGetSnapshotsAction transportGetSnapshotsAction = (TransportGetSnapshotsAction) getSnapshotsAction;
GetSnapshotsRequest repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName)
.snapshots(snapshotsList)
.ignoreUnavailable(false)
.verbose(false);
transportGetSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> {
assertNotNull("Snapshot list should not be null", repoSnapshotResponse.getSnapshots());
List<SnapshotInfo> snapshotInfos = repoSnapshotResponse.getSnapshots();
assertThat(repoSnapshotResponse.getSnapshots(), hasSize(snapshotsList.length));
for (SnapshotInfo snapshotInfo : snapshotInfos) {
assertEquals(SnapshotState.IN_PROGRESS, snapshotInfo.state());
assertEquals(0, snapshotInfo.failedShards());
assertTrue(snapshotInfo.snapshotId().getName().contains("last-snapshot"));
}
}, exception -> { throw new AssertionError(exception); }));
clusterManagerNode.clusterService.removeListener(this);
}
}
});
continueOrDie(
createRepoAndIndex(repoName, index, shards),
createIndexResponse -> client().admin()
.cluster()
.prepareCreateSnapshot(repoName, GetSnapshotsRequest.CURRENT_SNAPSHOT)
.execute(createSnapshotResponseListener)
);
deterministicTaskQueue.runAllRunnableTasks();
}

public void testBulkSnapshotDeleteWithAbort() {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));

Expand Down Expand Up @@ -1756,7 +1846,6 @@ public TestClusterNode currentClusterManager(ClusterState state) {
}

private final class TestClusterNode {

private final Logger logger = LogManager.getLogger(TestClusterNode.class);

private final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(
Expand Down Expand Up @@ -1802,6 +1891,8 @@ private final class TestClusterNode {

private Coordinator coordinator;

private Map<ActionType, TransportAction> actions = new HashMap<>();

TestClusterNode(DiscoveryNode node) throws IOException {
this.node = node;
final Environment environment = createEnvironment(node.getName());
Expand Down Expand Up @@ -2038,7 +2129,7 @@ public void onFailure(final Exception e) {
SegmentReplicationCheckpointPublisher.EMPTY,
mock(RemoteRefreshSegmentPressureService.class)
);
Map<ActionType, TransportAction> actions = new HashMap<>();

final SystemIndices systemIndices = new SystemIndices(emptyMap());
final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices);
final MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService(
Expand Down Expand Up @@ -2270,6 +2361,17 @@ public void onFailure(final Exception e) {
indexNameExpressionResolver
)
);
actions.put(
GetSnapshotsAction.INSTANCE,
new TransportGetSnapshotsAction(
transportService,
clusterService,
threadPool,
repositoriesService,
actionFilters,
indexNameExpressionResolver
)
);
actions.put(
ClusterStateAction.INSTANCE,
new TransportClusterStateAction(
Expand Down

0 comments on commit 2de1c24

Please sign in to comment.