Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/user/interfaces/endpoint.rst
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,27 @@ Unlike SQL's ``fetch_size`` which enables cursor-based pagination, PPL's ``fetch
| Can fetch more? | Yes (with cursor) | No (single response) |
+--------------------+-------------------------------------+------------------------------------+

Interaction with the ``head`` command
--------------------------------------

When a PPL query contains an explicit ``head`` command, the ``head`` command takes precedence over ``fetch_size``. Because PPL's ``fetch_size`` does not support pagination, capping the result below the user's explicit ``head`` limit would silently discard rows with no way to retrieve them. To avoid this, ``fetch_size`` is ignored when a ``head`` command is present, and the query returns the number of rows specified by ``head``.

If the query does **not** contain a ``head`` command, ``fetch_size`` limits the result as usual.

Examples::

# head 100 takes precedence — returns 100 rows, fetch_size=5 is ignored
>> curl -H 'Content-Type: application/json' -X POST localhost:9200/_plugins/_ppl -d '{
"fetch_size" : 5,
"query" : "source = accounts | head 100 | fields firstname"
}'

# No head command — fetch_size=5 limits the result to 5 rows
>> curl -H 'Content-Type: application/json' -X POST localhost:9200/_plugins/_ppl -d '{
"fetch_size" : 5,
"query" : "source = accounts | fields firstname"
}'

Example 1: JSON body
-------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2579,8 +2579,8 @@ public void testExplainFetchSizePushDown() throws IOException {
@Test
public void testExplainFetchSizeWithSmallerHead() throws IOException {
// fetch_size=10 with user's | head 3
// Two LogicalSort nodes: inner fetch=[3] from user head, outer fetch=[10] from fetch_size
// Effective limit = min(3, 10) = 3
// Explicit head takes precedence: only one LogicalSort(fetch=[3]) from user head
// fetch_size does not inject an additional Head when user already has one
String expected = loadExpectedPlan("explain_fetch_size_with_head_push.yaml");
assertYamlEqualsIgnoreId(
expected,
Expand All @@ -2591,8 +2591,8 @@ public void testExplainFetchSizeWithSmallerHead() throws IOException {
@Test
public void testExplainFetchSizeSmallerThanHead() throws IOException {
// fetch_size=5 with user's | head 100
// Two LogicalSort nodes: inner fetch=[100] from user head, outer fetch=[5] from fetch_size
// Effective limit = min(100, 5) = 5
// Explicit head takes precedence: only one LogicalSort(fetch=[100]) from user head
// fetch_size does not inject an additional Head when user already has one
String expected = loadExpectedPlan("explain_fetch_size_smaller_than_head_push.yaml");
assertYamlEqualsIgnoreId(
expected,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ public void testFetchSizeWithStats() throws IOException {
}

@Test
public void testFetchSizeWithHead() throws IOException {
// Both head command and fetch_size - the smaller limit should win
// head 3 limits to 3, fetch_size 10 would allow 10, so we get 3
public void testHeadOverridesFetchSizeWhenSmaller() throws IOException {
// Explicit head takes precedence over fetch_size
// head 3 returns 3 rows regardless of fetch_size=10
JSONObject result =
executeQueryWithFetchSize(
String.format("source=%s | head 3 | fields firstname", TEST_INDEX_ACCOUNT), 10);
Expand All @@ -183,16 +183,59 @@ public void testFetchSizeWithHead() throws IOException {
}

@Test
public void testFetchSizeSmallerThanHead() throws IOException {
// fetch_size smaller than head - fetch_size should further limit
// head 100 would return 100, but fetch_size 5 limits to 5
public void testHeadOverridesFetchSizeWhenLarger() throws IOException {
// Explicit head takes precedence over fetch_size
// head 100 should return 100 rows even though fetch_size=5
JSONObject result =
executeQueryWithFetchSize(
String.format("source=%s | head 100 | fields firstname", TEST_INDEX_ACCOUNT), 5);
JSONArray dataRows = result.getJSONArray("datarows");
assertEquals(100, dataRows.length());
}

@Test
public void testHeadOverridesFetchSizeWithOffset() throws IOException {
// Explicit head with offset takes precedence over fetch_size
// head 3 from 2 should skip 2 rows and return 3 rows, ignoring fetch_size=100
JSONObject result =
executeQueryWithFetchSize(
String.format("source=%s | head 3 from 2 | fields firstname", TEST_INDEX_ACCOUNT), 100);
JSONArray dataRows = result.getJSONArray("datarows");
assertEquals(3, dataRows.length());
}

@Test
public void testHeadOverridesFetchSizeWithFilter() throws IOException {
// Explicit head after filter takes precedence over fetch_size
// Even with fetch_size=2, head 5 should return 5 matching rows
JSONObject result =
executeQueryWithFetchSize(
String.format(
"source=%s | where age > 30 | head 5 | fields firstname, age", TEST_INDEX_ACCOUNT),
2);
JSONArray dataRows = result.getJSONArray("datarows");
assertEquals(5, dataRows.length());
}

@Test
public void testHeadEqualToFetchSize() throws IOException {
// When head and fetch_size are the same value, head takes precedence (no double-Head)
JSONObject result =
executeQueryWithFetchSize(
String.format("source=%s | head 7 | fields firstname", TEST_INDEX_ACCOUNT), 7);
JSONArray dataRows = result.getJSONArray("datarows");
assertEquals(7, dataRows.length());
}

@Test
public void testHeadLargerThanDatasetWithFetchSize() throws IOException {
// head 1000 on a 7-row index with fetch_size=3: head takes precedence, returns all 7 rows
JSONObject result =
executeQueryWithFetchSize(String.format("source=%s | head 1000", TEST_INDEX_BANK), 3);
JSONArray dataRows = result.getJSONArray("datarows");
assertEquals(7, dataRows.length());
}

@Test
public void testFetchSizeAsUrlParameter() throws IOException {
// fetch_size specified as URL parameter instead of JSON body
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalSort(fetch=[5])
LogicalProject(age=[$8])
LogicalSort(fetch=[100])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
LogicalProject(age=[$8])
LogicalSort(fetch=[100])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->100, LIMIT->5, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->100, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":100,"timeout":"1m","_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=100, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalSort(fetch=[10])
LogicalProject(age=[$8])
LogicalSort(fetch=[3])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
LogicalProject(age=[$8])
LogicalSort(fetch=[3])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->3, LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":3,"timeout":"1m","_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=3, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->3, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":3,"timeout":"1m","_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=3, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalSort(fetch=[5])
LogicalProject(age=[$8])
LogicalSort(fetch=[100])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
LogicalProject(age=[$8])
LogicalSort(fetch=[100])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
EnumerableLimit(fetch=[10000])
EnumerableCalc(expr#0..16=[{inputs}], age=[$t8])
EnumerableLimit(fetch=[5])
EnumerableLimit(fetch=[100])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
EnumerableLimit(fetch=[100])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalSort(fetch=[10])
LogicalProject(age=[$8])
LogicalSort(fetch=[3])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
LogicalProject(age=[$8])
LogicalSort(fetch=[3])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
EnumerableLimit(fetch=[10000])
EnumerableLimit(fetch=[10])
EnumerableCalc(expr#0..16=[{inputs}], age=[$t8])
EnumerableLimit(fetch=[3])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
EnumerableCalc(expr#0..16=[{inputs}], age=[$t8])
EnumerableLimit(fetch=[3])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class AstStatementBuilder extends OpenSearchPPLParserBaseVisitor<Statemen
@Override
public Statement visitPplStatement(OpenSearchPPLParser.PplStatementContext ctx) {
UnresolvedPlan rawPlan = astBuilder.visit(ctx);
if (context.getFetchSize() > 0) {
if (context.getFetchSize() > 0 && !containsHead(rawPlan)) {
rawPlan = new Head(context.getFetchSize(), 0).attach(rawPlan);
}
UnresolvedPlan plan = addSelectAll(rawPlan);
Expand Down Expand Up @@ -69,6 +69,23 @@ public static class StatementBuilderContext {
private final String explainMode;
}

/**
* Recursively checks if the AST contains a {@link Head} node. When the user's query already
* includes an explicit {@code head} command, we should not inject an additional Head for
* fetch_size so that the user's explicit limit takes precedence.
Comment on lines +73 to +75
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if a query ends with ... | head 3 | head 500, the limit in final physical plan is Limit 500?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be 3, added a corresponding test case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if a query ends with ... | head 3 | head 500, the limit in final physical plan is Limit 500?

The new logic only decides if fetch_size will inject a new head node or not, so it won't affect the original head command behavior(which is that the smaller head command will take effect)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I meant without current change, it will be LIMIT 500? If so, is there bug elsewhere or this is Calcite planner expected behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's an example physical plan:
https://github.com/opensearch-project/sql/pull/5194/changes#diff-0c3c28474cebca090844af02cd01618a9ec578f69a272774934c7b3fb1c020aeR8

If without the current change, it will be LIMIT 3 in the final OpenSearchRequestBuilder(sourceBuilder
I think it's the expected behavior

*/
private boolean containsHead(UnresolvedPlan plan) {
if (plan instanceof Head) {
return true;
}
for (var child : plan.getChild()) {
if (child instanceof UnresolvedPlan && containsHead((UnresolvedPlan) child)) {
return true;
}
}
return false;
}

private UnresolvedPlan addSelectAll(UnresolvedPlan plan) {
if ((plan instanceof Project) && !((Project) plan).isExcluded()) {
return plan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,33 +87,42 @@ public void buildQueryStatementWithLargeFetchSize() {
@Test
public void buildQueryStatementWithFetchSizeAndSmallerHead() {
// User query has head 3, fetchSize=10
// Head(10) wraps Head(3), then Project(*) wraps on top
// The inner head 3 limits first, so only 3 rows are returned
// Explicit head takes precedence over fetch_size, so no outer Head(10) is injected
assertEqualWithFetchSize(
"source=t | head 3",
10,
new Query(project(head(head(relation("t"), 3, 0), 10, 0), AllFields.of()), 0, PPL));
new Query(project(head(relation("t"), 3, 0), AllFields.of()), 0, PPL));
}

@Test
public void buildQueryStatementWithFetchSizeSmallerThanHead() {
// User query has head 100, fetchSize=5
// Head(5) wraps Head(100), then Project(*) wraps on top
// The outer head 5 limits, so only 5 rows are returned
// Explicit head takes precedence over fetch_size, so no outer Head(5) is injected
assertEqualWithFetchSize(
"source=t | head 100",
5,
new Query(project(head(head(relation("t"), 100, 0), 5, 0), AllFields.of()), 0, PPL));
new Query(project(head(relation("t"), 100, 0), AllFields.of()), 0, PPL));
}

@Test
public void buildQueryStatementWithFetchSizeAndHeadWithOffset() {
// User query has head 3 from 1 (with offset), fetchSize=10
// The inner head offset is preserved, outer Head always has offset 0
// Explicit head takes precedence over fetch_size, so no outer Head(10) is injected
assertEqualWithFetchSize(
"source=t | head 3 from 1",
10,
new Query(project(head(head(relation("t"), 3, 1), 10, 0), AllFields.of()), 0, PPL));
new Query(project(head(relation("t"), 3, 1), AllFields.of()), 0, PPL));
}

@Test
public void buildQueryStatementWithFetchSizeAndMultipleHeads() {
// User query has head 3 | head 500, fetchSize=10
// containsHead() finds the existing Head nodes, so no Head(10) is injected
// Effective limit is min(3, 500) = 3 since inner head 3 limits first
assertEqualWithFetchSize(
"source=t | head 3 | head 500",
10,
new Query(project(head(head(relation("t"), 3, 0), 500, 0), AllFields.of()), 0, PPL));
}

private void assertEqualWithFetchSize(String query, int fetchSize, Statement expectedStatement) {
Expand Down
Loading