Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: use include ... as ... to ingest more message parts #79

Merged
merged 4 commits into from Jan 12, 2024

Conversation

tabVersion
Copy link
Contributor

No description provided.

* If `as` name is not specified, a connector-component naming template will be applied
* For connector kafka and component key, the derived message key column name is `_rw_kafka_key`.
* The default type for message key column is `bytea`. The priority of the type definition is:
`key encode` > infer from `format ... encode ...` > default type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

infer from format ... encode ...

What dose it mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is a schema registry and we can infer the key schema from it.

* For all connectors with `format upsert`, RisingWave derives the column as primary key to perform upsert semantic.
* An exception: for `format upsert encode json`, RisingWave allows the PK to be part of the message payload instead of
the message key. This behavior is more like `format plain encode json` with PK constraint.
* **To Be Discussed**: whether we can afford to perform a breaking change for the above exception.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even more, We need a method to define pk from the part of the whole json message key. Not sure if it can be the default behavior when user defining the primary key constraint.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • An exception: for format upsert encode json, RisingWave allows the PK to be part of the message payload instead of the message key. This behavior is more like format plain encode json with PK constraint.

IIRC, according to our previous discussion, this will be format insert + normal PK definition i.e. primary key (foo, bar).

The exception looks very inconsistent and I'd like to get rid of it.

Copy link
Contributor

@st1page st1page Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some cases, user want to determine the primary key of the table for some use cases such as

  • better batching/ serving performance
  • streaming temporal join

And in those cases, user can make sure the primary keys column is exactly the message key in Kafka.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If format upsert is specific to MQ with delete tombsonte (null value), I think the PK must be the MQ message key and cannot be a field in the value.

Copy link
Contributor

@fuyufjh fuyufjh Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think either of the following behavior is acceptable:

  1. All fields in message key are used as PK, and empty message body will be considered as delete tomb
  2. All/Some fields in message key/body are used as PK, and empty message body will not be considered as delete tomb.

1 corresponds to upsert semantics, while 2 corresponds to plain(aka.insert) semantics. Whether to consider empty message as delete tomb is the key difference between upsert and plain.

Example:

/* message key:  {"a":1,"b":2} 
   message body: {"a":1,"b":2,"c":12,"d":34} */

-- case 1 --
create table t ( a int, b int, c int, d int)
format upsert
key encode json include key as pk;
/* NOTE: The output table will contain a `pk jsonb` column */

-- case 2 --
create table t ( a int, b int, c int, d int, primary key (a,b))
format plain 
[key encode json include key as pk] -- optional!

Copy link
Contributor

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM

Comment on lines +52 to +54
* **Important**: `include key` is required for `format upsert` and RisingWave will use the key column as one and
only primary key to perform upsert semantic. It does not allow to specify multiple columns as primary key
even if they are part of the key.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So,

  • if I specify key encode avro and include key as x, x will be struct
  • if I specify key encode json and include key as x, x will be JSONB

Right?

* For all connectors with `format upsert`, RisingWave derives the column as primary key to perform upsert semantic.
* An exception: for `format upsert encode json`, RisingWave allows the PK to be part of the message payload instead of
the message key. This behavior is more like `format plain encode json` with PK constraint.
* **To Be Discussed**: whether we can afford to perform a breaking change for the above exception.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • An exception: for format upsert encode json, RisingWave allows the PK to be part of the message payload instead of the message key. This behavior is more like format plain encode json with PK constraint.

IIRC, according to our previous discussion, this will be format insert + normal PK definition i.e. primary key (foo, bar).

The exception looks very inconsistent and I'd like to get rid of it.


| Allowed Components | Default Type | Note |
|--------------------|------------------------------------------|----------------------------------------------------------------------------------|
| key | `bytea` | Allow overwritten by `encode` and `key encode`. Refer to `Record::partition_key` |

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: partition key in kinesis is always a unicode string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I make it bytea here to make it consistent with other connectors. There are planned reactors relying on the unified type in source state table.

|--------------------|--------------|-------------------------------------------------------------------------------------------|
| key | `bytea` | Allow overwritten by `encode` and `key encode`. Refer to `MessageMetadata::partition_key` |

More components are available at [here](https://docs.rs/pulsar/latest/pulsar/message/proto/struct.MessageMetadata.html).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we decide which metadata fields to expose? Do we intend to expose similar (not sure if same) concepts from different connectors using the same word? For example

  • kafka key, pulsar partition_key, kinesis partition_key
  • kafka offset, pulsar sequence_id, kinesis sequence_number
    • i64 vs u64 vs string
  • kafka timestamp, pulsar publish_time, kinesis approximate_arrival_timestamp
    • There is also pulsar event_time
  • kafka headers, pulsar properties
    • kafka headers is struct<varchar, bytea>[] but pulsar properties is struct<varchar, varchar>[]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we intend to expose similar (not sure if same) concepts from different connectors using the same word?

I think it's not necessary to unify them. If there is ambiguity, I tend to let them have different names.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: now we do want to unify partition and offset for them.. @tabVersion Can you add the rationale for that in the RFC? 😇

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we are going to unify the partition and offset column type for all connectors.
The basic reason is that we want to impl a source exec level throttling and requires a chunk to be cut anywhere with maintaining the offset info. Besides, the source backfill feature also rely on the behavior to tell when to end the backfill stage.

| timestamp | `timestamp with time zone` (i64 in millis) | Refer to `CreateTime` rather than `LogAppendTime` |
| partition | `i64` | The message is from which partition |
| offset | `i64` | The offset in the partition |
| header | `struct<varchar, bytea>[]` | KV pairs along with message |

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plural: headers

@xiangjinwu
Copy link

xiangjinwu commented Nov 27, 2023

Would also like to mention an alternative syntax:

create source/table (
    ..,
    <key-column> bytea FROM SOURCE key,
    <timestamp-column> timestamptz FROM SOURCE timestamp,
    <headers-column> struct<key varchar, value bytea>[] FROM SOURCE headers,
    primary key ( <key-column> )
)
with ( ... )

Pros and cons of this syntax, compared to include ... as ...:

  • All columns are listed in one place, similar to DEFAULT and GENERATED columns.
  • Users are required to spell out the data type.
  • (welcome to add more)

@neverchanje
Copy link

Regarding the upgrade, please automatically add
include key as '_rw_key'
to old tables so that they won't need recreation.

@st1page
Copy link
Contributor

st1page commented Nov 27, 2023

Would also like to mention an alternative syntax:

create source/table (
    ..,
    <key-column> bytea FROM SOURCE key,
    <timestamp-column> timestamptz FROM SOURCE timestamp,
    <headers-column> struct<key varchar, value bytea>[] FROM SOURCE headers,
    primary key ( <key-column> )
)
with ( ... )

Pros and cons of this syntax, compared to include ... as ...:

  • All columns are listed in one place, similar to DEFAULT and GENERATED columns.
  • Users are required to spell out the data type.
  • (welcome to add more)

Currently, we do not allow users to define schema in column clauses when using schema registry or schema file
Another method is using the "star" grammer
risingwavelabs/risingwave#12209

@fuyufjh
Copy link
Contributor

fuyufjh commented Dec 4, 2023

FYI. Feel free to join the slack channel #wg-include-key-as

update offset and partition column type to make it consistent with existing source impl
rfcs/0079-include-key-as.md Outdated Show resolved Hide resolved
Co-authored-by: xxchan <xxchan22f@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants