Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add serial type #45

Merged
merged 3 commits into from Mar 29, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 48 additions & 0 deletions rfcs/0045-serial-type.md
@@ -0,0 +1,48 @@
---
feature: serial_type
authors:
- "TennyZhuang"
start_date: "2022/07/27"
---

# Serial type and consistent hash based RowID generator

## Summary

## Motivation

In the current implementation, for each append-only stream, we generate an ID for each row using the snowflake algorithm and use the hidden column as the source stream key.

We have another assumption that each operator should have a known distribution, either hash sharded by some columns or singleton. In the source, the distribution is `HashShard([_row_id])`.

This introduces an unnecessary overhead that we have to insert an exchange after each source, and due to the following reasons, the exchange is very expensive:

1. In most queries, the number of exchanges is less than 5 (significant ETL workload).
2. The source has the most significant traffic in most queries, so the first exchange will be much more expensive than the last exchange after filtering or aggregation.

The hidden `_row_id` field is also generated by ourselves, but doesn't satisfied the consistent-hash rule, so we have to insert an meaningless exchange after it.

To reduce the unnecessary cost, there are two possible solutions:

1. Allow `UnknownShard(some_columns)` in our graph and our relational table to represent that the data is already sharded by its distribution key, but not hash sharded. (Our current `row_id` is already sharded because a centralized meta service uniquely generates it).
2. **The source executors always generate `_row_id` that has already satisfied `HashShard[_row_id]`**.

Both solutions are OK, but the first one may affect more components, e.g. the materialized view is not shared by the consistent hash, so it will be hard in consistent hash-based compaction. Therefore, we'll focus on the second solution in this proposal.

## Design

### Embed `vnode` in `_row_id`

The critical point of the solution is that we should be able to extract the vnode from the row_id, so an easy way is to replace the 10-bit worker-id with vnode-id.

```plain
| timestamp | vnode id | sequence |
|-----------|----------|----------|
| 41 bits | 10 bits | 12 bits |
```

I'm not sure if 10 bits is enough, but there are several solutions, e.g. we can remove the timestamp part and dump the sequence on the last barrier to the state of the source operators.

### The new type: Serial

We may need to refactor the dispatch logic for `_row_id` so that it extracts the vnode for it. The easy way is to add a new type Serial for the hidden `_row_id`, whose corresponding physical type is int64 except that we will rewrite the dispatch logic.