-
Notifications
You must be signed in to change notification settings - Fork 10
Async search context and and state management framework #6
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments below. Not reviewed the tests yet.
It is generally a good practice to add assertions for invariant conditions. That goes a long way in protecting your system against bugs that violate those invariants, given you have the right tests. Another advantage that you get is the invariants are clearly documented in the code. I noticed that there is scope for adding invariants in a lot of places- especially the transitions and permits classes.
import java.io.IOException; | ||
import java.util.Objects; | ||
|
||
public class AsyncSearchContextId implements Writeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some explanation of why you use this as the id would help? Was a UUID not enough for uniqueness. What is the purpose of the long id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The long id
is used as a key for a MAP of AsyncSearchContext
objects stored on every node (AsyncSearchActiveStore
class which will be introduced in an upcoming PR contains said map).
The UUID injects a randomness to the asyncsearch id string which is used to access an existing async search
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UUID is for randomness(maynot be good candidate for hash keys/uniqueness guarantees) and id for better hashing and uniqueness guarantees
out.writeLong(asyncSearchId.getTaskId()); | ||
out.writeString(asyncSearchId.getAsyncSearchContextId().getContextId()); | ||
out.writeLong(asyncSearchId.getAsyncSearchContextId().getId()); | ||
return Base64.getUrlEncoder().encodeToString(BytesReference.toBytes(out.bytes())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withoutPadding()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed this to have withoutPadding()
import java.util.Base64; | ||
import java.util.Objects; | ||
|
||
public class AsyncSearchId { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An explanation on why you chose this as the id would be helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Lets add this across other newly introduced constructs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed adding a java doc on this class. Will add one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added java docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll change it to be Converter
@@ -111,8 +110,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws | |||
builder.field(START_TIME_IN_MILLIS.getPreferredName(), startTimeMillis); | |||
builder.field(EXPIRATION_TIME_IN_MILLIS.getPreferredName(), expirationTimeMillis); | |||
if (searchResponse != null) { | |||
builder.field(RESPONSE.getPreferredName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this not caught in tests in the earlier change? Do we need to add more tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AsyncSearchResponseTests.java
has been added in this PR.
It verifies serialization of AsyncSearchResponse.
expirationTimeMillis == other.expirationTimeMillis && | ||
searchResponse == other.searchResponse && | ||
error == other.error; | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is the equals
method called in the request flow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is used only in tests.
* | ||
* @param nextAsyncSearchStage asyncSearchStage to advance | ||
*/ | ||
public synchronized void advanceStage(AsyncSearchStage nextAsyncSearchStage) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be public. The events should be public and this class should decide the stage based on the event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
making it package-private for testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, we moved most of this to the state machine
if (currentAsyncSearchStage != nextAsyncSearchStage) { | ||
try { | ||
asyncSearchStage.onTransition(contextListener, getContextId()); | ||
} catch (Exception ex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Swallowing all exceptions! Would you get a consistent behavior if listener is not executed. Should you not fail the context in some tragic way or atleast close it if this happens?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Removing the surrounding try-catch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Listeners in ES are almost always surrounded by parent catch all
} | ||
} | ||
|
||
public void acquireContextPermit(final ActionListener<Releasable> onPermitAcquired, TimeValue timeout, String reason) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who would be the callers? Permit seems like an internal construct of context. Maybe send the event to context and let context decide about the permit? Maybe I need to see more classes here to understand the behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The AsyncSearchService
(which will be introduced in an upcoming PR) will be invoking the methods surround the permits acquisition.
|
||
protected final AsyncSearchContextId asyncSearchContextId; | ||
protected final LongSupplier currentTimeSupplier; | ||
protected volatile AsyncSearchProgressListener asyncSearchProgressListener; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are redefining the listener in the child class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has not been redefined in the child classes - AsyncSearchActiveContext
and AsyncSearchPersistenceContext
} | ||
|
||
public @Nullable | ||
AsyncSearchProgressListener getAsyncSearchProgressListener() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why nullable? Maybe define this method only in subclasses that are expected to have a listener object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, we need to provide a single interface to the consumers. Retaining it for now as we don't have a better/cleaner alternative
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making the changes. Reviewed 11/16 files (excluding test) till now. Overall looks good to me. Added few comments for clarification and considerations.
import java.util.Base64; | ||
import java.util.Objects; | ||
|
||
public class AsyncSearchId { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Lets add this across other newly introduced constructs.
return "[" + node + "][" + taskId + "][" + asyncSearchContextId + "]"; | ||
} | ||
|
||
public static String buildAsyncId(AsyncSearchId asyncSearchId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: shall we keep this as final for clear intent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Declared the class final.
import java.util.Base64; | ||
import java.util.Objects; | ||
|
||
public class AsyncSearchId { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we declare this class as final to prevent extensions?
return Collections.unmodifiableSet( | ||
Sets.newHashSet( | ||
AsyncSearchStage.INIT, AsyncSearchStage.RUNNING, AsyncSearchStage.SUCCEEDED, AsyncSearchStage.FAILED)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From async search response perspective only RUNNING
is needed. What is the purpose of maintaining these stages in a set here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a method which will be invoked by AsyncSearchService
(which will be introduced in upcoming PRs) to scan for expired searches which need to be cleaned up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Question] Can it be moved to constant, rather than a method call?
throw new IllegalStateException("only next asyncSearchStage " + INIT + | ||
" is allowed when the current asyncSearchStage is null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these real production scenario we can hit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concurrent Delete Async Search requests need to be handled. That is a valid production scenario.
Multiple delete async search requests can get hold of the context object but only one should be allowed to advance context stage to DELETED
public void acquireAllContextPermits(final ActionListener<Releasable> onPermitAcquired, TimeValue timeout, String reason) { | ||
asyncSearchContextPermits.asyncAcquireAllPermits(onPermitAcquired, timeout, reason); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we worry about starvation here, which could result in lingering state transition? How about restricting fresh permits once aquireAll has been issued, to ensure fairness?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If allPermits have been aquired that means the search is in post processing state and will leave the lock soon.
And if that happens within the acquire timeout, the request will proceed, else it will fail with TimeoutException and the we wont run into a starvation state.
@eirsep Can clarify
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If acquireAll is invoked and queued for acquiring permits, fresh permits are queued behind it as Semaphore has fair
setting on TRUE. Fairness ensure FIFO basis for permit acquisition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FIFO will be applicable only on the waiting requests. However, here the single permit requests will not wait and will be immediately allowed. Maybe the use case for allPermits contention is rare and can be handled later based on need.
|
||
private Releasable acquirePermits(int permits, TimeValue timeout, final String details) throws RuntimeException { | ||
try { | ||
if (semaphore.tryAcquire(permits, timeout.getMillis(), TimeUnit.MILLISECONDS)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the advantage of keeping a timeout here, considering the total permits is integer max. There are definite gains of not using timeout though. One use case I can think is for mutating operations contending for permit while all is acquired by post processor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timeout is because there is also an asyncAcquireAllPermits
which takes integer max number of permits and hence is a blocking call (think persisting completed async search response in index). We need to timeout mutating operations which are blocked beyond a waiting time.
"previous lock details: [" + lockDetails + "] trying to lock for [" + details + "]"); | ||
} | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly we don't think we need to worry about releasing permit here, as one is not acquired yet.
|
||
private void asyncAcquirePermit( | ||
int permits, final ActionListener<Releasable> onAcquired, final TimeValue timeout, String reason) { | ||
threadPool.executor(AsyncSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME).execute(new AbstractRunnable() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we plan to introduce a separate threadpool going forward?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure which thread pool you mean, while we might add few based on our performance tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dedicated threadpool for different asyc search operations. Agree that can wait for more concrete results based on perf testing.
…cement and initialized stage in active context constructor.
out.writeLong(asyncSearchId.getAsyncSearchContextId().getId()); | ||
return Base64.getUrlEncoder().withoutPadding().encodeToString(BytesReference.toBytes(out.bytes())); | ||
} catch (Exception e) { | ||
throw new IllegalArgumentException("Cannot build async search id", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[NitPick] Should we have a separate exception handling and throw appropriate errors instead of a generic one.
Will that be helpful for the end user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned in AsyncSearchIdTests.java
, this exception will be caught and in turn a ResourceNotFoundException
or a wrapper on RNF will be thrown to indicate that no Async Search Execution exists with the illegal Id. That responsibility will be delegated to the class handling the request.
out.writeString(asyncSearchId.getAsyncSearchContextId().getContextId()); | ||
out.writeLong(asyncSearchId.getAsyncSearchContextId().getId()); | ||
return Base64.getUrlEncoder().withoutPadding().encodeToString(BytesReference.toBytes(out.bytes())); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[NitPick] We are catching all the exception which it too broad, Will it be possible to narrow it down?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing this to the expected IOException. Thank you.
} | ||
|
||
@Override | ||
public boolean equals(Object o) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Question] Will deepEquals suffice the use case here? Applicable in other classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. I think deepEquals targets Arrays. If the objects being compared aren't arrays of any sort, it simply check equals
protected volatile AsyncSearchProgressListener asyncSearchProgressListener; | ||
|
||
public AsyncSearchContext(AsyncSearchContextId asyncSearchContextId, LongSupplier currentTimeSupplier) { | ||
Objects.requireNonNull(asyncSearchContextId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[NitPick]
- This will raise a NPE is that expected and handled at caller.
- We can use
@NonNull
annotation in the method definition itself. You can explore it and apply at other places as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out. Had meant to add an assert
.
return Collections.unmodifiableSet( | ||
Sets.newHashSet( | ||
AsyncSearchStage.INIT, AsyncSearchStage.RUNNING, AsyncSearchStage.SUCCEEDED, AsyncSearchStage.FAILED)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Question] Can it be moved to constant, rather than a method call?
} | ||
|
||
public boolean shouldPersist() { | ||
return keepOnCompletion && isExpired() == false && asyncSearchStage != DELETED; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[NitPick] We can use !isExpired() and save on additional comparison operation.
assert nextAsyncSearchStage != null : "Next async search stage cannot bu null!"; | ||
assert this.asyncSearchStage != null : "async search stage cannot be null!"; | ||
AsyncSearchStage currentAsyncSearchStage = this.asyncSearchStage; | ||
if (this.asyncSearchStage.nextTransitions().contains(nextAsyncSearchStage) == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[NitPick] Can we can avoid the comparison.
private volatile AsyncSearchStage asyncSearchStage; | ||
private final AsyncSearchContextListener contextListener; | ||
|
||
public AsyncSearchActiveContext(AsyncSearchContextId asyncSearchContextId, String nodeId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A small java doc on top of every public class and method will be helpful.
Applicable at other places as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a java doc for the class and any methods which need explanation. I don't think one is required here, as it is the sole constructor for the class.
* @param reason the reason for acquiring the permit | ||
*/ | ||
public void asyncAcquireAllPermits(final ActionListener<Releasable> onAcquired, final TimeValue timeout, String reason) { | ||
asyncAcquirePermit(TOTAL_PERMITS, onAcquired, timeout, reason); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Question] I don't really get the logic of Integer.MAX_VALUE.
We are saying that for any mutation operation we will first acquire lock on the permit value 1(asyncAcquirePermit method) and release it at the end, why to we need to consider the MAX_VALUE space?
Shouldn't acquiring a lock on the same permit value(1) will suffice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we acquire a single permit out of Integer.MAX_VALUE number of available permits, we imply that its a non
blocking operation. We support concurrent operations of this sort.(asyncAcquirePermits
) For ex. updating expiry time is non blocking.
When we acquire all available permits, we imply that its a blocking operation.(asyncAcquireAllPermits
). All other operations trying to acquire 1 or all permits will have to wait on this or timeout. For ex. storing completed async search response in index is blocking operation.
public void acquireAllContextPermits(final ActionListener<Releasable> onPermitAcquired, TimeValue timeout, String reason) { | ||
asyncSearchContextPermits.asyncAcquireAllPermits(onPermitAcquired, timeout, reason); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If allPermits have been aquired that means the search is in post processing state and will leave the lock soon.
And if that happens within the acquire timeout, the request will proceed, else it will fail with TimeoutException and the we wont run into a starvation state.
@eirsep Can clarify
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing these changes, Added a few minor comment and questions.
if (response == null) { | ||
return emptyMap(); | ||
} | ||
return convertToMap(response, false, Requests.INDEX_CONTENT_TYPE).v2(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[NitPick] Can use ternary operator. Applicable at other places.
return Collections.singletonList(stateMachine); | ||
} | ||
|
||
private AsyncSearchStateMachine getAsyncSearchStateMachineDefinition() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making these changes. Looks much cleaner now!
} | ||
|
||
@Override | ||
public AsyncSearchState trigger(AsyncSearchContextEvent event) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could there be more than 2 events triggered on state machine? If yes, how do you ensure locking/event accumulation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added synchronized block on context object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out, got missed with the state machine refactor
if (getFinalStates().contains(currentState)) { | ||
result = currentState; | ||
} else { | ||
for (AsyncSearchTransition<? extends AsyncSearchContextEvent> transition : getTransitions()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could have used a map instead of iterating through the list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed. changed transitions to be a map.
eventListener.accept(event.asyncSearchContext().getContextId(), event.asyncSearchContext().getContextListener()); | ||
} catch (Exception ex) { | ||
logger.error(() -> new ParameterizedMessage("Failed to execute listener for async search id : {}", | ||
event.asyncSearchContext.getAsyncSearchId()), ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this blanket level handling intentional? What happens if the event listener is not executed- is that a best effort operation and its failure does not impact async search in any way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The event listener will be used to collects node level stats about async searches. Its not an operation which impacts any ongoing async search requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, its intentional given its a non-critical listener consumed for stats
if (currentState.equals(transition.sourceState()) && transition.eventType().equals(event.getClass())) { | ||
execute(transition.onEvent(), event, currentState); | ||
event.asyncSearchContext().setState(transition.targetState()); | ||
logger.debug("Executed event {} for async event {} ", event.getClass().getName(), | ||
event.asyncSearchContext.getAsyncSearchId()); | ||
BiConsumer<AsyncSearchContextId, AsyncSearchContextListener> eventListener = transition.eventListener(); | ||
try { | ||
eventListener.accept(event.asyncSearchContext().getContextId(), event.asyncSearchContext().getContextListener()); | ||
} catch (Exception ex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not very clear on how synchronization is handled. This machine is stateless, but the context is stateful. However, the operations sent to context are not syncronized. Am I missing something here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added synchronization block with lock on context. Had just pushed a commit
57010e4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out, got missed with the state machine refactor
…condensed and added synchronized block. added asyncsearchstatemachineException for illegal transition triggers
|
||
private static final int TOTAL_PERMITS = Integer.MAX_VALUE; | ||
|
||
//visible for testing | ||
final Semaphore semaphore; | ||
private final AsyncSearchContextId asyncSearchContextId; | ||
private volatile String lockDetails; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will check on this. Will revisit with more tests
import static com.amazon.opendistroforelasticsearch.search.async.context.state.AsyncSearchState.RUNNING; | ||
import static com.amazon.opendistroforelasticsearch.search.async.context.state.AsyncSearchState.SUCCEEDED; | ||
|
||
public class AsyncSearchActiveContextIT extends AsyncSearchSingleNodeTestCase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test cases missing for async search state machine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
functional tests for asyncsearch state machine can be found in AsyncSearchStateMachineTests.java.
This class is verifying the state machine definition used for active context states and transitions
import static org.hamcrest.Matchers.hasToString; | ||
import static org.hamcrest.Matchers.instanceOf; | ||
|
||
public class AsyncSearchContextPermitsTests extends ESTestCase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing test cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you plz elaborate on the specific scenarios which you feel we haven't handled?
import static com.amazon.opendistroforelasticsearch.search.async.context.state.AsyncSearchState.PERSIST_FAILED; | ||
import static com.amazon.opendistroforelasticsearch.search.async.context.state.AsyncSearchState.SUCCEEDED; | ||
|
||
public class AsyncSearchStateMachineTests extends ESTestCase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing test cases
asyncSearchContext.setState(transition.targetState()); | ||
logger.debug("Executed event {} for async event {} ", event.getClass().getName(), | ||
event.asyncSearchContext.getAsyncSearchId()); | ||
BiConsumer<AsyncSearchContextId, AsyncSearchContextListener> eventListener = transition.eventListener(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this code block need to be in synchronized block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean the event listener in the synchronised block. I don't see an issue, given that we today only intend to update stats which is pretty light weight. If we don't synchronise the stats can mess up in some unexpected ways too. I plan on retaining it for now and change later as needed
public void setTask(SearchTask searchTask) { | ||
assert currentStage == INIT; | ||
Objects.requireNonNull(searchTask); | ||
searchTask.setProgressListener(asyncSearchProgressListener); | ||
this.searchTask.set(searchTask); | ||
this.startTimeMillis = searchTask.getStartTime(); | ||
this.expirationTimeMillis = startTimeMillis + keepAlive.getMillis(); | ||
this.asyncSearchId.set(AsyncSearchIdConverter.buildAsyncId(new AsyncSearchId(nodeId, searchTask.getId(), getContextId()))); | ||
} | ||
|
||
public void processSearchFailure(Exception e) { | ||
assert currentStage != DELETED : "cannot process search failure. Async search context is already DELETED"; | ||
if (completed.compareAndSet(false, true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am assuming these methods, even though public, are not called from anywhere except state machine execution and are protected under synchronized blocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. They are protected under synchronized blocks and only called in the state machine trigger()
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
setTask
cannot be invoked concurrently by any means as it is a part if the async search submission and hence guarded bySetOnce
construct -
We are using CAS in cases where it makes more sense rather than synchronized eg :
ProcessSearchFailure
processSearchResponse
return asyncSearchId.get(); | ||
} | ||
|
||
public boolean shouldPersist() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add synchronized?
Who is the caller?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if volatile
isn't good enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you leaking anything if stage is returned, but the actions post modifying the stage is not executed? e.g. listener execution. It seems fine for now given listeners are independent as you mentioned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, missing tests will be added in a separate PR.
public void acquireAllContextPermits(final ActionListener<Releasable> onPermitAcquired, TimeValue timeout, String reason) { | ||
asyncSearchContextPermits.asyncAcquireAllPermits(onPermitAcquired, timeout, reason); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FIFO will be applicable only on the waiting requests. However, here the single permit requests will not wait and will be immediately allowed. Maybe the use case for allPermits contention is rare and can be handled later based on need.
|
||
private void asyncAcquirePermit( | ||
int permits, final ActionListener<Releasable> onAcquired, final TimeValue timeout, String reason) { | ||
threadPool.executor(AsyncSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME).execute(new AbstractRunnable() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dedicated threadpool for different asyc search operations. Agree that can wait for more concrete results based on perf testing.
Issue #3
Description of changes
Contains the components surrounding the Async Search Context, both active and persisted :
SearchTask
and theSearchProgressActionListener
.AsyncSearchContext
through a backingAsyncSearchPersistenceModel
.Async Search Stage and Async Search Context Permits are also introduced with the changes.
TODO
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.