Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Concurrent Segment Search] Support parent-join aggregations #9316

Open
2 of 4 tasks
jed326 opened this issue Aug 14, 2023 · 5 comments
Open
2 of 4 tasks

[Concurrent Segment Search] Support parent-join aggregations #9316

jed326 opened this issue Aug 14, 2023 · 5 comments
Labels
enhancement Enhancement or improvement to existing feature or request Search:Aggregations

Comments

@jed326
Copy link
Collaborator

jed326 commented Aug 14, 2023

The join field is a special field type that is used to create a relationship between documents in the same index. Parent and child documents must be indexed into the same shard to maintain the lineage relationship for aggregation purposes. Ref: https://opensearch.org/docs/latest/field-types/supported-field-types/join/

For example, if there is a child document on shard_0 and a parent document on shard_1, then a parent aggregation would not collect any buckets because the relationship between the documents would not be found. In the case of concurrent segment search, this shard routing consideration leads to some similar problems.

Focusing on the ParentJoinAggregator, this is a special type of aggregator:

/**
* An aggregator that joins documents based on global ordinals.
* Global ordinals that match the main query and the <code>inFilter</code> query are replayed
* with documents matching the <code>outFilter</code> query.
*/

Specifically for parent aggregations, the inFilter set is the child documents, while the outFilter set will be the parent documents. This is reversed for children aggregations. Unlike other aggregators, during LeafBucketCollector::colect ParentJoinAggregator does not collect the buckets. Instead it iterates through the inFilter set and saves the globalOrdinal we are aggregating into a CollectionStrategy object.

return new LeafBucketCollector() {
@Override
public void collect(int docId, long owningBucketOrd) throws IOException {
if (parentDocs.get(docId) && globalOrdinals.advanceExact(docId)) {
int globalOrdinal = (int) globalOrdinals.nextOrd();
assert globalOrdinal != -1 && globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS;
collectionStrategy.add(owningBucketOrd, globalOrdinal);
}
}
};

Later at the beginning of the reduce phase ParentJoinAggregator::beforeBuildingBuckets is called which will iterate through the outFilter set and now we will collect buckets using the documents from the outFilter set that were saved in CollectionStrategy.

for (long owningBucketOrd : ordsToCollect) {
if (collectionStrategy.exists(owningBucketOrd, globalOrdinal)) {
collectBucket(sub, docId, owningBucketOrd);
}
}

In the concurrent segment search path, there are 2 main problems with this approach.

  1. CollectionStrategy is created per-Aggregator instance, so it will not be shared across segment slices. CollectionStrategy is basically what stores the relationship between parent and children docs, so it not being shared between segment slices means that we will not find the same relationships across segment slices.
  2. ParentJoinAggregator::beforeBuildingBuckets iterates over all of the segments of the entire index.
    IndexReader indexReader = context().searcher().getIndexReader();
    for (LeafReaderContext ctx : indexReader.leaves()) {

    This means that we will call collectBucket for every segment slice where the relationship in CollectionStrategy was found and lead to overcounting in most cases.

Here are a few potential changes we can make:
To address problem 1:

  1. Create CollectionStrategy as a shared object between threads. This will lead to contention between the threads, especially if we implement it as a concurrent hash map and the data set is very low cardinality.
  2. Since LeafBucketCollector::collect is called concurrently in index_searcher threads but ParentJoinAggregator::beforeBuildingBuckets is called sequentially for each aggregator in the reduce phase, we could merge together the CollectionStrategy objects across the aggregators before we call aggregator.buildTopLevel() in AggregationCollectorManager.

To address problem 2:

  1. Each aggregator instance could track every LeafReaderContext that it as collected and in the reduce phase it will only evaluate those same LeafReaderContexts instead of all the leaves for the entire index.
  2. At the reduce level, we only call buildTopLevel for a single ParentJoinAggregator instance. This would work similar to how InternalAggregations::reduce only calls reduce once here:
    reducedAggregations.add(first.reduce(aggregations, context));

Taking a step back, I think there is a broader problem that not all aggregation plugins may support concurrent search due to the nature of how they are implemented and if a user has a custom plugin that they have created they may want a way to not use concurrent search for their specific plugin. Off the top of my head, there could be other aggregation types that have shard routing considerations similar to this parent-join one or maybe there could be some arithmetic optimizations to an aggregation that specifically works better in the case of sequential searching. To address that I propose introducing a dynamic cluster setting that takes in a comma separated list of aggregator names that we would disable using concurrent search on.

In summary, these are the following changes I think we should make:

  • Dynamic cluster setting to disable concurrent segment search for certain types of aggregators. See [Concurrent Segment Search] Dynamic cluster setting to disable concurrent segment search for a given aggregation type #9446
  • Solution 2 for problem 1 above. This seems like a pretty involved change that needs to happen in one of the Aggregator base classes to provide a new interface for doing the merge.
  • Solution 1 for problem 2 above.
  • A new test case to cover the issue described in problem 1 above. This should be fairly straightforward as indexing a child document and parent document in separate segments and then performing concurrent segment search on 1 segment per slice should trigger this failure.
@jed326 jed326 added enhancement Enhancement or improvement to existing feature or request untriaged labels Aug 14, 2023
@jed326 jed326 self-assigned this Aug 14, 2023
@jed326
Copy link
Collaborator Author

jed326 commented Aug 18, 2023

This issue was originally created as a subtask for #7357 to focus on parent-join aggregation failure. Keep the original test failures below since the scope of the issue has expanded:

REPRODUCE WITH: ./gradlew ':modules:parent-join:internalClusterTest' --tests "org.opensearch.join.aggregations.ParentIT.testSimpleParentAgg" -Dtests.seed=1D45D9B19138B210 -Dtests.security.manager=true -Dtests.jvm.argline="-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m" -Dtests.locale=da -Dtests.timezone=Etc/GMT-13 -Druntime.java=20
REPRODUCE WITH: ./gradlew 'null' --tests "org.opensearch.join.aggregations.ParentIT.testSimpleParentAgg" -Dtests.seed=1D45D9B19138B210 -Dtests.security.manager=true -Dtests.jvm.argline="-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m" -Dtests.locale=da -Dtests.timezone=Etc/GMT-13 -Druntime.java=11

java.lang.AssertionError: Request: {"size":10000,"query":{"match":{"randomized":{"query":true,"operator":"OR","prefix_length":0,"max_expansions":50,"fuzzy_transpositions":true,"lenient":false,"zero_terms_query":"NONE","auto_generate_synonyms_phrase_query":true,"boost":1.0}}},"aggregations":{"to_article":{"parent":{"type":"comment"},"aggregations":{"category":{"terms":{"field":"category","size":10000,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"_count":"desc"},{"_key":"asc"}]}}}}}}
Response: {"took":92462,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":0.5260931,"hits":[{"_index":"test","_id":"article-1","_score":0.5260931,"_source":{"id":"article-1","category":["1"],"randomized":true,"join_field":{"name":"article"}}},{"_index":"test","_id":"comment-2","_score":0.5260931,"_routing":"article-1","_source":{"id":"comment-2","randomized":true,"join_field":{"parent":"article-1","name":"comment"},"commenter":"2"}},{"_index":"test","_id":"article-0","_score":0.5260931,"_source":{"id":"article-0","category":["0"],"randomized":true,"join_field":{"name":"article"}}},{"_index":"test","_id":"comment-3","_score":0.5260931,"_routing":"article-1","_source":{"id":"comment-3","randomized":true,"join_field":{"parent":"article-1","name":"comment"},"commenter":"3"}},{"_index":"test","_id":"comment-1","_score":0.5260931,"_routing":"article-0","_source":{"id":"comment-1","randomized":true,"join_field":{"parent":"article-0","name":"comment"},"commenter":"1"}},{"_index":"test","_id":"comment-0","_score":0.5260931,"_routing":"article-0","_source":{"id":"comment-0","randomized":true,"join_field":{"parent":"article-0","name":"comment"},"commenter":"0"}}]},"aggregations":{"to_article":{"doc_count":3,"category":{"doc_count_error_upper_bound":0,"sum_other_doc_count":0,"buckets":[{"key":"1","doc_count":2},{"key":"0","doc_count":1}]}}}}

Expected: <2L>
     but: was <3L>
Expected :<2L>
Actual   :<3L>
<Click to see difference>


	at __randomizedtesting.SeedInfo.seed([1D45D9B19138B210:C90EB7EA85D96A22]:0)
	at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18)
	at org.junit.Assert.assertThat(Assert.java:964)
	at org.opensearch.join.aggregations.ParentIT.testSimpleParentAgg(ParentIT.java:70)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)

The doc_count result in the aggregation is incorrect, even before it reaches the reduce phase.

@jed326 jed326 changed the title [Concurrent Segment Search] parent-join test failures [Concurrent Segment Search] Support parent-join aggregations Aug 18, 2023
@jed326 jed326 added the bug Something isn't working label Aug 18, 2023
@jed326
Copy link
Collaborator Author

jed326 commented Aug 18, 2023

I repurposed this issue as a backlog item to support parent-join aggregations in concurrent segment search and added all of my findings at the top level. I'll create another issue to track creating a dynamic cluster setting for disabling concurrent segment search for specific aggregators.

In the meantime @reta @sohami would like to get your thoughts on the changes I suggested in the top level issue above (including if you agree with the new dynamic cluster setting).

@sohami
Copy link
Collaborator

sohami commented Aug 18, 2023

@jed326 Thanks for capturing the details here around parent-join aggregation. Would be great to verify and add a similar test case for child aggregation which fails with concurrent segment search.

I'll create another issue to track creating a dynamic cluster setting for disabling concurrent segment search for specific aggregators.

+1. As you called out, setting based mechanism will be useful to disable for specific aggregations dynamically for subset of requests having such operations. Whereas other requests can still benefit from using concurrent search.

We can definitely follow-up with the fixes for parent-join aggregation depending on the users interest and requirements.

@jed326
Copy link
Collaborator Author

jed326 commented Aug 22, 2023

Would be great to verify and add a similar test case for child aggregation which fails with concurrent segment search.

Added test cases as a part of #9446. A few things to note though:

  • Children aggregation type doesn't see the same issue with over-counting because it is a many-to-one mapping instead of a one-to-many mapping like parent aggregation.
  • Because we iterate over all of the segments for the index in each Aggregator instance, this will actually cover up the issues with shard routing considerations at the slice level (problem 1). This means that today if we enable concurrent segment search and have 1 child and 1 parent document on different segment slices, then we would still see the relationship between the documents due to each aggregator iterating over all segments.

@reta
Copy link
Collaborator

reta commented Aug 22, 2023

In the meantime @reta @sohami would like to get your thoughts on the changes I suggested in the top level issue above (including if you agree with the new dynamic cluster setting).

My apologies, I missed that but shared my thought on the pull request

@jed326 jed326 removed their assignment Sep 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement or improvement to existing feature or request Search:Aggregations
Projects
Status: No status
Development

No branches or pull requests

3 participants