diff --git a/docs/proton-kafka.md b/docs/proton-kafka.md index 7431377e..75829336 100644 --- a/docs/proton-kafka.md +++ b/docs/proton-kafka.md @@ -1,10 +1,12 @@ # Kafka External Stream -You can read data from Apache Kafka (as well as Confluent Cloud, or Redpanda) in Timeplus with [External Stream](/external-stream). Combining with [Materialized View](/view#m_view) and [Target Stream](/view#target-stream), you can also write data to Apache Kafka with External Stream. +Timeplus allows users to **read from** and **write to** Apache Kafka (and compatible platforms like **Confluent Cloud** and **Redpanda**) using **Kafka External Streams**. + +By combining external streams with [Materialized Views](/view#m_view) and [Target Streams](/view#target-stream), users can build robust **real-time streaming pipelines**. ## Tutorial with Docker Compose {#tutorial} -Please check the tutorials: +Explore the following hands-on tutorials: - [Query Kafka with SQL](/tutorial-sql-kafka) - [Streaming JOIN](/tutorial-sql-join) @@ -12,14 +14,10 @@ Please check the tutorials: ## CREATE EXTERNAL STREAM -In Timeplus Proton, the external stream supports Kafka API as the only type. - -In Timeplus Enterprise, it also supports [External Stream for Apache Pulsar](/pulsar-external-stream), and [External Stream for other Timeplus deployment](/timeplus-external-stream). - -To create an external stream for Apache Kafka or Kafka-compatible messaging platforms, you can run the following DDL SQL: +Use the following SQL command to create a Kafka external stream: ```sql -CREATE EXTERNAL STREAM [IF NOT EXISTS] stream_name +CREATE EXTERNAL STREAM [IF NOT EXISTS] ( ) SETTINGS type='kafka', -- required @@ -41,57 +39,67 @@ SETTINGS properties='..' ``` -:::info +### Settings -For examples to connect to various Kafka API compatitable message platforms, please check [this doc](/tutorial-sql-connect-kafka). +#### type -::: -### DDL Settings +Must be set to `kafka`. Compatible with: -#### type -Need to be `kafka`. This works with Apache Kafka, Redpanda, Confluent Platform or Cloud,and many other Kafka compatible message platforms. +* Apache Kafka +* Confluent Platform or Cloud +* Redpanda +* Other Kafka-compatible systems #### brokers -One or more brokers with ports. + +Comma-separated list of broker addresses (host\:port), e.g.: + +``` +kafka1:9092,kafka2:9092,kafka3:9092 +``` #### topic -One external stream will connect to one topic. + +Kafka topic name to connect to. #### security_protocol + The supported values for `security_protocol` are: - PLAINTEXT: when this option is omitted, this is the default value. - SASL_SSL: when this value is set, username and password should be specified. - - If you need to specify own SSL certification file, add another setting `ssl_ca_cert_file='/ssl/ca.pem'` New in Proton 1.5.5, you can also put the full content of the pem file as a string in the `ssl_ca_pem` setting if you don't want to, or cannot use a file path, such as on Timeplus Enterprise or in Docker/Kubernetes environments. - - Skipping the SSL certification verification can be done via `SETTINGS skip_ssl_cert_check=true`. + - If users need to specify own SSL certification file, add another setting `ssl_ca_cert_file='/ssl/ca.pem'`. Users can also put the full content of the pem file as a string in the `ssl_ca_pem` setting. + - To skip the SSL certification verification: `skip_ssl_cert_check=true`. #### sasl_mechanism + The supported values for `sasl_mechanism` are: -- PLAIN: when you set security_protocol to SASL_SSL, this is the default value for sasl_mechanism. +- PLAIN: when setting security_protocol to SASL_SSL, this is the default value for sasl_mechanism. - SCRAM-SHA-256 - SCRAM-SHA-512 -- AWS_MSK_IAM: this is available since Timeplus Enterprise 2.7.0 and Proton 1.6.12. Set to this value if you are using AWS MSK with IAM authentication and the EC2 instance or Kubernetes pod has the proper IAM role to access the Kafka topic. +- AWS_MSK_IAM (for AWS MSK IAM role-based access when EC2 or Kubernetes pod is configured with a proper IAM role) -#### username -Required when `sasl_mechanism` is set to value other than `PLAIN` or `AWS_MSK_IAM`. +#### username / password -#### password -Required when `sasl_mechanism` is set to value other than `PLAIN` or `AWS_MSK_IAM`. +Required when `sasl_mechanism` is set to SCRAM-SHA-256 or SCRAM-SHA-512. -Since [Timeplus Enterprise v2.7](/enterprise-v2.7), you can also use the [config_file](#config_file) setting to specify the username and password in a separate file. +Alternatively, use [`config_file`](#config_file) to securely pass credentials. #### config_file -The `config_file` setting is available since Timeplus Enterprise 2.7. You can specify the path to a file that contains the Kafka configuration settings. The file should be in the format of `key=value` pairs, one pair per line. For example: + +Use this to point to a file containing key-value config lines for Kafka external stream, e.g.: ```properties username=my_username password=my_password +data_format='Avro' +one_message_per_row=true ``` -Not just for username and password, you can also put other Kafka settings in the file. Avoid defining the value both in the `config_file` and in the DDL. +This is especially useful in Kubernetes environments with secrets managed via [HashiCorp Vault](https://learn.hashicorp.com/tutorials/vault/kubernetes-sidecar). -If you manage Kubernetes secrets using HashiCorp Vault, you can use the [Vault Agent Injector](https://learn.hashicorp.com/tutorials/vault/kubernetes-sidecar) to mount the secrets to the pod and use the `config_file` setting to specify the path to the file. For example, you create the following annotation to inject the secrets as a local file: +**HarshiCorp Vault injection example:** ```yaml annotations: @@ -106,233 +114,326 @@ annotations: vault.hashicorp.com/role: "vault-role" ``` +:::info + +Please note values in settings in the DDL will override those in config_file and it will only merge the settings from the config_file which are not explicitly specified in the DDL. + +::: + + #### data_format -The supported values for `data_format` are: -- JSONEachRow: parse each row of the message as a single JSON document. The top level JSON key/value pairs will be parsed as the columns. [Learn More](#jsoneachrow). -- CSV: less commonly used. [Learn More](#csv). -- TSV: similar to CSV but tab as the separator -- ProtobufSingle: for single Protobuf message per message -- Protobuf: there could be multiple Protobuf messages in a single message. -- Avro: added in Proton 1.5.2 -- RawBLOB: the default value. Read/write message as plain text. +Defines how Kafka messages are parsed and written. Supported formats are + +| Format | Description | +| ---------------- | ---------------------------------------- | +| `JSONEachRow` | Parses one JSON document per line | +| `CSV` | Parses comma-separated values | +| `TSV` | Like CSV, but tab-delimited | +| `ProtobufSingle` | One Protobuf message per Kafka message | +| `Protobuf` | Multiple Protobuf messages per Kafka msg | +| `Avro` | Avro-encoded messages | +| `RawBLOB` | Raw text, no parsing (default) | #### format_schema -Required if `data_format` is set to `ProtobufSingle`, `Protobuf` or `Avro`. + +Required for these data formats: + +* `ProtobufSingle` +* `Protobuf` +* `Avro` #### one_message_per_row -If the external stream is used to write data to a Kafka topic and the `data_format` is set to `JSONEachRow`, setting this to `true` to make sure each Kafka message only contains one JSON document. + +Set to `true` to ensure each Kafka message maps to exactly **one JSON document**, especially when writing with `JSONEachRow`. #### kafka_schema_registry_url -Set to the address of Kafka Schema Registry server. `http` or `https` need to be included. [Learn more](/proton-schema-registry) for this setting and other settings with `kafka_schema_` as the prefix. + +URL of the [Kafka Schema Registry](/proton-schema-registry), including the protocol is required (`http://` or `https://`). #### kafka_schema_registry_credentials -Set in the 'username:password' format. [Learn more](/proton-schema-registry). -#### ssl_ca_cert_file -Set to the path of the CA certificate file. +Credentials for the registry, in `username:password` format. -#### ssl_ca_pem -Set to the content of the CA certificate file. Usually starts with `-----BEGIN CERTIFICATE-----`. +#### ssl_ca_cert_file / ssl_ca_pem + +Use either: + +* `ssl_ca_cert_file='/path/to/cert.pem'` +* `ssl_ca_pem='-----BEGIN CERTIFICATE-----\n...'` #### skip_ssl_cert_check -Default to `false`. Set to `true` to skip the SSL certificate check and don't specify the CA certificate file or content. + +* Default: `false` +* Set to `true` to **bypass SSL verification**. #### properties -For more advanced use cases, you can specify customized properties while creating the external streams. Those properties will be passed to the underlying Kafka client, which is [librdkafka](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). Please refer to the [section](#advanced_settings) in the bottom of this page for more details. +Used for advanced configurations. These settings are passed directly to the Kafka client ([librdkafka config options](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)) to fine tune the Kafka producer, consumer or topic behaviors. + +For more, see the [Advanced Settings](#advanced_settings) section. + +## Read Data from Kafka -## Read Data in Kafka -### Read messages in a single column {#single_col_read} +Timeplus allows reading Kafka messages in multiple data formats, including: -If the message in Kafka topic is in plain text format or JSON, you can create an external stream with only a `raw` column in `string` type. +* Plain string (raw) +* CSV / TSV +* JSON +* Protobuf +* Avro -Example: +### Read Kafka Messages as Raw String + +Use this mode when: + +* Messages contain **unstructured text or binary data** +* No built-in format is applicable +* You want to **debug raw Kafka messages** + +#### Raw String Example ```sql -CREATE EXTERNAL STREAM ext_github_events +CREATE EXTERNAL STREAM ext_application_logs (raw string) SETTINGS type='kafka', brokers='localhost:9092', - topic='github_events' + topic='application_logs' +``` + +Users can use functions like regex string processing or JSON extract etc functions to further process the raw string. + +#### Regex Example – Parse Application Logs + +```sql +SELECT + to_time(extract(raw, '^(\\d{4}\\.\\d{2}\\.\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d+)')) AS timestamp, + extract(raw, '} <(\\w+)>') AS level, + extract(raw, '} <\\w+> (.*)') AS message +FROM application_logs; +``` + +### Read JSON Kafka Message + +Assuming Kafka message contains JSON text with this schema + +```json +{ + "actor": string, + "created_at": timestamp, + "id": string, + "payload": string, + "repo": string, + "type": string +} ``` -Then use query time [JSON extraction functions](/functions_for_json) or shortcut to access the values, e.g. `raw:id`. +You can process JSON in two ways: -### Read messages as multiple columns{#multi_col_read} +#### Option A: Parse with JSON Extract Functions -If the keys in the JSON message never change, or you don't care about the new columns, you can also create the external stream with multiple columns. +1. Create a raw stream: -You can pick up some top level keys in the JSON as columns, or all possible keys as columns. +```sql +CREATE EXTERNAL STREAM ext_json_raw + (raw string) +SETTINGS type='kafka', + brokers='localhost:9092', + topic='github_events'; +``` -Please note the behaviors are changed in recent versions, based on user feedback: +2. Extract fields using JSON extract shortcut syntax or [JSON extract functions](/functions_for_json): -| Version | Default Behavior | How to overwrite | -| --------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------- | -| 1.4.2 or above | Say there are 5 top level key/value pairs in JSON, you can define 5 or less than 5 columns in the external stream. Data will be read properly. | If you don't want to read new events with unexpected columns, set `input_format_skip_unknown_fields=false` in the `CREATE` DDL. | -| 1.3.24 to 1.4.1 | Say there are 5 top level key/value pairs in JSON, you can need to define 5 columns to read them all. Or define less than 5 columns in the DDL, and make sure to add `input_format_skip_unknown_fields=true` in each `SELECT` query settings, otherwise no search result will be returned. | In each `SELECT` query, you can specify the setting `input_format_skip_unknown_fields=true\|false`. | -| 1.3.23 or older | You have to define a single `string` column for the entire JSON document and apply query time JSON parsing to extract fields. | N/A | +```sql +SELECT + raw:actor AS actor, + raw:created_at::datetime64(3, 'UTC') AS created_at, + raw:id AS id, + raw:payload AS payload, + raw:repo AS repo, + raw:type AS type +FROM ext_json_raw; +``` + +This method is most flexible and is best for dynamic JSON text with new fields or missing fields and it can also extract nested JSON fields. + +#### Option B: Use JSONEachRow Format -Example: +Define a Kafka external stream with columns which are mapped to the JSON fields and also specify the `data_format` as `JSONEachRow`. ```sql -CREATE EXTERNAL STREAM ext_github_events - (actor string, - created_at string, - id string, - payload string, - repo string, - type string - ) +CREATE EXTERNAL STREAM ext_json_parsed + ( + actor string, + created_at datetime64(3, 'UTC'), + id string, + payload string, + repo string, + type string + ) SETTINGS type='kafka', brokers='localhost:9092', topic='github_events', data_format='JSONEachRow' ``` -If there are nested complex JSON in the message, you can define the column as a string type. Actually any JSON value can be saved in a string column. +When users query the `ext_json_parsed` stream, the JSON fields will be parsed and cast to the target column type automatically. -:::info +This method is most convenient when the JSON text is in stable schema and can be used to extract JSON fields at top level. -Protobuf messages can be read with all or partial columns. Please check [this page](/proton-format-schema). +### Read CSV Kafka Messages -::: +Similar to data format `JSONEachRow`, users can read Kafka message in CSV format. -### Query Settings +``` +CREATE EXTERNAL STREAM ext_json_parsed + ( + actor string, + created_at datetime64(3, 'UTC'), + id string, + payload string, + repo string, + type string + ) +SETTINGS type='kafka', + brokers='localhost:9092', + topic='csv_topic', + data_format='CSV'; +``` -#### shards +### Read TSV Kafka Messages -Starting from Proton 1.3.18, you can also read in specified Kafka partitions. By default, all partitions will be read. But you can also read from a single partition via the `shards` setting, e.g. +Identical to CSV, but expects **tab-separated values**: ```sql -SELECT raw FROM ext_stream SETTINGS shards='0' +SETTINGS data_format='TSV'; ``` -Or you can specify a set of partition ID, separated by comma, e.g. +### Read Avro or Protobuf Messages -```sql -SELECT raw FROM ext_stream SETTINGS shards='0,2' -``` +To read Avro-encoded / Protobuf-encoded Kafka message, please refer to [Schema](/proton-format-schema) and [Schema Registry](/proton-schema-registry) for details. -### Read existing messages {#rewind} +### Access Kafka Message Metadata -When you run `SELECT raw FROM ext_stream `, Timeplus will read the new messages in the topics, not the existing ones. +Timeplus provides **virtual columns** for Kafka message metadata. -#### seek_to -If you need to read all existing messages, you can use the following settings: +| Virtual Column | Description | Type | +| --------------------- | ------------------------------ | ---------------------- | +| `_tp_time` | Kafka message timestamp | `datetime64(3, 'UTC')` | +| `_tp_message_key` | Kafka message key | `string` | +| `_tp_message_headers` | Kafka headers as key-value map | `map(string, string)` | +| `_tp_sn` | Kafka message offset | `int64` | +| `_tp_shard` | Kafka partition ID | `int32` | -```sql -SELECT raw FROM ext_stream SETTINGS seek_to='earliest' -``` -Or the following SQL if you are running Proton 1.5.9 or above: +### Kafka Message Metadata Examples ```sql -SELECT raw FROM table(ext_stream) WHERE ... -``` +-- View message time and payload +SELECT _tp_time, raw FROM ext_github_events; -:::warning -Please avoid scanning all data via `select * from table(ext_stream)`. However `select count(*) from table(ext_stream)` is optimized to get the number of current message count from the Kafka topic. -::: +-- View message key +SELECT _tp_message_key, raw FROM ext_github_events; -### Virtual Columns +-- Access headers +SELECT _tp_message_headers['trace_id'], raw FROM ext_github_events; -Besides the message body, Timeplus provides several virtual columns for each message in the Kafka topic. +-- View message offset and partition +SELECT _tp_sn, _tp_shard, raw FROM ext_github_events; +``` -#### _tp_time +### Query Settings for Kafka External Streams -You can read the timestamp of the message via `_tp_time`, e.g. -```sql -SELECT _tp_time, raw FROM foo; -``` +Timeplus supports several query-level settings to control how data is read from Kafka topics. These settings can be especially useful for targeting specific partitions or replaying messages from a defined point in time. -Starting from Timeplus Enterprise 2.8.1, you can also specify the message timestamp by set a value to the `_tp_time` column. +#### Read from Specific Kafka Partitions -#### _tp_message_key +By default, Timeplus reads from **all partitions** of a Kafka topic. You can override this by using the `shards` setting to specify which partitions to read from. -Starting from Timeplus Enterprise 2.4, you can define the `_tp_message_key` column to read or write the message key in the preferred format. +##### Read from a Single Partition -For example: ```sql -CREATE EXTERNAL STREAM foo ( - id int32, - name string, - _tp_message_key string -) SETTINGS type='kafka',...; +SELECT raw FROM ext_stream SETTINGS shards='0' ``` -When doing a SELECT query, the message key will be populated to the `_tp_message_key` column. -`SELECT * FROM foo` will return `'some-key'` for the `_tp_message_key` message. +##### Read from Multiple Partitions -`_tp_message_key` support the following types: `uint8`, `uint16`, `uint32`, `uint64`, `int8`, `int16`, `int32`, `int64`, `bool`, `float32`, `float64`, `string`, and `fixed_string`. +Separate partition IDs with commas: -`_tp_message_key` also support `nullable`. Thus we can create an external stream with optional message key. For example: ```sql -CREATE EXTERNAL STREAM foo ( - id int32, - name string, - _tp_message_key nullable(string) default null -) SETTINGS type='kafka',...; +SELECT raw FROM ext_stream SETTINGS shards='0,2' ``` -For how to use `_tp_message_key` to write the message key when you insert data into the Kafka topic, please refer to [the section](#write_message_key). +#### Rewind via seek_to -#### _tp_message_headers +By default, Timeplus only reads **new messages** published after the query starts. To read historical messages, use the `seek_to` setting. -Starting from Timeplus Proton 1.6.11 and Timeplus Enterprise 2.7, you can read the Kafka message headers as `map(string,string)`, for example: -```sql -SELECT _tp_message_headers, raw FROM foo; -``` +#### Rewind to the Earliest Offset (All Partitions) -To get the value for a certain key in the header, you can access it via `_tp_message_headers['key']`, for example: ```sql -SELECT _tp_message_headers['key'], raw FROM foo; +SELECT raw FROM ext_stream SETTINGS seek_to='earliest' ``` -#### _tp_sn -You can read the message offset of the message via `_tp_sn`, e.g. +#### Rewind to Specific Offsets (Per Partition) + +Offsets are specified **in partition order**. For example: + ```sql -SELECT _tp_sn, raw FROM foo; +SELECT raw FROM ext_stream SETTINGS seek_to='5,3,11' ``` -#### _tp_shard -You can read the partition ID of the message via `_tp_shard`, e.g. +This seeks to: + +* Offset `5` in partition `0` +* Offset `3` in partition `1` +* Offset `11` in partition `2` + +#### Rewind to a Specific Timestamp (All Partitions) + +You can also rewind based on a timestamp: + ```sql -SELECT _tp_shard, raw FROM foo; +SELECT raw FROM ext_stream SETTINGS seek_to='2025-01-01T00:00:00.000' ``` +:::info + +Timeplus will use Kafka API to convert the timestamp to the corresponding offsets for each partition internally. +::: ## Write Data to Kafka -### Write to Kafka in Plain Text {#single_col_write} +Timeplus supports writing data to Kafka using various encoding formats such as strings, JSON, CSV, TSV, Avro, and Protobuf. You can write to Kafka using SQL `INSERT` statements, the [Ingest REST API](/proton-ingest-api), or as the target of a [Materialized View](/sql-create-materialized-view). + +### Write as Raw String -You can write plain text messages to Kafka topics with an external stream with a single column. +You can encode data as a raw string in Kafka messages: ```sql -CREATE EXTERNAL STREAM ext_github_events - (raw string) +CREATE EXTERNAL STREAM ext_github_events (raw string) SETTINGS type='kafka', brokers='localhost:9092', topic='github_events' ``` -Then use either `INSERT INTO VALUES (v)`, or [Ingest REST API](/proton-ingest-api), or set it as the target stream for a materialized view to write message to the Kafka topic. The actual `data_format` value is `RawBLOB` but this can be omitted. By default `one_message_per_row` is `true`. +You can then write data via: -:::info -Since Timeplus Proton 1.5.11, a new setting `kafka_max_message_size` is available. When multiple rows can be written to the same Kafka message, this setting will control how many data will be put in a Kafka message, ensuring it won't exceed the `kafka_max_message_size` limit. -::: +* `INSERT INTO ext_github_events VALUES ('some string')` +* [Ingest REST API](/proton-ingest-api) +* Materialized View -### Multiple columns to write to Kafka{#multi_col_write} -To write data to Kafka topics, you can choose different data formats: +:::info -##### RawBLOB -Write the content as pain text. +Internally, the `data_format` is `RawBLOB`, and `one_message_per_row=true` by default. -##### JSONEachRow +Pay attention to setting `kafka_max_message_size`. When multiple rows can be written to the same Kafka message, this setting will control how many data will be put in a Kafka message, ensuring it won't exceed the `kafka_max_message_size` limit. +::: -You can use `data_format='JSONEachRow',one_message_per_row=true` to inform Timeplus to write each event as a JSON document. The columns of the external stream will be converted to keys in the JSON documents. For example: +### Write as JSONEachRow + +Encode each row as a separate JSON object (aka JSONL or jsonlines): ```sql CREATE EXTERNAL STREAM target( @@ -351,18 +452,18 @@ The messages will be generated in the specific topic as ```json { -"_tp_time":"2023-10-29 05:36:21.957" -"url":"https://www.nationalweb-enabled.io/methodologies/killer/web-readiness" -"method":"POST" -"ip":"c4ecf59a9ec27b50af9cc3bb8289e16c" + "_tp_time":"2023-10-29 05:36:21.957" + "url":"https://www.nationalweb-enabled.io/methodologies/killer/web-readiness" + "method":"POST" + "ip":"c4ecf59a9ec27b50af9cc3bb8289e16c" } ``` :::info -Please note, by default multiple JSON documents will be inserted to the same Kafka message. One JSON document each row/line. Such default behavior aims to get the maximum writing performance to Kafka/Redpanda. But you need to make sure the downstream applications are able to properly split the JSON documents per Kafka message. +Please note, by default multiple JSON documents will be inserted to the same Kafka message. One JSON document each row/line (JSONEachRow, jsonl). Such default behavior aims to get the maximum writing performance to Kafka/Redpanda. But users need to make sure the downstream applications are able to properly process the json lines. -If you need a valid JSON per each Kafka message, instead of a JSONL, please set `one_message_per_row=true` e.g. +If users need a valid JSON per each Kafka message, instead of a JSONL, please set `one_message_per_row=true` e.g. ```sql CREATE EXTERNAL STREAM target(_tp_time datetime64(3), url string, ip string) @@ -370,15 +471,13 @@ SETTINGS type='kafka', brokers='redpanda:9092', topic='masked-fe-event', data_format='JSONEachRow',one_message_per_row=true ``` -The default value of one_message_per_row, if not specified, is false for `data_format='JSONEachRow'` and true for `data_format='RawBLOB'`. - -Since Timeplus Proton 1.5.11, a new setting `kafka_max_message_size` is available. When multiple rows can be written to the same Kafka message, this setting will control how many data will be put in a Kafka message and when to create new Kafka message, ensuring each message won't exceed the `kafka_max_message_size` limit. +The default value of one_message_per_row is false for `data_format='JSONEachRow'` and true for `data_format='RawBLOB'`. ::: -##### CSV +### Write as CSV -You can use `data_format='CSV'` to inform Timeplus to write each event as a JSON document. The columns of the external stream will be converted to keys in the JSON documents. For example: +Each row is encoded as one CSV line: ```sql CREATE EXTERNAL STREAM target( @@ -397,116 +496,24 @@ The messages will be generated in the specific topic as ```csv "2023-10-29 05:35:54.176","https://www.nationalwhiteboard.info/sticky/recontextualize/robust/incentivize","PUT","3eaf6372e909e033fcfc2d6a3bc04ace" ``` -##### TSV -Similar to CSV but tab as the separator. - -##### ProtobufSingle -You can write Protobuf-encoded messages in Kafka topics. - -First, you need to create a schema with SQL, e.g. -```sql -CREATE OR REPLACE FORMAT SCHEMA schema_name AS ' - syntax = "proto3"; - - message SearchRequest { - string query = 1; - int32 page_number = 2; - int32 results_per_page = 3; - } - ' TYPE Protobuf -``` -Then refer to this schema while creating an external stream for Kafka: -```sql -CREATE EXTERNAL STREAM stream_name( - query string, - page_number int32, - results_per_page int32) -SETTINGS type='kafka', - brokers='kafka:9092', - topic='masked-fe-event', - data_format='ProtobufSingle', - format_schema='schema_name:SearchRequest' -``` - -Then you can run `INSERT INTO` or use a materialized view to write data to the topic. -```sql -INSERT INTO stream_name(query,page_number,results_per_page) VALUES('test',1,100) -``` - -You can either define the [Protobuf Schema in Proton](/proton-format-schema), or specify the [Kafka Schema Registry](/proton-schema-registry) when you create the external stream. - -##### Avro -Starting from Proton 1.5.2, you can use Avro format when you specify the [Kafka Schema Registry](/proton-schema-registry) when you create the external stream. - -First, you need to create a schema with SQL, e.g. -```sql -CREATE OR REPLACE FORMAT SCHEMA avro_schema AS '{ - "namespace": "example.avro", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]} - ] - } - ' TYPE Avro; -``` -Then refer to this schema while creating an external stream for Pulsar: -```sql -CREATE EXTERNAL STREAM stream_avro( - name string, - favorite_number nullable(int32), - favorite_color nullable(string)) -SETTINGS type='kafka', - brokers='kafka:9092', - topic='masked-fe-event', - data_format='Avro', - format_schema='avro_schema' -``` -Then you can run `INSERT INTO` or use a materialized view to write data to the topic. -```sql -INSERT INTO stream_avro(name,favorite_number,favorite_color) VALUES('test',1,'red') -``` +### Write as TSV -### Continuously Write to Kafka via MV +Same as CSV, but uses **tab characters** as delimiters instead of commas. -You can use materialized views to write data to Kafka as an external stream, e.g. +### Write as ProtobufSingle -```sql --- read the topic via an external stream -CREATE EXTERNAL STREAM frontend_events(raw string) - SETTINGS type='kafka', - brokers='redpanda:9092', - topic='owlshop-frontend-events'; +To write Protobuf-encoded messages from Kafka topics, please refer to [Protobuf Schema](/proton-format-schema), and [Kafka Schema Registry](/proton-schema-registry) pages for details. --- create the other external stream to write data to the other topic -CREATE EXTERNAL STREAM target( - _tp_time datetime64(3), - url string, - method string, - ip string) - SETTINGS type='kafka', - brokers='redpanda:9092', - topic='masked-fe-event', - data_format='JSONEachRow', - one_message_per_row=true; +### Write as Avro --- setup the ETL pipeline via a materialized view -CREATE MATERIALIZED VIEW mv INTO target AS - SELECT now64() AS _tp_time, - raw:requestedUrl AS url, - raw:method AS method, - lower(hex(md5(raw:ipAddress))) AS ip - FROM frontend_events; -``` +To write Avro-encoded messages from Kafka topics, please refer to [Avro Schema](/proton-format-schema), and [Kafka Schema Registry](/proton-schema-registry) pages for details. -### Write to Kafka with metadata{#metadata} +### Write Kafka Message Metadata -#### _tp_message_key {#write_message_key} +#### _tp_message_key -Starting from Timeplus Enterprise 2.4, you can define the `_tp_message_key` column when you create the external stream. This new approach provides more intuitive and flexible way to write any content as the message key, not necessarily mapping to a specify column or a set of columns. +If users like to populate Kafka message key when producing data to a Kafka topic, users can define the `_tp_message_key` column when creating the external stream. For example: ```sql @@ -516,17 +523,21 @@ CREATE EXTERNAL STREAM foo ( _tp_message_key string ) SETTINGS type='kafka',...; ``` -You can insert any data to the Kafka topic. -When insert a row to the stream like: +After inserting a row to the stream like this: ```sql INSERT INTO foo(id,name,_tp_message_key) VALUES (1, 'John', 'some-key'); ``` -`'some-key'` will be used for the message key for the Kafka message (and it will be excluded from the message body, so the message will be `{"id": 1, "name": "John"}` for the above SQL). -`_tp_message_key` support the following types: `uint8`, `uint16`, `uint32`, `uint64`, `int8`, `int16`, `int32`, `int64`, `bool`, `float32`, `float64`, `string`, and `fixed_string`. +* Kafka key will be `'some-key'` +* Message body: `{"id": 1, "name": "John"}`. Kafka key was excluded from the message body. + +`_tp_message_key` supports these types: + +* Numeric: `uint8/16/32/64`, `int8/16/32/64` +* Others: `string`, `bool`, `float32`, `float64`, `fixed_string` +* Nullable are also supported: -`_tp_message_key` also support `nullable`. Thus we can create an external stream with optional message key. For example: ```sql CREATE EXTERNAL STREAM foo ( id int32, @@ -535,22 +546,10 @@ CREATE EXTERNAL STREAM foo ( ) SETTINGS type='kafka',...; ``` -#### sharding_expr -If you configure a partition strategy in the Kafka cluster, you can specify the message key with the above approach. The message key will be stored together with the message body. Alternatively, you can use the `sharding_expr` to specify the partition ID for the message. For example: -```sql -CREATE EXTERNAL STREAM foo ( - id int32,.. -) SETTINGS type='kafka', sharding_expr='hash(id)'...; -``` -When you insert data, the shard ID will be calculated based on the `sharding_expr` and Timeplus will put the message into the corresponding partition/shard. - -#### _tp_message_headers {#write_message_headers} - -Starting from Timeplus Proton 1.6.11 and Timeplus Enterprise 2.7, you can read the Kafka message headers as `map(string,string)` via the `_tp_message_headers` virtual column. +#### _tp_message_headers -Starting from Timeplus Enterprise 2.8.2, you can also write custom headers via this column. +Add Kafka headers via `_tp_message_headers` (map of key-value pairs): -Define the column in the DDL: ```sql CREATE EXTERNAL STREAM example ( s string, @@ -559,19 +558,28 @@ CREATE EXTERNAL STREAM example ( _tp_message_headers map(string, string) ) settings type='kafka',...; ``` -Then insert data to the external stream via `INSERT INTO` or materialized views, with a map of string pairs as custom headers for each message. -## DROP EXTERNAL STREAM +Then insert rows to the external stream via `INSERT INTO` or Materialized Views, the `_tp_message_headers` will be set to the headers of the Kafka message. + +#### sharding_expr {#sharding_expr} + +`sharding_expr` is used to control how rows are distributed to Kafka partitions: ```sql -DROP STREAM [IF EXISTS] stream_name +CREATE EXTERNAL STREAM foo ( + id int32,.. +) SETTINGS type='kafka', sharding_expr='hash(id)'...; ``` +When inserting rows, the partition ID will be evaluated based on the `sharding_expr` and Timeplus will put the message into the corresponding Kafka partition. + ## Properties for Kafka client {#advanced_settings} -For more advanced use cases, you can specify customized properties while creating the external streams. Those properties will be passed to the underlying Kafka client, which is [librdkafka](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). +In advanced use cases, you may want to fine-tune the behavior of the Kafka consumer, producer, or topic when creating Kafka external streams. For example, fine tune the consumeer, producer's latency, throughput etc. Timeplus allows these fine tuning through the `properties` setting, which passes configuration options directly to the underlying [librdkafka](https://github.com/confluentinc/librdkafka) client. -For example: +These settings can control aspects like message size limits, retry behavior, timeouts, and more. For a full list of available configuration options, refer to the [librdkafka configuration documentation](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). + +### Kafka Client Properties Example ```sql CREATE EXTERNAL STREAM ext_github_events(raw string) @@ -581,7 +589,11 @@ SETTINGS type='kafka', properties='message.max.bytes=1000000;message.timeout.ms=6000' ``` -Please note, not all properties in [librdkafka](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) are supported. The following ones are accepted in Proton today. Please check the configuration guide of [librdkafka](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) for details. +This example sets the maximum Kafka message size to 1MB and the message timeout to 6 seconds. + +### Kafka Client Properties + +Please note while most configuration properties from `librdkafka` are supported, Timeplus may restrict or ignore certain settings. Here is the list of supported properties. (C/P legend: C = Consumer, P = Producer, * = both) @@ -685,13 +697,3 @@ batch.size | P | 1 .. 2147483647 | 1000000 delivery.report.only.error | P | true, false | false | low | Only provide delivery reports for failed messages. *Type: boolean* sticky.partitioning.linger.ms | P | 0 .. 900000 | 10 | low | Delay in milliseconds to wait to assign new sticky partitions for each topic. By default, set to double the time of linger.ms. To disable sticky behavior, set to 0. This behavior affects messages with the key NULL in all cases, and messages with key lengths of zero when the consistent_random partitioner is in use. These messages would otherwise be assigned randomly. A higher value allows for more effective batching of these messages. *Type: integer* - -## Limitations - -There are some limitations for the Kafka-based external streams, because Timeplus doesn’t control the storage or the data format for the external stream. - -1. The UI wizard to setup Kafka External Stream only supports JSON or TEXT. To use Avro, Protobuf, or schema registry service, you need the SQL DDL. -2. `_tp_time` is available in the external streams (since Proton 1.3.30). `_tp_append_time` is set only when message timestamp is an append time. -3. Unlike normal streams, there is no historical storage for the external streams. In recent versions, you can run `table(kafka_ext_stream)` but it will scan all messages in the topic, unless you are running a `count()`. If you need to frequently run query for historical data, you can use a Materialized View to query the Kafka External Stream and save the data in Timeplus columnar or row storage. This will improve the query performance. -4. There is no retention policy for the external streams in Timeplus. You need to configure the retention policy on Kafka/Confluent/Redpanda. If the data is no longer available in the external systems, they cannot be searched in Timeplus either. -5. Consumer group settings are not available for Kafka external streams, as Timeplus internally manages message offsets without utilizing consumer groups. diff --git a/docs/working-with-streams.md b/docs/working-with-streams.md index 3f6b58d5..30599a6f 100644 --- a/docs/working-with-streams.md +++ b/docs/working-with-streams.md @@ -1,27 +1,52 @@ # Streams -## All data live in streams +Timeplus `streams` are conceptually similar to `tables` in traditional SQL databases — they both hold data. However, there are key differences: -Timeplus is a streaming analytics platform and data lives in streams. Timeplus `streams` are similar to `tables` in the traditional SQL databases. Both of them are essentially datasets. The key difference is that Timeplus stream is an append-only (by default), unbounded, constantly changing events group. +* A **Timeplus stream** tracks changes and updates through its underlying **Write-Ahead Log (WAL)**, which powers incremental processing. +* Timeplus supports **both incremental stream processing and historical queries** over stream data. -Timeplus supports multiple types of streams: +## Types of Streams -1. By default, the streams are append-only and immutable (older data can be purged automatically by setting a retention policy). -2. If you want to create a stream to track the latest value for a primary key or a set of keys, you can create [Mutable Streams](/mutable-stream). This is only available in Timeplus Enterprise. -3. In [Timeplus Proton](/proton), you can also create [Versioned Streams](/versioned-stream) and [Changelog Stream](/changelog-stream). But those 2 stream modes will be deprecated and replaced by mutable streams. -4. You can also define [External Streams](/external-stream) to run SQL against remote Kafka/Redpanda brokers, or the other Timeplus/Proton server. +To support a variety of use cases efficiently, Timeplus offers multiple types of streams: -## Create a stream -You can create a stream via the Timeplus Console UI, or via [SQL](/sql-create-stream). When you [ingest data](/ingestion) into Timeplus from Kafka or file sources, streams can be created automatically to match the schema of the data. +1. [Append Stream](/append-stream) -## Query a stream + The default stream type in Timeplus. It uses columnar encoding and is optimized for **range scans** via a **sorting key**. It suits workloads with infrequent data mutations (e.g., `UPDATE` or `DELETE`). -By default, querying the stream will continuously scan new events and output new results. It never ends unless the user cancels the query. For example, you can get the latest web logs with HTTP 500 error or get the min/max/avg of a metric for every minute from an IoT device. Please read [Streaming Queries](/stream-query) for more details. +2. [Mutable Stream](/mutable-stream) -If you only want to analyze the existing data and need an immediate response, you can run [Non-streaming Queries](/history) via the [table](/functions_for_streaming#table) function. This will turn the query in the bounded mode and only scan the existing data. For example, you can run `select count(*) from table(stream1)` to get the total number of rows in the data stream. + Row-encoded and similar in behavior to a **MySQL table**, where each primary key corresponds to a single row. It is optimized for **frequent data mutations** (`UPDATE`, `UPSERT`, `DELETE`) and supports **point and range queries** via **primary or secondary indexes**. +3. [Versioned Key-Value Stream](/versioned-stream) + Similar to the mutable stream but uses **columnar encoding**. It offers better **compression** but lower performance for updates and point queries, especially when cardinality is high. Best suited for scenarios where **data mutations are less frequent**. -## Delete a stream +4. [Changelog Key-Value Stream](/changelog-stream) -From the web console, you can delete the stream. This will permanently delete all data in the stream and delete the stream itself. Data cannot be recovered after deletion. + Designed to model **change data capture (CDC) events**, with **columnar encoding** for efficient downstream processing. + +5. [External Stream](/external-stream) + + As the name implies, the data resides outside of Timeplus. Timeplus can reference external sources (e.g., a **Kafka topic**) and execute **streaming SQL** queries against them in real time. + +> Note: Timeplus also supports [External Tables](/sql-create-external-table), which allow **historical queries and inserts** only (e.g., against ClickHouse, MySQL, PostgreSQL, MongoDB, etc.). + + +## Stream Internals + +When users [create a stream](/sql-create-stream) in Timeplus, they can specify the number of **shards** for the stream. Each shard consists of two core components at the storage layer: + +1. **Streaming Store** +2. **Historical Store** + +The **streaming store** is essentially the **Write-Ahead Log** (internally called `NativeLog`). It supports: + +* High-concurrency [data ingestion](/ingestion) +* [Incremental stream processing](/stream-query) +* Real-time data replication + +For more information, refer to the [high-level architecture](/architecture) page. + +The **historical store** asynchronously derives its data from the WAL through a dedicated background thread. It performs periodic **compaction**, **merge**, and **compression**, making it highly efficient for [historical analytic queries](/history) and **streaming backfills**. + +To learn more about stream lifecycle operations (Create, Read, Delete, Update) and advanced configurations like **TTL**, **key versioning**, and other stream settings, refer to the SQL Reference documentation. To learning more about external streams, refer to [external stream](/external-stream) pages for more details.