Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(join): leverage band conditions in hash join #8749

Merged
merged 10 commits into from
Mar 28, 2023
Merged

Conversation

soundOfDestiny
Copy link
Contributor

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Leverage band conditions in hash join.
Emit watermarks by band conditions in non-equi conditions in join condition of streaming hash join.
Clean state by band conditions in non-equi conditions in join condition in streaming hash join executor.

Checklist For Contributors

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • I have demonstrated that backward compatibility is not broken by breaking changes and created issues to track deprecated features to be removed in the future. (Please refer to the issue)
  • All checks passed in ./risedev check (or alias, ./risedev c)

Checklist For Reviewers

  • I have requested macro/micro-benchmarks as this PR can affect performance substantially, and the results are shown.

Documentation

  • My PR DOES NOT contain user-facing changes.
Click here for Documentation

Types of user-facing changes

Please keep the types that apply to your changes, and remove the others.

  • Installation and deployment
  • Connector (sources & sinks)
  • SQL commands, functions, and operators
  • RisingWave cluster configuration changes
  • Other (please specify in the release note below)

Release note

@codecov
Copy link

codecov bot commented Mar 23, 2023

Codecov Report

Merging #8749 (22e0d45) into main (4828bb5) will increase coverage by 0.01%.
The diff coverage is 83.05%.

@@            Coverage Diff             @@
##             main    #8749      +/-   ##
==========================================
+ Coverage   70.79%   70.80%   +0.01%     
==========================================
  Files        1171     1171              
  Lines      193943   194337     +394     
==========================================
+ Hits       137298   137608     +310     
- Misses      56645    56729      +84     
Flag Coverage Δ
rust 70.80% <83.05%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
src/stream/src/from_proto/hash_join.rs 0.00% <0.00%> (ø)
src/frontend/src/expr/mod.rs 80.16% <56.60%> (-1.90%) ⬇️
...ontend/src/optimizer/plan_node/stream_hash_join.rs 91.86% <85.57%> (-2.73%) ⬇️
src/stream/src/executor/hash_join.rs 96.61% <94.89%> (-0.19%) ⬇️
src/frontend/src/utils/condition.rs 95.19% <95.00%> (-0.01%) ⬇️
...ntend/src/optimizer/plan_node/eq_join_predicate.rs 82.74% <100.00%> (+1.13%) ⬆️
...c/frontend/src/optimizer/plan_node/logical_join.rs 90.33% <100.00%> (+<0.01%) ⬆️
...rc/optimizer/rule/push_calculation_of_join_rule.rs 99.40% <100.00%> (+<0.01%) ⬆️
src/frontend/src/scheduler/distributed/query.rs 69.58% <100.00%> (+0.07%) ⬆️
src/stream/src/executor/test_utils.rs 93.84% <100.00%> (+0.09%) ⬆️

... and 7 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

proto/stream_plan.proto Show resolved Hide resolved
proto/stream_plan.proto Outdated Show resolved Hide resolved
@soundOfDestiny soundOfDestiny force-pushed the zl_band_join branch 3 times, most recently from 769e364 to c18da29 Compare March 27, 2023 10:45
@soundOfDestiny soundOfDestiny force-pushed the zl_band_join branch 2 times, most recently from 7f24aa1 to 964c90a Compare March 28, 2023 05:43
Copy link
Contributor

@st1page st1page left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Could we add a watermark in JoinSide and do the state cleaning logic inside the JoinSide?
  • considering if we have a side's entry it is inserted but never matched, so we can not clean. we need to filter the useless data with the watermark when it is inserted.

@@ -291,6 +291,30 @@ macro_rules! impl_has_variant {

impl_has_variant! {InputRef, Literal, FunctionCall, AggCall, Subquery, TableFunction, WindowFunction}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) struct InequalityInputPair {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BandJoin indicates a BETWEEN sematic, while here one side inequality in enough.

@soundOfDestiny
Copy link
Contributor Author

  • Could we add a watermark in JoinSide and do the state cleaning logic inside the JoinSide?
  • considering if we have a side's entry it is inserted but never matched, so we can not clean. we need to filter the useless data with the watermark when it is inserted.
  • However some watermarks need to be emitted into downstream.
  • We cannot filter at present or we cannot pass sanity check, but perhaps we can improve our sanity check in the future.

@@ -291,6 +291,30 @@ macro_rules! impl_has_variant {

impl_has_variant! {InputRef, Literal, FunctionCall, AggCall, Subquery, TableFunction, WindowFunction}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) struct InequalityInputPair {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BandJoin indicates a BETWEEN sematic, while here one side inequality in enough.

src/frontend/src/optimizer/plan_node/stream_hash_join.rs Outdated Show resolved Hide resolved
src/frontend/src/optimizer/plan_node/stream_hash_join.rs Outdated Show resolved Hide resolved
src/stream/src/executor/hash_join.rs Outdated Show resolved Hide resolved
src/stream/src/executor/hash_join.rs Outdated Show resolved Hide resolved
@@ -77,6 +77,25 @@
| └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t2.ts) }
└─StreamTableScan { table: t2, columns: [t2.ts, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
- name: band hash join
sql: |
create table t1 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
Copy link
Contributor

@CAJan93 CAJan93 Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some suggestion: Maybe it would also be helpful to add a test that evaluates the query and not just looks at the plan. Inspired by the PSQL tests:

CREATE TABLE t1 (
	c1 int,
	c2 int,
	c3 int,
	c4 timestamptz,
	c5 timestamp,
	c6 varchar,
	c7 varchar,
	c8 int,
	CONSTRAINT t1_pkey PRIMARY KEY (c1)
);
CREATE TABLE t2 (
	c1 int,
	c2 text,
	CONSTRAINT t2_pkey PRIMARY KEY (c1)
);

CREATE TABLE t4 (
	c1 int,
	c2 int,
	c3 text,
	CONSTRAINT t4_pkey PRIMARY KEY (c1)
);

INSERT INTO t4
	SELECT id,
	       id + 1,
	       'AAA'
	FROM generate_series(1, 100, 1) as t(id);
DELETE FROM t4 WHERE c1 % 3 != 0;

INSERT INTO t1
	SELECT id,
	       id % 10,
	       id,
	       '1970-01-01'::timestamptz + ((id % 100) || ' days')::interval,
	       '1970-01-01'::timestamp + ((id % 100) || ' days')::interval,
	       id % 10,
	       id % 10,
	       1
	FROM generate_series(1, 1000, 1) as t(id);

INSERT INTO t2
	SELECT id,
	       'AAA'
	FROM generate_series(1, 100, 1) t(id);

SELECT t1.c1, t2.c1 FROM (SELECT c1 FROM t4 WHERE c1 between 50 and 60) t1 FULL JOIN t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some suggestion: Maybe it would also be helpful to add a test that evaluates the query and not just looks at the plan. Inspired by the PSQL tests:

CREATE TABLE t1 (
	c1 int,
	c2 int,
	c3 int,
	c4 timestamptz,
	c5 timestamp,
	c6 varchar,
	c7 varchar,
	c8 int,
	CONSTRAINT t1_pkey PRIMARY KEY (c1)
);
CREATE TABLE t2 (
	c1 int,
	c2 text,
	CONSTRAINT t2_pkey PRIMARY KEY (c1)
);

CREATE TABLE t4 (
	c1 int,
	c2 int,
	c3 text,
	CONSTRAINT t4_pkey PRIMARY KEY (c1)
);

INSERT INTO t4
	SELECT id,
	       id + 1,
	       'AAA'
	FROM generate_series(1, 100, 1) as t(id);
DELETE FROM t4 WHERE c1 % 3 != 0;

INSERT INTO t1
	SELECT id,
	       id % 10,
	       id,
	       '1970-01-01'::timestamptz + ((id % 100) || ' days')::interval,
	       '1970-01-01'::timestamp + ((id % 100) || ' days')::interval,
	       id % 10,
	       id % 10,
	       1
	FROM generate_series(1, 1000, 1) as t(id);

INSERT INTO t2
	SELECT id,
	       'AAA'
	FROM generate_series(1, 100, 1) t(id);

SELECT t1.c1, t2.c1 FROM (SELECT c1 FROM t4 WHERE c1 between 50 and 60) t1 FULL JOIN t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1;

The example is extremely helpful. Thank you very much.

Copy link
Contributor

@chenzl25 chenzl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thank you for your effort.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants