From c5f682bbde33b45e38df58d32955016c8e9a19fe Mon Sep 17 00:00:00 2001 From: Aolin Date: Thu, 23 Nov 2023 13:35:41 +0800 Subject: [PATCH 1/5] This is an automated cherry-pick of #15360 Signed-off-by: ti-chi-bot --- ticdc/ticdc-changefeed-config.md | 6 +- ticdc/ticdc-manage-changefeed.md | 6 +- ticdc/ticdc-open-api-v2.md | 8 +- ticdc/ticdc-sink-to-cloud-storage.md | 2 +- ticdc/ticdc-sink-to-kafka.md | 2 +- ticdc/ticdc-sink-to-mysql.md | 2 +- ticdc/ticdc-sink-to-pulsar.md | 298 +++++++++++++++++++++++++++ 7 files changed, 313 insertions(+), 11 deletions(-) create mode 100644 ticdc/ticdc-sink-to-pulsar.md diff --git a/ticdc/ticdc-changefeed-config.md b/ticdc/ticdc-changefeed-config.md index 9c5d38d05158e..f83665aba40f7 100644 --- a/ticdc/ticdc-changefeed-config.md +++ b/ticdc/ticdc-changefeed-config.md @@ -16,7 +16,7 @@ cdc cli changefeed create --server=http://10.0.10.25:8300 --sink-uri="mysql://ro ```shell Create changefeed successfully! ID: simple-replication-task -Info: {"upstream_id":7178706266519722477,"namespace":"default","id":"simple-replication-task","sink_uri":"mysql://root:xxxxx@127.0.0.1:4000/?time-zone=","create_time":"2022-12-19T15:05:46.679218+08:00","start_ts":438156275634929669,"engine":"unified","config":{"case_sensitive":true,"enable_old_value":true,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":true,"bdr_mode":false,"sync_point_interval":30000000000,"sync_point_retention":3600000000000,"filter":{"rules":["test.*"],"event_filters":null},"mounter":{"worker_num":16},"sink":{"protocol":"","schema_registry":"","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false},"column_selectors":null,"transaction_atomicity":"none","encoder_concurrency":16,"terminator":"\r\n","date_separator":"none","enable_partition_separator":false},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"storage":""}},"state":"normal","creator_version":"v6.5.0"} +Info: {"upstream_id":7178706266519722477,"namespace":"default","id":"simple-replication-task","sink_uri":"mysql://root:xxxxx@127.0.0.1:4000/?time-zone=","create_time":"2023-11-28T15:05:46.679218+08:00","start_ts":438156275634929669,"engine":"unified","config":{"case_sensitive":false,"enable_old_value":true,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":true,"bdr_mode":false,"sync_point_interval":30000000000,"sync_point_retention":3600000000000,"filter":{"rules":["test.*"],"event_filters":null},"mounter":{"worker_num":16},"sink":{"protocol":"","schema_registry":"","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false},"column_selectors":null,"transaction_atomicity":"none","encoder_concurrency":16,"terminator":"\r\n","date_separator":"none","enable_partition_separator":false},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"storage":""}},"state":"normal","creator_version":"v7.5.0"} ``` - `--changefeed-id`: The ID of the replication task. The format must match the `^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$` regular expression. If this ID is not specified, TiCDC automatically generates a UUID (the version 4 format) as the ID. @@ -43,9 +43,9 @@ This section introduces the configuration of a replication task. # memory-quota = 1073741824 # Specifies whether the database names and tables in the configuration file are case-sensitive. -# The default value is true. +# Starting from v7.5.0, the default value changes from true to false. # This configuration item affects configurations related to filter and sink. -case-sensitive = true +case-sensitive = false # Specifies whether to output the old value. New in v4.0.5. Since v5.0, the default value is `true`. enable-old-value = true diff --git a/ticdc/ticdc-manage-changefeed.md b/ticdc/ticdc-manage-changefeed.md index 6931feb06324b..99dc0aa7538e7 100644 --- a/ticdc/ticdc-manage-changefeed.md +++ b/ticdc/ticdc-manage-changefeed.md @@ -18,7 +18,7 @@ cdc cli changefeed create --server=http://10.0.10.25:8300 --sink-uri="mysql://ro ```shell Create changefeed successfully! ID: simple-replication-task -Info: {"upstream_id":7178706266519722477,"namespace":"default","id":"simple-replication-task","sink_uri":"mysql://root:xxxxx@127.0.0.1:4000/?time-zone=","create_time":"2022-12-19T15:05:46.679218+08:00","start_ts":438156275634929669,"engine":"unified","config":{"case_sensitive":true,"enable_old_value":true,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":true,"bdr_mode":false,"sync_point_interval":30000000000,"sync_point_retention":3600000000000,"filter":{"rules":["test.*"],"event_filters":null},"mounter":{"worker_num":16},"sink":{"protocol":"","schema_registry":"","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false},"column_selectors":null,"transaction_atomicity":"none","encoder_concurrency":16,"terminator":"\r\n","date_separator":"none","enable_partition_separator":false},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"storage":""}},"state":"normal","creator_version":"v6.5.0"} +Info: {"upstream_id":7178706266519722477,"namespace":"default","id":"simple-replication-task","sink_uri":"mysql://root:xxxxx@127.0.0.1:4000/?time-zone=","create_time":"2023-11-28T15:05:46.679218+08:00","start_ts":438156275634929669,"engine":"unified","config":{"case_sensitive":false,"enable_old_value":true,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":true,"bdr_mode":false,"sync_point_interval":30000000000,"sync_point_retention":3600000000000,"filter":{"rules":["test.*"],"event_filters":null},"mounter":{"worker_num":16},"sink":{"protocol":"","schema_registry":"","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false},"column_selectors":null,"transaction_atomicity":"none","encoder_concurrency":16,"terminator":"\r\n","date_separator":"none","enable_partition_separator":false},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"storage":""}},"state":"normal","creator_version":"v7.5.0"} ``` ## Query the replication task list @@ -89,8 +89,12 @@ cdc cli changefeed query --server=http://10.0.10.25:8300 --changefeed-id=simple- "sort-engine": "unified", "sort-dir": ".", "config": { +<<<<<<< HEAD "case-sensitive": true, "enable-old-value": false, +======= + "case-sensitive": false, +>>>>>>> 883ffcba7b (Update ticdc changefeed filter case_sensitive default value (#15360)) "filter": { "rules": [ "*.*" diff --git a/ticdc/ticdc-open-api-v2.md b/ticdc/ticdc-open-api-v2.md index 3b6ded5f7d07b..050fe264f0f7e 100644 --- a/ticdc/ticdc-open-api-v2.md +++ b/ticdc/ticdc-open-api-v2.md @@ -147,7 +147,7 @@ This interface is used to submit a replication task to TiCDC. If the request is "changefeed_id": "string", "replica_config": { "bdr_mode": true, - "case_sensitive": true, + "case_sensitive": false, "check_gc_safe_point": true, "consistent": { "flush_interval": 0, @@ -266,7 +266,7 @@ The descriptions of the `replica_config` parameters are as follows. | Parameter name | Description | | :------------------------ | :----------------------------------------------------- | | `bdr_mode` | `BOOLEAN` type. Determines whether to enable [bidirectional replication](/ticdc/ticdc-bidirectional-replication.md). The default value is `false`. (Optional) | -| `case_sensitive` | `BOOLEAN` type. Determines whether to be case-sensitive when filtering table names. The default value is `true`. (Optional) | +| `case_sensitive` | `BOOLEAN` type. Determines whether to be case-sensitive when filtering table names. Starting from v7.5.0, the default value changes from `true` to `false`. (Optional) | | `check_gc_safe_point` | `BOOLEAN` type. Determines whether to check that the start time of the replication task is earlier than the GC time. The default value is `true`. (Optional) | | `consistent` | The configuration parameters of redo log. (Optional) | | `enable_old_value` | `BOOLEAN` type. Determines whether to output the old value (that is, the value before the update). The default value is `true`. (Optional) | @@ -384,7 +384,7 @@ If the request is successful, `200 OK` is returned. If the request fails, an err "checkpoint_ts": 0, "config": { "bdr_mode": true, - "case_sensitive": true, + "case_sensitive": false, "check_gc_safe_point": true, "consistent": { "flush_interval": 0, @@ -588,7 +588,7 @@ To modify the changefeed configuration, follow the steps of `pause the replicati { "replica_config": { "bdr_mode": true, - "case_sensitive": true, + "case_sensitive": false, "check_gc_safe_point": true, "consistent": { "flush_interval": 0, diff --git a/ticdc/ticdc-sink-to-cloud-storage.md b/ticdc/ticdc-sink-to-cloud-storage.md index 3d5fa0a831d61..6a52e293b5a81 100644 --- a/ticdc/ticdc-sink-to-cloud-storage.md +++ b/ticdc/ticdc-sink-to-cloud-storage.md @@ -24,7 +24,7 @@ cdc cli changefeed create \ The output is as follows: ```shell -Info: {"upstream_id":7171388873935111376,"namespace":"default","id":"simple-replication-task","sink_uri":"s3://logbucket/storage_test?protocol=canal-json","create_time":"2022-11-29T18:52:05.566016967+08:00","start_ts":437706850431664129,"engine":"unified","config":{"case_sensitive":true,"enable_old_value":true,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":false,"sync_point_interval":600000000000,"sync_point_retention":86400000000000,"filter":{"rules":["*.*"],"event_filters":null},"mounter":{"worker_num":16},"sink":{"protocol":"canal-json","schema_registry":"","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false},"column_selectors":null,"transaction_atomicity":"none","encoder_concurrency":16,"terminator":"\r\n","date_separator":"none","enable_partition_separator":false},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"storage":""}},"state":"normal","creator_version":"v6.5.0-master-dirty"} +Info: {"upstream_id":7171388873935111376,"namespace":"default","id":"simple-replication-task","sink_uri":"s3://logbucket/storage_test?protocol=canal-json","create_time":"2023-11-28T18:52:05.566016967+08:00","start_ts":437706850431664129,"engine":"unified","config":{"case_sensitive":false,"enable_old_value":true,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":false,"sync_point_interval":600000000000,"sync_point_retention":86400000000000,"filter":{"rules":["*.*"],"event_filters":null},"mounter":{"worker_num":16},"sink":{"protocol":"canal-json","schema_registry":"","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false},"column_selectors":null,"transaction_atomicity":"none","encoder_concurrency":16,"terminator":"\r\n","date_separator":"none","enable_partition_separator":false},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"storage":""}},"state":"normal","creator_version":"v7.5.0"} ``` - `--server`: The address of any TiCDC server in the TiCDC cluster. diff --git a/ticdc/ticdc-sink-to-kafka.md b/ticdc/ticdc-sink-to-kafka.md index 75f987bf41e41..53ca9e239819e 100644 --- a/ticdc/ticdc-sink-to-kafka.md +++ b/ticdc/ticdc-sink-to-kafka.md @@ -21,7 +21,7 @@ cdc cli changefeed create \ ```shell Create changefeed successfully! ID: simple-replication-task -Info: {"sink-uri":"kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1","opts":{},"create-time":"2020-03-12T22:04:08.103600025+08:00","start-ts":415241823337054209,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":".","config":{"case-sensitive":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null} +Info: {"sink-uri":"kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1","opts":{},"create-time":"2023-11-28T22:04:08.103600025+08:00","start-ts":415241823337054209,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":".","config":{"case-sensitive":false,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null} ``` - `--server`: The address of any TiCDC server in the TiCDC cluster. diff --git a/ticdc/ticdc-sink-to-mysql.md b/ticdc/ticdc-sink-to-mysql.md index 19a4e0979c5e1..71dd6dba2ee85 100644 --- a/ticdc/ticdc-sink-to-mysql.md +++ b/ticdc/ticdc-sink-to-mysql.md @@ -21,7 +21,7 @@ cdc cli changefeed create \ ```shell Create changefeed successfully! ID: simple-replication-task -Info: {"sink-uri":"mysql://root:123456@127.0.0.1:3306/","opts":{},"create-time":"2020-03-12T22:04:08.103600025+08:00","start-ts":415241823337054209,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":".","config":{"case-sensitive":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null} +Info: {"sink-uri":"mysql://root:123456@127.0.0.1:3306/","opts":{},"create-time":"2023-11-28T22:04:08.103600025+08:00","start-ts":415241823337054209,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":".","config":{"case-sensitive":false,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null} ``` - `--server`: The address of any TiCDC server in the TiCDC cluster. diff --git a/ticdc/ticdc-sink-to-pulsar.md b/ticdc/ticdc-sink-to-pulsar.md new file mode 100644 index 0000000000000..34f5872502c25 --- /dev/null +++ b/ticdc/ticdc-sink-to-pulsar.md @@ -0,0 +1,298 @@ +--- +title: Replicate Data to Pulsar +summary: Learn how to replicate data to Pulsar using TiCDC. +--- + +# Replicate Data to Pulsar + +This document describes how to create a changefeed that replicates incremental data to Pulsar using TiCDC. + +## Create a replication task to replicate incremental data to Pulsar + +Create a replication task by running the following command: + +```shell +cdc cli changefeed create \ + --server=http://127.0.0.1:8300 \ +--sink-uri="pulsar://127.0.0.1:6650/persistent://public/default/yktest?protocol=canal-json" \ +--config=./t_changefeed.toml \ +--changefeed-id="simple-replication-task" +``` + +```shell + +Create changefeed successfully! +ID: simple-replication-task +Info: {"upstream_id":7277814241002263370,"namespace":"default","id":"simple-replication-task","sink_uri":"pulsar://127.0.0.1:6650/consumer-test?protocol=canal-json","create_time":"2023-11-28T14:42:32.000904+08:00","start_ts":444203257406423044,"config":{"memory_quota":1073741824,"case_sensitive":false,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":false,"bdr_mode":false,"sync_point_interval":600000000000,"sync_point_retention":86400000000000,"filter":{"rules":["pulsar_test.*"]},"mounter":{"worker_num":16},"sink":{"protocol":"canal-json","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false,"binary_encoding_method":"base64"},"dispatchers":[{"matcher":["pulsar_test.*"],"partition":"","topic":"test_{schema}_{table}"}],"encoder_concurrency":16,"terminator":"\r\n","date_separator":"day","enable_partition_separator":true,"enable_kafka_sink_v2":false,"only_output_updated_columns":false,"delete_only_output_handle_key_columns":false,"pulsar_config":{"connection-timeout":30,"operation-timeout":30,"batching-max-messages":1000,"batching-max-publish-delay":10,"send-timeout":30},"advance_timeout":150},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"use_file_backend":false},"scheduler":{"enable_table_across_nodes":false,"region_threshold":100000,"write_key_threshold":0},"integrity":{"integrity_check_level":"none","corruption_handle_level":"warn"}},"state":"normal","creator_version":"v7.5.0","resolved_ts":444203257406423044,"checkpoint_ts":444203257406423044,"checkpoint_time":"2023-09-12 14:42:31.410"} +``` + +The meaning of each parameter is as follows: + +- `--server`: the address of a TiCDC server in the TiCDC cluster. +- `--changefeed-id`: the ID of the replication task. The format must match the regular expression `^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$`. If the ID is not specified, TiCDC automatically generates a UUID (in the version 4 format) as the ID. +- `--sink-uri`: the downstream address of the replication task. See [Use Sink URI to configure Pulsar](#sink-uri). +- `--start-ts`: the start TSO of the changefeed. The TiCDC cluster starts pulling data from this TSO. The default value is the current time. +- `--target-ts`: the target TSO of the changefeed. The TiCDC cluster stops pulling data at this TSO. It is empty by default, which means that TiCDC does not automatically stop pulling data. +- `--config`: the changefeed configuration file. See [TiCDC changefeed configuration parameters](/ticdc/ticdc-changefeed-config.md). + +## Use Sink URI and changefeed config to configure Pulsar + +You can use Sink URI to specify the connection information for the TiCDC target system, and use changefeed config to configure parameters related to Pulsar. + +### Sink URI + +A Sink URI follows the following format: + +```shell +[scheme]://[userinfo@][host]:[port][/path]?[query_parameters] +``` + +Configuration example 1: + +```shell +--sink-uri="pulsar://127.0.0.1:6650/persistent://abc/def/yktest?protocol=canal-json" +``` + +Configuration example 2: + +```shell +--sink-uri="pulsar://127.0.0.1:6650/yktest?protocol=canal-json" +``` + +The configurable parameters in a URI are as follows: + +| Parameter | Description | +| :------------------ | :------------------------------------------------------------ | +| `127.0.0.1` | The IP address by which the downstream Pulsar provides service. | +| `6650` | The connection port for the downstream Pulsar. | +| `persistent://abc/def/yktest` | As shown in the preceding configuration example 1, this parameter is used to specify the tenant, namespace, and topic of Pulsar. | +| `yktest` | As shown in the preceding configuration example 2, if the topic you want to specify is in the default namespace `default` of the default tenant `public` in Pulsar, you can configure the URI with just the topic name, for example, `yktest`. This is equivalent to specifying the topic as `persistent://public/default/yktest`. | + +### Changefeed config parameters + +The following are examples of changefeed config parameters: + +```toml +[sink] +# `dispatchers` is used to specify matching rules. +# Note: When the downstream MQ is Pulsar, if the routing rule for `partition` is not specified as any of `ts`, `index-value`, `table`, or `default`, each Pulsar message will be routed using the string you set as the key. +# For example, if you specify the routing rule for a matcher as the string `code`, then all Pulsar messages that match that matcher will be routed with `code` as the key. +# dispatchers = [ +# {matcher = ['test1.*', 'test2.*'], topic = "Topic expression 1", partition = "ts" }, +# {matcher = ['test3.*', 'test4.*'], topic = "Topic expression 2", partition = "index-value" }, +# {matcher = ['test1.*', 'test5.*'], topic = "Topic expression 3", partition = "table"}, +# {matcher = ['test6.*'], partition = "default"}, +# {matcher = ['test7.*'], partition = "test123"} +# ] + +# `protocol` is used to specify the protocol format for encoding messages. +# When the downstream is Pulsar, the protocol can only be canal-json. +# protocol = "canal-json" + +# The following parameters only take effect when the downstream is Pulsar. +[sink.pulsar-config] +# Authentication on the Pulsar server is done using a token. Specify the value of the token. +authentication-token = "xxxxxxxxxxxxx" +# When you use a token for Pulsar server authentication, specify the path to the file where the token is located. +token-from-file="/data/pulsar/token-file.txt" +# Pulsar uses the basic account and password to authenticate the identity. Specify the account. +basic-user-name="root" +# Pulsar uses the basic account and password to authenticate the identity. Specify the password. +basic-password="password" +# The certificate path for Pulsar TLS encrypted authentication. +auth-tls-certificate-path="/data/pulsar/certificate" +# The private key path for Pulsar TLS encrypted authentication. +auth-tls-private-key-path="/data/pulsar/certificate.key" +# Path to trusted certificate file of the Pulsar TLS encrypted authentication. +tls-trust-certs-file-path="/data/pulsar/tls-trust-certs-file" +# Pulsar oauth2 issuer-url. For more information, see the Pulsar website: https://pulsar.apache.org/docs/2.10.x/client-libraries-go/#tls-encryption-and-authentication +oauth2.oauth2-issuer-url="https://xxxx.auth0.com" +# Pulsar oauth2 audience +oauth2.oauth2-audience="https://xxxx.auth0.com/api/v2/" +# Pulsar oauth2 private-key +oauth2.oauth2-private-key="/data/pulsar/privateKey" +# Pulsar oauth2 client-id +oauth2.oauth2-client-id="0Xx...Yyxeny" +# Pulsar oauth2 oauth2-scope +oauth2.oauth2-scope="xxxx" +# The number of cached Pulsar producers in TiCDC. The value is 10240 by default. Each Pulsar producer corresponds to one topic. If the number of topics you need to replicate is larger than the default value, you need to increase the number. +pulsar-producer-cache-size=10240 +# Pulsar data compression method. No compression is used by default. Optional values are "lz4", "zlib", and "zstd". +compression-type="" +# The timeout for the Pulsar client to establish a TCP connection with the server. The value is 5 seconds by default. +connection-timeout=5 +# The timeout for Pulsar clients to initiate operations such as creating and subscribing to a topic. The value is 30 seconds by default. +operation-timeout=30 +# The maximum number of messages in a single batch for a Pulsar producer to send. The value is 1000 by default. +batching-max-messages=1000 +# The interval at which Pulsar producer messages are saved for batching. The value is 10 milliseconds by default. +batching-max-publish-delay=10 +# The timeout for a Pulsar producer to send a message. The value is 30 seconds by default. +send-timeout=30 +``` + +### Best practice + +* You need to specify the `protocol` parameter when creating a changefeed. Currently, only the `canal-json` protocol is supported for replicating data to Pulsar. +* The `pulsar-producer-cache-size` parameter indicates the number of producers cached in the Pulsar client. Because each producer in Pulsar can only correspond to one topic, TiCDC adopts the LRU method to cache producers, and the default limit is 10240. If the number of topics you need to replicate is larger than the default value, you need to increase the number. + +### TiCDC authentication and authorization for Pulsar + +The following is a sample configuration when you use token authentication with Pulsar: + +- Token + + Sink URI: + + ```shell + --sink-uri="pulsar://127.0.0.1:6650/persistent://public/default/yktest?protocol=canal-json" + ``` + + Config parameter: + + ```shell + [sink.pulsar-config] + authentication-token = "xxxxxxxxxxxxx" + ``` + +- Token from file + + Sink URI: + + ```shell + --sink-uri="pulsar://127.0.0.1:6650/persistent://public/default/yktest?protocol=canal-json" + ``` + + Config parameter: + + ```toml + [sink.pulsar-config] + # Pulsar uses tokens for authentication on the Pulsar server. Specify the path to the token file, which will be read from the TiCDC server. + token-from-file="/data/pulsar/token-file.txt" + ``` + +- TLS encrypted authentication + + Sink URI: + + ```shell + --sink-uri="pulsar+ssl://127.0.0.1:6650/persistent://public/default/yktest?protocol=canal-json" + ``` + + Config parameters: + + ```toml + [sink.pulsar-config] + # Certificate path of the Pulsar TLS encrypted authentication + auth-tls-certificate-path="/data/pulsar/certificate" + # Private key path of the Pulsar TLS encrypted authentication + auth-tls-private-key-path="/data/pulsar/certificate.key" + # Path to trusted certificate file of the Pulsar TLS encrypted authentication + tls-trust-certs-file-path="/data/pulsar/tls-trust-certs-file" + ``` + +- OAuth2 authentication + + Sink URI: + + ```shell + --sink-uri="pulsar+ssl://127.0.0.1:6650/persistent://public/default/yktest?protocol=canal-json" + ``` + + Config parameters: + + ```toml + [sink.pulsar-config] + # Pulsar oauth2 issuer-url. For more information, see the Pulsar website: https://pulsar.apache.org/docs/2.10.x/client-libraries-go/#oauth2-authentication + oauth2.oauth2-issuer-url="https://xxxx.auth0.com" + # Pulsar oauth2 audience + oauth2.oauth2-audience="https://xxxx.auth0.com/api/v2/" + # Pulsar oauth2 private-key + oauth2.oauth2-private-key="/data/pulsar/privateKey" + # Pulsar oauth2 client-id + oauth2.oauth2-client-id="0Xx...Yyxeny" + # Pulsar oauth2 oauth2-scope + oauth2.oauth2-scope="xxxx" + ``` + +## Customize the dispatching rules for topics and partitions in Pulsar Sink + +### Matching rules for Matcher + +Take the `dispatchers` configuration item in the following sample configuration file as an example: + +```toml +[sink] +dispatchers = [ + {matcher = ['test1.*', 'test2.*'], topic = "Topic expression 1", partition = "ts" }, + {matcher = ['test3.*', 'test4.*'], topic = "Topic expression 2", partition = "index-value" }, + {matcher = ['test1.*', 'test5.*'], topic = "Topic expression 3", partition = "table"}, + {matcher = ['test6.*'], partition = "default"}, + {matcher = ['test7.*'], partition = "test123"} +] +``` + +- The tables that match a matcher rule are dispatched according to the policy specified by the corresponding topic expression. For example, the table `test3.aa` is dispatched according to `Topic expression 2`, and the table `test5.aa` is dispatched according to `Topic expression 3`. +- For a table that matches more than one matcher rule, it is dispatched according to the first matching topic expression. For example, the table `test1.aa` is dispatched according to `Topic expression 1`. +- For tables that do not match any matcher, the corresponding data change events are sent to the default topic specified in `-sink-uri`. For example, the table `test10.aa` is sent to the default topic. +- For tables that match the matcher rule but do not have a topic dispatcher specified, the corresponding data changes are sent to the default topic specified in `-sink-uri`. For example, the table `test6.abc` is sent to the default topic. + +### Topic dispatcher + +You can use `topic = "xxx"` to specify a topic dispatcher and use topic expressions to implement flexible topic dispatching policies. It is recommended that the total number of topics be less than 1000. + +The format of a topic expression is `[prefix]{schema}[middle][{table}][suffix]`. The following are the meanings of each part: + +- `prefix`: Optional. Represents the prefix of the topic name. +- `{schema}`: Optional. Represents the database name. +- `middle`: Optional. Represents the separator between a database name and a table name. +- `{table}`: Optional. Represents the table name. +- `suffix`: Optional. Represents the suffix of the topic name. + +`prefix`, `middle`, and `suffix` only support uppercase and lowercase letters (`a-z`, `A-Z`), numbers (`0-9`), dots (`.`), underscores (`_`), and hyphens (`-`). `{schema}` and `{table}` must be lowercase. Placeholders such as `{Schema}` and `{TABLE}` that contain uppercase letters are invalid. + +The following are some examples: + +- `matcher = ['test1.table1', 'test2.table2'], topic = "hello_{schema}_{table}"` + - Data change events corresponding to the table `test1.table1` are despatched to a topic named `hello_test1_table1`. + - Data change events corresponding to the table `test2.table2` are despatched to a topic named `hello_test2_table2`. + +- `matcher = ['test3.*', 'test4.*'], topic = "hello_{schema}_world"` + - Data change events for all tables under `test3` are despatched to a topic named `hello_test3_world`. + - Data change events for all tables under `test4` are despatched to a topic named `hello_test4_world`. + +- `matcher = ['*.*'], topic = "{schema}_{table}"` + - For all tables that TiCDC listens on, they are despatched to separate topics according to the `databaseName_tableName` rule. For example, for the table `test.account`, TiCDC despatches its data change log to a topic named `test_account`. + +### Dispatch DDL events + +#### Database-level DDL events + +DDL statements such as `CREATE DATABASE` and `DROP DATABASE` that are not related to a specific table are called database-level DDL statements. Events corresponding to database-level DDL statements are dispatched to the default topic specified in `--sink-uri`. + +#### Table-level DDL events + +DDL statements such as `ALTER TABLE` and `CREATE TABLE` that are related to a specific table are called table-level DDL statements. Events corresponding to table-level DDL statements are dispatched to an appropriate topic according to the configuration of `dispatchers`. + +For example, for a `dispatchers` configuration like `matcher = ['test.*'], topic = {schema}_{table}`, the DDL events are despatched as follows: + +- If a DDL event only involves a single table, the DDL event is dispatched to the appropriate topic as it is. For example, for the DDL event `DROP TABLE test.table1`, the event is dispatched to the topic named `test_table1`. + +- If a DDL event involves more than one table (`RENAME TABLE`, `DROP TABLE`, and `DROP VIEW` might all involve more than one table), the single DDL event is split into multiple ones and dispatched to appropriate topics. For example, for the DDL event `RENAME TABLE test.table1 TO test.table10, test.table2 TO test.table20`, the processing is as follows: + + - Dispatch the DDL event for `RENAME TABLE test.table1 TO test.table10` to a topic named `test_table1`. + - Dispatch the DDL event for `RENAME TABLE test.table2 TO test.table20` to a topic named `test_table2`. + +### Partition dispatcher + +Currently, TiCDC only supports consumers to consume messages using the exclusive subscription model, that is, each consumer can consume messages from all partitions in a topic. + +You can specify a partition dispatcher with `partition = "xxx"`. The following partition dispatches are supported: `default`, `ts`, `index-value`, and `table`. If you fill in any other string, TiCDC will pass that string as the `key` of the message in the messages sent to the Pulsar server. + +The dispatching rules are as follows: + +- `default`: By default, events are dispatched by the schema name and table name, which is the same as when `table` is specified. +- `ts`: Use commitTs of row changes to perform hash calculation and dispatch events. +- `index-value`: Use the value of the table primary key or unique index to perform hash calculation and dispatch events. +- `table`: Use the schema name and table name to perform hash calculation and dispatch events. +- Other self-defined string: The self-defined string is used directly as the key for the Pulsar message, and the Pulsar producer uses this key value for dispatching. From 19e10ce8e6e1212d367c8efa46a1f9a5110303a9 Mon Sep 17 00:00:00 2001 From: Aolin Date: Mon, 18 Dec 2023 12:23:23 +0800 Subject: [PATCH 2/5] bump version --- ticdc/ticdc-changefeed-config.md | 4 ++-- ticdc/ticdc-manage-changefeed.md | 2 +- ticdc/ticdc-open-api-v2.md | 2 +- ticdc/ticdc-sink-to-cloud-storage.md | 2 +- ticdc/ticdc-sink-to-kafka.md | 2 +- ticdc/ticdc-sink-to-mysql.md | 2 +- ticdc/ticdc-sink-to-pulsar.md | 2 +- 7 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ticdc/ticdc-changefeed-config.md b/ticdc/ticdc-changefeed-config.md index f83665aba40f7..ff677c6d69586 100644 --- a/ticdc/ticdc-changefeed-config.md +++ b/ticdc/ticdc-changefeed-config.md @@ -16,7 +16,7 @@ cdc cli changefeed create --server=http://10.0.10.25:8300 --sink-uri="mysql://ro ```shell Create changefeed successfully! ID: simple-replication-task -Info: {"upstream_id":7178706266519722477,"namespace":"default","id":"simple-replication-task","sink_uri":"mysql://root:xxxxx@127.0.0.1:4000/?time-zone=","create_time":"2023-11-28T15:05:46.679218+08:00","start_ts":438156275634929669,"engine":"unified","config":{"case_sensitive":false,"enable_old_value":true,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":true,"bdr_mode":false,"sync_point_interval":30000000000,"sync_point_retention":3600000000000,"filter":{"rules":["test.*"],"event_filters":null},"mounter":{"worker_num":16},"sink":{"protocol":"","schema_registry":"","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false},"column_selectors":null,"transaction_atomicity":"none","encoder_concurrency":16,"terminator":"\r\n","date_separator":"none","enable_partition_separator":false},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"storage":""}},"state":"normal","creator_version":"v7.5.0"} +Info: {"upstream_id":7178706266519722477,"namespace":"default","id":"simple-replication-task","sink_uri":"mysql://root:xxxxx@127.0.0.1:4000/?time-zone=","create_time":"2023-12-21T15:05:46.679218+08:00","start_ts":438156275634929669,"engine":"unified","config":{"case_sensitive":false,"enable_old_value":true,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":true,"bdr_mode":false,"sync_point_interval":30000000000,"sync_point_retention":3600000000000,"filter":{"rules":["test.*"],"event_filters":null},"mounter":{"worker_num":16},"sink":{"protocol":"","schema_registry":"","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false},"column_selectors":null,"transaction_atomicity":"none","encoder_concurrency":16,"terminator":"\r\n","date_separator":"none","enable_partition_separator":false},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"storage":""}},"state":"normal","creator_version":"v7.1.3"} ``` - `--changefeed-id`: The ID of the replication task. The format must match the `^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$` regular expression. If this ID is not specified, TiCDC automatically generates a UUID (the version 4 format) as the ID. @@ -43,7 +43,7 @@ This section introduces the configuration of a replication task. # memory-quota = 1073741824 # Specifies whether the database names and tables in the configuration file are case-sensitive. -# Starting from v7.5.0, the default value changes from true to false. +# Starting from v7.1.3, the default value changes from true to false. # This configuration item affects configurations related to filter and sink. case-sensitive = false diff --git a/ticdc/ticdc-manage-changefeed.md b/ticdc/ticdc-manage-changefeed.md index 99dc0aa7538e7..2466c8a706541 100644 --- a/ticdc/ticdc-manage-changefeed.md +++ b/ticdc/ticdc-manage-changefeed.md @@ -18,7 +18,7 @@ cdc cli changefeed create --server=http://10.0.10.25:8300 --sink-uri="mysql://ro ```shell Create changefeed successfully! ID: simple-replication-task -Info: {"upstream_id":7178706266519722477,"namespace":"default","id":"simple-replication-task","sink_uri":"mysql://root:xxxxx@127.0.0.1:4000/?time-zone=","create_time":"2023-11-28T15:05:46.679218+08:00","start_ts":438156275634929669,"engine":"unified","config":{"case_sensitive":false,"enable_old_value":true,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":true,"bdr_mode":false,"sync_point_interval":30000000000,"sync_point_retention":3600000000000,"filter":{"rules":["test.*"],"event_filters":null},"mounter":{"worker_num":16},"sink":{"protocol":"","schema_registry":"","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false},"column_selectors":null,"transaction_atomicity":"none","encoder_concurrency":16,"terminator":"\r\n","date_separator":"none","enable_partition_separator":false},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"storage":""}},"state":"normal","creator_version":"v7.5.0"} +Info: {"upstream_id":7178706266519722477,"namespace":"default","id":"simple-replication-task","sink_uri":"mysql://root:xxxxx@127.0.0.1:4000/?time-zone=","create_time":"2023-12-21T15:05:46.679218+08:00","start_ts":438156275634929669,"engine":"unified","config":{"case_sensitive":false,"enable_old_value":true,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":true,"bdr_mode":false,"sync_point_interval":30000000000,"sync_point_retention":3600000000000,"filter":{"rules":["test.*"],"event_filters":null},"mounter":{"worker_num":16},"sink":{"protocol":"","schema_registry":"","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false},"column_selectors":null,"transaction_atomicity":"none","encoder_concurrency":16,"terminator":"\r\n","date_separator":"none","enable_partition_separator":false},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"storage":""}},"state":"normal","creator_version":"v7.1.3"} ``` ## Query the replication task list diff --git a/ticdc/ticdc-open-api-v2.md b/ticdc/ticdc-open-api-v2.md index 050fe264f0f7e..16c137e22779d 100644 --- a/ticdc/ticdc-open-api-v2.md +++ b/ticdc/ticdc-open-api-v2.md @@ -266,7 +266,7 @@ The descriptions of the `replica_config` parameters are as follows. | Parameter name | Description | | :------------------------ | :----------------------------------------------------- | | `bdr_mode` | `BOOLEAN` type. Determines whether to enable [bidirectional replication](/ticdc/ticdc-bidirectional-replication.md). The default value is `false`. (Optional) | -| `case_sensitive` | `BOOLEAN` type. Determines whether to be case-sensitive when filtering table names. Starting from v7.5.0, the default value changes from `true` to `false`. (Optional) | +| `case_sensitive` | `BOOLEAN` type. Determines whether to be case-sensitive when filtering table names. Starting from v7.1.3, the default value changes from `true` to `false`. (Optional) | | `check_gc_safe_point` | `BOOLEAN` type. Determines whether to check that the start time of the replication task is earlier than the GC time. The default value is `true`. (Optional) | | `consistent` | The configuration parameters of redo log. (Optional) | | `enable_old_value` | `BOOLEAN` type. Determines whether to output the old value (that is, the value before the update). The default value is `true`. (Optional) | diff --git a/ticdc/ticdc-sink-to-cloud-storage.md b/ticdc/ticdc-sink-to-cloud-storage.md index 6a52e293b5a81..ef2415afc4574 100644 --- a/ticdc/ticdc-sink-to-cloud-storage.md +++ b/ticdc/ticdc-sink-to-cloud-storage.md @@ -24,7 +24,7 @@ cdc cli changefeed create \ The output is as follows: ```shell -Info: {"upstream_id":7171388873935111376,"namespace":"default","id":"simple-replication-task","sink_uri":"s3://logbucket/storage_test?protocol=canal-json","create_time":"2023-11-28T18:52:05.566016967+08:00","start_ts":437706850431664129,"engine":"unified","config":{"case_sensitive":false,"enable_old_value":true,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":false,"sync_point_interval":600000000000,"sync_point_retention":86400000000000,"filter":{"rules":["*.*"],"event_filters":null},"mounter":{"worker_num":16},"sink":{"protocol":"canal-json","schema_registry":"","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false},"column_selectors":null,"transaction_atomicity":"none","encoder_concurrency":16,"terminator":"\r\n","date_separator":"none","enable_partition_separator":false},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"storage":""}},"state":"normal","creator_version":"v7.5.0"} +Info: {"upstream_id":7171388873935111376,"namespace":"default","id":"simple-replication-task","sink_uri":"s3://logbucket/storage_test?protocol=canal-json","create_time":"2023-12-21T18:52:05.566016967+08:00","start_ts":437706850431664129,"engine":"unified","config":{"case_sensitive":false,"enable_old_value":true,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":false,"sync_point_interval":600000000000,"sync_point_retention":86400000000000,"filter":{"rules":["*.*"],"event_filters":null},"mounter":{"worker_num":16},"sink":{"protocol":"canal-json","schema_registry":"","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false},"column_selectors":null,"transaction_atomicity":"none","encoder_concurrency":16,"terminator":"\r\n","date_separator":"none","enable_partition_separator":false},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"storage":""}},"state":"normal","creator_version":"v7.1.3"} ``` - `--server`: The address of any TiCDC server in the TiCDC cluster. diff --git a/ticdc/ticdc-sink-to-kafka.md b/ticdc/ticdc-sink-to-kafka.md index 53ca9e239819e..b1e07c40fad2a 100644 --- a/ticdc/ticdc-sink-to-kafka.md +++ b/ticdc/ticdc-sink-to-kafka.md @@ -21,7 +21,7 @@ cdc cli changefeed create \ ```shell Create changefeed successfully! ID: simple-replication-task -Info: {"sink-uri":"kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1","opts":{},"create-time":"2023-11-28T22:04:08.103600025+08:00","start-ts":415241823337054209,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":".","config":{"case-sensitive":false,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null} +Info: {"sink-uri":"kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1","opts":{},"create-time":"2023-12-21T22:04:08.103600025+08:00","start-ts":415241823337054209,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":".","config":{"case-sensitive":false,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null} ``` - `--server`: The address of any TiCDC server in the TiCDC cluster. diff --git a/ticdc/ticdc-sink-to-mysql.md b/ticdc/ticdc-sink-to-mysql.md index 71dd6dba2ee85..79b0da44e696d 100644 --- a/ticdc/ticdc-sink-to-mysql.md +++ b/ticdc/ticdc-sink-to-mysql.md @@ -21,7 +21,7 @@ cdc cli changefeed create \ ```shell Create changefeed successfully! ID: simple-replication-task -Info: {"sink-uri":"mysql://root:123456@127.0.0.1:3306/","opts":{},"create-time":"2023-11-28T22:04:08.103600025+08:00","start-ts":415241823337054209,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":".","config":{"case-sensitive":false,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null} +Info: {"sink-uri":"mysql://root:123456@127.0.0.1:3306/","opts":{},"create-time":"2023-12-21T22:04:08.103600025+08:00","start-ts":415241823337054209,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":".","config":{"case-sensitive":false,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null} ``` - `--server`: The address of any TiCDC server in the TiCDC cluster. diff --git a/ticdc/ticdc-sink-to-pulsar.md b/ticdc/ticdc-sink-to-pulsar.md index 34f5872502c25..f9b823bebfec3 100644 --- a/ticdc/ticdc-sink-to-pulsar.md +++ b/ticdc/ticdc-sink-to-pulsar.md @@ -23,7 +23,7 @@ cdc cli changefeed create \ Create changefeed successfully! ID: simple-replication-task -Info: {"upstream_id":7277814241002263370,"namespace":"default","id":"simple-replication-task","sink_uri":"pulsar://127.0.0.1:6650/consumer-test?protocol=canal-json","create_time":"2023-11-28T14:42:32.000904+08:00","start_ts":444203257406423044,"config":{"memory_quota":1073741824,"case_sensitive":false,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":false,"bdr_mode":false,"sync_point_interval":600000000000,"sync_point_retention":86400000000000,"filter":{"rules":["pulsar_test.*"]},"mounter":{"worker_num":16},"sink":{"protocol":"canal-json","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false,"binary_encoding_method":"base64"},"dispatchers":[{"matcher":["pulsar_test.*"],"partition":"","topic":"test_{schema}_{table}"}],"encoder_concurrency":16,"terminator":"\r\n","date_separator":"day","enable_partition_separator":true,"enable_kafka_sink_v2":false,"only_output_updated_columns":false,"delete_only_output_handle_key_columns":false,"pulsar_config":{"connection-timeout":30,"operation-timeout":30,"batching-max-messages":1000,"batching-max-publish-delay":10,"send-timeout":30},"advance_timeout":150},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"use_file_backend":false},"scheduler":{"enable_table_across_nodes":false,"region_threshold":100000,"write_key_threshold":0},"integrity":{"integrity_check_level":"none","corruption_handle_level":"warn"}},"state":"normal","creator_version":"v7.5.0","resolved_ts":444203257406423044,"checkpoint_ts":444203257406423044,"checkpoint_time":"2023-09-12 14:42:31.410"} +Info: {"upstream_id":7277814241002263370,"namespace":"default","id":"simple-replication-task","sink_uri":"pulsar://127.0.0.1:6650/consumer-test?protocol=canal-json","create_time":"2023-12-21T14:42:32.000904+08:00","start_ts":444203257406423044,"config":{"memory_quota":1073741824,"case_sensitive":false,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":false,"bdr_mode":false,"sync_point_interval":600000000000,"sync_point_retention":86400000000000,"filter":{"rules":["pulsar_test.*"]},"mounter":{"worker_num":16},"sink":{"protocol":"canal-json","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false,"binary_encoding_method":"base64"},"dispatchers":[{"matcher":["pulsar_test.*"],"partition":"","topic":"test_{schema}_{table}"}],"encoder_concurrency":16,"terminator":"\r\n","date_separator":"day","enable_partition_separator":true,"enable_kafka_sink_v2":false,"only_output_updated_columns":false,"delete_only_output_handle_key_columns":false,"pulsar_config":{"connection-timeout":30,"operation-timeout":30,"batching-max-messages":1000,"batching-max-publish-delay":10,"send-timeout":30},"advance_timeout":150},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"use_file_backend":false},"scheduler":{"enable_table_across_nodes":false,"region_threshold":100000,"write_key_threshold":0},"integrity":{"integrity_check_level":"none","corruption_handle_level":"warn"}},"state":"normal","creator_version":"v7.1.3","resolved_ts":444203257406423044,"checkpoint_ts":444203257406423044,"checkpoint_time":"2023-09-12 14:42:31.410"} ``` The meaning of each parameter is as follows: From b8c4acc8188375c545de21573a8070f4d62bd45c Mon Sep 17 00:00:00 2001 From: Aolin Date: Mon, 18 Dec 2023 12:24:27 +0800 Subject: [PATCH 3/5] Discard changes to ticdc/ticdc-sink-to-pulsar.md --- ticdc/ticdc-sink-to-pulsar.md | 298 ---------------------------------- 1 file changed, 298 deletions(-) delete mode 100644 ticdc/ticdc-sink-to-pulsar.md diff --git a/ticdc/ticdc-sink-to-pulsar.md b/ticdc/ticdc-sink-to-pulsar.md deleted file mode 100644 index f9b823bebfec3..0000000000000 --- a/ticdc/ticdc-sink-to-pulsar.md +++ /dev/null @@ -1,298 +0,0 @@ ---- -title: Replicate Data to Pulsar -summary: Learn how to replicate data to Pulsar using TiCDC. ---- - -# Replicate Data to Pulsar - -This document describes how to create a changefeed that replicates incremental data to Pulsar using TiCDC. - -## Create a replication task to replicate incremental data to Pulsar - -Create a replication task by running the following command: - -```shell -cdc cli changefeed create \ - --server=http://127.0.0.1:8300 \ ---sink-uri="pulsar://127.0.0.1:6650/persistent://public/default/yktest?protocol=canal-json" \ ---config=./t_changefeed.toml \ ---changefeed-id="simple-replication-task" -``` - -```shell - -Create changefeed successfully! -ID: simple-replication-task -Info: {"upstream_id":7277814241002263370,"namespace":"default","id":"simple-replication-task","sink_uri":"pulsar://127.0.0.1:6650/consumer-test?protocol=canal-json","create_time":"2023-12-21T14:42:32.000904+08:00","start_ts":444203257406423044,"config":{"memory_quota":1073741824,"case_sensitive":false,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":false,"bdr_mode":false,"sync_point_interval":600000000000,"sync_point_retention":86400000000000,"filter":{"rules":["pulsar_test.*"]},"mounter":{"worker_num":16},"sink":{"protocol":"canal-json","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false,"binary_encoding_method":"base64"},"dispatchers":[{"matcher":["pulsar_test.*"],"partition":"","topic":"test_{schema}_{table}"}],"encoder_concurrency":16,"terminator":"\r\n","date_separator":"day","enable_partition_separator":true,"enable_kafka_sink_v2":false,"only_output_updated_columns":false,"delete_only_output_handle_key_columns":false,"pulsar_config":{"connection-timeout":30,"operation-timeout":30,"batching-max-messages":1000,"batching-max-publish-delay":10,"send-timeout":30},"advance_timeout":150},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"use_file_backend":false},"scheduler":{"enable_table_across_nodes":false,"region_threshold":100000,"write_key_threshold":0},"integrity":{"integrity_check_level":"none","corruption_handle_level":"warn"}},"state":"normal","creator_version":"v7.1.3","resolved_ts":444203257406423044,"checkpoint_ts":444203257406423044,"checkpoint_time":"2023-09-12 14:42:31.410"} -``` - -The meaning of each parameter is as follows: - -- `--server`: the address of a TiCDC server in the TiCDC cluster. -- `--changefeed-id`: the ID of the replication task. The format must match the regular expression `^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$`. If the ID is not specified, TiCDC automatically generates a UUID (in the version 4 format) as the ID. -- `--sink-uri`: the downstream address of the replication task. See [Use Sink URI to configure Pulsar](#sink-uri). -- `--start-ts`: the start TSO of the changefeed. The TiCDC cluster starts pulling data from this TSO. The default value is the current time. -- `--target-ts`: the target TSO of the changefeed. The TiCDC cluster stops pulling data at this TSO. It is empty by default, which means that TiCDC does not automatically stop pulling data. -- `--config`: the changefeed configuration file. See [TiCDC changefeed configuration parameters](/ticdc/ticdc-changefeed-config.md). - -## Use Sink URI and changefeed config to configure Pulsar - -You can use Sink URI to specify the connection information for the TiCDC target system, and use changefeed config to configure parameters related to Pulsar. - -### Sink URI - -A Sink URI follows the following format: - -```shell -[scheme]://[userinfo@][host]:[port][/path]?[query_parameters] -``` - -Configuration example 1: - -```shell ---sink-uri="pulsar://127.0.0.1:6650/persistent://abc/def/yktest?protocol=canal-json" -``` - -Configuration example 2: - -```shell ---sink-uri="pulsar://127.0.0.1:6650/yktest?protocol=canal-json" -``` - -The configurable parameters in a URI are as follows: - -| Parameter | Description | -| :------------------ | :------------------------------------------------------------ | -| `127.0.0.1` | The IP address by which the downstream Pulsar provides service. | -| `6650` | The connection port for the downstream Pulsar. | -| `persistent://abc/def/yktest` | As shown in the preceding configuration example 1, this parameter is used to specify the tenant, namespace, and topic of Pulsar. | -| `yktest` | As shown in the preceding configuration example 2, if the topic you want to specify is in the default namespace `default` of the default tenant `public` in Pulsar, you can configure the URI with just the topic name, for example, `yktest`. This is equivalent to specifying the topic as `persistent://public/default/yktest`. | - -### Changefeed config parameters - -The following are examples of changefeed config parameters: - -```toml -[sink] -# `dispatchers` is used to specify matching rules. -# Note: When the downstream MQ is Pulsar, if the routing rule for `partition` is not specified as any of `ts`, `index-value`, `table`, or `default`, each Pulsar message will be routed using the string you set as the key. -# For example, if you specify the routing rule for a matcher as the string `code`, then all Pulsar messages that match that matcher will be routed with `code` as the key. -# dispatchers = [ -# {matcher = ['test1.*', 'test2.*'], topic = "Topic expression 1", partition = "ts" }, -# {matcher = ['test3.*', 'test4.*'], topic = "Topic expression 2", partition = "index-value" }, -# {matcher = ['test1.*', 'test5.*'], topic = "Topic expression 3", partition = "table"}, -# {matcher = ['test6.*'], partition = "default"}, -# {matcher = ['test7.*'], partition = "test123"} -# ] - -# `protocol` is used to specify the protocol format for encoding messages. -# When the downstream is Pulsar, the protocol can only be canal-json. -# protocol = "canal-json" - -# The following parameters only take effect when the downstream is Pulsar. -[sink.pulsar-config] -# Authentication on the Pulsar server is done using a token. Specify the value of the token. -authentication-token = "xxxxxxxxxxxxx" -# When you use a token for Pulsar server authentication, specify the path to the file where the token is located. -token-from-file="/data/pulsar/token-file.txt" -# Pulsar uses the basic account and password to authenticate the identity. Specify the account. -basic-user-name="root" -# Pulsar uses the basic account and password to authenticate the identity. Specify the password. -basic-password="password" -# The certificate path for Pulsar TLS encrypted authentication. -auth-tls-certificate-path="/data/pulsar/certificate" -# The private key path for Pulsar TLS encrypted authentication. -auth-tls-private-key-path="/data/pulsar/certificate.key" -# Path to trusted certificate file of the Pulsar TLS encrypted authentication. -tls-trust-certs-file-path="/data/pulsar/tls-trust-certs-file" -# Pulsar oauth2 issuer-url. For more information, see the Pulsar website: https://pulsar.apache.org/docs/2.10.x/client-libraries-go/#tls-encryption-and-authentication -oauth2.oauth2-issuer-url="https://xxxx.auth0.com" -# Pulsar oauth2 audience -oauth2.oauth2-audience="https://xxxx.auth0.com/api/v2/" -# Pulsar oauth2 private-key -oauth2.oauth2-private-key="/data/pulsar/privateKey" -# Pulsar oauth2 client-id -oauth2.oauth2-client-id="0Xx...Yyxeny" -# Pulsar oauth2 oauth2-scope -oauth2.oauth2-scope="xxxx" -# The number of cached Pulsar producers in TiCDC. The value is 10240 by default. Each Pulsar producer corresponds to one topic. If the number of topics you need to replicate is larger than the default value, you need to increase the number. -pulsar-producer-cache-size=10240 -# Pulsar data compression method. No compression is used by default. Optional values are "lz4", "zlib", and "zstd". -compression-type="" -# The timeout for the Pulsar client to establish a TCP connection with the server. The value is 5 seconds by default. -connection-timeout=5 -# The timeout for Pulsar clients to initiate operations such as creating and subscribing to a topic. The value is 30 seconds by default. -operation-timeout=30 -# The maximum number of messages in a single batch for a Pulsar producer to send. The value is 1000 by default. -batching-max-messages=1000 -# The interval at which Pulsar producer messages are saved for batching. The value is 10 milliseconds by default. -batching-max-publish-delay=10 -# The timeout for a Pulsar producer to send a message. The value is 30 seconds by default. -send-timeout=30 -``` - -### Best practice - -* You need to specify the `protocol` parameter when creating a changefeed. Currently, only the `canal-json` protocol is supported for replicating data to Pulsar. -* The `pulsar-producer-cache-size` parameter indicates the number of producers cached in the Pulsar client. Because each producer in Pulsar can only correspond to one topic, TiCDC adopts the LRU method to cache producers, and the default limit is 10240. If the number of topics you need to replicate is larger than the default value, you need to increase the number. - -### TiCDC authentication and authorization for Pulsar - -The following is a sample configuration when you use token authentication with Pulsar: - -- Token - - Sink URI: - - ```shell - --sink-uri="pulsar://127.0.0.1:6650/persistent://public/default/yktest?protocol=canal-json" - ``` - - Config parameter: - - ```shell - [sink.pulsar-config] - authentication-token = "xxxxxxxxxxxxx" - ``` - -- Token from file - - Sink URI: - - ```shell - --sink-uri="pulsar://127.0.0.1:6650/persistent://public/default/yktest?protocol=canal-json" - ``` - - Config parameter: - - ```toml - [sink.pulsar-config] - # Pulsar uses tokens for authentication on the Pulsar server. Specify the path to the token file, which will be read from the TiCDC server. - token-from-file="/data/pulsar/token-file.txt" - ``` - -- TLS encrypted authentication - - Sink URI: - - ```shell - --sink-uri="pulsar+ssl://127.0.0.1:6650/persistent://public/default/yktest?protocol=canal-json" - ``` - - Config parameters: - - ```toml - [sink.pulsar-config] - # Certificate path of the Pulsar TLS encrypted authentication - auth-tls-certificate-path="/data/pulsar/certificate" - # Private key path of the Pulsar TLS encrypted authentication - auth-tls-private-key-path="/data/pulsar/certificate.key" - # Path to trusted certificate file of the Pulsar TLS encrypted authentication - tls-trust-certs-file-path="/data/pulsar/tls-trust-certs-file" - ``` - -- OAuth2 authentication - - Sink URI: - - ```shell - --sink-uri="pulsar+ssl://127.0.0.1:6650/persistent://public/default/yktest?protocol=canal-json" - ``` - - Config parameters: - - ```toml - [sink.pulsar-config] - # Pulsar oauth2 issuer-url. For more information, see the Pulsar website: https://pulsar.apache.org/docs/2.10.x/client-libraries-go/#oauth2-authentication - oauth2.oauth2-issuer-url="https://xxxx.auth0.com" - # Pulsar oauth2 audience - oauth2.oauth2-audience="https://xxxx.auth0.com/api/v2/" - # Pulsar oauth2 private-key - oauth2.oauth2-private-key="/data/pulsar/privateKey" - # Pulsar oauth2 client-id - oauth2.oauth2-client-id="0Xx...Yyxeny" - # Pulsar oauth2 oauth2-scope - oauth2.oauth2-scope="xxxx" - ``` - -## Customize the dispatching rules for topics and partitions in Pulsar Sink - -### Matching rules for Matcher - -Take the `dispatchers` configuration item in the following sample configuration file as an example: - -```toml -[sink] -dispatchers = [ - {matcher = ['test1.*', 'test2.*'], topic = "Topic expression 1", partition = "ts" }, - {matcher = ['test3.*', 'test4.*'], topic = "Topic expression 2", partition = "index-value" }, - {matcher = ['test1.*', 'test5.*'], topic = "Topic expression 3", partition = "table"}, - {matcher = ['test6.*'], partition = "default"}, - {matcher = ['test7.*'], partition = "test123"} -] -``` - -- The tables that match a matcher rule are dispatched according to the policy specified by the corresponding topic expression. For example, the table `test3.aa` is dispatched according to `Topic expression 2`, and the table `test5.aa` is dispatched according to `Topic expression 3`. -- For a table that matches more than one matcher rule, it is dispatched according to the first matching topic expression. For example, the table `test1.aa` is dispatched according to `Topic expression 1`. -- For tables that do not match any matcher, the corresponding data change events are sent to the default topic specified in `-sink-uri`. For example, the table `test10.aa` is sent to the default topic. -- For tables that match the matcher rule but do not have a topic dispatcher specified, the corresponding data changes are sent to the default topic specified in `-sink-uri`. For example, the table `test6.abc` is sent to the default topic. - -### Topic dispatcher - -You can use `topic = "xxx"` to specify a topic dispatcher and use topic expressions to implement flexible topic dispatching policies. It is recommended that the total number of topics be less than 1000. - -The format of a topic expression is `[prefix]{schema}[middle][{table}][suffix]`. The following are the meanings of each part: - -- `prefix`: Optional. Represents the prefix of the topic name. -- `{schema}`: Optional. Represents the database name. -- `middle`: Optional. Represents the separator between a database name and a table name. -- `{table}`: Optional. Represents the table name. -- `suffix`: Optional. Represents the suffix of the topic name. - -`prefix`, `middle`, and `suffix` only support uppercase and lowercase letters (`a-z`, `A-Z`), numbers (`0-9`), dots (`.`), underscores (`_`), and hyphens (`-`). `{schema}` and `{table}` must be lowercase. Placeholders such as `{Schema}` and `{TABLE}` that contain uppercase letters are invalid. - -The following are some examples: - -- `matcher = ['test1.table1', 'test2.table2'], topic = "hello_{schema}_{table}"` - - Data change events corresponding to the table `test1.table1` are despatched to a topic named `hello_test1_table1`. - - Data change events corresponding to the table `test2.table2` are despatched to a topic named `hello_test2_table2`. - -- `matcher = ['test3.*', 'test4.*'], topic = "hello_{schema}_world"` - - Data change events for all tables under `test3` are despatched to a topic named `hello_test3_world`. - - Data change events for all tables under `test4` are despatched to a topic named `hello_test4_world`. - -- `matcher = ['*.*'], topic = "{schema}_{table}"` - - For all tables that TiCDC listens on, they are despatched to separate topics according to the `databaseName_tableName` rule. For example, for the table `test.account`, TiCDC despatches its data change log to a topic named `test_account`. - -### Dispatch DDL events - -#### Database-level DDL events - -DDL statements such as `CREATE DATABASE` and `DROP DATABASE` that are not related to a specific table are called database-level DDL statements. Events corresponding to database-level DDL statements are dispatched to the default topic specified in `--sink-uri`. - -#### Table-level DDL events - -DDL statements such as `ALTER TABLE` and `CREATE TABLE` that are related to a specific table are called table-level DDL statements. Events corresponding to table-level DDL statements are dispatched to an appropriate topic according to the configuration of `dispatchers`. - -For example, for a `dispatchers` configuration like `matcher = ['test.*'], topic = {schema}_{table}`, the DDL events are despatched as follows: - -- If a DDL event only involves a single table, the DDL event is dispatched to the appropriate topic as it is. For example, for the DDL event `DROP TABLE test.table1`, the event is dispatched to the topic named `test_table1`. - -- If a DDL event involves more than one table (`RENAME TABLE`, `DROP TABLE`, and `DROP VIEW` might all involve more than one table), the single DDL event is split into multiple ones and dispatched to appropriate topics. For example, for the DDL event `RENAME TABLE test.table1 TO test.table10, test.table2 TO test.table20`, the processing is as follows: - - - Dispatch the DDL event for `RENAME TABLE test.table1 TO test.table10` to a topic named `test_table1`. - - Dispatch the DDL event for `RENAME TABLE test.table2 TO test.table20` to a topic named `test_table2`. - -### Partition dispatcher - -Currently, TiCDC only supports consumers to consume messages using the exclusive subscription model, that is, each consumer can consume messages from all partitions in a topic. - -You can specify a partition dispatcher with `partition = "xxx"`. The following partition dispatches are supported: `default`, `ts`, `index-value`, and `table`. If you fill in any other string, TiCDC will pass that string as the `key` of the message in the messages sent to the Pulsar server. - -The dispatching rules are as follows: - -- `default`: By default, events are dispatched by the schema name and table name, which is the same as when `table` is specified. -- `ts`: Use commitTs of row changes to perform hash calculation and dispatch events. -- `index-value`: Use the value of the table primary key or unique index to perform hash calculation and dispatch events. -- `table`: Use the schema name and table name to perform hash calculation and dispatch events. -- Other self-defined string: The self-defined string is used directly as the key for the Pulsar message, and the Pulsar producer uses this key value for dispatching. From 5d2d32d4b1285f0a1a9d94d065c8e2cac270a06c Mon Sep 17 00:00:00 2001 From: Aolin Date: Mon, 18 Dec 2023 12:25:09 +0800 Subject: [PATCH 4/5] resolve conflicts --- ticdc/ticdc-manage-changefeed.md | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/ticdc/ticdc-manage-changefeed.md b/ticdc/ticdc-manage-changefeed.md index 2466c8a706541..74f748d482eda 100644 --- a/ticdc/ticdc-manage-changefeed.md +++ b/ticdc/ticdc-manage-changefeed.md @@ -89,12 +89,8 @@ cdc cli changefeed query --server=http://10.0.10.25:8300 --changefeed-id=simple- "sort-engine": "unified", "sort-dir": ".", "config": { -<<<<<<< HEAD - "case-sensitive": true, - "enable-old-value": false, -======= "case-sensitive": false, ->>>>>>> 883ffcba7b (Update ticdc changefeed filter case_sensitive default value (#15360)) + "enable-old-value": false, "filter": { "rules": [ "*.*" From 3788ac80b597aea1a41d836953a01369266b4eb7 Mon Sep 17 00:00:00 2001 From: Aolin Date: Mon, 18 Dec 2023 12:34:50 +0800 Subject: [PATCH 5/5] Apply suggestions from code review --- ticdc/ticdc-changefeed-config.md | 2 +- ticdc/ticdc-open-api-v2.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ticdc/ticdc-changefeed-config.md b/ticdc/ticdc-changefeed-config.md index ff677c6d69586..d54ffb323bffa 100644 --- a/ticdc/ticdc-changefeed-config.md +++ b/ticdc/ticdc-changefeed-config.md @@ -43,7 +43,7 @@ This section introduces the configuration of a replication task. # memory-quota = 1073741824 # Specifies whether the database names and tables in the configuration file are case-sensitive. -# Starting from v7.1.3, the default value changes from true to false. +# Starting from v6.5.6 and v7.1.3, the default value changes from true to false. # This configuration item affects configurations related to filter and sink. case-sensitive = false diff --git a/ticdc/ticdc-open-api-v2.md b/ticdc/ticdc-open-api-v2.md index 16c137e22779d..ef5f449af2937 100644 --- a/ticdc/ticdc-open-api-v2.md +++ b/ticdc/ticdc-open-api-v2.md @@ -266,7 +266,7 @@ The descriptions of the `replica_config` parameters are as follows. | Parameter name | Description | | :------------------------ | :----------------------------------------------------- | | `bdr_mode` | `BOOLEAN` type. Determines whether to enable [bidirectional replication](/ticdc/ticdc-bidirectional-replication.md). The default value is `false`. (Optional) | -| `case_sensitive` | `BOOLEAN` type. Determines whether to be case-sensitive when filtering table names. Starting from v7.1.3, the default value changes from `true` to `false`. (Optional) | +| `case_sensitive` | `BOOLEAN` type. Determines whether to be case-sensitive when filtering table names. Starting from v6.5.6 and v7.1.3, the default value changes from `true` to `false`. (Optional) | | `check_gc_safe_point` | `BOOLEAN` type. Determines whether to check that the start time of the replication task is earlier than the GC time. The default value is `true`. (Optional) | | `consistent` | The configuration parameters of redo log. (Optional) | | `enable_old_value` | `BOOLEAN` type. Determines whether to output the old value (that is, the value before the update). The default value is `true`. (Optional) |