Skip to content

Commit

Permalink
Removes all instances of index/alias existence checking via cluster s…
Browse files Browse the repository at this point in the history
…tate and replaces with exists()/aliasExists() API

Signed-off-by: Joshua Palis <jpalis@amazon.com>
  • Loading branch information
joshpalis committed Jun 2, 2023
1 parent c20e72d commit 6266531
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.PutMappingRequest;
import org.opensearch.client.indices.rollover.RolloverRequest;
import org.opensearch.client.opensearch.OpenSearchAsyncClient;
Expand Down Expand Up @@ -304,13 +305,57 @@ public static String getCheckpointMappings() throws IOException {
return Resources.toString(url, Charsets.UTF_8);
}

/**
* Determine if index exists
*
* @param indexName the name of the index
* @return true if index exists
*/
public boolean indexExists(String indexName) {
GetIndexRequest getindexRequest = new GetIndexRequest(indexName);

CompletableFuture<Boolean> existsFuture = new CompletableFuture<>();
sdkRestClient.indices().exists(getindexRequest, ActionListener.wrap(response -> { existsFuture.complete(response); }, exception -> {
existsFuture.completeExceptionally(exception);
}));

Boolean existsResponse = existsFuture
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(environmentSettings).getMillis(), TimeUnit.MILLISECONDS)
.join();

return existsResponse.booleanValue();
}

/**
* Determine if alias exists
*
* @param aliasName the name of the alias
* @return true if alias exists
*/
public boolean aliasExists(String aliasName) {
GetAliasesRequest getAliasRequest = new GetAliasesRequest(aliasName);

CompletableFuture<Boolean> existsFuture = new CompletableFuture<>();
sdkRestClient
.indices()
.existsAlias(getAliasRequest, ActionListener.wrap(response -> { existsFuture.complete(response); }, exception -> {
existsFuture.completeExceptionally(exception);
}));

Boolean existsResponse = existsFuture
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(environmentSettings).getMillis(), TimeUnit.MILLISECONDS)
.join();

return existsResponse.booleanValue();
}

/**
* Anomaly detector index exist or not.
*
* @return true if anomaly detector index exists
*/
public boolean doesAnomalyDetectorIndexExist() {
return sdkClusterService.state().getRoutingTable().hasIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX);
return indexExists(AnomalyDetector.ANOMALY_DETECTORS_INDEX);
}

/**
Expand All @@ -319,7 +364,7 @@ public boolean doesAnomalyDetectorIndexExist() {
* @return true if anomaly detector job index exists
*/
public boolean doesAnomalyDetectorJobIndexExist() {
return sdkClusterService.state().getRoutingTable().hasIndex(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX);
return indexExists(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX);
}

/**
Expand All @@ -328,11 +373,11 @@ public boolean doesAnomalyDetectorJobIndexExist() {
* @return true if anomaly result index exists
*/
public boolean doesDefaultAnomalyResultIndexExist() {
return sdkClusterService.state().metadata().hasAlias(CommonName.ANOMALY_RESULT_INDEX_ALIAS);
return aliasExists(CommonName.ANOMALY_RESULT_INDEX_ALIAS);
}

public boolean doesIndexExist(String indexName) {
return sdkClusterService.state().metadata().hasIndex(indexName);
return indexExists(indexName);
}

public <T> void initCustomResultIndexAndExecute(String resultIndex, AnomalyDetectorFunction function, ActionListener<T> listener) {
Expand Down Expand Up @@ -471,7 +516,7 @@ public boolean isValidResultIndexMapping(String resultIndex) {
* @return true if anomaly state index exists
*/
public boolean doesDetectorStateIndexExist() {
return sdkClusterService.state().getRoutingTable().hasIndex(CommonName.DETECTION_STATE_INDEX);
return indexExists(CommonName.DETECTION_STATE_INDEX);
}

/**
Expand All @@ -480,27 +525,7 @@ public boolean doesDetectorStateIndexExist() {
* @return true if checkpoint index exists
*/
public boolean doesCheckpointIndexExist() {
return sdkClusterService.state().getRoutingTable().hasIndex(CommonName.CHECKPOINT_INDEX_NAME);
}

/**
* Index exists or not
* @param sdkClusterService Cluster service
* @param name Index name
* @return true if the index exists
*/
public static boolean doesIndexExists(SDKClusterService sdkClusterService, String name) {
return sdkClusterService.state().getRoutingTable().hasIndex(name);
}

/**
* Alias exists or not
* @param sdkClusterService Cluster service
* @param alias Alias name
* @return true if the alias exists
*/
public static boolean doesAliasExists(SDKClusterService sdkClusterService, String alias) {
return sdkClusterService.state().metadata().hasAlias(alias);
return indexExists(CommonName.CHECKPOINT_INDEX_NAME);
}

private ActionListener<CreateIndexResponse> markMappingUpToDate(ADIndex index, ActionListener<CreateIndexResponse> followingListener) {
Expand Down Expand Up @@ -979,9 +1004,9 @@ private void markMappingUpdated(ADIndex adIndex) {
private void shouldUpdateIndex(ADIndex index, ActionListener<Boolean> thenDo) {
boolean exists = false;
if (index.isAlias()) {
exists = AnomalyDetectionIndices.doesAliasExists(sdkClusterService, index.getIndexName());
exists = aliasExists(index.getIndexName());
} else {
exists = AnomalyDetectionIndices.doesIndexExists(sdkClusterService, index.getIndexName());
exists = indexExists(index.getIndexName());
}
if (false == exists) {
thenDo.onResponse(Boolean.FALSE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -23,7 +25,10 @@
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.util.RestHandlerUtils;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -57,7 +62,7 @@ public void getDetectorJob(
AnomalyDetectorFunction function,
NamedXContentRegistry xContentRegistry
) {
if (sdkClusterService.state().metadata().indices().containsKey(ANOMALY_DETECTOR_JOB_INDEX)) {
if (anomalyDetectorJobIndexExists(client)) {
GetRequest request = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX).id(detectorId);
client
.get(
Expand Down Expand Up @@ -102,4 +107,19 @@ private void onGetAdJobResponseForWrite(
}
function.execute();
}

private boolean anomalyDetectorJobIndexExists(SDKRestClient sdkRestClient) {
GetIndexRequest getindexRequest = new GetIndexRequest(ANOMALY_DETECTOR_JOB_INDEX);

CompletableFuture<Boolean> existsFuture = new CompletableFuture<>();
sdkRestClient.indices().exists(getindexRequest, ActionListener.wrap(response -> { existsFuture.complete(response); }, exception -> {
existsFuture.completeExceptionally(exception);
}));

Boolean existsResponse = existsFuture
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(Settings.EMPTY).getMillis(), TimeUnit.MILLISECONDS)
.join();

return existsResponse.booleanValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -42,6 +44,7 @@
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.util.RestHandlerUtils;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -189,7 +192,7 @@ public void onFailure(Exception e) {
}

private void getDetectorJob(String detectorId, ActionListener<DeleteResponse> listener, AnomalyDetectorFunction function) {
if (clusterService.state().metadata().indices().containsKey(ANOMALY_DETECTOR_JOB_INDEX)) {
if (anomalyDetectorJobIndexExists(client)) {
GetRequest request = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX).id(detectorId);
client.get(request, ActionListener.wrap(response -> onGetAdJobResponseForWrite(response, listener, function), exception -> {
LOG.error("Fail to get anomaly detector job: " + detectorId, exception);
Expand Down Expand Up @@ -224,4 +227,19 @@ private void onGetAdJobResponseForWrite(GetResponse response, ActionListener<Del
}
function.execute();
}

private boolean anomalyDetectorJobIndexExists(SDKRestClient sdkRestClient) {
GetIndexRequest getindexRequest = new GetIndexRequest(ANOMALY_DETECTOR_JOB_INDEX);

CompletableFuture<Boolean> existsFuture = new CompletableFuture<>();
sdkRestClient.indices().exists(getindexRequest, ActionListener.wrap(response -> { existsFuture.complete(response); }, exception -> {
existsFuture.completeExceptionally(exception);
}));

Boolean existsResponse = existsFuture
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(Settings.EMPTY).getMillis(), TimeUnit.MILLISECONDS)
.join();

return existsResponse.booleanValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -28,10 +30,13 @@
import org.opensearch.action.support.TransportAction;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorType;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.stats.ADStats;
import org.opensearch.ad.stats.ADStatsResponse;
import org.opensearch.ad.stats.StatNames;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.rest.RestStatus;
import org.opensearch.sdk.SDKClient.SDKRestClient;
import org.opensearch.sdk.SDKClusterService;
Expand Down Expand Up @@ -133,7 +138,7 @@ private void getClusterStats(
if ((adStatsRequest.getStatsToBeRetrieved().contains(StatNames.DETECTOR_COUNT.getName())
|| adStatsRequest.getStatsToBeRetrieved().contains(StatNames.SINGLE_ENTITY_DETECTOR_COUNT.getName())
|| adStatsRequest.getStatsToBeRetrieved().contains(StatNames.MULTI_ENTITY_DETECTOR_COUNT.getName()))
&& sdkClusterService.state().getRoutingTable().hasIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX)) {
&& anomalyDetectorsIndexExists()) {

TermsAggregationBuilder termsAgg = AggregationBuilders.terms(DETECTOR_TYPE_AGG).field(AnomalyDetector.DETECTOR_TYPE_FIELD);
SearchRequest request = new SearchRequest()
Expand Down Expand Up @@ -213,4 +218,19 @@ private void getNodeStats(
listener.onResponse(restADStatsResponse);
}, listener::onFailure));
}

private boolean anomalyDetectorsIndexExists() {
GetIndexRequest getindexRequest = new GetIndexRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX);

CompletableFuture<Boolean> existsFuture = new CompletableFuture<>();
sdkRestClient.indices().exists(getindexRequest, ActionListener.wrap(response -> { existsFuture.complete(response); }, exception -> {
existsFuture.completeExceptionally(exception);
}));

Boolean existsResponse = existsFuture
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(Settings.EMPTY).getMillis(), TimeUnit.MILLISECONDS)
.join();

return existsResponse.booleanValue();
}
}
41 changes: 38 additions & 3 deletions src/main/java/org/opensearch/ad/util/IndexUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.opensearch.OpenSearchAsyncClient;
import org.opensearch.client.opensearch.indices.IndicesStatsRequest;
import org.opensearch.client.opensearch.indices.IndicesStatsResponse;
Expand Down Expand Up @@ -113,9 +116,9 @@ public IndexUtils(
* @throws IllegalArgumentException Thrown when an alias is passed in that points to more than one index
*/
public String getIndexHealthStatus(String indexOrAliasName) throws IllegalArgumentException {
if (!clusterService.state().getRoutingTable().hasIndex(indexOrAliasName)) {
if (!indexExists(indexOrAliasName)) {
// Check if the index is actually an alias
if (clusterService.state().metadata().hasAlias(indexOrAliasName)) {
if (aliasExists(indexOrAliasName)) {
// List of all indices the alias refers to
List<IndexMetadata> indexMetaDataList = clusterService
.state()
Expand Down Expand Up @@ -154,7 +157,7 @@ public String getIndexHealthStatus(String indexOrAliasName) throws IllegalArgume
*/
@Deprecated
public Long getNumberOfDocumentsInIndex(String indexName) {
if (!clusterService.state().getRoutingTable().hasIndex(indexName)) {
if (!indexExists(indexName)) {
return 0L;
}
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest.Builder().build();
Expand Down Expand Up @@ -191,4 +194,36 @@ public boolean checkIndicesBlocked(ClusterState state, ClusterBlockLevel level,

return state.blocks().indicesBlockedException(level, concreteIndices) != null;
}

private boolean indexExists(String indexName) {
GetIndexRequest getindexRequest = new GetIndexRequest(indexName);

CompletableFuture<Boolean> existsFuture = new CompletableFuture<>();
sdkRestClient.indices().exists(getindexRequest, ActionListener.wrap(response -> { existsFuture.complete(response); }, exception -> {
existsFuture.completeExceptionally(exception);
}));

Boolean existsResponse = existsFuture
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.join();

return existsResponse.booleanValue();
}

private boolean aliasExists(String aliasName) {
GetAliasesRequest getAliasRequest = new GetAliasesRequest(aliasName);

CompletableFuture<Boolean> existsFuture = new CompletableFuture<>();
sdkRestClient
.indices()
.existsAlias(getAliasRequest, ActionListener.wrap(response -> { existsFuture.complete(response); }, exception -> {
existsFuture.completeExceptionally(exception);
}));

Boolean existsResponse = existsFuture
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.join();

return existsResponse.booleanValue();
}
}

0 comments on commit 6266531

Please sign in to comment.