-
Notifications
You must be signed in to change notification settings - Fork 10
Rest Layer and Async Search Cleanup Management #9
Changes from 29 commits
9c921d2
591d8ad
28d39b7
93ecef5
959439a
1bbf255
9fc6a89
0678622
c830645
7cbcbf8
97d65c4
ceecbac
903b107
85fed8f
6f24bf0
a62f6cb
c7b2df1
d072d6d
fa5cd67
bc64fc3
3350aa8
d504524
c164993
b71e57f
362ab77
99aacde
cfac793
30c3a66
34865ca
0878892
8173ba0
150ad90
f90d6be
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
/* | ||
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package com.amazon.opendistroforelasticsearch.search.async.action; | ||
|
||
import com.amazon.opendistroforelasticsearch.search.async.response.AsyncSearchStatsResponse; | ||
import org.elasticsearch.action.ActionType; | ||
import org.elasticsearch.common.io.stream.Writeable; | ||
|
||
public class AsyncSearchStatsAction extends ActionType<AsyncSearchStatsResponse> { | ||
|
||
|
||
public static final AsyncSearchStatsAction INSTANCE = new AsyncSearchStatsAction(); | ||
public static final String NAME = "cluster:admin/async_search/stats"; | ||
|
||
private AsyncSearchStatsAction() { | ||
super(NAME, AsyncSearchStatsResponse::new); | ||
} | ||
|
||
@Override | ||
public Writeable.Reader<AsyncSearchStatsResponse> getResponseReader() { | ||
return AsyncSearchStatsResponse::new; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,6 @@ | |
package com.amazon.opendistroforelasticsearch.search.async.context.active; | ||
|
||
import com.amazon.opendistroforelasticsearch.search.async.context.AsyncSearchContextId; | ||
import com.amazon.opendistroforelasticsearch.search.async.context.state.AsyncSearchStateMachine; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
|
@@ -26,6 +25,7 @@ | |
|
||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.function.Consumer; | ||
|
||
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency; | ||
|
||
|
@@ -34,26 +34,25 @@ public class AsyncSearchActiveStore { | |
|
||
private static Logger logger = LogManager.getLogger(AsyncSearchActiveStore.class); | ||
private volatile int maxRunningContext; | ||
private final AsyncSearchStateMachine asyncSearchStateMachine; | ||
public static final Setting<Integer> MAX_RUNNING_CONTEXT = Setting.intSetting( | ||
"async_search.max_running_context", 100, 0, Setting.Property.Dynamic, Setting.Property.NodeScope); | ||
"async_search.max_running_context", 100, 10, Setting.Property.Dynamic, Setting.Property.NodeScope); | ||
|
||
private final ConcurrentMapLong<AsyncSearchActiveContext> activeContexts = newConcurrentMapLongWithAggressiveConcurrency(); | ||
|
||
|
||
public AsyncSearchActiveStore(ClusterService clusterService, AsyncSearchStateMachine stateMachine) { | ||
public AsyncSearchActiveStore(ClusterService clusterService) { | ||
Settings settings = clusterService.getSettings(); | ||
maxRunningContext = MAX_RUNNING_CONTEXT.get(settings); | ||
this.asyncSearchStateMachine = stateMachine; | ||
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_RUNNING_CONTEXT, this::setMaxRunningContext); | ||
} | ||
|
||
private void setMaxRunningContext(int maxRunningContext) { | ||
this.maxRunningContext = maxRunningContext; | ||
} | ||
|
||
public synchronized void putContext(AsyncSearchContextId asyncSearchContextId, AsyncSearchActiveContext asyncSearchContext) { | ||
public synchronized void putContext(AsyncSearchContextId asyncSearchContextId, AsyncSearchActiveContext asyncSearchContext, | ||
Consumer<AsyncSearchContextId> contextRejectionEventConsumer) { | ||
if (activeContexts.size() >= maxRunningContext) { | ||
contextRejectionEventConsumer.accept(asyncSearchContextId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this consumer needed? For cleanup ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consumer is required for async search throttled count stat at node level. This is harnessed in our stats API |
||
throw new AsyncSearchRejectedException("Trying to create too many running contexts. Must be less than or equal to: [" | ||
+ maxRunningContext + "]. This limit can be set by changing the [" + MAX_RUNNING_CONTEXT.getKey() + "] setting.", | ||
maxRunningContext); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
import org.apache.logging.log4j.Logger; | ||
import org.apache.logging.log4j.message.ParameterizedMessage; | ||
import org.apache.lucene.store.AlreadyClosedException; | ||
import org.elasticsearch.ElasticsearchTimeoutException; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.common.lease.Releasable; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
|
@@ -58,7 +59,7 @@ public AsyncSearchContextPermits(AsyncSearchContextId asyncSearchContextId, Thre | |
this.semaphore = new Semaphore(TOTAL_PERMITS, true); | ||
} | ||
|
||
private Releasable acquirePermits(int permits, TimeValue timeout, final String details) throws TimeoutException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. reason for making it run time? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We throw ElasticSearchTimeoutException here. That's serializable over the wire. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. So IllegalStateException is no more expected here? |
||
private Releasable acquirePermits(int permits, TimeValue timeout, final String details) { | ||
RunOnce release = new RunOnce(() -> {}); | ||
if (closed) { | ||
logger.debug("Trying to acquire permit for closed context [{}]", asyncSearchContextId); | ||
|
@@ -71,17 +72,15 @@ private Releasable acquirePermits(int permits, TimeValue timeout, final String d | |
logger.warn("Releasing permit(s) [{}] with reason [{}]", permits, lockDetails); | ||
semaphore.release(permits);}); | ||
if (closed) { | ||
release.run(); | ||
logger.debug("Trying to acquire permit for closed context [{}]", asyncSearchContextId); | ||
throw new AlreadyClosedException("trying to acquire permits on closed context ["+ asyncSearchContextId +"]"); | ||
} | ||
return release::run; | ||
} else { | ||
throw new TimeoutException("obtaining context lock" + asyncSearchContextId + "timed out after " + | ||
throw new ElasticsearchTimeoutException("obtaining context lock" + asyncSearchContextId + "timed out after " + | ||
timeout.getMillis() + "ms, previous lock details: [" + lockDetails + "] trying to lock for [" + details + "]"); | ||
} | ||
} catch (IllegalStateException e){ | ||
release.run(); | ||
throw new RuntimeException("Context already closed while trying to obtain context lock", e); | ||
} catch (InterruptedException e ) { | ||
Thread.currentThread().interrupt(); | ||
release.run(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,16 +24,17 @@ | |
import org.elasticsearch.ResourceNotFoundException; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.DocWriteResponse; | ||
import org.elasticsearch.action.NoShardAvailableActionException; | ||
import org.elasticsearch.action.bulk.BackoffPolicy; | ||
import org.elasticsearch.action.delete.DeleteRequest; | ||
import org.elasticsearch.action.get.GetRequest; | ||
import org.elasticsearch.action.index.IndexRequestBuilder; | ||
import org.elasticsearch.action.index.IndexResponse; | ||
import org.elasticsearch.action.update.UpdateRequest; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.cluster.block.ClusterBlockException; | ||
import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.inject.Inject; | ||
import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
|
@@ -67,20 +68,19 @@ public class AsyncSearchPersistenceService { | |
public static final String ERROR = "error"; | ||
|
||
private static final Logger logger = LogManager.getLogger(AsyncSearchPersistenceService.class); | ||
public static final String ASYNC_SEARCH_RESPONSE_INDEX = ".asynchronous_search_response"; | ||
public static final String ASYNC_SEARCH_RESPONSE_INDEX = ".opendistro_asynchronous_search_response"; | ||
private static final String MAPPING_TYPE = "_doc"; | ||
/** | ||
* The backoff policy to use when saving a task result fails. The total wait | ||
* The backoff policy to use when saving a async search response fails. The total wait | ||
* time is 600000 milliseconds, ten minutes. | ||
*/ | ||
private static final BackoffPolicy STORE_BACKOFF_POLICY = | ||
BackoffPolicy.exponentialBackoff(timeValueMillis(50), 5); | ||
public static final BackoffPolicy STORE_BACKOFF_POLICY = | ||
BackoffPolicy.exponentialBackoff(timeValueMillis(250), 14); | ||
Comment on lines
+77
to
+78
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the reason to increase the delay & retries based on some test run data? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The backoff policy to use when saving a search response fails. The total wait time intended is 600000 milliseconds i.e. 10 minutes. We had this value earlier but got changed inadvertently. Reverted it back to what it was earlier i.e. BackoffPolicy.exponentialBackoff(timeValueMillis(250), 14) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah i meant why 10 minutes. Do we think it is just a good default to start with or has any backing from code/tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason for not adding jitter? |
||
|
||
private final Client client; | ||
private final ClusterService clusterService; | ||
private final ThreadPool threadPool; | ||
|
||
@Inject | ||
public AsyncSearchPersistenceService(Client client, ClusterService clusterService, ThreadPool threadPool) { | ||
this.client = client; | ||
this.clusterService = clusterService; | ||
|
@@ -291,7 +291,9 @@ public void onResponse(IndexResponse indexResponse) { | |
|
||
@Override | ||
public void onFailure(Exception e) { | ||
if (!(e instanceof EsRejectedExecutionException) || !backoff.hasNext()) { | ||
if (((e instanceof EsRejectedExecutionException || e instanceof ClusterBlockException | ||
|| e instanceof NoShardAvailableActionException) == false) || backoff.hasNext() == false) { | ||
Comment on lines
+294
to
+295
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it the complete list of non-retriable exceptions? For instance why There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, we don't have an exhaustive list of retryable exceptions. If we run into |
||
logger.warn(() -> new ParameterizedMessage("failed to store async search response, not retrying"), e); | ||
listener.onFailure(e); | ||
} else { | ||
TimeValue wait = backoff.next(); | ||
|
@@ -304,7 +306,7 @@ public void onFailure(Exception e) { | |
|
||
private Settings indexSettings() { | ||
return Settings.builder() | ||
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) | ||
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 5) | ||
.put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.getKey(), "0-1") | ||
.put(IndexMetadata.SETTING_PRIORITY, Integer.MAX_VALUE) | ||
.build(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets revert to 0, this would help us turn the feature off