From ed07e2c3dfda397210cb386ab97ba5eb79971886 Mon Sep 17 00:00:00 2001 From: Ken Chen Date: Mon, 28 Jul 2025 15:35:36 -0700 Subject: [PATCH 01/13] refine --- docs/proton-kafka.md | 564 +++++++++++++++++------------------ docs/working-with-streams.md | 52 +++- 2 files changed, 316 insertions(+), 300 deletions(-) diff --git a/docs/proton-kafka.md b/docs/proton-kafka.md index 7431377e..2e9bc5cb 100644 --- a/docs/proton-kafka.md +++ b/docs/proton-kafka.md @@ -1,10 +1,12 @@ # Kafka External Stream -You can read data from Apache Kafka (as well as Confluent Cloud, or Redpanda) in Timeplus with [External Stream](/external-stream). Combining with [Materialized View](/view#m_view) and [Target Stream](/view#target-stream), you can also write data to Apache Kafka with External Stream. +Timeplus allows users to **read from** and **write to** Apache Kafka (and compatible platforms like **Confluent Cloud** and **Redpanda**) using **Kafka External Streams**. + +By combining external streams with [Materialized Views](/view#m_view) and [Target Streams](/view#target-stream), users can build robust **real-time streaming pipelines**. ## Tutorial with Docker Compose {#tutorial} -Please check the tutorials: +Explore the following hands-on tutorials: - [Query Kafka with SQL](/tutorial-sql-kafka) - [Streaming JOIN](/tutorial-sql-join) @@ -12,14 +14,10 @@ Please check the tutorials: ## CREATE EXTERNAL STREAM -In Timeplus Proton, the external stream supports Kafka API as the only type. - -In Timeplus Enterprise, it also supports [External Stream for Apache Pulsar](/pulsar-external-stream), and [External Stream for other Timeplus deployment](/timeplus-external-stream). - -To create an external stream for Apache Kafka or Kafka-compatible messaging platforms, you can run the following DDL SQL: +Use the following SQL command to create a Kafka external stream: ```sql -CREATE EXTERNAL STREAM [IF NOT EXISTS] stream_name +CREATE EXTERNAL STREAM [IF NOT EXISTS] ( ) SETTINGS type='kafka', -- required @@ -41,57 +39,68 @@ SETTINGS properties='..' ``` -:::info +### Settings -For examples to connect to various Kafka API compatitable message platforms, please check [this doc](/tutorial-sql-connect-kafka). +#### type -::: -### DDL Settings +Must be set to `kafka`. Compatible with: -#### type -Need to be `kafka`. This works with Apache Kafka, Redpanda, Confluent Platform or Cloud,and many other Kafka compatible message platforms. +* Apache Kafka +* Confluent Platform or Cloud +* Redpanda +* Other Kafka-compatible systems #### brokers -One or more brokers with ports. + +Comma-separated list of broker addresses (host\:port), e.g.: + +``` +kafka1:9092,kafka2:9092,kafka3:9092 +``` #### topic -One external stream will connect to one topic. + +Kafka topic name to connect to. #### security_protocol + The supported values for `security_protocol` are: - PLAINTEXT: when this option is omitted, this is the default value. - SASL_SSL: when this value is set, username and password should be specified. - - If you need to specify own SSL certification file, add another setting `ssl_ca_cert_file='/ssl/ca.pem'` New in Proton 1.5.5, you can also put the full content of the pem file as a string in the `ssl_ca_pem` setting if you don't want to, or cannot use a file path, such as on Timeplus Enterprise or in Docker/Kubernetes environments. - - Skipping the SSL certification verification can be done via `SETTINGS skip_ssl_cert_check=true`. + - If users need to specify own SSL certification file, add another setting `ssl_ca_cert_file='/ssl/ca.pem'`. Users can also put the full content of the pem file as a string in the `ssl_ca_pem` setting. + - To skip the SSL certification verification: `skip_ssl_cert_check=true`. #### sasl_mechanism + The supported values for `sasl_mechanism` are: -- PLAIN: when you set security_protocol to SASL_SSL, this is the default value for sasl_mechanism. +- PLAIN: when setting security_protocol to SASL_SSL, this is the default value for sasl_mechanism. - SCRAM-SHA-256 - SCRAM-SHA-512 -- AWS_MSK_IAM: this is available since Timeplus Enterprise 2.7.0 and Proton 1.6.12. Set to this value if you are using AWS MSK with IAM authentication and the EC2 instance or Kubernetes pod has the proper IAM role to access the Kafka topic. +- AWS_MSK_IAM (for AWS MSK IAM role-based access) -#### username -Required when `sasl_mechanism` is set to value other than `PLAIN` or `AWS_MSK_IAM`. +#### username / password -#### password -Required when `sasl_mechanism` is set to value other than `PLAIN` or `AWS_MSK_IAM`. +Required when `sasl_mechanism` is used. -Since [Timeplus Enterprise v2.7](/enterprise-v2.7), you can also use the [config_file](#config_file) setting to specify the username and password in a separate file. +Alternatively, use [`config_file`](#config_file) to securely pass credentials. #### config_file -The `config_file` setting is available since Timeplus Enterprise 2.7. You can specify the path to a file that contains the Kafka configuration settings. The file should be in the format of `key=value` pairs, one pair per line. For example: + +Use this to point to a file containing key-value config lines for Kafka external stream, e.g.: ```properties username=my_username password=my_password +data_format='Avro' +one_message_per_row=true ``` -Not just for username and password, you can also put other Kafka settings in the file. Avoid defining the value both in the `config_file` and in the DDL. -If you manage Kubernetes secrets using HashiCorp Vault, you can use the [Vault Agent Injector](https://learn.hashicorp.com/tutorials/vault/kubernetes-sidecar) to mount the secrets to the pod and use the `config_file` setting to specify the path to the file. For example, you create the following annotation to inject the secrets as a local file: +This is especially useful in Kubernetes environments with secrets managed via [HashiCorp Vault](https://learn.hashicorp.com/tutorials/vault/kubernetes-sidecar). + +**HarshCorp Vault injection example:** ```yaml annotations: @@ -106,209 +115,302 @@ annotations: vault.hashicorp.com/role: "vault-role" ``` +:::info + +Please note values in settings in the DDL will override those in config_file and it will only merge the settings from the config_file which are not explicitly sepcified in the DDL. + +::: + + #### data_format -The supported values for `data_format` are: -- JSONEachRow: parse each row of the message as a single JSON document. The top level JSON key/value pairs will be parsed as the columns. [Learn More](#jsoneachrow). -- CSV: less commonly used. [Learn More](#csv). -- TSV: similar to CSV but tab as the separator -- ProtobufSingle: for single Protobuf message per message -- Protobuf: there could be multiple Protobuf messages in a single message. -- Avro: added in Proton 1.5.2 -- RawBLOB: the default value. Read/write message as plain text. +Defines how Kafka messages are parsed and written. Supported formats are + +| Format | Description | +| ---------------- | ---------------------------------------- | +| `JSONEachRow` | Parses one JSON document per line | +| `CSV` | Parses comma-separated values | +| `TSV` | Like CSV, but tab-delimited | +| `ProtobufSingle` | One Protobuf message per Kafka message | +| `Protobuf` | Multiple Protobuf messages per Kafka msg | +| `Avro` | Avro-encoded messages | +| `RawBLOB` | Raw text, no parsing (default) | #### format_schema -Required if `data_format` is set to `ProtobufSingle`, `Protobuf` or `Avro`. + +Required for these data formats: + +* `ProtobufSingle` +* `Protobuf` +* `Avro` #### one_message_per_row -If the external stream is used to write data to a Kafka topic and the `data_format` is set to `JSONEachRow`, setting this to `true` to make sure each Kafka message only contains one JSON document. + +Set to `true` to ensure each Kafka message maps to exactly **one JSON document**, especially when writing with `JSONEachRow`. #### kafka_schema_registry_url -Set to the address of Kafka Schema Registry server. `http` or `https` need to be included. [Learn more](/proton-schema-registry) for this setting and other settings with `kafka_schema_` as the prefix. + +URL of the [Kafka Schema Registry](/proton-schema-registry), including the protocol is required (`http://` or `https://`). #### kafka_schema_registry_credentials -Set in the 'username:password' format. [Learn more](/proton-schema-registry). -#### ssl_ca_cert_file -Set to the path of the CA certificate file. +Credentials for the registry, in `username:password` format. + +#### ssl_ca_cert_file / ssl_ca_pem + +Use either: + +* `ssl_ca_cert_file='/path/to/cert.pem'` +* `ssl_ca_pem='-----BEGIN CERTIFICATE-----\n...'` -#### ssl_ca_pem -Set to the content of the CA certificate file. Usually starts with `-----BEGIN CERTIFICATE-----`. +#### `skip_ssl_cert_check` -#### skip_ssl_cert_check -Default to `false`. Set to `true` to skip the SSL certificate check and don't specify the CA certificate file or content. +* Default: `false` +* Set to `true` to **bypass SSL verification**. #### properties -For more advanced use cases, you can specify customized properties while creating the external streams. Those properties will be passed to the underlying Kafka client, which is [librdkafka](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). Please refer to the [section](#advanced_settings) in the bottom of this page for more details. +Used for advanced configurations. These settings are passed directly to the Kafka client ([librdkafka config options](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)) to fine tune the Kafka producer, consumer or topic behaviors. + +For more, see the [Advanced Settings](#advanced_settings) section. ## Read Data in Kafka -### Read messages in a single column {#single_col_read} -If the message in Kafka topic is in plain text format or JSON, you can create an external stream with only a `raw` column in `string` type. +Timeplus supports read Kafka message in different data formats: raw as string, CSV, TSV, JSON, Protobuf, Avro etc. -Example: +### Read as String + +If the Kafka messages contain unstructure text data, or there are no builtin formats to parse the message, users can read Kafka message as string. +Example: ```sql CREATE EXTERNAL STREAM ext_github_events (raw string) SETTINGS type='kafka', brokers='localhost:9092', - topic='github_events' + topic='application_logs' +``` + +Then we can use other functions like regex string processing or JSON extract etc functions to further process the raw string. Consuming the Kafka message as it is is very useful for debugging as well. + +Here is one plain text application log parsing example by using regex functions: +```sql +SELECT + to_time(extract(raw, '^(\\d{4}\\.\\d{2}\\.\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d+)')) AS timestamp, + extract(raw, '} <(\\w+)>') AS level, + extract(raw, '} <\\w+> (.*)') AS message +FROM application_logs; ``` -Then use query time [JSON extraction functions](/functions_for_json) or shortcut to access the values, e.g. `raw:id`. +### Read JSON Kafka Message -### Read messages as multiple columns{#multi_col_read} +If Kafka message contains JSON text, users can two ways to process the data +1. Read the text as plain string and then use JSON extract function or the shortcuts syntax to parse the json fields as columns manually +2. Use `JSONEachRow` data format to parse the JSON fields automatically -If the keys in the JSON message never change, or you don't care about the new columns, you can also create the external stream with multiple columns. +Assuming Kafka message contains JSON text with this schema -You can pick up some top level keys in the JSON as columns, or all possible keys as columns. +```json +{ + "actor": string, + "created_at": timestamp, + "id": string, + "payload": string, + "repo": string, + "type": string +} +``` -Please note the behaviors are changed in recent versions, based on user feedback: +To pase the JSON fields, we have the following methods. -| Version | Default Behavior | How to overwrite | -| --------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------- | -| 1.4.2 or above | Say there are 5 top level key/value pairs in JSON, you can define 5 or less than 5 columns in the external stream. Data will be read properly. | If you don't want to read new events with unexpected columns, set `input_format_skip_unknown_fields=false` in the `CREATE` DDL. | -| 1.3.24 to 1.4.1 | Say there are 5 top level key/value pairs in JSON, you can need to define 5 columns to read them all. Or define less than 5 columns in the DDL, and make sure to add `input_format_skip_unknown_fields=true` in each `SELECT` query settings, otherwise no search result will be returned. | In each `SELECT` query, you can specify the setting `input_format_skip_unknown_fields=true\|false`. | -| 1.3.23 or older | You have to define a single `string` column for the entire JSON document and apply query time JSON parsing to extract fields. | N/A | +#### Use JSON extract functions -Example: +First define a simple Kafka external stream to read the message as string ```sql -CREATE EXTERNAL STREAM ext_github_events - (actor string, - created_at string, - id string, - payload string, - repo string, - type string - ) +CREATE EXTERNAL STREAM ext_json_raw + (raw string) SETTINGS type='kafka', brokers='localhost:9092', - topic='github_events', - data_format='JSONEachRow' + topic='github_events' ``` -If there are nested complex JSON in the message, you can define the column as a string type. Actually any JSON value can be saved in a string column. +And then use JSON extract function or shortcut syntaxes to extract the JSON fields. -:::info - -Protobuf messages can be read with all or partial columns. Please check [this page](/proton-format-schema). - -::: +```sql +SELECT + raw:actor AS actor, + raw:created_at::datetime64(3, 'UTC') AS created_at, + raw:id AS id, + raw:payload AS payload, + raw:repo AS repo, + raw:type AS type +FROM + ext_json_raw; +``` -### Query Settings +This method is most flexible and can handle dynamic JSON text with new fields or missing fields and it can also extract nested JSON fields. -#### shards +#### Use JSONEachRow data format -Starting from Proton 1.3.18, you can also read in specified Kafka partitions. By default, all partitions will be read. But you can also read from a single partition via the `shards` setting, e.g. +Define a Kafka external stream with columns which are mapped to the JSON fields and also specify the `data_format` as `JSONEachRow`. ```sql -SELECT raw FROM ext_stream SETTINGS shards='0' +CREATE EXTERNAL STREAM ext_json_parsed + ( + actor string, + created_at datetime64(3, 'UTC'), + id string, + payload string, + repo string, + type string + ) +SETTINGS type='kafka', + brokers='localhost:9092', + topic='github_events', + data_format='JSONEachRow' ``` -Or you can specify a set of partition ID, separated by comma, e.g. - -```sql -SELECT raw FROM ext_stream SETTINGS shards='0,2' -``` +When users query the `ext_json_parsed` stream, the JSON fields will be parsed and casted to the target column type automatically. -### Read existing messages {#rewind} +This method is most convient when the JSON text is in stable schema and can be used to extract JSON fields at top level. -When you run `SELECT raw FROM ext_stream `, Timeplus will read the new messages in the topics, not the existing ones. +### Read as CSV -#### seek_to -If you need to read all existing messages, you can use the following settings: +Similar to data format `JSONEachRow`, users can read Kafka message in CSV format. -```sql -SELECT raw FROM ext_stream SETTINGS seek_to='earliest' +``` +CREATE EXTERNAL STREAM ext_json_parsed + ( + actor string, + created_at datetime64(3, 'UTC'), + id string, + payload string, + repo string, + type string + ) +SETTINGS type='kafka', + brokers='localhost:9092', + topic='csv_topic', + data_format='CSV'; ``` -Or the following SQL if you are running Proton 1.5.9 or above: +### Read as TSV -```sql -SELECT raw FROM table(ext_stream) WHERE ... -``` +Similar to reading data in data format CSV, it is tab separated. -:::warning -Please avoid scanning all data via `select * from table(ext_stream)`. However `select count(*) from table(ext_stream)` is optimized to get the number of current message count from the Kafka topic. -::: +### Read Avro Kafka Message -### Virtual Columns +To read Avro-encoded Kafka message, please refer to [Avro Schema](/proton-format-schema) and [Avro Schema Registry](/proton-schema-registry) for details. -Besides the message body, Timeplus provides several virtual columns for each message in the Kafka topic. +### Read Protobuf Kafka Message -#### _tp_time +To read Protobuf-encoded Kafka message, please refer to [Protobuf Schema](/proton-format-schema) and [Protobuf Schema Registry](/proton-schema-registry) for details. -You can read the timestamp of the message via `_tp_time`, e.g. -```sql -SELECT _tp_time, raw FROM foo; -``` +### Read Kafka Message Metadata -Starting from Timeplus Enterprise 2.8.1, you can also specify the message timestamp by set a value to the `_tp_time` column. +Besides the message body, Timeplus provides several `virtual columns` which map to the metadata of each Kafka message. -#### _tp_message_key +#### _tp_time -Starting from Timeplus Enterprise 2.4, you can define the `_tp_message_key` column to read or write the message key in the preferred format. +`_tp_time` is mapped to the timestamp of a Kafka message in Kafka external stream which has `datetime64(3, 'UTC')` column type. When reading from a Kafka topic, the Kafka message timestamp will be set to `_tp_time` automatically. For example: ```sql -CREATE EXTERNAL STREAM foo ( - id int32, - name string, - _tp_message_key string -) SETTINGS type='kafka',...; +SELECT _tp_time, raw FROM foo; ``` -When doing a SELECT query, the message key will be populated to the `_tp_message_key` column. -`SELECT * FROM foo` will return `'some-key'` for the `_tp_message_key` message. +When writing to a Kafka topic, if `_tp_time` exists in the schema, it will be set to the timetamps of the Kafka message automatically. -`_tp_message_key` support the following types: `uint8`, `uint16`, `uint32`, `uint64`, `int8`, `int16`, `int32`, `int64`, `bool`, `float32`, `float64`, `string`, and `fixed_string`. +#### _tp_message_key + +`_tp_message_key` is mapped to the key of a Kafka message in Kafka external stream which as `string` column type. When reading from a Kafka topic, the Kafka message key will be set to `_tp_message_key` automatically. -`_tp_message_key` also support `nullable`. Thus we can create an external stream with optional message key. For example: +For example: ```sql -CREATE EXTERNAL STREAM foo ( - id int32, - name string, - _tp_message_key nullable(string) default null -) SETTINGS type='kafka',...; +SELECT _tp_message_key, raw FROM foo; ``` -For how to use `_tp_message_key` to write the message key when you insert data into the Kafka topic, please refer to [the section](#write_message_key). +Regarding how to use `_tp_message_key` to write the message key when inserting data into the Kafka topic, please refer to [write message key section](#write_message_key). #### _tp_message_headers -Starting from Timeplus Proton 1.6.11 and Timeplus Enterprise 2.7, you can read the Kafka message headers as `map(string,string)`, for example: +`_tp_message_headers` is mapped to the headers of a Kafka message in Kafka external stream, which has `map(string, string)` column type. + +For example: ```sql SELECT _tp_message_headers, raw FROM foo; ``` -To get the value for a certain key in the header, you can access it via `_tp_message_headers['key']`, for example: +To get the value for a certain key in the header, users can access it via `_tp_message_headers['key']`. + +For example: ```sql SELECT _tp_message_headers['key'], raw FROM foo; ``` #### _tp_sn -You can read the message offset of the message via `_tp_sn`, e.g. + +`_tp_sn` is mapped to the offset of a Kafka message in Kafka external stream, which has `int64` column type. + +For example: ```sql SELECT _tp_sn, raw FROM foo; ``` #### _tp_shard -You can read the partition ID of the message via `_tp_shard`, e.g. + +`tp_shard` is mapped to the partition ID of a Kafka message in Kafka external stream, which has `int32` column type. + +For example: ```sql SELECT _tp_shard, raw FROM foo; ``` +### Query Settings + +#### Read Specific Kafka Partitions + + With `shards` setting, users can query specified Kafka partitions. By default, all partitions will be read. e.g. + +```sql +SELECT raw FROM ext_stream SETTINGS shards='0' +``` + +Or specify a set of partition ID, separated by comma, e.g. + +```sql +SELECT raw FROM ext_stream SETTINGS shards='0,2' +``` + +#### Rewind via seek_to + +When users run `SELECT raw FROM ext_stream `, Timeplus will read only the new messages in the topic. + +Users can use `seek_to` query setting to rewind to a specific offset or timestamp per partition. + +Rewind to earliest offsets for all partitions. +```sql +SELECT raw FROM ext_stream SETTINGS seek_to='earliest' +``` + +Rewind to a specific offset for partitions: seek to offset 5 for partition 1, 3 for partition 2 and 11 for partition 3 respectively +``` +SELECT raw FROM ext_stream SETTINGS seek_to='5,3,11' +``` +Rewind to a specific timestamp for all partitions. +``` +SELECT raw FROM ext_stream SETTINGS seek_to='2025-01-01T00:00:00.000' +``` ## Write Data to Kafka -### Write to Kafka in Plain Text {#single_col_write} +### Write as String -You can write plain text messages to Kafka topics with an external stream with a single column. +Users can encode data as string in Kafka message and write to the Kafka topic. +For example: ```sql CREATE EXTERNAL STREAM ext_github_events (raw string) @@ -317,23 +419,17 @@ SETTINGS type='kafka', topic='github_events' ``` -Then use either `INSERT INTO VALUES (v)`, or [Ingest REST API](/proton-ingest-api), or set it as the target stream for a materialized view to write message to the Kafka topic. The actual `data_format` value is `RawBLOB` but this can be omitted. By default `one_message_per_row` is `true`. +Then use either `INSERT INTO VALUES (v)`, or [Ingest REST API](/proton-ingest-api), or set it as the target stream for a Materialized View to write message to the Kafka topic. The actual `data_format` value is `RawBLOB` but this can be omitted. By default `one_message_per_row` is `true`. :::info -Since Timeplus Proton 1.5.11, a new setting `kafka_max_message_size` is available. When multiple rows can be written to the same Kafka message, this setting will control how many data will be put in a Kafka message, ensuring it won't exceed the `kafka_max_message_size` limit. +Setting `kafka_max_message_size`. When multiple rows can be written to the same Kafka message, this setting will control how many data will be put in a Kafka message, ensuring it won't exceed the `kafka_max_message_size` limit. ::: -### Multiple columns to write to Kafka{#multi_col_write} - -To write data to Kafka topics, you can choose different data formats: - -##### RawBLOB -Write the content as pain text. +### Write as JSONEachRow -##### JSONEachRow - -You can use `data_format='JSONEachRow',one_message_per_row=true` to inform Timeplus to write each event as a JSON document. The columns of the external stream will be converted to keys in the JSON documents. For example: +Users can use `data_format='JSONEachRow',one_message_per_row=true` to inform Timeplus to encode columns in each row as a JSON document. +For example: ```sql CREATE EXTERNAL STREAM target( _tp_time datetime64(3), @@ -351,18 +447,18 @@ The messages will be generated in the specific topic as ```json { -"_tp_time":"2023-10-29 05:36:21.957" -"url":"https://www.nationalweb-enabled.io/methodologies/killer/web-readiness" -"method":"POST" -"ip":"c4ecf59a9ec27b50af9cc3bb8289e16c" + "_tp_time":"2023-10-29 05:36:21.957" + "url":"https://www.nationalweb-enabled.io/methodologies/killer/web-readiness" + "method":"POST" + "ip":"c4ecf59a9ec27b50af9cc3bb8289e16c" } ``` :::info -Please note, by default multiple JSON documents will be inserted to the same Kafka message. One JSON document each row/line. Such default behavior aims to get the maximum writing performance to Kafka/Redpanda. But you need to make sure the downstream applications are able to properly split the JSON documents per Kafka message. +Please note, by default multiple JSON documents will be inserted to the same Kafka message. One JSON document each row/line (JSONEachRow, jsonl). Such default behavior aims to get the maximum writing performance to Kafka/Redpanda. But users need to make sure the downstream applications are able to properly process the json lines. -If you need a valid JSON per each Kafka message, instead of a JSONL, please set `one_message_per_row=true` e.g. +If users need a valid JSON per each Kafka message, instead of a JSONL, please set `one_message_per_row=true` e.g. ```sql CREATE EXTERNAL STREAM target(_tp_time datetime64(3), url string, ip string) @@ -370,16 +466,16 @@ SETTINGS type='kafka', brokers='redpanda:9092', topic='masked-fe-event', data_format='JSONEachRow',one_message_per_row=true ``` -The default value of one_message_per_row, if not specified, is false for `data_format='JSONEachRow'` and true for `data_format='RawBLOB'`. - -Since Timeplus Proton 1.5.11, a new setting `kafka_max_message_size` is available. When multiple rows can be written to the same Kafka message, this setting will control how many data will be put in a Kafka message and when to create new Kafka message, ensuring each message won't exceed the `kafka_max_message_size` limit. +The default value of one_message_per_row is false for `data_format='JSONEachRow'` and true for `data_format='RawBLOB'`. +Pay attention to `kafka_max_message_size`. When multiple rows can be written to the same Kafka message, this setting will control how maximum amount of data will be batched in a single Kafka message on client side to avoid exceeding the Kafka broker's message size limit. ::: -##### CSV +### Write as CSV -You can use `data_format='CSV'` to inform Timeplus to write each event as a JSON document. The columns of the external stream will be converted to keys in the JSON documents. For example: +Users can use `data_format='CSV'` to inform Timeplus to encode columns in each row as one CSV line. +For example: ```sql CREATE EXTERNAL STREAM target( _tp_time datetime64(3), @@ -397,116 +493,24 @@ The messages will be generated in the specific topic as ```csv "2023-10-29 05:35:54.176","https://www.nationalwhiteboard.info/sticky/recontextualize/robust/incentivize","PUT","3eaf6372e909e033fcfc2d6a3bc04ace" ``` -##### TSV -Similar to CSV but tab as the separator. -##### ProtobufSingle -You can write Protobuf-encoded messages in Kafka topics. +### Write as TSV -First, you need to create a schema with SQL, e.g. -```sql -CREATE OR REPLACE FORMAT SCHEMA schema_name AS ' - syntax = "proto3"; +Similar to CSV but using tab as the separator. - message SearchRequest { - string query = 1; - int32 page_number = 2; - int32 results_per_page = 3; - } - ' TYPE Protobuf -``` -Then refer to this schema while creating an external stream for Kafka: -```sql -CREATE EXTERNAL STREAM stream_name( - query string, - page_number int32, - results_per_page int32) -SETTINGS type='kafka', - brokers='kafka:9092', - topic='masked-fe-event', - data_format='ProtobufSingle', - format_schema='schema_name:SearchRequest' -``` +### Write as ProtobufSingle -Then you can run `INSERT INTO` or use a materialized view to write data to the topic. -```sql -INSERT INTO stream_name(query,page_number,results_per_page) VALUES('test',1,100) -``` +To write Protobuf-encoded messages from Kafka topcis, please refer to [Protobuf Schema](/proton-format-schema), and [Kafka Schema Registry](/proton-schema-registry) pages for details. -You can either define the [Protobuf Schema in Proton](/proton-format-schema), or specify the [Kafka Schema Registry](/proton-schema-registry) when you create the external stream. +### Write as Avro -##### Avro -Starting from Proton 1.5.2, you can use Avro format when you specify the [Kafka Schema Registry](/proton-schema-registry) when you create the external stream. +To write Avro-encoded messages from Kafka topcis, please refer to [Avro Schema](/proton-format-schema), and [Kafka Schema Registry](/proton-schema-registry) pages for details. -First, you need to create a schema with SQL, e.g. -```sql -CREATE OR REPLACE FORMAT SCHEMA avro_schema AS '{ - "namespace": "example.avro", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]} - ] - } - ' TYPE Avro; -``` -Then refer to this schema while creating an external stream for Pulsar: -```sql -CREATE EXTERNAL STREAM stream_avro( - name string, - favorite_number nullable(int32), - favorite_color nullable(string)) -SETTINGS type='kafka', - brokers='kafka:9092', - topic='masked-fe-event', - data_format='Avro', - format_schema='avro_schema' -``` +### Write Kafka Message Metadata -Then you can run `INSERT INTO` or use a materialized view to write data to the topic. -```sql -INSERT INTO stream_avro(name,favorite_number,favorite_color) VALUES('test',1,'red') -``` - -### Continuously Write to Kafka via MV - -You can use materialized views to write data to Kafka as an external stream, e.g. - -```sql --- read the topic via an external stream -CREATE EXTERNAL STREAM frontend_events(raw string) - SETTINGS type='kafka', - brokers='redpanda:9092', - topic='owlshop-frontend-events'; - --- create the other external stream to write data to the other topic -CREATE EXTERNAL STREAM target( - _tp_time datetime64(3), - url string, - method string, - ip string) - SETTINGS type='kafka', - brokers='redpanda:9092', - topic='masked-fe-event', - data_format='JSONEachRow', - one_message_per_row=true; - --- setup the ETL pipeline via a materialized view -CREATE MATERIALIZED VIEW mv INTO target AS - SELECT now64() AS _tp_time, - raw:requestedUrl AS url, - raw:method AS method, - lower(hex(md5(raw:ipAddress))) AS ip - FROM frontend_events; -``` - -### Write to Kafka with metadata{#metadata} - -#### _tp_message_key {#write_message_key} +#### _tp_message_key -Starting from Timeplus Enterprise 2.4, you can define the `_tp_message_key` column when you create the external stream. This new approach provides more intuitive and flexible way to write any content as the message key, not necessarily mapping to a specify column or a set of columns. +If users like to populate Kafka message key when producing data to a Kafka topic, users can define the `_tp_message_key` column when creating the external stream. For example: ```sql @@ -516,17 +520,19 @@ CREATE EXTERNAL STREAM foo ( _tp_message_key string ) SETTINGS type='kafka',...; ``` -You can insert any data to the Kafka topic. When insert a row to the stream like: ```sql INSERT INTO foo(id,name,_tp_message_key) VALUES (1, 'John', 'some-key'); ``` + `'some-key'` will be used for the message key for the Kafka message (and it will be excluded from the message body, so the message will be `{"id": 1, "name": "John"}` for the above SQL). `_tp_message_key` support the following types: `uint8`, `uint16`, `uint32`, `uint64`, `int8`, `int16`, `int32`, `int64`, `bool`, `float32`, `float64`, `string`, and `fixed_string`. -`_tp_message_key` also support `nullable`. Thus we can create an external stream with optional message key. For example: +`_tp_message_key` also support `nullable`. Thus we can create an external stream with optional message key. + +For example: ```sql CREATE EXTERNAL STREAM foo ( id int32, @@ -535,22 +541,11 @@ CREATE EXTERNAL STREAM foo ( ) SETTINGS type='kafka',...; ``` -#### sharding_expr -If you configure a partition strategy in the Kafka cluster, you can specify the message key with the above approach. The message key will be stored together with the message body. Alternatively, you can use the `sharding_expr` to specify the partition ID for the message. For example: -```sql -CREATE EXTERNAL STREAM foo ( - id int32,.. -) SETTINGS type='kafka', sharding_expr='hash(id)'...; -``` -When you insert data, the shard ID will be calculated based on the `sharding_expr` and Timeplus will put the message into the corresponding partition/shard. - -#### _tp_message_headers {#write_message_headers} - -Starting from Timeplus Proton 1.6.11 and Timeplus Enterprise 2.7, you can read the Kafka message headers as `map(string,string)` via the `_tp_message_headers` virtual column. +#### _tp_message_headers -Starting from Timeplus Enterprise 2.8.2, you can also write custom headers via this column. +If users like to populate Kafka message headers when producing data to a Kafka topic, users can define the `_tp_message_headers` column when creating the external stream. -Define the column in the DDL: +For example: ```sql CREATE EXTERNAL STREAM example ( s string, @@ -559,20 +554,27 @@ CREATE EXTERNAL STREAM example ( _tp_message_headers map(string, string) ) settings type='kafka',...; ``` -Then insert data to the external stream via `INSERT INTO` or materialized views, with a map of string pairs as custom headers for each message. -## DROP EXTERNAL STREAM +Then insert rows to the external stream via `INSERT INTO` or Materialized Views, the `_tp_message_headers` will be set to the headers of the Kafka message. +#### sharding_expr + +`sharding_expr` allows users to control the distribution of each row to the target partition of the topic when writing. + +For example: ```sql -DROP STREAM [IF EXISTS] stream_name +CREATE EXTERNAL STREAM foo ( + id int32,.. +) SETTINGS type='kafka', sharding_expr='hash(id)'...; ``` +When insertint rows, the partition ID will be evaluated based on the `sharding_expr` and Timeplus will put the message into the corresponding partition of the topic. + ## Properties for Kafka client {#advanced_settings} -For more advanced use cases, you can specify customized properties while creating the external streams. Those properties will be passed to the underlying Kafka client, which is [librdkafka](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). +For more advanced scenarios, users may like to fine tune the Kafka consumer, producer and topic behaviors while creating Kafka external streams. Users can specify these fine tune settings via `properties=`, and they will be passed to the underlying librdkafka Kafka client. For full detailed configurations supported by librdkafka, please refer to its [configurations](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). For example: - ```sql CREATE EXTERNAL STREAM ext_github_events(raw string) SETTINGS type='kafka', @@ -581,7 +583,7 @@ SETTINGS type='kafka', properties='message.max.bytes=1000000;message.timeout.ms=6000' ``` -Please note, not all properties in [librdkafka](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) are supported. The following ones are accepted in Proton today. Please check the configuration guide of [librdkafka](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) for details. +Please note, most of properties in [librdkafka](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) are supported. The following ones are accepted in Timeplus today. Please check the configuration guide of [librdkafka](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) for details. (C/P legend: C = Consumer, P = Producer, * = both) @@ -685,13 +687,3 @@ batch.size | P | 1 .. 2147483647 | 1000000 delivery.report.only.error | P | true, false | false | low | Only provide delivery reports for failed messages. *Type: boolean* sticky.partitioning.linger.ms | P | 0 .. 900000 | 10 | low | Delay in milliseconds to wait to assign new sticky partitions for each topic. By default, set to double the time of linger.ms. To disable sticky behavior, set to 0. This behavior affects messages with the key NULL in all cases, and messages with key lengths of zero when the consistent_random partitioner is in use. These messages would otherwise be assigned randomly. A higher value allows for more effective batching of these messages. *Type: integer* - -## Limitations - -There are some limitations for the Kafka-based external streams, because Timeplus doesn’t control the storage or the data format for the external stream. - -1. The UI wizard to setup Kafka External Stream only supports JSON or TEXT. To use Avro, Protobuf, or schema registry service, you need the SQL DDL. -2. `_tp_time` is available in the external streams (since Proton 1.3.30). `_tp_append_time` is set only when message timestamp is an append time. -3. Unlike normal streams, there is no historical storage for the external streams. In recent versions, you can run `table(kafka_ext_stream)` but it will scan all messages in the topic, unless you are running a `count()`. If you need to frequently run query for historical data, you can use a Materialized View to query the Kafka External Stream and save the data in Timeplus columnar or row storage. This will improve the query performance. -4. There is no retention policy for the external streams in Timeplus. You need to configure the retention policy on Kafka/Confluent/Redpanda. If the data is no longer available in the external systems, they cannot be searched in Timeplus either. -5. Consumer group settings are not available for Kafka external streams, as Timeplus internally manages message offsets without utilizing consumer groups. diff --git a/docs/working-with-streams.md b/docs/working-with-streams.md index 3f6b58d5..20427924 100644 --- a/docs/working-with-streams.md +++ b/docs/working-with-streams.md @@ -1,27 +1,51 @@ + # Streams -## All data live in streams +## Types of Streams + +Timeplus `streams` are conceptually similar to `tables` in traditional SQL databases—they both hold data. However, there are key differences: + +* A **Timeplus stream** tracks changes and updates through its underlying **Write-Ahead Log (WAL)**, which powers incremental processing. +* Timeplus supports **both incremental stream processing and historical queries** over stream data. + +To support a variety of use cases efficiently, Timeplus offers multiple types of streams: + +1. **Append Stream** + The default stream type in Timeplus. It uses columnar encoding and is optimized for **range scans** via a **sorting key**. It suits workloads with infrequent data mutations (e.g., `UPDATE` or `DELETE`). + +2. **Mutable Stream** + Row-encoded and similar in behavior to a **MySQL table**, where each primary key corresponds to a single row. It is optimized for **frequent data mutations** (`UPDATE`, `UPSERT`, `DELETE`) and supports **point and range queries** via **primary or secondary indexes**. + +3. **Versioned Key-Value Stream** + Similar to the mutable stream but uses **columnar encoding**. It offers better **compression** but lower performance for updates and point queries, especially when cardinality is high. Best suited for scenarios where **data mutations are less frequent**. + +4. **Changelog Stream** + Designed to model **change data capture (CDC) events**, with **columnar encoding** for efficient downstream processing. + +5. **External Stream** + As the name implies, the data resides outside of Timeplus. Timeplus can reference external sources (e.g., a **Kafka topic**) and execute **streaming SQL** queries against them in real time. -Timeplus is a streaming analytics platform and data lives in streams. Timeplus `streams` are similar to `tables` in the traditional SQL databases. Both of them are essentially datasets. The key difference is that Timeplus stream is an append-only (by default), unbounded, constantly changing events group. +> Note: Timeplus also supports [External Tables](/sql-create-external-table), which allow **historical queries and inserts** only (e.g., against ClickHouse, MySQL, PostgreSQL, MongoDB, etc.). -Timeplus supports multiple types of streams: +--- -1. By default, the streams are append-only and immutable (older data can be purged automatically by setting a retention policy). -2. If you want to create a stream to track the latest value for a primary key or a set of keys, you can create [Mutable Streams](/mutable-stream). This is only available in Timeplus Enterprise. -3. In [Timeplus Proton](/proton), you can also create [Versioned Streams](/versioned-stream) and [Changelog Stream](/changelog-stream). But those 2 stream modes will be deprecated and replaced by mutable streams. -4. You can also define [External Streams](/external-stream) to run SQL against remote Kafka/Redpanda brokers, or the other Timeplus/Proton server. +## Stream Internals -## Create a stream -You can create a stream via the Timeplus Console UI, or via [SQL](/sql-create-stream). When you [ingest data](/ingestion) into Timeplus from Kafka or file sources, streams can be created automatically to match the schema of the data. +When users [create a stream](/sql-create-stream) in Timeplus, they can specify the number of **shards** for the stream. Each shard consists of two core components at the storage layer: -## Query a stream +1. **Streaming Store** +2. **Historical Store** -By default, querying the stream will continuously scan new events and output new results. It never ends unless the user cancels the query. For example, you can get the latest web logs with HTTP 500 error or get the min/max/avg of a metric for every minute from an IoT device. Please read [Streaming Queries](/stream-query) for more details. +The **streaming store** is essentially the **Write-Ahead Log** (internally called `NativeLog`). It supports: -If you only want to analyze the existing data and need an immediate response, you can run [Non-streaming Queries](/history) via the [table](/functions_for_streaming#table) function. This will turn the query in the bounded mode and only scan the existing data. For example, you can run `select count(*) from table(stream1)` to get the total number of rows in the data stream. +* High-concurrency [data ingestion](/ingestion) +* [Incremental stream processing](/stream-query) +* Real-time data replication +For more information, refer to the [high-level architecture](/architecture) page. +The **historical store** asynchronously derives its data from the WAL through a dedicated background thread. It performs periodic **compaction**, **merge**, and **compression**, making it highly efficient for [historical analytic queries](/history) and **streaming backfills**. -## Delete a stream +--- -From the web console, you can delete the stream. This will permanently delete all data in the stream and delete the stream itself. Data cannot be recovered after deletion. +To learn more about stream lifecycle operations (Create, Read, Delete, Update) and advanced configurations like **TTL**, **key versioning**, and other stream settings, refer to the SQL Reference documentation. To learning more about external streams, refer to [external stream](/external-stream) pages for more details. From 910541792c0eb4bd2691aecc472250b8baf7346e Mon Sep 17 00:00:00 2001 From: Ken Chen Date: Mon, 28 Jul 2025 16:21:31 -0700 Subject: [PATCH 02/13] refine --- docs/proton-kafka.md | 237 ++++++++++++++++++++++--------------------- 1 file changed, 123 insertions(+), 114 deletions(-) diff --git a/docs/proton-kafka.md b/docs/proton-kafka.md index 2e9bc5cb..9088448b 100644 --- a/docs/proton-kafka.md +++ b/docs/proton-kafka.md @@ -82,7 +82,7 @@ The supported values for `sasl_mechanism` are: #### username / password -Required when `sasl_mechanism` is used. +Required when `sasl_mechanism` ise set to SCRAM-SHA-256 or SCRAM-SHA-512. Alternatively, use [`config_file`](#config_file) to securely pass credentials. @@ -97,7 +97,6 @@ data_format='Avro' one_message_per_row=true ``` - This is especially useful in Kubernetes environments with secrets managed via [HashiCorp Vault](https://learn.hashicorp.com/tutorials/vault/kubernetes-sidecar). **HarshCorp Vault injection example:** @@ -174,15 +173,26 @@ Used for advanced configurations. These settings are passed directly to the Kafk For more, see the [Advanced Settings](#advanced_settings) section. -## Read Data in Kafka +## Reading Data from Kafka + +Timeplus allows reading Kafka messages in multiple data formats, including: + +* Plain string (raw) +* CSV / TSV +* JSON +* Protobuf +* Avro + +### Reading Kafka Messages as Raw String -Timeplus supports read Kafka message in different data formats: raw as string, CSV, TSV, JSON, Protobuf, Avro etc. +Use this mode when: -### Read as String +* Messages contain **unstructured text or binary data** +* No built-in format is applicable +* You want to **debug raw Kafka messages** -If the Kafka messages contain unstructure text data, or there are no builtin formats to parse the message, users can read Kafka message as string. +#### Example -Example: ```sql CREATE EXTERNAL STREAM ext_github_events (raw string) @@ -191,9 +201,10 @@ SETTINGS type='kafka', topic='application_logs' ``` -Then we can use other functions like regex string processing or JSON extract etc functions to further process the raw string. Consuming the Kafka message as it is is very useful for debugging as well. +Users can use functions like regex string processing or JSON extract etc functions to further process the raw string. + +#### Regex Example – Parse Application Logs -Here is one plain text application log parsing example by using regex functions: ```sql SELECT to_time(extract(raw, '^(\\d{4}\\.\\d{2}\\.\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d+)')) AS timestamp, @@ -202,11 +213,7 @@ SELECT FROM application_logs; ``` -### Read JSON Kafka Message - -If Kafka message contains JSON text, users can two ways to process the data -1. Read the text as plain string and then use JSON extract function or the shortcuts syntax to parse the json fields as columns manually -2. Use `JSONEachRow` data format to parse the JSON fields automatically +### Reading JSON Kafka Message Assuming Kafka message contains JSON text with this schema @@ -221,21 +228,21 @@ Assuming Kafka message contains JSON text with this schema } ``` -To pase the JSON fields, we have the following methods. +You can process JSON in two ways: -#### Use JSON extract functions +#### Option A: Parse with JSON Extract Functions -First define a simple Kafka external stream to read the message as string +1. Create a raw stream: ```sql CREATE EXTERNAL STREAM ext_json_raw (raw string) SETTINGS type='kafka', brokers='localhost:9092', - topic='github_events' + topic='github_events'; ``` -And then use JSON extract function or shortcut syntaxes to extract the JSON fields. +2. Extract fields using shortcut syntax: ```sql SELECT @@ -245,13 +252,12 @@ SELECT raw:payload AS payload, raw:repo AS repo, raw:type AS type -FROM - ext_json_raw; -``` +FROM ext_json_raw; +``` -This method is most flexible and can handle dynamic JSON text with new fields or missing fields and it can also extract nested JSON fields. +This method is most flexible and is best for dynamic JSON text with new fields or missing fields and it can also extract nested JSON fields. -#### Use JSONEachRow data format +#### Option B: Use `JSONEachRow` Format Define a Kafka external stream with columns which are mapped to the JSON fields and also specify the `data_format` as `JSONEachRow`. @@ -275,7 +281,7 @@ When users query the `ext_json_parsed` stream, the JSON fields will be parsed an This method is most convient when the JSON text is in stable schema and can be used to extract JSON fields at top level. -### Read as CSV +### Reading CSV Kafka Messages Similar to data format `JSONEachRow`, users can read Kafka message in CSV format. @@ -295,141 +301,140 @@ SETTINGS type='kafka', data_format='CSV'; ``` -### Read as TSV +### Reading TSV Kafka Messages -Similar to reading data in data format CSV, it is tab separated. +Identical to CSV, but expects **tab-separated values**: -### Read Avro Kafka Message +```sql +SETTINGS data_format='TSV'; +``` -To read Avro-encoded Kafka message, please refer to [Avro Schema](/proton-format-schema) and [Avro Schema Registry](/proton-schema-registry) for details. +### Reading Avro or Protobuf Messages -### Read Protobuf Kafka Message +To read Avro-encoded / Protobuf-encoded Kafka message, please refer to [Schema](/proton-format-schema) and [Schema Registry](/proton-schema-registry) for details. -To read Protobuf-encoded Kafka message, please refer to [Protobuf Schema](/proton-format-schema) and [Protobuf Schema Registry](/proton-schema-registry) for details. +### Accessing Kafka Message Metadata -### Read Kafka Message Metadata +Timeplus provides **virtual columns** for Kafka message metadata. -Besides the message body, Timeplus provides several `virtual columns` which map to the metadata of each Kafka message. +| Virtual Column | Description | Type | +| --------------------- | ------------------------------ | ---------------------- | +| `_tp_time` | Kafka message timestamp | `datetime64(3, 'UTC')` | +| `_tp_message_key` | Kafka message key | `string` | +| `_tp_message_headers` | Kafka headers as key-value map | `map(string, string)` | +| `_tp_sn` | Kafka message offset | `int64` | +| `_tp_shard` | Kafka partition ID | `int32` | -#### _tp_time -`_tp_time` is mapped to the timestamp of a Kafka message in Kafka external stream which has `datetime64(3, 'UTC')` column type. When reading from a Kafka topic, the Kafka message timestamp will be set to `_tp_time` automatically. +### Examples -For example: ```sql -SELECT _tp_time, raw FROM foo; -``` +-- View message time and payload +SELECT _tp_time, raw FROM ext_github_events; -When writing to a Kafka topic, if `_tp_time` exists in the schema, it will be set to the timetamps of the Kafka message automatically. +-- View message key +SELECT _tp_message_key, raw FROM ext_github_events; -#### _tp_message_key - -`_tp_message_key` is mapped to the key of a Kafka message in Kafka external stream which as `string` column type. When reading from a Kafka topic, the Kafka message key will be set to `_tp_message_key` automatically. +-- Access headers +SELECT _tp_message_headers['trace_id'], raw FROM ext_github_events; -For example: -```sql -SELECT _tp_message_key, raw FROM foo; +-- View message offset and partition +SELECT _tp_sn, _tp_shard, raw FROM ext_github_events; ``` -Regarding how to use `_tp_message_key` to write the message key when inserting data into the Kafka topic, please refer to [write message key section](#write_message_key). +### Query Settings for External Kafka Streams -#### _tp_message_headers +Timeplus supports several query-level settings to control how data is read from Kafka topics. These settings can be especially useful for targeting specific partitions or replaying messages from a defined point in time. -`_tp_message_headers` is mapped to the headers of a Kafka message in Kafka external stream, which has `map(string, string)` column type. +#### Read from Specific Kafka Partitions -For example: -```sql -SELECT _tp_message_headers, raw FROM foo; -``` +By default, Timeplus reads from **all partitions** of a Kafka topic. You can override this by using the `shards` setting to specify which partitions to read from. -To get the value for a certain key in the header, users can access it via `_tp_message_headers['key']`. +##### Read from a Single Partition -For example: ```sql -SELECT _tp_message_headers['key'], raw FROM foo; +SELECT raw FROM ext_stream SETTINGS shards='0' ``` -#### _tp_sn +##### Read from Multiple Partitions -`_tp_sn` is mapped to the offset of a Kafka message in Kafka external stream, which has `int64` column type. +Separate partition IDs with commas: -For example: ```sql -SELECT _tp_sn, raw FROM foo; +SELECT raw FROM ext_stream SETTINGS shards='0,2' ``` -#### _tp_shard +#### Rewind via `seek_to` -`tp_shard` is mapped to the partition ID of a Kafka message in Kafka external stream, which has `int32` column type. +By default, Timeplus only reads **new messages** published after the query starts. To read historical messages, use the `seek_to` setting. + +#### Rewind to the Earliest Offset (All Partitions) -For example: ```sql -SELECT _tp_shard, raw FROM foo; +SELECT raw FROM ext_stream SETTINGS seek_to='earliest' ``` -### Query Settings - -#### Read Specific Kafka Partitions +#### Rewind to Specific Offsets (Per Partition) - With `shards` setting, users can query specified Kafka partitions. By default, all partitions will be read. e.g. +Offsets are specified **in partition order**. For example: ```sql -SELECT raw FROM ext_stream SETTINGS shards='0' +SELECT raw FROM ext_stream SETTINGS seek_to='5,3,11' ``` -Or specify a set of partition ID, separated by comma, e.g. +This seeks to: -```sql -SELECT raw FROM ext_stream SETTINGS shards='0,2' -``` - -#### Rewind via seek_to +* Offset `5` in partition `0` +* Offset `3` in partition `1` +* Offset `11` in partition `2` -When users run `SELECT raw FROM ext_stream `, Timeplus will read only the new messages in the topic. +#### Rewind to a Specific Timestamp (All Partitions) -Users can use `seek_to` query setting to rewind to a specific offset or timestamp per partition. +You can also rewind based on a timestamp: -Rewind to earliest offsets for all partitions. ```sql -SELECT raw FROM ext_stream SETTINGS seek_to='earliest' +SELECT raw FROM ext_stream SETTINGS seek_to='2025-01-01T00:00:00.000' ``` -Rewind to a specific offset for partitions: seek to offset 5 for partition 1, 3 for partition 2 and 11 for partition 3 respectively -``` -SELECT raw FROM ext_stream SETTINGS seek_to='5,3,11' -``` +:::info -Rewind to a specific timestamp for all partitions. -``` -SELECT raw FROM ext_stream SETTINGS seek_to='2025-01-01T00:00:00.000' -``` +Timeplus will use Kafka API to convert the timestamp to the corresponding offsets for each partition internally. + +::: ## Write Data to Kafka -### Write as String +Timeplus supports writing data to Kafka using various encoding formats such as strings, JSON, CSV, TSV, Avro, and Protobuf. You can write to Kafka using SQL `INSERT` statements, the [Ingest REST API](/proton-ingest-api), or as the target of a [Materialized View](/sql-create-materialized-view). -Users can encode data as string in Kafka message and write to the Kafka topic. +### Write as Raw String + +You can encode data as a raw string in Kafka messages: -For example: ```sql -CREATE EXTERNAL STREAM ext_github_events - (raw string) +CREATE EXTERNAL STREAM ext_github_events (raw string) SETTINGS type='kafka', brokers='localhost:9092', topic='github_events' ``` -Then use either `INSERT INTO VALUES (v)`, or [Ingest REST API](/proton-ingest-api), or set it as the target stream for a Materialized View to write message to the Kafka topic. The actual `data_format` value is `RawBLOB` but this can be omitted. By default `one_message_per_row` is `true`. +You can then write data via: + +* `INSERT INTO ext_github_events VALUES ('some string')` +* [Ingest REST API](/proton-ingest-api) +* Materialized View + :::info -Setting `kafka_max_message_size`. When multiple rows can be written to the same Kafka message, this setting will control how many data will be put in a Kafka message, ensuring it won't exceed the `kafka_max_message_size` limit. + +Internally, the `data_format` is `RawBLOB`, and `one_message_per_row=true` by default. + +Pay attention to setting `kafka_max_message_size`. When multiple rows can be written to the same Kafka message, this setting will control how many data will be put in a Kafka message, ensuring it won't exceed the `kafka_max_message_size` limit. ::: -### Write as JSONEachRow +### Write as `JSONEachRow` -Users can use `data_format='JSONEachRow',one_message_per_row=true` to inform Timeplus to encode columns in each row as a JSON document. +Encode each row as a separate JSON object (aka JSONL or jsonlines): -For example: ```sql CREATE EXTERNAL STREAM target( _tp_time datetime64(3), @@ -468,14 +473,12 @@ SETTINGS type='kafka', brokers='redpanda:9092', topic='masked-fe-event', The default value of one_message_per_row is false for `data_format='JSONEachRow'` and true for `data_format='RawBLOB'`. -Pay attention to `kafka_max_message_size`. When multiple rows can be written to the same Kafka message, this setting will control how maximum amount of data will be batched in a single Kafka message on client side to avoid exceeding the Kafka broker's message size limit. ::: ### Write as CSV -Users can use `data_format='CSV'` to inform Timeplus to encode columns in each row as one CSV line. +Each row is encoded as one CSV line: -For example: ```sql CREATE EXTERNAL STREAM target( _tp_time datetime64(3), @@ -496,7 +499,7 @@ The messages will be generated in the specific topic as ### Write as TSV -Similar to CSV but using tab as the separator. +Same as CSV, but uses **tab characters** as delimiters instead of commas. ### Write as ProtobufSingle @@ -521,18 +524,20 @@ CREATE EXTERNAL STREAM foo ( ) SETTINGS type='kafka',...; ``` -When insert a row to the stream like: +After inserting a row to the stream like this: ```sql INSERT INTO foo(id,name,_tp_message_key) VALUES (1, 'John', 'some-key'); ``` -`'some-key'` will be used for the message key for the Kafka message (and it will be excluded from the message body, so the message will be `{"id": 1, "name": "John"}` for the above SQL). +* Kafka key will be `'some-key'` +* Message body: `{"id": 1, "name": "John"}`. Kafka key was excluded from the message body. -`_tp_message_key` support the following types: `uint8`, `uint16`, `uint32`, `uint64`, `int8`, `int16`, `int32`, `int64`, `bool`, `float32`, `float64`, `string`, and `fixed_string`. +`_tp_message_key` supports these types: -`_tp_message_key` also support `nullable`. Thus we can create an external stream with optional message key. +* Numeric: `uint8/16/32/64`, `int8/16/32/64` +* Others: `string`, `bool`, `float32`, `float64`, `fixed_string` +* Nullable are also supported: -For example: ```sql CREATE EXTERNAL STREAM foo ( id int32, @@ -543,9 +548,8 @@ CREATE EXTERNAL STREAM foo ( #### _tp_message_headers -If users like to populate Kafka message headers when producing data to a Kafka topic, users can define the `_tp_message_headers` column when creating the external stream. +Add Kafka headers via `_tp_message_headers` (map of key-value pairs): -For example: ```sql CREATE EXTERNAL STREAM example ( s string, @@ -557,24 +561,26 @@ CREATE EXTERNAL STREAM example ( Then insert rows to the external stream via `INSERT INTO` or Materialized Views, the `_tp_message_headers` will be set to the headers of the Kafka message. -#### sharding_expr +#### sharding_expr (Partition Logic) -`sharding_expr` allows users to control the distribution of each row to the target partition of the topic when writing. +`sharding_expr` is used to control how rows are distributed to Kafka partitions: -For example: ```sql CREATE EXTERNAL STREAM foo ( id int32,.. ) SETTINGS type='kafka', sharding_expr='hash(id)'...; ``` -When insertint rows, the partition ID will be evaluated based on the `sharding_expr` and Timeplus will put the message into the corresponding partition of the topic. +When inserting rows, the partition ID will be evaluated based on the `sharding_expr` and Timeplus will put the message into the corresponding Kafka partition. ## Properties for Kafka client {#advanced_settings} -For more advanced scenarios, users may like to fine tune the Kafka consumer, producer and topic behaviors while creating Kafka external streams. Users can specify these fine tune settings via `properties=`, and they will be passed to the underlying librdkafka Kafka client. For full detailed configurations supported by librdkafka, please refer to its [configurations](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). +In advanced use cases, you may want to fine-tune the behavior of the Kafka consumer, producer, or topic when creating Kafka external streams. Timeplus allows this through the `properties` setting, which passes configuration options directly to the underlying [librdkafka](https://github.com/confluentinc/librdkafka) client. + +These settings can control aspects like message size limits, retry behavior, timeouts, and more. For a full list of available configuration options, refer to the [librdkafka configuration documentation](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). + +### Example -For example: ```sql CREATE EXTERNAL STREAM ext_github_events(raw string) SETTINGS type='kafka', @@ -583,7 +589,10 @@ SETTINGS type='kafka', properties='message.max.bytes=1000000;message.timeout.ms=6000' ``` -Please note, most of properties in [librdkafka](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) are supported. The following ones are accepted in Timeplus today. Please check the configuration guide of [librdkafka](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) for details. +This example sets the maximum Kafka message size to 1MB and the message timeout to 6 seconds. + +Please note while most configuration properties from `librdkafka` are supported, Timeplus may restrict or ignore certain settings. Here is the list of supported settings. + (C/P legend: C = Consumer, P = Producer, * = both) From 5642adf1c3f0f76bd357d987a59be3cca78e4367 Mon Sep 17 00:00:00 2001 From: Ken Chen Date: Mon, 28 Jul 2025 16:34:36 -0700 Subject: [PATCH 03/13] refine --- docs/working-with-streams.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/working-with-streams.md b/docs/working-with-streams.md index 20427924..a7abfc34 100644 --- a/docs/working-with-streams.md +++ b/docs/working-with-streams.md @@ -1,4 +1,3 @@ - # Streams ## Types of Streams @@ -48,4 +47,4 @@ The **historical store** asynchronously derives its data from the WAL through a --- -To learn more about stream lifecycle operations (Create, Read, Delete, Update) and advanced configurations like **TTL**, **key versioning**, and other stream settings, refer to the SQL Reference documentation. To learning more about external streams, refer to [external stream](/external-stream) pages for more details. +To learn more about stream lifecycle operations (Create, Read, Delete, Update) and advanced configurations like **TTL**, **key versioning**, and other stream settings, refer to the SQL Reference documentation. To learning more about external streams From 068c4b62d718d9f807167ac17593423f99194b6c Mon Sep 17 00:00:00 2001 From: Ken Chen Date: Mon, 28 Jul 2025 16:36:22 -0700 Subject: [PATCH 04/13] refine --- docs/working-with-streams.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/docs/working-with-streams.md b/docs/working-with-streams.md index a7abfc34..5a8d764b 100644 --- a/docs/working-with-streams.md +++ b/docs/working-with-streams.md @@ -26,7 +26,6 @@ To support a variety of use cases efficiently, Timeplus offers multiple types of > Note: Timeplus also supports [External Tables](/sql-create-external-table), which allow **historical queries and inserts** only (e.g., against ClickHouse, MySQL, PostgreSQL, MongoDB, etc.). ---- ## Stream Internals @@ -45,6 +44,4 @@ For more information, refer to the [high-level architecture](/architecture) page The **historical store** asynchronously derives its data from the WAL through a dedicated background thread. It performs periodic **compaction**, **merge**, and **compression**, making it highly efficient for [historical analytic queries](/history) and **streaming backfills**. ---- - To learn more about stream lifecycle operations (Create, Read, Delete, Update) and advanced configurations like **TTL**, **key versioning**, and other stream settings, refer to the SQL Reference documentation. To learning more about external streams From c1ee1f412546e6f3912981602890c11316d39153 Mon Sep 17 00:00:00 2001 From: Ken Chen Date: Mon, 28 Jul 2025 16:42:40 -0700 Subject: [PATCH 05/13] fixes --- docs/working-with-streams.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/working-with-streams.md b/docs/working-with-streams.md index 5a8d764b..74f576ef 100644 --- a/docs/working-with-streams.md +++ b/docs/working-with-streams.md @@ -44,4 +44,4 @@ For more information, refer to the [high-level architecture](/architecture) page The **historical store** asynchronously derives its data from the WAL through a dedicated background thread. It performs periodic **compaction**, **merge**, and **compression**, making it highly efficient for [historical analytic queries](/history) and **streaming backfills**. -To learn more about stream lifecycle operations (Create, Read, Delete, Update) and advanced configurations like **TTL**, **key versioning**, and other stream settings, refer to the SQL Reference documentation. To learning more about external streams +To learn more about stream lifecycle operations (Create, Read, Delete, Update) and advanced configurations like **TTL**, **key versioning**, and other stream settings, refer to the SQL Reference documentation. To learning more about external streams, refer to [external stream](/external-stream) pages for more details. From 4704d1d0d473ac815b2b2eb2af30f97112c65451 Mon Sep 17 00:00:00 2001 From: Ken Chen Date: Mon, 28 Jul 2025 16:44:03 -0700 Subject: [PATCH 06/13] better --- docs/working-with-streams.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/working-with-streams.md b/docs/working-with-streams.md index 74f576ef..5afb69ea 100644 --- a/docs/working-with-streams.md +++ b/docs/working-with-streams.md @@ -1,12 +1,12 @@ # Streams -## Types of Streams - Timeplus `streams` are conceptually similar to `tables` in traditional SQL databases—they both hold data. However, there are key differences: * A **Timeplus stream** tracks changes and updates through its underlying **Write-Ahead Log (WAL)**, which powers incremental processing. * Timeplus supports **both incremental stream processing and historical queries** over stream data. +## Types of Streams + To support a variety of use cases efficiently, Timeplus offers multiple types of streams: 1. **Append Stream** From 2727fbbb37b6366b9b2d76ae6938c83afaac4028 Mon Sep 17 00:00:00 2001 From: Ken Chen Date: Mon, 28 Jul 2025 16:44:37 -0700 Subject: [PATCH 07/13] better --- docs/working-with-streams.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/working-with-streams.md b/docs/working-with-streams.md index 5afb69ea..21c3da8d 100644 --- a/docs/working-with-streams.md +++ b/docs/working-with-streams.md @@ -1,6 +1,6 @@ # Streams -Timeplus `streams` are conceptually similar to `tables` in traditional SQL databases—they both hold data. However, there are key differences: +Timeplus `streams` are conceptually similar to `tables` in traditional SQL databases — they both hold data. However, there are key differences: * A **Timeplus stream** tracks changes and updates through its underlying **Write-Ahead Log (WAL)**, which powers incremental processing. * Timeplus supports **both incremental stream processing and historical queries** over stream data. From 0fe323d3d3b21ef54ef91cbf689eeabb4fdc8fbb Mon Sep 17 00:00:00 2001 From: Ken Chen Date: Mon, 28 Jul 2025 19:51:52 -0700 Subject: [PATCH 08/13] cosmic --- docs/proton-kafka.md | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/docs/proton-kafka.md b/docs/proton-kafka.md index 9088448b..c7e324eb 100644 --- a/docs/proton-kafka.md +++ b/docs/proton-kafka.md @@ -162,7 +162,7 @@ Use either: * `ssl_ca_cert_file='/path/to/cert.pem'` * `ssl_ca_pem='-----BEGIN CERTIFICATE-----\n...'` -#### `skip_ssl_cert_check` +#### skip_ssl_cert_check * Default: `false` * Set to `true` to **bypass SSL verification**. @@ -173,7 +173,7 @@ Used for advanced configurations. These settings are passed directly to the Kafk For more, see the [Advanced Settings](#advanced_settings) section. -## Reading Data from Kafka +## Read Data from Kafka Timeplus allows reading Kafka messages in multiple data formats, including: @@ -183,7 +183,7 @@ Timeplus allows reading Kafka messages in multiple data formats, including: * Protobuf * Avro -### Reading Kafka Messages as Raw String +### Read Kafka Messages as Raw String Use this mode when: @@ -213,7 +213,7 @@ SELECT FROM application_logs; ``` -### Reading JSON Kafka Message +### Read JSON Kafka Message Assuming Kafka message contains JSON text with this schema @@ -242,7 +242,7 @@ SETTINGS type='kafka', topic='github_events'; ``` -2. Extract fields using shortcut syntax: +2. Extract fields using JSON extract shortcut syntax: ```sql SELECT @@ -257,7 +257,7 @@ FROM ext_json_raw; This method is most flexible and is best for dynamic JSON text with new fields or missing fields and it can also extract nested JSON fields. -#### Option B: Use `JSONEachRow` Format +#### Option B: Use JSONEachRow Format Define a Kafka external stream with columns which are mapped to the JSON fields and also specify the `data_format` as `JSONEachRow`. @@ -281,7 +281,7 @@ When users query the `ext_json_parsed` stream, the JSON fields will be parsed an This method is most convient when the JSON text is in stable schema and can be used to extract JSON fields at top level. -### Reading CSV Kafka Messages +### Read CSV Kafka Messages Similar to data format `JSONEachRow`, users can read Kafka message in CSV format. @@ -301,7 +301,7 @@ SETTINGS type='kafka', data_format='CSV'; ``` -### Reading TSV Kafka Messages +### Read TSV Kafka Messages Identical to CSV, but expects **tab-separated values**: @@ -309,11 +309,11 @@ Identical to CSV, but expects **tab-separated values**: SETTINGS data_format='TSV'; ``` -### Reading Avro or Protobuf Messages +### Read Avro or Protobuf Messages To read Avro-encoded / Protobuf-encoded Kafka message, please refer to [Schema](/proton-format-schema) and [Schema Registry](/proton-schema-registry) for details. -### Accessing Kafka Message Metadata +### Access Kafka Message Metadata Timeplus provides **virtual columns** for Kafka message metadata. @@ -342,7 +342,7 @@ SELECT _tp_message_headers['trace_id'], raw FROM ext_github_events; SELECT _tp_sn, _tp_shard, raw FROM ext_github_events; ``` -### Query Settings for External Kafka Streams +### Query Settings for Kafka External Streams Timeplus supports several query-level settings to control how data is read from Kafka topics. These settings can be especially useful for targeting specific partitions or replaying messages from a defined point in time. @@ -364,7 +364,7 @@ Separate partition IDs with commas: SELECT raw FROM ext_stream SETTINGS shards='0,2' ``` -#### Rewind via `seek_to` +#### Rewind via seek_to By default, Timeplus only reads **new messages** published after the query starts. To read historical messages, use the `seek_to` setting. @@ -431,7 +431,7 @@ Internally, the `data_format` is `RawBLOB`, and `one_message_per_row=true` by de Pay attention to setting `kafka_max_message_size`. When multiple rows can be written to the same Kafka message, this setting will control how many data will be put in a Kafka message, ensuring it won't exceed the `kafka_max_message_size` limit. ::: -### Write as `JSONEachRow` +### Write as JSONEachRow Encode each row as a separate JSON object (aka JSONL or jsonlines): @@ -591,7 +591,10 @@ SETTINGS type='kafka', This example sets the maximum Kafka message size to 1MB and the message timeout to 6 seconds. -Please note while most configuration properties from `librdkafka` are supported, Timeplus may restrict or ignore certain settings. Here is the list of supported settings. +### Kafka Client Properties + +Please note while most configuration properties from `librdkafka` are supported, Timeplus may restrict or ignore certain settings. Here is the list of supported properties. + (C/P legend: C = Consumer, P = Producer, * = both) From 5d0dbfbfe35d6fd5b9a3f99d224388fcba1f9474 Mon Sep 17 00:00:00 2001 From: Ken Chen Date: Mon, 28 Jul 2025 19:55:00 -0700 Subject: [PATCH 09/13] better --- docs/proton-kafka.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/proton-kafka.md b/docs/proton-kafka.md index c7e324eb..157de9d5 100644 --- a/docs/proton-kafka.md +++ b/docs/proton-kafka.md @@ -575,7 +575,7 @@ When inserting rows, the partition ID will be evaluated based on the `sharding_e ## Properties for Kafka client {#advanced_settings} -In advanced use cases, you may want to fine-tune the behavior of the Kafka consumer, producer, or topic when creating Kafka external streams. Timeplus allows this through the `properties` setting, which passes configuration options directly to the underlying [librdkafka](https://github.com/confluentinc/librdkafka) client. +In advanced use cases, you may want to fine-tune the behavior of the Kafka consumer, producer, or topic when creating Kafka external streams. For example, fine tune the consumeer, producer's latency, throughput etc. Timeplus allows these fine tuning through the `properties` setting, which passes configuration options directly to the underlying [librdkafka](https://github.com/confluentinc/librdkafka) client. These settings can control aspects like message size limits, retry behavior, timeouts, and more. For a full list of available configuration options, refer to the [librdkafka configuration documentation](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). From 9fd88a623819116401ac203b25e3293ee927057c Mon Sep 17 00:00:00 2001 From: Ken Chen Date: Mon, 28 Jul 2025 20:14:23 -0700 Subject: [PATCH 10/13] better --- docs/proton-kafka.md | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/docs/proton-kafka.md b/docs/proton-kafka.md index 157de9d5..775466db 100644 --- a/docs/proton-kafka.md +++ b/docs/proton-kafka.md @@ -82,7 +82,7 @@ The supported values for `sasl_mechanism` are: #### username / password -Required when `sasl_mechanism` ise set to SCRAM-SHA-256 or SCRAM-SHA-512. +Required when `sasl_mechanism` is set to SCRAM-SHA-256 or SCRAM-SHA-512. Alternatively, use [`config_file`](#config_file) to securely pass credentials. @@ -99,7 +99,7 @@ one_message_per_row=true This is especially useful in Kubernetes environments with secrets managed via [HashiCorp Vault](https://learn.hashicorp.com/tutorials/vault/kubernetes-sidecar). -**HarshCorp Vault injection example:** +**HarshiCorp Vault injection example:** ```yaml annotations: @@ -116,7 +116,7 @@ annotations: :::info -Please note values in settings in the DDL will override those in config_file and it will only merge the settings from the config_file which are not explicitly sepcified in the DDL. +Please note values in settings in the DDL will override those in config_file and it will only merge the settings from the config_file which are not explicitly specified in the DDL. ::: @@ -191,10 +191,10 @@ Use this mode when: * No built-in format is applicable * You want to **debug raw Kafka messages** -#### Example +#### Raw String Example ```sql -CREATE EXTERNAL STREAM ext_github_events +CREATE EXTERNAL STREAM ext_application_logs (raw string) SETTINGS type='kafka', brokers='localhost:9092', @@ -242,7 +242,7 @@ SETTINGS type='kafka', topic='github_events'; ``` -2. Extract fields using JSON extract shortcut syntax: +2. Extract fields using JSON extract shortcut syntax or [JSON extract functions](/functions_for_json): ```sql SELECT @@ -277,7 +277,7 @@ SETTINGS type='kafka', data_format='JSONEachRow' ``` -When users query the `ext_json_parsed` stream, the JSON fields will be parsed and casted to the target column type automatically. +When users query the `ext_json_parsed` stream, the JSON fields will be parsed and cast to the target column type automatically. This method is most convient when the JSON text is in stable schema and can be used to extract JSON fields at top level. @@ -326,7 +326,7 @@ Timeplus provides **virtual columns** for Kafka message metadata. | `_tp_shard` | Kafka partition ID | `int32` | -### Examples +### Kafka Message Metadata Examples ```sql -- View message time and payload @@ -503,11 +503,11 @@ Same as CSV, but uses **tab characters** as delimiters instead of commas. ### Write as ProtobufSingle -To write Protobuf-encoded messages from Kafka topcis, please refer to [Protobuf Schema](/proton-format-schema), and [Kafka Schema Registry](/proton-schema-registry) pages for details. +To write Protobuf-encoded messages from Kafka topics, please refer to [Protobuf Schema](/proton-format-schema), and [Kafka Schema Registry](/proton-schema-registry) pages for details. ### Write as Avro -To write Avro-encoded messages from Kafka topcis, please refer to [Avro Schema](/proton-format-schema), and [Kafka Schema Registry](/proton-schema-registry) pages for details. +To write Avro-encoded messages from Kafka topics, please refer to [Avro Schema](/proton-format-schema), and [Kafka Schema Registry](/proton-schema-registry) pages for details. ### Write Kafka Message Metadata @@ -561,7 +561,7 @@ CREATE EXTERNAL STREAM example ( Then insert rows to the external stream via `INSERT INTO` or Materialized Views, the `_tp_message_headers` will be set to the headers of the Kafka message. -#### sharding_expr (Partition Logic) +#### sharding_expr {#sharding_expr} `sharding_expr` is used to control how rows are distributed to Kafka partitions: @@ -579,7 +579,7 @@ In advanced use cases, you may want to fine-tune the behavior of the Kafka consu These settings can control aspects like message size limits, retry behavior, timeouts, and more. For a full list of available configuration options, refer to the [librdkafka configuration documentation](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). -### Example +### Kafka Client Properties Example ```sql CREATE EXTERNAL STREAM ext_github_events(raw string) @@ -595,8 +595,6 @@ This example sets the maximum Kafka message size to 1MB and the message timeout Please note while most configuration properties from `librdkafka` are supported, Timeplus may restrict or ignore certain settings. Here is the list of supported properties. - - (C/P legend: C = Consumer, P = Producer, * = both) From 96365d94037d0a9eca715e4fb15568d17482d551 Mon Sep 17 00:00:00 2001 From: Ken Chen Date: Mon, 28 Jul 2025 20:20:24 -0700 Subject: [PATCH 11/13] better --- docs/proton-kafka.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/proton-kafka.md b/docs/proton-kafka.md index 775466db..72413f10 100644 --- a/docs/proton-kafka.md +++ b/docs/proton-kafka.md @@ -279,7 +279,7 @@ SETTINGS type='kafka', When users query the `ext_json_parsed` stream, the JSON fields will be parsed and cast to the target column type automatically. -This method is most convient when the JSON text is in stable schema and can be used to extract JSON fields at top level. +This method is most convenient when the JSON text is in stable schema and can be used to extract JSON fields at top level. ### Read CSV Kafka Messages From c099b7690bb05a079d124c10979fe567ebf5c524 Mon Sep 17 00:00:00 2001 From: Ken Chen Date: Mon, 28 Jul 2025 20:23:08 -0700 Subject: [PATCH 12/13] better --- docs/working-with-streams.md | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/docs/working-with-streams.md b/docs/working-with-streams.md index 21c3da8d..30599a6f 100644 --- a/docs/working-with-streams.md +++ b/docs/working-with-streams.md @@ -9,19 +9,24 @@ Timeplus `streams` are conceptually similar to `tables` in traditional SQL datab To support a variety of use cases efficiently, Timeplus offers multiple types of streams: -1. **Append Stream** +1. [Append Stream](/append-stream) + The default stream type in Timeplus. It uses columnar encoding and is optimized for **range scans** via a **sorting key**. It suits workloads with infrequent data mutations (e.g., `UPDATE` or `DELETE`). -2. **Mutable Stream** +2. [Mutable Stream](/mutable-stream) + Row-encoded and similar in behavior to a **MySQL table**, where each primary key corresponds to a single row. It is optimized for **frequent data mutations** (`UPDATE`, `UPSERT`, `DELETE`) and supports **point and range queries** via **primary or secondary indexes**. -3. **Versioned Key-Value Stream** +3. [Versioned Key-Value Stream](/versioned-stream) + Similar to the mutable stream but uses **columnar encoding**. It offers better **compression** but lower performance for updates and point queries, especially when cardinality is high. Best suited for scenarios where **data mutations are less frequent**. -4. **Changelog Stream** +4. [Changelog Key-Value Stream](/changelog-stream) + Designed to model **change data capture (CDC) events**, with **columnar encoding** for efficient downstream processing. -5. **External Stream** +5. [External Stream](/external-stream) + As the name implies, the data resides outside of Timeplus. Timeplus can reference external sources (e.g., a **Kafka topic**) and execute **streaming SQL** queries against them in real time. > Note: Timeplus also supports [External Tables](/sql-create-external-table), which allow **historical queries and inserts** only (e.g., against ClickHouse, MySQL, PostgreSQL, MongoDB, etc.). From 1e3bfa5e5f41f96a4633cfd9fddf7215602c0210 Mon Sep 17 00:00:00 2001 From: Ken Chen Date: Mon, 28 Jul 2025 20:28:54 -0700 Subject: [PATCH 13/13] better --- docs/proton-kafka.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/proton-kafka.md b/docs/proton-kafka.md index 72413f10..75829336 100644 --- a/docs/proton-kafka.md +++ b/docs/proton-kafka.md @@ -78,7 +78,7 @@ The supported values for `sasl_mechanism` are: - PLAIN: when setting security_protocol to SASL_SSL, this is the default value for sasl_mechanism. - SCRAM-SHA-256 - SCRAM-SHA-512 -- AWS_MSK_IAM (for AWS MSK IAM role-based access) +- AWS_MSK_IAM (for AWS MSK IAM role-based access when EC2 or Kubernetes pod is configured with a proper IAM role) #### username / password