Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add on conflict clause in create table statement #1860

Merged
merged 21 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
32 changes: 6 additions & 26 deletions docs/sql/commands/sql-create-sink-into.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,16 @@ Use the `CREATE SINK INTO` command to create a sink into RisingWave's table.
## Syntax

```sql
CREATE SINK [ IF NOT EXISTS ] sink_name INTO table_name
CREATE SINK [ IF NOT EXISTS ] sink_name INTO table_name [ ( col_name [ , ... ] ) ]
[FROM sink_from | AS select_query]
```

import rr from '@theme/RailroadDiagram'

export const svg = rr.Diagram(
rr.Stack(
rr.Sequence(
rr.Terminal('CREATE SINK'),
rr.Optional(rr.Terminal('IF NOT EXISTS')),
rr.NonTerminal('sink_name', 'skip'),
rr.Terminal('INTO'),
rr.NonTerminal('table_name'),
rr.ZeroOrMore(
rr.Sequence(
rr.Terminal('FROM'),
rr.NonTerminal('sink_from', 'skip')
),
rr.Sequence(
rr.Terminal('AS'),
rr.NonTerminal('select_query', 'skip')
),
),
),
rr.Terminal(';'),
)
);
## Parameters

<drawer SVG={svg} />
| Parameter or clause | Description|
|-----------|-------------|
|`sink_name` |The name of the sink. If a schema name is given (for example, `CREATE SINK <schema>.<sink> ...`), then the sink is created in the specified schema. Otherwise it is created in the current schema.|
|`col_name` |The corresponding table columns in the sink result. For those columns not listed, it will be inserted as the default value defined in the table.|

:::note

Expand Down
24 changes: 24 additions & 0 deletions docs/sql/commands/sql-create-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ CREATE TABLE [ IF NOT EXISTS ] table_name (
[ watermark_clause ]
)
[ APPEND ONLY ]
[ ON CONFLICT conflict_action ]
[INCLUDE { header | key | offset | partition | timestamp } [AS <column_name>]]
[ WITH (
connector='connector_name',
Expand Down Expand Up @@ -77,6 +78,7 @@ FORMAT upsert ENCODE AVRO (
|`generation_expression`| The expression for the generated column. For details about generated columns, see [Generated columns](/sql/query-syntax/query-syntax-generated-columns.md).|
|`watermark_clause`| A clause that defines the watermark for a timestamp column. The syntax is `WATERMARK FOR column_name as expr`. For the watermark clause to be valid, the table must be an append-only table. That is, the `APPEND ONLY` option must be specified. This restriction only applies to a table. For details about watermarks, refer to [Watermarks](/transform/watermarks.md).|
|`APPEND ONLY` | When this option is specified, the table will be created as an append-only table. An append-only table cannot have primary keys. `UPDATE` and `DELETE` statements are not valid for append-only tables. Note that append-only tables is a Beta feature. |
|`ON CONFLICT` | Specify the alternative action when the newly inserted record brings a violation of PRIMARY KEY constraint on the table. See [PK conflict behavior](#pk-conflict-behavior) below for more information. |
|**INCLUDE** clause | Extract fields not included in the payload as separate columns. For more details on its usage, see [`INCLUDE` clause](/ingest/include-clause.md). |
|**WITH** clause |Specify the connector settings here if trying to store all the source data. See the [Data ingestion](/ingest/data-ingestion.md) page for the full list of supported source as well as links to specific connector pages detailing the syntax for each source. |
|**FORMAT** and **ENCODE** options |Specify the data format and the encoding format of the source data. To learn about the supported data formats, see [Data formats](sql-create-source.md#supported-formats). |
Expand All @@ -89,6 +91,28 @@ Please distinguish between the parameters set in the FORMAT and ENCODE options a

RisingWave supports generating watermarks when creating an append-only streaming table. Watermarks are like markers or signals that track the progress of event time, allowing you to process events within their corresponding time windows. For more information on the syntax on how to create a watermark, see [Watermarks](/transform/watermarks.md).

## PK conflict behavior

The record with insert operation could introduce duplicate records with the same primary key in the table. In that case, an alternative action specified by the `ON CONFLICT` clause will be adopted. The record can come from Insert DML statement, external connectors of the table, or sinks into the table [`CREATE SINK INTO`](sql-create-sink-into.md).

The action could one of the following. A column not in the primary key can be specified as the version column for `DO UPDATE FULL` and `DO UPDATE IF NOT NULL`. When version column is specified, the insert operation will take effect only when the newly inserted value is greater or equal than the exist data record in the table's specified column.

- `DO NOTHING`: Ignore the newly inserted record.
- `DO UPDATE FULL [WITH VERSION COLUMN(col_name)]`: Replace the existing row in the table. When version column is specified, the existing row will be replaced only when the newly inserted value is greater or equal than the existing data record in the table's specified column.
- `DO UPDATE IF NOT NULL [WITH VERSION COLUMN(col_name)]`: Only replace those fields which is not NULL in the inserted row. If version column is specified but the inserted row's version field is NULL, the version column will not take effect.

:::note Beta Feature
Version column is currently in Beta. Please contact us if you encounter any issues or have feedback.
:::

:::note
The delete and update operation on the table cannot break the primary key constraint on the table, so the option will not take effect for those cases.
:::

:::note
When `DO UPDATE IF NOT NULL` behavior is applied, `DEFAULT` clause is not allowed on the table's columns.
:::

## Examples

The statement below creates a table that has three columns.
Expand Down
107 changes: 107 additions & 0 deletions docs/transform/multiple-table-sink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
---
id: multiple-table-sink
slug: /multiple-table-sink
title: Maintain wide table with table sinks
---
<head>
<link rel="canonical" href="https://docs.risingwave.com/docs/current/multiple-table-sink/" />
</head>

This guide introduces how to maintain a wide table whose columns come from different sources. Traditional data warehouses or ETL use a join query for this purpose. However, streaming join brings issues such as low efficiency and high memory consumption.

In some cases with limitation, use the [CREATE SINK INTO TABLE](/sql/commands/sql-create-sink-into.md) and [ON CONFLICT clause](/sql/commands/sql-create-table.md#pk-conflict-behavior) can save the resources and achieve high efficiency.

## Merge multiple sinks with the same primary key

```sql
CREATE TABLE d1(v1 int, k int primary key);
CREATE TABLE d2(v2 int, k int primary key);
CREATE TABLE d3(v3 int, k int primary key);
CREATE TABLE wide_d(v1 int, v2 int, v3 int, k int primary key)
ON CONFLICT DO UPDATE IF NOT NULL;

CREATE SINK sink1 INTO wide_d (v1, k) AS
SELECT v1, k FROM d1
with (
type = 'append-only',
force_append_only = 'true',
);
CREATE SINK sink2 INTO wide_d (v2, k) AS
SELECT v2, k FROM d2
with (
type = 'append-only',
force_append_only = 'true',
);
CREATE SINK sink3 INTO wide_d (v3,k) AS
SELECT v3, k FROM d3
with (
type = 'append-only',
force_append_only = 'true',
);
```

:::note
Keep in mind that the `ON CONFLICT` clause does not affect the update or delete events, the sinks should be forced to be append-only. Otherwise, the delete or update events from any sink will delete the regarding row.
:::

## Enrich data with foreign keys in Star/Snowflake schema model

With star schema, the data is constructed with a central fact table surrounded by several related dimension tables. Each dimension table is joined to the fact table through a foreign key relationship. Given that the join key is the primary key of the dimension tables, we can rewrite the query as a series of sink into table.

```sql
CREATE TABLE fact(pk int primary key, k1 int, k2 int, k3 int);
CREATE TABLE d1(pk int primary key, v int);
CREATE TABLE d2(pk int primary key, v int);
CREATE TABLE d3(pk int primary key, v int);

CREATE TABLE wide_fact(pk int primary key, v1 int, v2 int, v3 int)
ON CONFLICT DO UPDATE IF NOT NULL;

/* the main sink is not force-append-only to control if the record exists*/
CREATE SINK fact_sink INTO wide_fact (pk) AS
SELECT pk FROM fact;

CREATE SINK sink1 INTO wide_fact (pk, v1) AS
SELECT fact.pk, d1.v
FROM fact JOIN d1 ON fact.k1 = d1.pk
with (
type = 'append-only',
force_append_only = 'true',
);

CREATE SINK sink2 INTO wide_fact (pk, v2) AS
SELECT fact.pk, d2.v
FROM fact JOIN d2 ON fact.k2 = d2.pk
with (
type = 'append-only',
force_append_only = 'true',
);

CREATE SINK sink3 INTO wide_fact (pk, v3) AS
SELECT fact.pk, d3.v
FROM fact JOIN d3 ON fact.k3 = d3.pk
with (
type = 'append-only',
force_append_only = 'true',
);
```

The example above and the following SQL with left join operation are completely equivalent.

```sql
CREATE MATERIALIZED VIEW wide_fact AS
SELECT fact.pk, d1.v v1, d2.v v2, d3.v v3
FROM fact
LEFT JOIN d1 ON fact.k1 = d1.pk
LEFT JOIN d2 ON fact.k2 = d2.pk
LEFT JOIN d3 ON fact.k3 = d3.pk
```

But maintaining wide table with table sinks can save the resources and achieve high efficiency.

<img
src={require('../images/maintain_wide_table_with_table_sink.drawio.png').default}
alt="The streaming state when maintain wide table with table sinks"
/>

Furthermore, for the large dimension table, we can use [Temporal Join](/sql/query-syntax/query-syntax-join-clause.md) as the partial join to reduce the streaming state and improve performance.
2 changes: 2 additions & 0 deletions docs/troubleshoot/troubleshoot-high-latency.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ For example, the following figure shows a materialized view with extremely high
alt="An example of extremely high latency"
/>

To solve the issue, consider rewriting the SQL query to reduce join amplification, such as using better equal conditions on the problematic join to reduce the number of matched rows. See [Maintain wide table with table sinks](/transform/multiple-table-sink.md) for details.

At the same time, a log of `high_join_amplification` with the problematic join keys will be printed, such as

```
Expand Down
5 changes: 5 additions & 0 deletions sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,11 @@ const sidebars = {
id: "transform/use-dbt",
label: "Use dbt for data transformations",
},
{
type: "doc",
id: "transform/multiple-table-sink",
label: "Maintain wide table with table sinks",
},
{
type: "doc",
id: "transform/subscription",
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
32 changes: 6 additions & 26 deletions versioned_docs/version-1.9/sql/commands/sql-create-sink-into.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,16 @@ Use the `CREATE SINK INTO` command to create a sink into RisingWave's table.
## Syntax

```sql
CREATE SINK [ IF NOT EXISTS ] sink_name INTO table_name
CREATE SINK [ IF NOT EXISTS ] sink_name INTO table_name [ ( col_name [ , ... ] ) ]
[FROM sink_from | AS select_query]
```

import rr from '@theme/RailroadDiagram'

export const svg = rr.Diagram(
rr.Stack(
rr.Sequence(
rr.Terminal('CREATE SINK'),
rr.Optional(rr.Terminal('IF NOT EXISTS')),
rr.NonTerminal('sink_name', 'skip'),
rr.Terminal('INTO'),
rr.NonTerminal('table_name'),
rr.ZeroOrMore(
rr.Sequence(
rr.Terminal('FROM'),
rr.NonTerminal('sink_from', 'skip')
),
rr.Sequence(
rr.Terminal('AS'),
rr.NonTerminal('select_query', 'skip')
),
),
),
rr.Terminal(';'),
)
);
## Parameters

<drawer SVG={svg} />
| Parameter or clause | Description|
|-----------|-------------|
|`sink_name` |The name of the sink. If a schema name is given (for example, `CREATE SINK <schema>.<sink> ...`), then the sink is created in the specified schema. Otherwise it is created in the current schema.|
|`col_name` |The corresponding table columns in the sink result. For those columns not listed, it will be inserted as the default value defined in the table.|

:::note

Expand Down
24 changes: 24 additions & 0 deletions versioned_docs/version-1.9/sql/commands/sql-create-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ CREATE TABLE [ IF NOT EXISTS ] table_name (
[ watermark_clause ]
)
[ APPEND ONLY ]
[ ON CONFLICT conflict_action ]
[INCLUDE { header | key | offset | partition | timestamp } [AS <column_name>]]
[ WITH (
connector='connector_name',
Expand Down Expand Up @@ -77,6 +78,7 @@ FORMAT upsert ENCODE AVRO (
|`generation_expression`| The expression for the generated column. For details about generated columns, see [Generated columns](/sql/query-syntax/query-syntax-generated-columns.md).|
|`watermark_clause`| A clause that defines the watermark for a timestamp column. The syntax is `WATERMARK FOR column_name as expr`. For the watermark clause to be valid, the table must be an append-only table. That is, the `APPEND ONLY` option must be specified. This restriction only applies to a table. For details about watermarks, refer to [Watermarks](/transform/watermarks.md).|
|`APPEND ONLY` | When this option is specified, the table will be created as an append-only table. An append-only table cannot have primary keys. `UPDATE` and `DELETE` statements are not valid for append-only tables. Note that append-only tables is a Beta feature. |
|`ON CONFLICT` | Specify the alternative action when the newly inserted record brings a violation of PRIMARY KEY constraint on the table. See [PK conflict behavior](#pk-conflict-behavior) below for more information. |
|**INCLUDE** clause | Extract fields not included in the payload as separate columns. For more details on its usage, see [`INCLUDE` clause](/ingest/include-clause.md). |
|**WITH** clause |Specify the connector settings here if trying to store all the source data. See the [Data ingestion](/ingest/data-ingestion.md) page for the full list of supported source as well as links to specific connector pages detailing the syntax for each source. |
|**FORMAT** and **ENCODE** options |Specify the data format and the encoding format of the source data. To learn about the supported data formats, see [Data formats](sql-create-source.md#supported-formats). |
Expand All @@ -89,6 +91,28 @@ Please distinguish between the parameters set in the FORMAT and ENCODE options a

RisingWave supports generating watermarks when creating an append-only streaming table. Watermarks are like markers or signals that track the progress of event time, allowing you to process events within their corresponding time windows. For more information on the syntax on how to create a watermark, see [Watermarks](/transform/watermarks.md).

## PK conflict behavior

The record with insert operation could introduce duplicate records with the same primary key in the table. In that case, an alternative action specified by the `ON CONFLICT` clause will be adopted. The record can come from Insert DML statement, external connectors of the table, or sinks into the table [`CREATE SINK INTO`](sql-create-sink-into.md).

The action could one of the following. A column not in the primary key can be specified as the version column for `DO UPDATE FULL` and `DO UPDATE IF NOT NULL`. When version column is specified, the insert operation will take effect only when the newly inserted value is greater or equal than the exist data record in the table's specified column.

- `DO NOTHING`: Ignore the newly inserted record.
- `DO UPDATE FULL [WITH VERSION COLUMN(col_name)]`: Replace the existing row in the table. When version column is specified, the existing row will be replaced only when the newly inserted value is greater or equal than the existing data record in the table's specified column.
- `DO UPDATE IF NOT NULL [WITH VERSION COLUMN(col_name)]`: Only replace those fields which is not NULL in the inserted row. If version column is specified but the inserted row's version field is NULL, the version column will not take effect.

:::note Beta Feature
Version column is currently in Beta. Please contact us if you encounter any issues or have feedback.
:::

:::note
The delete and update operation on the table cannot break the primary key constraint on the table, so the option will not take effect for those cases.
:::

:::note
When `DO UPDATE IF NOT NULL` behavior is applied, `DEFAULT` clause is not allowed on the table's columns.
:::

## Examples

The statement below creates a table that has three columns.
Expand Down
Loading
Loading