Contains a few test cases to evaluate the incremental/streaming aggregation capabilities of DataFusion. Most simple cases succeed, and for the ones that fail, it is mostly due to a few small bugs such as a missing optimiser rule, or an exec plan failing to report the correct ordering:
SortExecis sometimes pipeline blocking when it doesn't need to be apache/datafusion#16899PiecewiseMergeJoinExecdoesn't report that it maintains ordering (code)- Unnecessary blocking
SortExecinserted beforeBoundedWindowAggregation
✅ Aggregates incrementally
The group by can be streamed because the input is ordered by timestamp
SELECT
timestamp,
COUNT(*) as count
FROM people
GROUP BY timestamp
ORDER BY timestampData ordered by timestamp
✅ Aggregates incrementally
The group by can be streamed because the input is ordered by at least one of the group by columns. The aggregation will buffer aggregate state in memory until the next timestamp is reached, when all the results will be emitted.
SELECT
timestamp,
city,
AVG(age) as avg_age,
COUNT(*) as count
FROM people
GROUP BY timestamp, city
ORDER BY timestampData ordered by timestamp
❌ Pipeline blocking due to a missing optimisation
While this should be theoretically possible to execute in a streaming manner, it seems that the
SortExec operator is pipeline blocking, even when the input is partially sorted.
Looks like this is the core issue apache/datafusion#16899.
SELECT
timestamp,
city,
AVG(age) as avg_age,
COUNT(*) as count
FROM people
GROUP BY timestamp, city
ORDER BY timestamp, cityData ordered by timestamp
❌ Pipeline blocking
This plan cannot stream the aggregation, because the ordering of the timestamp is lost in the projection. Even though the truncated timestamp is guaranteed to have the same ordering as the original timestamp column, once a projection is applied, all ordering properties are lost.
This would be a challenging optimisation to implement.
SELECT
cast(timestamp / 1000 as int) * 1000 as timestamp,
city,
AVG(age) as avg_age,
COUNT(*) as count
FROM people
GROUP BY 1, city
ORDER BY 1Data ordered by timestamp
✅ Aggregates incrementally
This is a variation of the above query, but using a timestamp bucket column which has been precomputed in the input data, and the table provider reports that it is ordered.
SELECT
timestamp_bucket as timestamp,
city,
AVG(age) as avg_age,
COUNT(*) as count
FROM people
GROUP BY 1, city
ORDER BY 1Data ordered by timestamp
❌ Pipeline blocking due to a missing optimisation
Fails for the same reason as the other query ordered by city
SELECT
timestamp_bucket as timestamp,
city,
AVG(age) as avg_age,
COUNT(*) as count
FROM people
GROUP BY 1, city
ORDER BY 1, cityData ordered by timestamp
✅ Aggregates incrementally
Similar to the first query, since the data is partially sorted by the group by columns this succeeds.
SELECT
id_bucket,
id,
AVG(age) as avg_age,
COUNT(*) as count
FROM people
GROUP BY 1, 2Data ordered by id_bucket, and partially by id
✅ Aggregates incrementally
Similar to the second query, however, since the requested order matches the ordering of the data,
the planner recognises it can eliminate the sort exec.
Fails however if we use ORDER BY 1, 2 DESC
SELECT
id_bucket,
id,
AVG(age) as avg_age,
COUNT(*) as count
FROM people
GROUP BY 1, 2
ORDER BY 1, 2Data ordered by id_bucket, and partially by id
✅ Aggregates incrementally
This query simulates a multi-layer aggregation that first needs to calculate a value per entity id, and then combines the results per entity into some global aggregation. To do this incrementally, it calculates a partial aggregation per id bucket, and then combines the partial results in a running incremental aggregation. The result is one aggregated result per id bucket, kind of representing the result generation. A consumer would stream results, and discard the ones from the previous generation when the next one arrives.
-- Aggregate per key and bucket
with a as (
SELECT
id,
id_bucket,
AVG(age) as avg_age,
COUNT(*) as count
FROM people
GROUP BY id, id_bucket
)
-- Combine individual aggregated rows per per bucket
,b as (
SELECT
id_bucket,
COUNT(*) FILTER (WHERE avg_age > 65) as count_over_65,
SUM(count) as count
FROM a
GROUP BY id_bucket
)
-- Combine partial results per bucket with all of the previous buckets to produce
-- a running incremental aggregation
SELECT
id_bucket,
SUM(count_over_65) over (
ORDER BY id_bucket
GROUPS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) count_over_65,
SUM(count) over (
ORDER BY id_bucket
GROUPS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as sum_count
FROM bData ordered by id_bucket, and partially by id
❌ Pipeline blocking due to a missing optimisation
This query represents a non-timeseries aggregation on timeseries data. For example, calculating the count of each key in log events. An ideal incremental aggregation would be able to calculate a partial result after reading a certain amount of data, and then replace that result when more data arrives.
The query is pipeline blocking due to the PiecewiseMergeJoinExec not reporting that it maintains the ordering of its input, as described by this TODO. This causes the above aggregate exec to buffer all data in memory.
with
all_generation_ids as (
SELECT generation_id
FROM UNNEST(range(1700000, 1700050)) AS t(generation_id)
)
,input as (
SELECT
city,
timestamp_bucket as generation_id
FROM people
)
-- Partially aggregate per key and generation
,a as (
SELECT
city,
generation_id,
COUNT(*) as count
FROM input
GROUP BY city, generation_id
)
-- Incrementally merge aggregates to find the total city for all generations up to
-- the current
,b as (
SELECT
city,
generation_id,
SUM(count) OVER (
PARTITION BY city
ORDER BY generation_id
GROUPS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as sum_count
FROM a
)
-- Join on all generations to fill in gaps
,c as (
SELECT
b.city,
b.generation_id as original_generation_id,
b.sum_count,
all.generation_id as displaced_generation_id
FROM all_generation_ids all
LEFT JOIN b ON all.generation_id >= b.generation_id
)
-- Filter to only include the last value per generation
,d as (
SELECT
city,
displaced_generation_id as generation_id,
LAST_VALUE(sum_count ORDER BY original_generation_id) as sum_count
FROM c
GROUP BY city, displaced_generation_id
)
select * from dData ordered by timestamp
❌ Pipeline blocking due to a missing optimisation
This query is the same as the previous one, but adds a top-k reduction to each generation of results.
We are missing a few optimisations to make it incremental. First, it also uses the
PiecewiseMergeJoinExec which has the limitation described above. Second, the optimiser inserts an
unneeded SortExec before a BoundedWindowAggExec, when the window exec could be performed on the
partially sorted stream. In the b CTE, we perform a window aggregation partitioned by city, but
sorted on generation_id:
SELECT
city,
generation_id,
SUM(count) OVER (
PARTITION BY city
ORDER BY generation_id
GROUPS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as sum_count
FROM a The data is already sorted on generation id, so the optimiser should choose to use
SortMode::Linear which would compute the results incrementally. However, it instead chooses to
fully sort the input and use SortMode::Sorted. I believe it is caused by this
if condition
being too restrictive. To illustrate the point, DataFusion can perform incremental window
aggregations when the data is not partitioned:
SELECT
city,
generation_id,
SUM(count) OVER (
-- PARTITION BY city
ORDER BY generation_id
GROUPS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as sum_count
FROM a The above query produces the plan:
ProjectionExec: expr=[city@0 as city, generation_id@1 as generation_id, sum(a.count) ORDER BY [a.generation_id ASC NULLS FIRST] GROUPS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum_count]
BoundedWindowAggExec: wdw=[sum(a.count) ORDER BY [a.generation_id ASC NULLS FIRST] GROUPS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(a.count) ORDER BY [a.generation_id ASC NULLS FIRST] GROUPS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: GROUPS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
ProjectionExec: expr=[city@0 as city, generation_id@1 as generation_id, count(Int64(1))@2 as count]
AggregateExec: mode=Single, gby=[city@0 as city, generation_id@1 as generation_id], aggr=[count(Int64(1))], ordering_mode=PartiallySorted([1])
ProjectionExec: expr=[city@1 as city, timestamp_bucket@0 as generation_id]
CooperativeExec
GeneratedExec: rate=100, duration=5, ordering=Timestamp
While using the PARTITION BY city produces (note the SortExec):
ProjectionExec: expr=[city@0 as city, generation_id@1 as generation_id, sum(a.count) PARTITION BY [a.city] ORDER BY [a.generation_id ASC NULLS FIRST] GROUPS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum_count]
BoundedWindowAggExec: wdw=[sum(a.count) PARTITION BY [a.city] ORDER BY [a.generation_id ASC NULLS FIRST] GROUPS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(a.count) PARTITION BY [a.city] ORDER BY [a.generation_id ASC NULLS FIRST] GROUPS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: GROUPS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
SortExec: expr=[city@0 ASC NULLS LAST, generation_id@1 ASC], preserve_partitioning=[false]
ProjectionExec: expr=[city@0 as city, generation_id@1 as generation_id, count(Int64(1))@2 as count]
AggregateExec: mode=Single, gby=[city@0 as city, generation_id@1 as generation_id], aggr=[count(Int64(1))], ordering_mode=PartiallySorted([1])
ProjectionExec: expr=[city@1 as city, timestamp_bucket@0 as generation_id]
CooperativeExec
GeneratedExec: rate=100, duration=5, ordering=Timestamp
with
all_generation_ids as (
SELECT generation_id
FROM UNNEST(range(1700000, 1700050)) AS t(generation_id)
)
,input as (
SELECT
city,
timestamp_bucket as generation_id
FROM people
)
-- Partially aggregate per key and generation
,a as (
SELECT
city,
generation_id,
COUNT(*) as count
FROM input
GROUP BY city, generation_id
)
-- Incrementally merge aggregates
,b as (
SELECT
city,
generation_id,
SUM(count) OVER (
PARTITION BY city
ORDER BY generation_id
GROUPS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as sum_count
FROM a
)
-- Generate sorted row numbers per generation
,c as (
SELECT
city,
generation_id,
sum_count,
ROW_NUMBER() OVER (
PARTITION BY generation_id
ORDER BY sum_count DESC
) as row_num
FROM b
)
-- Apply top k to each generation by filtering on row_num
,d as (
SELECT
city,
generation_id,
sum_count,
row_num
FROM c
WHERE row_num <= 3
)
,e as (
SELECT
d.city,
d.generation_id as original_generation_id,
d.sum_count,
all.generation_id as displaced_generation_id
FROM all_generation_ids all
LEFT JOIN d ON all.generation_id >= d.generation_id
)
,f as (
SELECT
city,
displaced_generation_id as generation_id,
LAST_VALUE(sum_count ORDER BY original_generation_id) as sum_count
FROM e
GROUP BY city, displaced_generation_id
)
,g as (
SELECT
city,
generation_id,
sum_count,
ROW_NUMBER() OVER (
PARTITION BY generation_id
ORDER BY sum_count DESC
) as row_num
FROM f
)
,h as (
SELECT
city,
generation_id,
sum_count,
row_num
FROM g
WHERE row_num <= 3
)
select * from hData ordered by timestamp