New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC: DAG Structured Query Plan #28
Conversation
- CTE. At the binder/planner stage, we can create a `Share` for the table expression and reuse it. | ||
- Subquery unnesting. We can calculate the domain by creating a `Share` operator of the LHS of the apply operator. | ||
- Same source operator appears multiple times in a query. Use Share operator at binder/planner or at end of optimizer are both ok. | ||
- Common sub-plan detection. We can use a field `digest` for each sub-plan and if two sub-plan have the same `digest` it means they are probably the same sub-plan. `digest` is just like a hashcode. We also need to have a `deepEqual` method for each plan to compare with another one. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC we can implement Hash
and Eq
for them directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends. Every plan node has its plan id by default, so if we put them into a HashSet
or other collections, they will be regarded as different items. Once we overwrite this method, the behavior will change.
### Execution | ||
|
||
We only discuss the execution for streaming query here for the reason that batch query needs to do a materialization/buffering which is complicated and hard to tell if DAG plan execution is better or not. `Share` operator has multiple downstream and we can dispatch its data to each downstream just like `ChainNode`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, executing a graph with multiple-edges is currently not a simple deal in the compute node. 🥵
https://www.notion.so/risingwave-labs/Multiple-Edges-in-the-Stream-Graph-f0ab595523d74823a39edaebdea2be16
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I remember this issue. Based on this RFC, if a node has multiple downstream, it must be the Share
node. The fragmenter can split the Share
node just like it splits the exchange node. In this way, we can separate the upstream and the downstream into different fragments. So the last thing we need to deal with is ensuring all the downstreams of the Share
Node are in different fragments. If there are multiple edge connected to the same downstream fragment, use shuffle to separate them.
|
||
For the batch query, DAG means we need to read the input multiple times which inevitably leads to buffer or materialization. The cost of DAG is determined by how many times it has been read. Normally, the first time has a higher cost than later ones, because of the materialization. It is hard to tell whether to use DAG or not for batch query, I think we can always convert the DAG back to the tree for batch executor. | ||
|
||
For the streaming query, it seems not a big deal. We support buffering and dispatch naturally. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting observation. Indeed, for streaming, execution of DAG plan doesn't introduce any additional cost. No extra buffering needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Completed at risingwavelabs/risingwave#6955
Is there anything need to be edited? If not, let's merge it.
No description provided.