From a7c4aa2a3ca19a41279673751705c8b67d3679a6 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Tue, 14 Nov 2023 16:56:06 +0800 Subject: [PATCH 1/4] feat: Add shuffle for iceberg sink --- rfcs/0077-iceberg-sink-shuffle.md | 67 +++++++++++++++++++ .../after_shuffle.svg | 21 ++++++ .../before_shuffle.svg | 21 ++++++ 3 files changed, 109 insertions(+) create mode 100644 rfcs/0077-iceberg-sink-shuffle.md create mode 100644 rfcs/images/0077-iceberg-sink-shuffle/after_shuffle.svg create mode 100644 rfcs/images/0077-iceberg-sink-shuffle/before_shuffle.svg diff --git a/rfcs/0077-iceberg-sink-shuffle.md b/rfcs/0077-iceberg-sink-shuffle.md new file mode 100644 index 00000000..923780e4 --- /dev/null +++ b/rfcs/0077-iceberg-sink-shuffle.md @@ -0,0 +1,67 @@ +--- +feature: Improve shuffle in iceberg sink +authors: + - "Renjie Liu" +start_date: "2023/11/8" +--- + +# Shuffle according to iceberg's partition spec + +## Motivation + +Apache iceberg allows users to define partition spec[1] for a table. The partition spec defines how data is partitioned and stored in the table. For example, a table can be partitioned by date and hour. Also, it's required that each data file in apache iceberg can contain only one partition value. In our current iceberg sink implementation, we don't do any shuffle. This means that if the table is partitioned by bucket, we will have a lot of small files in the table. This is not good for performance. We should shuffle the data according to the partition spec. + +## Design + +We will add a shuffle operator before sending data to iceberg sink. For example, let's assume the iceberg table is partitioned by following partition spec: + +```sql +CREATE TABLE prod.db.sample ( + id bigint, + data string, + category string, + ts timestamp) +USING iceberg +PARTITIONED BY (bucket(16, id), years(ts)) +``` + +If we don't have the shuffle operator, the writing process will be like following: + +![image](images/0077-iceberg-sink-shuffle/before_shuffle.svg) + +After we add the shuffle operator, the writing process will be like following: + +![image](images/0077-iceberg-sink-shuffle/after_shuffle.svg) + +This way we can reduce the number of data files in the table, which helps to improve the read performance of iceberg. + +### Implementation + +There are two possible implementations for this feature: + +1. Add a new `IcebergPartitionOperator`, which calculates the partition value of each record and adds it to the record, then asking the dispatcher executor to do hash shuffle according to the partition value. The plan is like following: + +```mermaid +flowchart TD + A[Source] --> B[IcebergPartitionOperator] + B --> C[DispatcherExecutor] + C -->|partition_value| D[IcebergSink] +``` + +2. Extend dispatcher executor to support iceberg partition shuffle. The plan is like following: + +```mermaid +flowchart TD + A[Source] --> B[DispatcherExecutor] + B -->|IcebergDispatcher| C[IcebergSink] +``` + +I prefer approach 1 since it's more extensible and does not change too much current shuffle implementation, e.g. other lakehouse sinks (delta lake) could have similar approach. + +### Caveats + +When iceberg partition spec only contains range partitions(e.g. year, month, day), we don't need to do this shuffle, otherwise all traffic will go to same sink. + +## References + +1. https://iceberg.apache.org/spec/#partitioning \ No newline at end of file diff --git a/rfcs/images/0077-iceberg-sink-shuffle/after_shuffle.svg b/rfcs/images/0077-iceberg-sink-shuffle/after_shuffle.svg new file mode 100644 index 00000000..0d372796 --- /dev/null +++ b/rfcs/images/0077-iceberg-sink-shuffle/after_shuffle.svg @@ -0,0 +1,21 @@ + + +  + + + + + SourceIcebergSink2023, bucket 12023, bucket 152023, bucket 16IcebergShuffleSourceIcebergSinkIcebergShuffleSourceIcebergSinkIcebergShuffle........ \ No newline at end of file diff --git a/rfcs/images/0077-iceberg-sink-shuffle/before_shuffle.svg b/rfcs/images/0077-iceberg-sink-shuffle/before_shuffle.svg new file mode 100644 index 00000000..432d2e69 --- /dev/null +++ b/rfcs/images/0077-iceberg-sink-shuffle/before_shuffle.svg @@ -0,0 +1,21 @@ + + +  + + + + + SourceIcebergSink2023, bucket 12023, bucket 22023, bucket 32023, bucket 16.....SourceIcebergSink2023, bucket 12023, bucket 22023, bucket 32023, bucket 16...........SourceIcebergSink2023, bucket 12023, bucket 22023, bucket 32023, bucket 16..... \ No newline at end of file From 8a594f6bc457123a7557f0443c3a15832d7045be Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Wed, 15 Nov 2023 13:25:55 +0800 Subject: [PATCH 2/4] Some complements --- rfcs/0077-iceberg-sink-shuffle.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/rfcs/0077-iceberg-sink-shuffle.md b/rfcs/0077-iceberg-sink-shuffle.md index 923780e4..b12a262e 100644 --- a/rfcs/0077-iceberg-sink-shuffle.md +++ b/rfcs/0077-iceberg-sink-shuffle.md @@ -45,9 +45,11 @@ There are two possible implementations for this feature: flowchart TD A[Source] --> B[IcebergPartitionOperator] B --> C[DispatcherExecutor] - C -->|partition_value| D[IcebergSink] + C -->|"partition_value"| D[IcebergSink] ``` +The `IcebergPartitionExecutor` will be a `StreamExecutor`, which calculates the partition value of each record and adds it to the record. The `DispatcherExecutor` doesn't need to change much, and do hash shuffle according to the partition value. + 2. Extend dispatcher executor to support iceberg partition shuffle. The plan is like following: ```mermaid @@ -56,11 +58,14 @@ flowchart TD B -->|IcebergDispatcher| C[IcebergSink] ``` +In this approach we need to add an extra `IcebergDispatcher` to dispatcher executor. The `IcebergDispatcher` will calculate the partition value of each record and do hash shuffle according to the partition value. + I prefer approach 1 since it's more extensible and does not change too much current shuffle implementation, e.g. other lakehouse sinks (delta lake) could have similar approach. ### Caveats When iceberg partition spec only contains range partitions(e.g. year, month, day), we don't need to do this shuffle, otherwise all traffic will go to same sink. +Also we need to reject upsert queries where partition columns is not a subset of the `stream_pk`. ## References From 17915082a4527e2cdd0536b5c331cec6674885da Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Thu, 16 Nov 2023 10:45:29 +0800 Subject: [PATCH 3/4] Add more details to explain cases better --- rfcs/0077-iceberg-sink-shuffle.md | 157 +++++++++++++++++++++++++++--- 1 file changed, 141 insertions(+), 16 deletions(-) diff --git a/rfcs/0077-iceberg-sink-shuffle.md b/rfcs/0077-iceberg-sink-shuffle.md index b12a262e..70d7a2e8 100644 --- a/rfcs/0077-iceberg-sink-shuffle.md +++ b/rfcs/0077-iceberg-sink-shuffle.md @@ -9,41 +9,171 @@ start_date: "2023/11/8" ## Motivation -Apache iceberg allows users to define partition spec[1] for a table. The partition spec defines how data is partitioned and stored in the table. For example, a table can be partitioned by date and hour. Also, it's required that each data file in apache iceberg can contain only one partition value. In our current iceberg sink implementation, we don't do any shuffle. This means that if the table is partitioned by bucket, we will have a lot of small files in the table. This is not good for performance. We should shuffle the data according to the partition spec. +Apache iceberg allows users to define partition spec[1] for a table. The partition spec defines how data is partitioned and stored in the table. For example, a table can be partitioned by date and hour. Also, **it's required that each data file in apache iceberg can contain only one partition value**. This means that if the table is partitioned by bucket, we will have a lot of small files in the table if we don't do any shuffle at all. Let's use following table as an example: + +```sql +CREATE TABLE prod.db.sample ( + id bigint, + data string, + category string, + ts timestamp) +USING iceberg +PARTITIONED BY (bucket(16, id), years(ts)) +``` + +If we don't do any shuffle, the writing process will be like following: + +![image](images/0077-iceberg-sink-shuffle/before_shuffle.svg) + +By shuffling values with same partition value to same sink, we can reduce the number of data files in the table: + +![image](images/0077-iceberg-sink-shuffle/after_shuffle.svg) ## Design -We will add a shuffle operator before sending data to iceberg sink. For example, let's assume the iceberg table is partitioned by following partition spec: +There are several cases to think about when doing shuffle: +```mermaid +flowchart TD + Start((start)) + End((end)) + Start --> append_only{Sink append only?} + + append_only -->|yes| iceberg_range_only_part{Range only partition?} + iceberg_range_only_part -->|yes| no_shuffle(No shuffle) + iceberg_range_only_part -->|no| append_only_shuffle(Shuffle by partition columns) + no_shuffle --> End + append_only_shuffle --> End + + append_only -->|no| upsert_iceberg_range_only_part{Range only partition?} + upsert_iceberg_range_only_part -->|yes| upsert_shuffle_by_stream_part_key(Shuffle by stream key + partition columns) + upsert_shuffle_by_stream_part_key --> End + upsert_iceberg_range_only_part -->|No| upsert_shuffle_by_part_key(Shuffle by partition columns) + upsert_shuffle_by_part_key --> End +``` + +Here we will use several examples to explain different cases. + +### Case 1: Append only table with range only partition + +Following iceberg table definition has only range partition: ```sql +## Iceberg CREATE TABLE prod.db.sample ( id bigint, data string, category string, ts timestamp) USING iceberg -PARTITIONED BY (bucket(16, id), years(ts)) +PARTITIONED BY (years(ts)) ``` -If we don't have the shuffle operator, the writing process will be like following: +And we want to sink following query to iceberg: +```sql +CREATE TABLE t0 (id bigint, data string, category string, ts timestamp) WITH ( + connector = 'kafka', + topic = 'kafka_1_csv_topic', + properties.bootstrap.server = 'message_queue:29092', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE CSV (delimiter = ','); + +CREATE SINK s0 AS SELECT * FROM t0 WITH ( + connector = 'iceberg' +); +``` -![image](images/0077-iceberg-sink-shuffle/before_shuffle.svg) +In this case we don't need to do any shuffle, since it's range only partition. Otherwise it all traffic will go to same actor. -After we add the shuffle operator, the writing process will be like following: +### Case 2: Append only table with not range only partition -![image](images/0077-iceberg-sink-shuffle/after_shuffle.svg) +Following iceberg table definition has only range partition: +```sql +## Iceberg +CREATE TABLE prod.db.sample ( + id bigint, + data string, + category string, + ts timestamp) +USING iceberg +PARTITIONED BY (bucket(id, 16), years(ts)) +``` + +And we want to sink following query to iceberg: +```sql +CREATE TABLE t0 (id bigint, data string, category string, ts timestamp) WITH ( + connector = 'kafka', + topic = 'kafka_1_csv_topic', + properties.bootstrap.server = 'message_queue:29092', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE CSV (delimiter = ','); + +CREATE SINK s0 AS SELECT * FROM t0 WITH ( + connector = 'iceberg' +); +``` + +In this case we need to shuffle by `(bucket(id), years(ts))` so that rows with same partition value will go to same actor. + +### Case 3: Upsert table with range only partition + +Following iceberg table definition has only range partition: +```sql +## Iceberg +CREATE TABLE prod.db.sample ( + id bigint, + data string, + category string, + ts timestamp) +USING iceberg +PARTITIONED BY (years(ts)) +``` + +And we want to sink following query to iceberg: +```sql +CREATE TABLE t0 (id bigint primary key, data string, category string, ts timestamp); + +CREATE SINK s0 AS SELECT * FROM t0 WITH ( + connector = 'iceberg' +); +``` + +In this case we need to shuffle by `(years(ts), id)` to avoid all traffic go to same actor. -This way we can reduce the number of data files in the table, which helps to improve the read performance of iceberg. + +### Case 4: Upsert table with not range only partition + +Following iceberg table definition has only range partition: +```sql +## Iceberg +CREATE TABLE prod.db.sample ( + id bigint, + data string, + category string, + ts timestamp) +USING iceberg +PARTITIONED BY (bucket(data, 16), years(ts)) +``` + +And we want to sink following query to iceberg: +```sql +CREATE TABLE t0 (id bigint primary key, data string, category string, ts timestamp); + +CREATE SINK s0 AS SELECT * FROM t0 WITH ( + connector = 'iceberg' +); +``` + +In this case we need to shuffle by `(bucket(data), years(ts))` so that rows with same will go to same actor. Notice that in iceberg we don't need to shuffle by `stream_key`, e.g. `id` since it only requires that insert/delete of same row will go to same partition. ### Implementation -There are two possible implementations for this feature: +There are two possible implementations for shuffling by iceberg partition columns: -1. Add a new `IcebergPartitionOperator`, which calculates the partition value of each record and adds it to the record, then asking the dispatcher executor to do hash shuffle according to the partition value. The plan is like following: +1. Add a new `IcebergPartitionExecutor`, which calculates the partition value of each record and adds it to the record, then asking the dispatcher executor to do hash shuffle according to the partition value. The plan is like following: ```mermaid flowchart TD - A[Source] --> B[IcebergPartitionOperator] + A[Source] --> B[IcebergPartitionExecutor] B --> C[DispatcherExecutor] C -->|"partition_value"| D[IcebergSink] ``` @@ -62,11 +192,6 @@ In this approach we need to add an extra `IcebergDispatcher` to dispatcher execu I prefer approach 1 since it's more extensible and does not change too much current shuffle implementation, e.g. other lakehouse sinks (delta lake) could have similar approach. -### Caveats - -When iceberg partition spec only contains range partitions(e.g. year, month, day), we don't need to do this shuffle, otherwise all traffic will go to same sink. -Also we need to reject upsert queries where partition columns is not a subset of the `stream_pk`. - ## References 1. https://iceberg.apache.org/spec/#partitioning \ No newline at end of file From 532d50c7f19ef940582390a17412e633e069449e Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Fri, 17 Nov 2023 16:25:08 +0800 Subject: [PATCH 4/4] Update case 3 according to discussion --- rfcs/0077-iceberg-sink-shuffle.md | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/rfcs/0077-iceberg-sink-shuffle.md b/rfcs/0077-iceberg-sink-shuffle.md index 70d7a2e8..0c1b8cfa 100644 --- a/rfcs/0077-iceberg-sink-shuffle.md +++ b/rfcs/0077-iceberg-sink-shuffle.md @@ -38,15 +38,13 @@ flowchart TD Start((start)) End((end)) Start --> append_only{Sink append only?} - append_only -->|yes| iceberg_range_only_part{Range only partition?} - iceberg_range_only_part -->|yes| no_shuffle(No shuffle) + iceberg_range_only_part -->|yes| no_shuffle(No shuffle) iceberg_range_only_part -->|no| append_only_shuffle(Shuffle by partition columns) no_shuffle --> End append_only_shuffle --> End - append_only -->|no| upsert_iceberg_range_only_part{Range only partition?} - upsert_iceberg_range_only_part -->|yes| upsert_shuffle_by_stream_part_key(Shuffle by stream key + partition columns) + upsert_iceberg_range_only_part -->|yes| upsert_shuffle_by_stream_part_key(Shuffle by stream key) upsert_shuffle_by_stream_part_key --> End upsert_iceberg_range_only_part -->|No| upsert_shuffle_by_part_key(Shuffle by partition columns) upsert_shuffle_by_part_key --> End @@ -137,7 +135,7 @@ CREATE SINK s0 AS SELECT * FROM t0 WITH ( ); ``` -In this case we need to shuffle by `(years(ts), id)` to avoid all traffic go to same actor. +In this case we need to shuffle by `id` to avoid all traffic go to same actor. Since in most cases upstream is already shuffled by `id`, we can avoid another shuffle. ### Case 4: Upsert table with not range only partition