Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add on conflict clause in create table statement #1860

Merged
merged 21 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
32 changes: 6 additions & 26 deletions docs/sql/commands/sql-create-sink-into.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,16 @@ Use the `CREATE SINK INTO` command to create a sink into RisingWave's table.
## Syntax

```sql
CREATE SINK [ IF NOT EXISTS ] sink_name INTO table_name
CREATE SINK [ IF NOT EXISTS ] sink_name INTO table_name [ ( col_name [ , ... ] ) ]
[FROM sink_from | AS select_query]
```

import rr from '@theme/RailroadDiagram'

export const svg = rr.Diagram(
rr.Stack(
rr.Sequence(
rr.Terminal('CREATE SINK'),
rr.Optional(rr.Terminal('IF NOT EXISTS')),
rr.NonTerminal('sink_name', 'skip'),
rr.Terminal('INTO'),
rr.NonTerminal('table_name'),
rr.ZeroOrMore(
rr.Sequence(
rr.Terminal('FROM'),
rr.NonTerminal('sink_from', 'skip')
),
rr.Sequence(
rr.Terminal('AS'),
rr.NonTerminal('select_query', 'skip')
),
),
),
rr.Terminal(';'),
)
);
## Parameters

<drawer SVG={svg} />
| Parameter or clause | Description|
|-----------|-------------|
|`sink_name` |The name of the sink. If a schema name is given (for example, `CREATE SINK <schema>.<sink> ...`), then the sink is created in the specified schema. Otherwise it is created in the current schema.|
|`col_name` |The corresponding table columns in the sink result. For those table's columns not listed, it will be inserted as the default value defined in the table|

:::note

Expand Down
17 changes: 17 additions & 0 deletions docs/sql/commands/sql-create-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ CREATE TABLE [ IF NOT EXISTS ] table_name (
[ watermark_clause ]
)
[ APPEND ONLY ]
[ ON CONFLICT conflict_action ]
[INCLUDE { header | key | offset | partition | timestamp } [AS <column_name>]]
[ WITH (
connector='connector_name',
Expand Down Expand Up @@ -77,6 +78,7 @@ FORMAT upsert ENCODE AVRO (
|`generation_expression`| The expression for the generated column. For details about generated columns, see [Generated columns](/sql/query-syntax/query-syntax-generated-columns.md).|
|`watermark_clause`| A clause that defines the watermark for a timestamp column. The syntax is `WATERMARK FOR column_name as expr`. For the watermark clause to be valid, the table must be an append-only table. That is, the `APPEND ONLY` option must be specified. This restriction only applies to a table. For details about watermarks, refer to [Watermarks](/transform/watermarks.md).|
|`APPEND ONLY` | When this option is specified, the table will be created as an append-only table. An append-only table cannot have primary keys. `UPDATE` and `DELETE` statements are not valid for append-only tables. Note that append-only tables is a Beta feature. |
|`ON CONFLICT` | Specify the alternative action when the newly inserted record bring a violation of PRIMARY KEY constraint on the table. See Section "PK Conflict Behavior" below for more information. |
|**INCLUDE** clause | Extract fields not included in the payload as separate columns. For more details on its usage, see [`INCLUDE` clause](/ingest/include-clause.md). |
|**WITH** clause |Specify the connector settings here if trying to store all the source data. See the [Data ingestion](/ingest/data-ingestion.md) page for the full list of supported source as well as links to specific connector pages detailing the syntax for each source. |
|**FORMAT** and **ENCODE** options |Specify the data format and the encoding format of the source data. To learn about the supported data formats, see [Data formats](sql-create-source.md#supported-formats). |
Expand All @@ -89,6 +91,21 @@ Please distinguish between the parameters set in the FORMAT and ENCODE options a

RisingWave supports generating watermarks when creating an append-only streaming table. Watermarks are like markers or signals that track the progress of event time, allowing you to process events within their corresponding time windows. For more information on the syntax on how to create a watermark, see [Watermarks](/transform/watermarks.md).

## PK Conflict Behavior
The record with "insert" operation could introduce duplicate records with the same primary key in the table. In that case, an alternative action Specified by the `ON CONFLICT` clause will be adopted. The record can come from Insert DML statement, table's external connectors or sinks into the table([`CREATE SINK INTO`](sql-create-sink-into.md)).

The option could one of the belows, a column not in primary key can be specified as the version column for `DO UPDATE FULL` and `DO UPDATE IF NOT NULL`. When version column is specified, the insert operation will take effect only when the newly inserted value is greater or equal than the exist data record in the table's specified column.
- `DO NOTHING`: ignore the newly inserted record.
- `DO UPDATE FULL [WITH VERSION COLUMN(col_name)]`: replace the exist row in the table. When version column is specified, the exist row will be replaced only when the newly inserted value is greater or equal than the exist data record in the table's specified column.
- `DO UPDATE IF NOT NULL [WITH VERSION COLUMN(col_name)]`: only replace those fields which is not NULL in the inserted row. If version column is specified but the inserted row's version field is NULL, the version column will not take effect.

:::note
The "delete" and "update" operation on the table can not break the primary key constraint on the table. So the option will not take effect for those cases.
:::

:::note
When `DO UPDATE IF NOT NULL` behavior is applied, `DEFAULT` clause is not allowed on the table's columns.
:::
## Examples

The statement below creates a table that has three columns.
Expand Down
107 changes: 107 additions & 0 deletions docs/transform/multiple-table-sink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
---
id: multiple-table-sink
slug: /multiple-table-sink
title: Maintain wide table with table sinks
---
<head>
<link rel="canonical" href="https://docs.risingwave.com/docs/current/multiple-table-sink/" />
</head>

This guide tells a way to maintain a wide table whose columns are comes from different sources. Traditional data warehouse or ETL use a join query to do it. But the streaming join brings issues such as low efficiency and large memory consumption.

In some cases with limitation, use the [CREATE SINK INTO TABLE](/sql/commands/sql-create-sink-into.md) and [ON CONFLICT clause](/sql/commands/sql-create-table.md#pk-conflict-behavior) can save the resources and get high efficiency.

## Merge multiple sinks with the same primary key

:::note
Keep in mind that the `ON CONFLICT` clause does not affect the update or delete events, the sinks should be forced to be append-only, otherwise, the delete or update events from any sink will delete the regarding row.
:::

```sql
CREATE TABLE d1(v1 int, k int primary key);
CREATE TABLE d2(v2 int, k int primary key);
CREATE TABLE d3(v3 int, k int primary key);
CREATE TABLE wide_d(v1 int, v2 int, v3 int, k primary key)
st1page marked this conversation as resolved.
Show resolved Hide resolved
st1page marked this conversation as resolved.
Show resolved Hide resolved
ON CONFLICT DO UPDATE IF NOT NULL;

CREATE SINK sink1 INTO wide_d (v1, k) AS
SELECT v1, k FROM d1
with (
type = 'append-only',
force_append_only = 'true',
);
CREATE SINK sink2 INTO wide_d (v2, k) AS
SELECT v2, k FROM d2
with (
type = 'append-only',
force_append_only = 'true',
);
CREATE SINK sink3 INTO wide_d (v3,k) AS
SELECT v3, k FROM d3
with (
type = 'append-only',
force_append_only = 'true',
);
```

## Enrich data with foreign keys in Star/Snowflake Schema Model

With star schema, the data is constructed with a central fact table surrounded by several related dimension tables. Each dimension table is joined to the fact table through a foreign key relationship. With the good properties that the join key is the primary key of the dimension tables, we can rewrite the query as a series of sink into table.

```sql
CREATE TABLE fact(pk int primary key, k1 int, k2 int, k3 int);
CREATE TABLE d1(pk int primary key, v int);
CREATE TABLE d2(pk int primary key, v int);
CREATE TABLE d3(pk int primary key, v int);

CREATE TABLE wide_fact(pk int primary key, v1 int, v2 int, v3 int)
ON CONFLICT DO UPDATE IF NOT NULL;

/* the main sink is not force-append-only to control if the record exists*/
CREATE SINK fact_sink INTO wide_fact (pk) AS
SELECT pk FROM fact;

CREATE SINK sink1 INTO wide_fact (pk, v1) AS
SELECT fact.pk, d1.v
FROM fact JOIN d1 ON fact.k1 = d1.pk
with (
type = 'append-only',
force_append_only = 'true',
);

CREATE SINK sink2 INTO wide_fact (pk, v2) AS
SELECT fact.pk, d2.v
FROM fact JOIN d2 ON fact.k2 = d2.pk
with (
type = 'append-only',
force_append_only = 'true',
);

CREATE SINK sink3 INTO wide_fact (pk, v3) AS
SELECT fact.pk, d3.v
FROM fact JOIN d3 ON fact.k3 = d3.pk
with (
type = 'append-only',
force_append_only = 'true',
);
```

The example above and the following SQL with left join operation are completely equivalent.

```sql
CREATE MATERIALIZED VIEW wide_fact AS
SELECT fact.pk, d1.v v1, d2.v v2, d3.v v3
FROM fact
LEFT JOIN d1 ON fact.k1 = d1.pk
LEFT JOIN d2 ON fact.k2 = d2.pk
LEFT JOIN d3 ON fact.k3 = d3.pk
```

But maintaining wide table with table sinks can save the resources and get high efficiency

<img
src={require('../images/maintain_wide_table_with_table_sink.drawio.png').default}
alt="The streaming state when maintain wide table with table sinks"
/>

Furthermore, for the large dimension table, the [Temporal Join](/sql/query-syntax/query-syntax-join-clause.md) can be used as the partial join to reduce the streaming state and improve performance.
2 changes: 2 additions & 0 deletions docs/troubleshoot/troubleshoot-high-latency.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ For example, the following figure shows a materialized view with extremely high
alt="An example of extremely high latency"
/>

To solve the issue, consider rewriting the SQL query to reduce join amplification, such as using better equal conditions on the problematic join to reduce the number of matched rows. [Maintain wide table with table sinks](/transform/multiple-table-sink.md) might help.

At the same time, a log of `high_join_amplification` with the problematic join keys will be printed, such as

```
Expand Down
5 changes: 5 additions & 0 deletions sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,11 @@ const sidebars = {
id: "transform/use-dbt",
label: "Use dbt for data transformations",
},
{
type: "doc",
id: "transform/multiple-table-sink",
label: "Maintain wide table with table sinks",
},
{
type: "doc",
id: "transform/subscription",
Expand Down
Loading