New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add bulk resolution #79
feat: add bulk resolution #79
Conversation
aaf6242
to
29ce0af
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@austince Love it so far! I got hands on with it and tried (and failed) to break it.
I'm almost ready to approve and merge. One thing I'm thinking about, from the user's perspective, is how to associate the results with the inputs. It looks like the results come back in the order that they were submitted, including when there's an error, so the user can associate the results with the inputs if they keep track of the original inputs. If the order of the results is guaranteed, and if we believe the user is likely to have control over the order as well (i.e. unlike Python dictionary keys), then your implementation suffices.
The other option would be to introduce an explicit link between the results and the inputs, such as a request ID or an object containing the original input. But I wonder what a request ID would look like, and how the user would create it? And if we went this route what would be the least confusing and most efficient for the user? Curious on your thoughts on this topic.
For reference, I checked the Multi Search API and it does the same thing as your implementation -- returning an array of results without linking the results to a specific search.
I'm leaning toward accepting this now and thinking about this in the future as needed, as long as the order of the results is deterministic.
import static org.elasticsearch.rest.RestRequest.Method.POST; | ||
|
||
|
||
public class ResolutionAction extends BaseRestHandler { | ||
|
||
private static final Logger logger = LogManager.getLogger(ResolutionAction.class); | ||
|
||
private static final int MAX_CONCURRENT_JOBS_PER_REQUEST = 100; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This variable looks good for now. At first I was going to suggest deriving this number from the size of the search thread pool on the node, the information for which would probably be in one of the methods for client.threadPool()
. But that doesn't quite make sense, because a cluster with multiple nodes will have more search threads available in total for the job, and the node that receives the request to delegate the searches could be a small node like a master or coordinator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with that analysis and conclusion about tieing it to the NodeClient's client values. I think it might make sense to expose this setting to the user at some point, as it is likely a deployment-specific parameter for tuning.
|
||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.concurrent.CompletableFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that CompleteableFuture only exist in the tests, it's good to see that it was avoidable in the main code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I resisted :) I found it easy to use them to test the async callbacks, though I didn't do too much digging into Elasticsearch to see if there was a more "official" way.
Yeah, I think this behavior has its pros + cons, but having it consistent with Elasticsearch fits with the Zentity goals. Handling the response is at least predictable and can be done in the same way one would handle an ES
It should be guaranteed to be deterministic, as enforced here in the synchronized (items) {
if (items.isEmpty()) {
return;
}
resultIndex = size - items.size();
nextItem = items.pop();
}
ActionListener<ResultT> resultListener = ActionListener.wrap(
(result) -> groupedListener.onResponse(resultIndex, result),
(ex) -> {
hasFailure = true;
groupedListener.onFailure(ex);
}
);
itemRunner.accept(nextItem, ActionListener.runAfter(
resultListener,
this::runNextItem)); I tried to pay special attention to the synchronization here, which was one thing the |
LGTM -- Merged 🚀 |
Wooo, thanks for looking at + merging this Dave! I've found a small typo but will have a PR in momentarily (update: #81) |
Adds bulk resolution support via NDJSON alternating params/ resolution bodies. Runs all resolution jobs asynchronously as fast as possible, with a hardcoded max of 100 concurrent jobs per bulk request. Non-fatal errors are bundled into the response instead of failing the whole thing. Some examples of fatal errors are JsonProcessingException and other server/ programming errors.
Closes #50