Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(optimizer): implement 2-phase stream agg #3320

Merged
merged 22 commits into from
Jun 25, 2022

Conversation

kwannoel
Copy link
Contributor

@kwannoel kwannoel commented Jun 19, 2022

What's changed and what's your intention?

As per title:

LocalAgg (single-value) -> Exchange -> GlobalAgg
  • Summarize your change (mandatory)
    Perform 2-phase agg if input is distributed.

  • How does this PR work? Need a brief introduction for the changed logic (optional)
    When converting from logical plan to stream plan, if input is distributed, insert a LocalAgg and Exchange(single) to do 2-phase aggregation.

  • Describe clearly one logical change and avoid lazy messages (optional)
    A new plan node, LocalSimpleAgg is included, since the semantics of LocalSimpleAgg differs from GlobalSimpleAgg.
    It is used for distributed input, does not persist state. Additionally output is append-only, compared to GlobalSimpleAgg which has stateful output.

  • Describe any limitations of the current code (optional)
    Currently does not support non-append-only min/max.
    In the future perhaps LocalSimpleAgg should be renamed to LocalAppendOnlySimpleAgg. It is an optimization on top of the more general stateful LocalSimpleAgg.
    We will also need to replace the single value LocalSimpleAgg with the stateful one, since it is more general.

  • Add the 'user-facing changes' label if your PR contains changes that are visible to users (optional)
    NIL

Checklist

  • I have written necessary docs and comments
  • I have added necessary unit tests and integration tests
  • All checks passed in ./risedev check (or alias, ./risedev c)

Refer to a related PR or issue link (optional)

#2997

@kwannoel kwannoel force-pushed the kwannoel/2-phase-stream-agg branch from 3bb9200 to ae99ce4 Compare June 20, 2022 04:25
@kwannoel
Copy link
Contributor Author

kwannoel commented Jun 20, 2022

Currently failing:

dev=> create table t1 (v1 int not null, v2 int not null, v3 int not null);
CREATE_TABLE
dev=> 
dev=> insert into t1 values (1,4,2), (2,3,3);
INSERT 0 2
dev=> 
dev=> explain create materialized view mv2 as select round(avg(v1), 1) as avg_v1, sum(v2) as sum_v2, count(v3) as count_v3 from t1;
                                            QUERY PLAN                                             
---------------------------------------------------------------------------------------------------
 StreamMaterialize { columns: [avg_v1, sum_v2, count_v3], pk_columns: [] }
   StreamProject { exprs: [RoundDigit(($1::Decimal / $2), 1:Int32), $3, $4] }
     StreamSimpleAgg { aggs: [sum($0), sum($1), sum($2), sum($3), sum($4)], pk_indices: [] } -- should be failing
       StreamExchange { dist: Single, pk_indices: [] }
         StreamSimpleAgg { aggs: [count, sum($0), count($0), sum($1), count($2)], pk_indices: [] }
           StreamTableScan { table: t1, columns: [v1, v2, v3, _row_id], pk_indices: [3] }
(6 rows)

dev=> create materialized view mv2 as select round(avg(v1), 1) as avg_v1, sum(v2) as sum_v2, count(v3) as count_v3 from t1;ERROR:  RPC error: GrpcStatus(Status { code: Unknown, message: "transport error", source: Some(tonic::transport::Error(Transport, hyper::Error(Io, Kind(BrokenPipe)))) })

Fails as it tries to serialize pk: [] with some order types. Need to write some integration tests to figure out what's going on

The stream agg that should be failing is probably the second one, since the first should work as per normal.
What I can't quite figure out is whether pk should be empty here or order types should be.

Backtrace + prints of pk row and order types:

serialize pk Row(
    [],
)
with order types [
    Ascending,
]
thread 'tokio-runtime-worker' panicked at 'itertools: .zip_eq() reached end of one iterator before the other', /home/noel/.cargo/registry/src/github.com-1ecc6299db9ec823/itertools-0.10.3/src/zip_eq_impl.rs:48:13
stack backtrace:
thread 'tokio-runtime-worker' panicked at 'itertools: .zip_eq() reached end of one iterator before the other', /home/noel/.cargo/registry/src/github.com-1ecc6299db9ec823/itertools-0.10.3/src/zip_eq_impl.rs:48:13
   0: std::panicking::begin_panic
             at /rustc/bb8c2f41174caceec00c28bc6c5c20ae9f9a175c/library/std/src/panicking.rs:616:12
   1: <itertools::zip_eq_impl::ZipEq<I,J> as core::iter::traits::iterator::Iterator>::next
             at /home/noel/.cargo/registry/src/github.com-1ecc6299db9ec823/itertools-0.10.3/src/zip_eq_impl.rs:48:13
   2: risingwave_common::util::ordered::serde::OrderedRowSerializer::serialize_datums
             at ./src/common/src/util/ordered/serde.rs:100:36
   3: risingwave_common::util::ordered::serde::OrderedRowSerializer::serialize
             at ./src/common/src/util/ordered/serde.rs:88:9
   4: risingwave_common::util::ordered::serde::serialize_pk
             at ./src/common/src/util/ordered/serde.rs:231:5
   5: risingwave_storage::table::state_table::StateTable<S>::get_row::{{closure}}
             at ./src/storage/src/table/state_table.rs:82:24
   6: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/bb8c2f41174caceec00c28bc6c5c20ae9f9a175c/library/core/src/future/mod.rs:91:19
   7: risingwave_stream::executor::managed_state::aggregation::value::ManagedValueState::new::{{closure}}
             at ./src/stream/src/executor/managed_state/aggregation/value.rs:56:63
   8: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/bb8c2f41174caceec00c28bc6c5c20ae9f9a175c/library/core/src/future/mod.rs:91:19
   9: risingwave_stream::executor::managed_state::aggregation::ManagedStateImpl<S>::create_managed_state::{{closure}}
             at ./src/stream/src/executor/managed_state/aggregation/mod.rs:164:81
  10: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/bb8c2f41174caceec00c28bc6c5c20ae9f9a175c/library/core/src/future/mod.rs:91:19
  11: risingwave_stream::executor::aggregation::generate_managed_agg_state::{{closure}}
             at ./src/stream/src/executor/aggregation/mod.rs:455:10
  12: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/bb8c2f41174caceec00c28bc6c5c20ae9f9a175c/library/core/src/future/mod.rs:91:19
  13: risingwave_stream::executor::global_simple_agg::SimpleAggExecutor<S>::apply_chunk::{{closure}}
             at ./src/stream/src/executor/global_simple_agg.rs:171:14
  14: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/bb8c2f41174caceec00c28bc6c5c20ae9f9a175c/library/core/src/future/mod.rs:91:19
  15: risingwave_stream::executor::global_simple_agg::SimpleAggExecutor<S>::execute_inner::{{closure}}
             at ./src/stream/src/executor/global_simple_agg.rs:276:22
   ...

@skyzh skyzh self-requested a review June 21, 2022 08:34
@kwannoel kwannoel force-pushed the kwannoel/2-phase-stream-agg branch from ae99ce4 to 5b16473 Compare June 21, 2022 08:49
@skyzh
Copy link
Contributor

skyzh commented Jun 21, 2022

I guess the issue is caused by wrong induction of state table schema. cc @BowenXiao1999 any ideas?

@skyzh
Copy link
Contributor

skyzh commented Jun 21, 2022

... might be related to this part:

https://github.com/singularity-data/risingwave/blob/309ce36470483c6240d644936e916b5521178d6c/src/stream/src/executor/aggregation/mod.rs#L400-L429

@BowenXiao1999
Copy link
Contributor

Let me take a look

@BowenXiao1999
Copy link
Contributor

BowenXiao1999 commented Jun 21, 2022

From your query, it should be simple agg + not append_only + value states. So the generate_table_state should infer order type as 0 length.

@BowenXiao1999
Copy link
Contributor

@BowenXiao1999 BowenXiao1999 self-requested a review June 21, 2022 10:18
@kwannoel
Copy link
Contributor Author

kwannoel commented Jun 22, 2022

Closing this in favour of stateful approach to be used: https://singularity-data.quip.com/KtaRA6CspqRK#RBEADAF6kqO. It will be able to handle non-append only min / max too.

@kwannoel kwannoel closed this Jun 22, 2022
@kwannoel kwannoel deleted the kwannoel/2-phase-stream-agg branch June 22, 2022 03:44
@skyzh
Copy link
Contributor

skyzh commented Jun 22, 2022

I think we'll still need this. The stateless approach can only be used on append-only min max and value state, and the stateful approach can only be used on max min topN.

@kwannoel kwannoel restored the kwannoel/2-phase-stream-agg branch June 22, 2022 03:52
@kwannoel kwannoel reopened this Jun 22, 2022
@kwannoel kwannoel force-pushed the kwannoel/2-phase-stream-agg branch from 5b16473 to 7cd2ddc Compare June 24, 2022 07:04
@kwannoel kwannoel force-pushed the kwannoel/2-phase-stream-agg branch from fda67fb to 4d45994 Compare June 24, 2022 08:39
@codecov
Copy link

codecov bot commented Jun 24, 2022

Codecov Report

Merging #3320 (4165d7d) into main (6ba009e) will increase coverage by 0.01%.
The diff coverage is 98.98%.

@@            Coverage Diff             @@
##             main    #3320      +/-   ##
==========================================
+ Coverage   74.28%   74.30%   +0.01%     
==========================================
  Files         768      769       +1     
  Lines      106684   106775      +91     
==========================================
+ Hits        79250    79338      +88     
- Misses      27434    27437       +3     
Flag Coverage Δ
rust 74.30% <98.98%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
src/frontend/src/optimizer/plan_node/mod.rs 97.91% <ø> (ø)
...ntend/src/optimizer/plan_node/stream_simple_agg.rs 96.66% <ø> (-0.06%) ⬇️
...src/optimizer/plan_node/stream_local_simple_agg.rs 98.41% <98.41%> (ø)
...rc/frontend/src/optimizer/plan_node/logical_agg.rs 96.40% <100.00%> (+0.13%) ⬆️
src/meta/src/hummock/mock_hummock_meta_client.rs 40.56% <0.00%> (-0.95%) ⬇️
src/meta/src/manager/id.rs 95.50% <0.00%> (-0.57%) ⬇️
src/storage/src/hummock/local_version_manager.rs 84.37% <0.00%> (-0.16%) ⬇️
src/frontend/src/expr/utils.rs 98.99% <0.00%> (ø)
src/frontend/src/catalog/source_catalog.rs 68.18% <0.00%> (+1.51%) ⬆️

📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more

@kwannoel kwannoel marked this pull request as ready for review June 24, 2022 16:33
Copy link
Contributor

@skyzh skyzh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM, good work!

src/frontend/src/optimizer/plan_node/logical_agg.rs Outdated Show resolved Hide resolved
src/frontend/src/optimizer/plan_node/logical_agg.rs Outdated Show resolved Hide resolved
src/frontend/src/optimizer/plan_node/logical_agg.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@BowenXiao1999 BowenXiao1999 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM, Good work!

src/frontend/test_runner/tests/testdata/agg.yaml Outdated Show resolved Hide resolved
Copy link
Contributor

@skyzh skyzh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, good work!

src/frontend/src/optimizer/plan_node/logical_agg.rs Outdated Show resolved Hide resolved
@kwannoel kwannoel enabled auto-merge (squash) June 25, 2022 08:35
@kwannoel kwannoel merged commit 76b609b into main Jun 25, 2022
@kwannoel kwannoel deleted the kwannoel/2-phase-stream-agg branch June 25, 2022 08:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants