-
Notifications
You must be signed in to change notification settings - Fork 10
Rest Layer and Async Search Cleanup Management #9
Conversation
…chronous-search into rest-layer
…asynchronous-search into rest-layer
…asynchronous-search into rest-layer
} | ||
|
||
@Override | ||
public void handleException(TransportException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a stat?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe next phase
protected void doClose() { | ||
ResponseCleanUpScheduler cleanUpScheduler = activeResponseCleanUpScheduler.get(); | ||
if (cleanUpScheduler != null) { | ||
cleanUpScheduler.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set cleanUpScheduler to null?
ActionListener.wrap(channel::sendResponse, e -> { | ||
try { | ||
channel.sendResponse(e); | ||
} catch (Exception ex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dangling request. Shouldn't you close the connection in that case or add some retries here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would anyways be re-scheduled. Connection would close after the failure response is sent back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are you sending the failure response in case of Exception. Does the framework handle it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ActionListener.wrap(channel::sendResponse //sends response
, e -> { //sends error
try {
channel.sendResponse(e);
} catch (Exception ex) {
}
}
} | ||
|
||
@Override | ||
public void offMaster() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is is possible that offMaster call comes after onMaster call for master relection to same node? Even if calls are interleaved there will be issues
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check elastic/elasticsearch@fefb31b. The PR talks about loose guarantees on sequence order of the onMaster and offMaster calls. Additionally, ES may deprecate the listener in future.
public void handleException(TransportException e) { | ||
logger.error(() -> new ParameterizedMessage("Exception executing action {}", | ||
CLEANUP_ACTION_NAME), e); | ||
scheduleNextWakeUp(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
next one woken up even before the first one ends? How would that work?
…asynchronous-search into rest-layer
ActionListener.wrap(channel::sendResponse, e -> { | ||
try { | ||
channel.sendResponse(e); | ||
} catch (Exception ex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are you sending the failure response in case of Exception. Does the framework handle it?
} | ||
|
||
@Override | ||
public void handleException(TransportException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add stats?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently we only record stats for async searches submitted by user. AsyncSearchManagementService is internal component which runs behind the scenes.
…asynchronous-search into rest-layer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes. Have left few comments/clarifications.
@@ -54,6 +57,7 @@ private void setMaxRunningContext(int maxRunningContext) { | |||
|
|||
public synchronized void putContext(AsyncSearchContextId asyncSearchContextId, AsyncSearchActiveContext asyncSearchContext) { | |||
if (activeContexts.size() >= maxRunningContext) { | |||
contextRejectionEventConsumer.accept(asyncSearchContextId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this consumer needed? For cleanup ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consumer is required for async search throttled count stat at node level. This is harnessed in our stats API
@@ -58,7 +59,7 @@ public AsyncSearchContextPermits(AsyncSearchContextId asyncSearchContextId, Thre | |||
this.semaphore = new Semaphore(TOTAL_PERMITS, true); | |||
} | |||
|
|||
private Releasable acquirePermits(int permits, TimeValue timeout, final String details) throws TimeoutException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reason for making it run time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We throw ElasticSearchTimeoutException here. That's serializable over the wire.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. So IllegalStateException is no more expected here?
public static final BackoffPolicy STORE_BACKOFF_POLICY = | ||
BackoffPolicy.exponentialBackoff(timeValueMillis(250), 14); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the reason to increase the delay & retries based on some test run data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The backoff policy to use when saving a search response fails. The total wait time intended is 600000 milliseconds i.e. 10 minutes. We had this value earlier but got changed inadvertently. Reverted it back to what it was earlier i.e. BackoffPolicy.exponentialBackoff(timeValueMillis(250), 14)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah i meant why 10 minutes. Do we think it is just a good default to start with or has any backing from code/tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for not adding jitter?
if (((e instanceof EsRejectedExecutionException || e instanceof ClusterBlockException | ||
|| e instanceof NoShardAvailableActionException) == false) || backoff.hasNext() == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it the complete list of non-retriable exceptions? For instance why ClusterBlockException
here or not the CircuitBreakingException
. Can we not use ElasticsearchException
directly to save on the type check and covers all runtime exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, we don't have an exhaustive list of retryable exceptions. If we run into CircuitBreakingException
its ideal we back out and clean up response. Will revisit this later
} | ||
|
||
private void performCleanUpAction(AsyncSearchCleanUpRequest request, ActionListener<AcknowledgedResponse> listener) { | ||
asyncSearchPersistenceService.deleteExpiredResponses(listener, request.absoluteTimeInMillis); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need to care and pass the absoluteTimeInMillis
from here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. deleteByQuery would run a search query saying expirationTime < GIVEN_TIME
to fetch docs to delete.
We are simply passing GIVEN_TIME parameter in the request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah my point why given time and not the CURRENT_TIME (when the query is executing on node). It would still make sense (efficient) to clean up expired record based on current time. Why do we even expect time param in the request for cleanup here? Am i missing something?
final ResponseCleanUpAndRescheduleRunnable newRunnable = new ResponseCleanUpAndRescheduleRunnable(); | ||
activeResponseCleanUpRunnable.set(newRunnable); | ||
threadPool.scheduleUnlessShuttingDown(responseCleanUpInterval, RESPONSE_CLEANUP_SCHEDULING_EXECUTOR, newRunnable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the ResponseCleanUpAndRescheduleRunnable
first time scheduled only after a clusterChanged event? Also since in onAfter
we schedule the next one, could it create a duplicate schedule if master changes?
Possibly the activeResponseCleanUpRunnable
check below isnt enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's guarded by
@Override
protected void doRun() {
if (this == activeResponseCleanUpRunnable.get()) {
super.doRun();
} else {
logger.trace("master changed, scheduled cleanup job is stale");
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I was referring to race between activeResponseCleanUpRunnable being set to null (on old master) and new master scheduling on clusterChanged event, since the processing of event on both the nodes are disjoint and cannot be made linear. However it seems a rare use case and can be thought through if it really has a downside later on.
transportService.sendRequest(randomNode, CLEANUP_ACTION_NAME, | ||
new AsyncSearchCleanUpRequest(threadPool.absoluteTimeInMillis()), | ||
new TransportResponseHandler<AcknowledgedResponse>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if sub-tasks can also be cleaned by scheduling a self-cleanup per node, instead of transport response handling? For example if it has access to ContextsToReap
per node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is DELETE-BY-QUERY persisted response cleanup action which is distributed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If each response is modelled as a single record and is expected to present in a single shard, and not shared across multiple shards (nodes), can local cleanup schedules on nodes save the distributed action. I understand modelling that as a separate action might be tedious and we might just be better of with DELETE-BY-QUERY.
return acknowledged ? OK : NOT_FOUND; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets have a NOT_AVAILABLE maybe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOT_AVAILABLE
is not a valid RestStatus enum value
private static final String RUNNING = "async_search_running_current"; | ||
private static final String PERSISTED = "async_search_persisted"; | ||
private static final String FAILED = "async_search_failed"; | ||
private static final String COMPLETED = "async_search_completed"; | ||
private static final String REJECTED = "async_search_rejected"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
asynchronous?
@Override | ||
public void onContextPersisted(AsyncSearchContextId asyncSearchContextId) { | ||
countStatsHolder.persistedAsyncSearchCount.inc(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets add onContextPersistFailed
too
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.common.metrics.CounterMetric; | ||
|
||
public class InternalAsyncSearchStats implements AsyncSearchContextListener { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when context gets closed/deleted, does running stats go for a toss?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added onRunningContextClosed() in context listener to have a hook to decrement running async searches count
public static final Setting<Integer> MAX_RUNNING_CONTEXT = Setting.intSetting( | ||
"async_search.max_running_context", 100, 0, Setting.Property.Dynamic, Setting.Property.NodeScope); | ||
"async_search.max_running_context", 100, 10, Setting.Property.Dynamic, Setting.Property.NodeScope); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets revert to 0, this would help us turn the feature off
return builder; | ||
} | ||
|
||
static final class Fields { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why have such long field names?Won't you have a section for async search and then add status, current directly there. Asking because long fields may take longer for serialization and deserialization depending on algo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trimming asynchronous_search_
prefix from individual stats
} | ||
|
||
static final class Fields { | ||
private static final String ASYNC_SEARCH_STATUS = "async_search_stats"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
status
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed variable to ASYNC_SEARCH_STATS
… running search when abruptly closed. rejection default change
...main/java/com/amazon/opendistroforelasticsearch/search/async/service/AsyncSearchService.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes. Itiyama was fine with the set of changes. Approving to unblock
Issue #, if available:
Description of changes:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.