Skip to content

Commit

Permalink
SQL: Avoid empty last pages for GROUP BY queries when possible (elast…
Browse files Browse the repository at this point in the history
…ic#84356)

Resolves elastic#75528.

Instead of always returning an empty last page for `GROUP BY` queries,
`CompositeAggCursor` will now only do so if it is not possible to tell
wether there are more pages based on the composite aggregation response.
This is the case in two situations: * The last page contains exactly
`fetch_size` results. In this case, the composite aggregation return an
`after_key` even if there are no more keys remaining (see also
elastic#75573) * The query uses
a bucket selector. In this case, the composite aggregation might return
partial pages with less than `size` buckets and the `buckets.size() <
sizeRequested` heuristic for detecting last pages does no longer work.

Hence, if any (or both) of the two conditions above applies, SQL will
still return an empty last page. If neither of the conditions apply, the
last page will always be non-empty.

This PR is also a weak prerequisite for addressing
elastic#84349 because it allows
to immediately close PITs for aggregation queries returning only one
page. As a result the performance impact of using PIT for aggregations
should be minimized.
  • Loading branch information
Lukas Wegmann authored and javanna committed Mar 16, 2022
1 parent 260e707 commit c107188
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 83 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/84356.yaml
@@ -0,0 +1,6 @@
pr: 84356
summary: Avoid empty last pages for GROUP BY queries when possible
area: SQL
type: bug
issues:
- 75528
Expand Up @@ -478,7 +478,7 @@ public void testCountAndCountDistinct() throws IOException {
);

String cursor = (String) response.remove("cursor");
assertNotNull(cursor);
assertNull(cursor);
assertResponse(expected, response);
}

Expand Down Expand Up @@ -1460,7 +1460,7 @@ public void testFetchAllPagesSearchHitCursor(String format) throws IOException {
List<String> texts = IntStream.range(0, size).mapToObj(i -> String.format(Locale.ROOT, "text%02d", i)).toList();
index(texts.stream().map(t -> "{\"field\": \"" + t + "\"}").toArray(String[]::new));

testFetchAllPages(format, "SELECT field FROM " + indexPattern("test") + " ORDER BY field", texts, pageSize, true);
testFetchAllPages(format, "SELECT field FROM " + indexPattern("test") + " ORDER BY field", texts, pageSize, size % pageSize == 0);
}

/**
Expand Down Expand Up @@ -1495,7 +1495,41 @@ public void testFetchAllPagesCompositeAggCursor(String format) throws IOExceptio
List<String> texts = IntStream.range(0, size).mapToObj(i -> String.format(Locale.ROOT, "text%02d", i)).toList();
index(texts.stream().map(t -> "{\"field\": \"" + t + "\"}").toArray(String[]::new));

testFetchAllPages(format, "SELECT field FROM " + indexPattern("test") + " GROUP BY field ORDER BY field", texts, pageSize, true);
testFetchAllPages(
format,
"SELECT field FROM " + indexPattern("test") + " GROUP BY field ORDER BY field",
texts,
pageSize,
size % pageSize == 0
);
}

public void testFetchAllPagesCompositeAggCursorWithFilterOnAggregateTxt() throws IOException {
testFetchAllPagesCompositeAggCursorWithFilterOnAggregate("text/plain");
}

public void testFetchAllPagesCompositeAggCursorWithFilterOnAggregateCsv() throws IOException {
testFetchAllPagesCompositeAggCursorWithFilterOnAggregate("text/csv");
}

public void testFetchAllPagesCompositeAggCursorWithFilterOnAggregateTsv() throws IOException {
testFetchAllPagesCompositeAggCursorWithFilterOnAggregate("text/tab-separated-values");
}

public void testFetchAllPagesCompositeAggCursorWithFilterOnAggregate(String format) throws IOException {
int size = randomIntBetween(4, 20);
int pageSize = randomIntBetween(1, size + 1);

List<String> texts = IntStream.range(0, size).mapToObj(i -> String.format(Locale.ROOT, "text%02d", i)).toList();
index(texts.stream().map(t -> "{\"field\": \"" + t + "\"}").toArray(String[]::new));

testFetchAllPages(
format,
"SELECT field, COUNT(*) c FROM " + indexPattern("test") + " GROUP BY field HAVING c = 1 ORDER BY field",
texts,
pageSize,
true
);
}

public void testFetchAllPagesListCursorTxt() throws IOException {
Expand Down Expand Up @@ -1526,7 +1560,7 @@ public void testFetchAllPagesListCursor(String format) throws IOException {
* 2. There are at most `expectedValues.size() / pageSize + 1` pages (the last one might or might not be empty)
* 3. Optionally: That the last page is not empty.
*/
private void testFetchAllPages(String format, String query, List<String> expectedValues, int pageSize, boolean allowEmptyLastPage)
private void testFetchAllPages(String format, String query, List<String> expectedValues, int pageSize, boolean emptyLastPage)
throws IOException {
int remainingPages = expectedValues.size() / pageSize + 1;

Expand All @@ -1550,8 +1584,10 @@ private void testFetchAllPages(String format, String query, List<String> expecte
remainingPages--;
}

if (allowEmptyLastPage == false) {
assertFalse(Strings.isNullOrEmpty(response.v1()));
if (emptyLastPage) {
assertTrue("Expected empty last page but got " + response.v1(), Strings.isNullOrEmpty(response.v1()));
} else {
assertFalse("Expected non-empty last page but got " + response.v1(), Strings.isNullOrEmpty(response.v1()));
}

assertNull(response.v2());
Expand Down
Expand Up @@ -18,15 +18,15 @@
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketSelectorPipelineAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.ql.execution.search.extractor.BucketExtractor;
import org.elasticsearch.xpack.ql.type.Schema;
import org.elasticsearch.xpack.ql.util.StringUtils;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.querydsl.agg.Aggs;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SqlConfiguration;
import org.elasticsearch.xpack.sql.util.Check;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -141,14 +141,15 @@ public void onResponse(SearchResponse response) {
makeCursor(),
() -> client.search(request, this),
delegate,
Schema.EMPTY
couldProducePartialPages(getCompositeBuilder(next()))
);
}
});
}

protected Supplier<CompositeAggRowSet> makeRowSet(SearchResponse response) {
return () -> new CompositeAggRowSet(extractors, mask, response, limit);
CompositeAggregationBuilder aggregation = getCompositeBuilder(nextQuery);
return () -> new CompositeAggRowSet(extractors, mask, response, aggregation.size(), limit, couldProducePartialPages(aggregation));
}

protected BiFunction<SearchSourceBuilder, CompositeAggRowSet, CompositeAggCursor> makeCursor() {
Expand All @@ -162,71 +163,69 @@ static void handle(
BiFunction<SearchSourceBuilder, CompositeAggRowSet, CompositeAggCursor> makeCursor,
Runnable retry,
ActionListener<Page> listener,
Schema schema
boolean couldProducePartialPages
) {

if (log.isTraceEnabled()) {
logSearchResponse(response, log);
}
// there are some results
if (response.getAggregations().asList().isEmpty() == false) {
// retry
if (shouldRetryDueToEmptyPage(response)) {
updateCompositeAfterKey(response, source);
retry.run();
return;
}

try {
CompositeAggRowSet rowSet = makeRowSet.get();
Map<String, Object> afterKey = rowSet.afterKey();
// retry
if (couldProducePartialPages && shouldRetryDueToEmptyPage(response)) {
updateCompositeAfterKey(response, source);
retry.run();
return;
}

if (afterKey != null) {
updateSourceAfterKey(afterKey, source);
}
CompositeAggRowSet rowSet = makeRowSet.get();

Cursor next = rowSet.remainingData() == 0 ? Cursor.EMPTY : makeCursor.apply(source, rowSet);
listener.onResponse(new Page(rowSet, next));
} catch (Exception ex) {
listener.onFailure(ex);
}
}
// no results
else {
listener.onResponse(Page.last(Rows.empty(schema)));
Map<String, Object> afterKey = rowSet.afterKey();

if (afterKey != null) {
updateSourceAfterKey(afterKey, source);
}

Cursor next = rowSet.remainingData() == 0 ? Cursor.EMPTY : makeCursor.apply(source, rowSet);
listener.onResponse(new Page(rowSet, next));
}

private static boolean shouldRetryDueToEmptyPage(SearchResponse response) {
CompositeAggregation composite = getComposite(response);
// if there are no buckets but a next page, go fetch it instead of sending an empty response to the client
return composite != null
&& composite.getBuckets().isEmpty()
&& composite.afterKey() != null
&& composite.afterKey().isEmpty() == false;
return composite.getBuckets().isEmpty() && composite.afterKey() != null && composite.afterKey().isEmpty() == false;
}

static CompositeAggregation getComposite(SearchResponse response) {
Aggregation agg = response.getAggregations().get(Aggs.ROOT_GROUP_NAME);
if (agg == null) {
return null;
}
static CompositeAggregationBuilder getCompositeBuilder(SearchSourceBuilder source) {
AggregationBuilder aggregation = source.aggregations()
.getAggregatorFactories()
.stream()
.filter(a -> Objects.equals(a.getName(), Aggs.ROOT_GROUP_NAME))
.findFirst()
.orElse(null);

if (agg instanceof CompositeAggregation) {
return (CompositeAggregation) agg;
}
Check.isTrue(aggregation instanceof CompositeAggregationBuilder, "Unexpected aggregation builder " + aggregation);

throw new SqlIllegalArgumentException("Unrecognized root group found; {}", agg.getClass());
return (CompositeAggregationBuilder) aggregation;
}

private static void updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder search) {
CompositeAggregation composite = getComposite(r);

if (composite == null) {
throw new SqlIllegalArgumentException("Invalid server response; no group-by detected");
static boolean couldProducePartialPages(CompositeAggregationBuilder aggregation) {
for (var agg : aggregation.getPipelineAggregations()) {
if (agg instanceof BucketSelectorPipelineAggregationBuilder) {
return true;
}
}
return false;
}

static CompositeAggregation getComposite(SearchResponse response) {
Aggregation agg = response.getAggregations().get(Aggs.ROOT_GROUP_NAME);
Check.isTrue(agg instanceof CompositeAggregation, "Unrecognized root group found; " + agg);

updateSourceAfterKey(composite.afterKey(), search);
return (CompositeAggregation) agg;
}

private static void updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder search) {
updateSourceAfterKey(getComposite(r).afterKey(), search);
}

private static void updateSourceAfterKey(Map<String, Object> afterKey, SearchSourceBuilder search) {
Expand Down
Expand Up @@ -15,8 +15,6 @@
import java.util.List;
import java.util.Map;

import static java.util.Collections.emptyList;

/**
* {@link RowSet} specific to (GROUP BY) aggregation.
*/
Expand All @@ -29,21 +27,24 @@ class CompositeAggRowSet extends ResultRowSet<BucketExtractor> {
int size;
int row = 0;

CompositeAggRowSet(List<BucketExtractor> exts, BitSet mask, SearchResponse response, int limit) {
CompositeAggRowSet(
List<BucketExtractor> exts,
BitSet mask,
SearchResponse response,
int sizeRequested,
int remainingLimit,
boolean mightProducePartialPages
) {
super(exts, mask);

CompositeAggregation composite = CompositeAggCursor.getComposite(response);
if (composite != null) {
buckets = composite.getBuckets();
afterKey = composite.afterKey();
} else {
buckets = emptyList();
afterKey = null;
}
buckets = composite.getBuckets();
afterKey = composite.afterKey();

// page size
size = limit == -1 ? buckets.size() : Math.min(buckets.size(), limit);
remainingData = remainingData(afterKey != null, size, limit);
size = remainingLimit == -1 ? buckets.size() : Math.min(buckets.size(), remainingLimit);
boolean hasNextPage = mightProducePartialPages || buckets.size() == sizeRequested;
remainingData = remainingData(hasNextPage, size, remainingLimit);
}

static int remainingData(boolean hasNextPage, int size, int limit) {
Expand Down
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.ql.execution.search.extractor.BucketExtractor;
import org.elasticsearch.xpack.ql.type.Schema;
Expand Down Expand Up @@ -64,7 +65,17 @@ public String getWriteableName() {

@Override
protected Supplier<CompositeAggRowSet> makeRowSet(SearchResponse response) {
return () -> new PivotRowSet(Schema.EMPTY, extractors(), mask(), response, limit(), previousKey);
CompositeAggregationBuilder aggregation = getCompositeBuilder(next());
return () -> new PivotRowSet(
Schema.EMPTY,
extractors(),
mask(),
response,
aggregation.size(),
limit(),
previousKey,
couldProducePartialPages(aggregation)
);
}

@Override
Expand Down
Expand Up @@ -31,10 +31,12 @@ class PivotRowSet extends SchemaCompositeAggRowSet {
List<BucketExtractor> exts,
BitSet mask,
SearchResponse response,
int sizeRequested,
int limit,
Map<String, Object> previousLastKey
Map<String, Object> previousLastKey,
boolean mightProducePartialPages
) {
super(schema, exts, mask, response, limit);
super(schema, exts, mask, response, sizeRequested, limit, mightProducePartialPages);

data = buckets.isEmpty() ? emptyList() : new ArrayList<>();

Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.filter.Filters;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand Down Expand Up @@ -452,22 +453,28 @@ static class CompositeActionListener extends BaseAggActionListener {

@Override
protected void handleResponse(SearchResponse response, ActionListener<Page> listener) {
CompositeAggregationBuilder aggregation = CompositeAggCursor.getCompositeBuilder(request.source());
boolean mightProducePartialPages = CompositeAggCursor.couldProducePartialPages(aggregation);

Supplier<CompositeAggRowSet> makeRowSet = isPivot
? () -> new PivotRowSet(
schema,
initBucketExtractors(response),
mask,
response,
aggregation.size(),
query.sortingColumns().isEmpty() ? query.limit() : -1,
null
null,
mightProducePartialPages
)
: () -> new SchemaCompositeAggRowSet(
schema,
initBucketExtractors(response),
mask,
response,
query.sortingColumns().isEmpty() ? query.limit() : -1
aggregation.size(),
query.sortingColumns().isEmpty() ? query.limit() : -1,
mightProducePartialPages
);

BiFunction<SearchSourceBuilder, CompositeAggRowSet, CompositeAggCursor> makeCursor = isPivot ? (q, r) -> {
Expand Down Expand Up @@ -498,7 +505,7 @@ protected void handleResponse(SearchResponse response, ActionListener<Page> list
makeCursor,
() -> client.search(request, this),
listener,
schema
mightProducePartialPages
);
}
}
Expand Down
Expand Up @@ -23,8 +23,16 @@ class SchemaCompositeAggRowSet extends CompositeAggRowSet implements SchemaRowSe

private final Schema schema;

SchemaCompositeAggRowSet(Schema schema, List<BucketExtractor> exts, BitSet mask, SearchResponse r, int limitAggs) {
super(exts, mask, r, limitAggs);
SchemaCompositeAggRowSet(
Schema schema,
List<BucketExtractor> exts,
BitSet mask,
SearchResponse r,
int sizeRequested,
int limitAggs,
boolean mightProducePartialPages
) {
super(exts, mask, r, sizeRequested, limitAggs, mightProducePartialPages);
this.schema = schema;
}

Expand Down

0 comments on commit c107188

Please sign in to comment.