Skip to content
108 changes: 92 additions & 16 deletions modules/reference/pages/sql/sql-statements/create-table.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

The `CREATE TABLE` statement maps a Redpanda topic to a SQL table through a catalog. After creating the table, you can query topic data using standard SQL.

NOTE: You must first xref:reference:sql/sql-statements/create-redpanda-catalog.adoc[create a Redpanda catalog connection] before creating tables. `CREATE TABLE` in Redpanda SQL maps Redpanda topics to SQL tables — it does not create standalone tables with user-defined schemas.
NOTE: You must first xref:reference:sql/sql-statements/create-redpanda-catalog.adoc[create a Redpanda catalog connection] before creating tables. `CREATE TABLE` in Redpanda SQL maps Redpanda topics to SQL tables and does not create standalone tables with user-defined schemas.

== Syntax

Expand Down Expand Up @@ -32,12 +32,12 @@ WITH (option = 'value' [, ...]);
|`schema_subject`
|STRING
|No
|Schema Registry subject name to use for deserializing topic data.
|Schema Registry subject name to use for deserializing topic data. Defaults to the topic-name strategy (`<topic>-value`).

|`schema_lookup_policy`
|STRING
|No
|How to resolve the schema version. Only `LATEST` is supported.
|How to resolve the schema version. `LATEST` is the only supported value.

|`error_handling_policy`
|STRING
Expand All @@ -51,42 +51,118 @@ a|How to handle records that fail deserialization.
|`struct_mapping_policy`
|STRING
|No
a|How to map nested structures to SQL columns.
a|How to map nested structures from the topic schema to SQL columns.

* `JSON` (default): Stores nested data as JSON.
* `FLATTEN`: Expands nested fields into top-level columns.
* `COMPOUND`: Maps to ROW types.
* `VARIANT`: Stores as a variant type.
* `COMPOUND` (default): Maps each nested structure to a SQL xref:reference:sql/sql-data-types/row.adoc[ROW] value with named fields, queryable using `(column).field_name` syntax. Cyclic types are not supported in `COMPOUND` mode. Use `JSON` for recursive schemas.
* `JSON`: Stores each nested structure as a JSON value. Required for recursive (cyclic) types.

|`output_schema_message_full_name`
|STRING
|No
|Full Protobuf message name. Required when the schema contains multiple message definitions.

|`confluent_wire_protocol`
|STRING
|No
a|Whether records on the topic are encoded with the https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format[Confluent Schema Registry wire format^] (a magic byte followed by a 4-byte schema ID before the payload).

* `'true'` (default): Records carry the Confluent wire-format prefix. Use this for topics whose values were produced by a Schema-Registry-aware client.
* `'false'`: Records are raw Protobuf or Avro without the wire-format prefix.

Only valid when `schema_lookup_policy = 'LATEST'`.
|===

[#auto-added-columns]
== Auto-added columns

Every catalog-mapped table includes two struct columns in addition to the columns derived from the topic's schema. Redpanda SQL adds these columns to both Kafka-backed and Iceberg-backed tables. The names `redpanda` and `redpanda_raw` are reserved. A topic schema cannot define columns with these names.

=== `redpanda`

Contains Kafka record metadata. Always present on every row.

[cols="<22%,<28%,<10%,<40%",options="header"]
|===
|Field |Type |Nullable |Description

|`partition`
|`INT`
|No
|Partition the record was read from.

|`offset`
|`BIGINT`
|No
|Offset of the record within its partition.

|`timestamp`
|`TIMESTAMP WITH TIME ZONE`
|Yes
|Record timestamp.

|`headers`
|Array of struct `{key TEXT, value BYTEA}`
|Yes
|Record headers, as an array where each element is a struct of header name and value bytes.

|`key`
|`BYTEA`
|Yes
|Record key bytes.

|`timestamp_type`
|`INT`
|Yes
|Kafka timestamp type code. `0` for `CreateTime`, `1` for `LogAppendTime`. `NULL` when not available.
|===

=== `redpanda_raw`

Populated only when `error_handling_policy = 'FILL_NULL'` and a record fails to decode. In all other cases, `redpanda_raw` is `NULL`.

Use `redpanda_raw` as a dead-letter pattern. Rows whose value fails schema deserialization remain queryable, with the malformed payload preserved for inspection or reprocessing.

[cols="<22%,<28%,<10%,<40%",options="header"]
|===
|Field |Type |Nullable |Description

|`key`
|`BYTEA`
|Yes
|Raw record key bytes.

|`value`
|`BYTEA`
|Yes
|Raw record value bytes that failed to decode.
|===

== Examples

=== Create a basic table
=== Map a topic to a table

Map the `transactions` topic to a table through `default_redpanda_catalog`:

[source,sql]
----
CREATE TABLE default_redpanda_catalog=>transactions
WITH (topic = 'transactions');
WITH (
topic = 'transactions',
schema_subject = 'transactions-value'
);
----

=== Specify a Schema Registry subject
=== Create a table from a multi-message Protobuf schema

Map a topic and specify the Schema Registry subject:
When the Protobuf schema for the topic defines more than one message, specify the message to use with `output_schema_message_full_name`:

[source,sql]
----
CREATE TABLE default_redpanda_catalog=>user_events
CREATE TABLE default_redpanda_catalog=>orders
WITH (
topic = 'user-events',
schema_subject = 'user-events-value',
schema_lookup_policy = 'LATEST'
topic = 'orders',
schema_subject = 'orders-value',
output_schema_message_full_name = 'com.example.orders.Order'
);
----

Expand Down
3 changes: 3 additions & 0 deletions modules/sql/pages/query-data/index.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
= Query Data
:description: Query live and historical data in your Redpanda topics using standard PostgreSQL syntax.
:page-layout: index
87 changes: 87 additions & 0 deletions modules/sql/pages/query-data/query-streaming-topics.adoc
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder where should we mention the existence of redpanda and redpanda_raw structs. The first is iceberg equivalent so all partition offset etc properties are there. The second is DLQ equivalent, filled when FILL_NULL error policy is set

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JacekGalazka1 do those only exist for Iceberg topics? There is mention of them in this other doc specifically for querying Iceberg https://github.com/redpanda-data/cloud-docs/pull/575/changes#diff-3ab2a15f947f028cb3f75cdb5184029657557cac26b1c961cb27c72554ba3533R83 Is redpanda_raw populated only when FILL_NULL is set?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are always added to each kafka reader, so both pure kafka and iceberg backed will have it.
redpanda_raw is populated only when FILL_NULL is set and only for records that failed to decode. in all other cases it's NULL.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect. Good to go

Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
= Query Streaming Topics
:description: Map a Redpanda topic to a SQL table and run analytical queries directly against live streaming data.
:page-topic-type: how-to
:personas: app_developer, data_engineer
:learning-objective-1: Map a Redpanda topic to a SQL table using the default Redpanda catalog
:learning-objective-2: Run analytical SQL queries against live topic data

Map a Redpanda topic to a SQL table to run analytical queries directly against live streaming data without building ETL pipelines. Redpanda SQL reads each record's fields from the topic's registered schema.

To extend queries past your Redpanda retention window by reading the Iceberg history of Iceberg-enabled topics, see xref:sql:query-data/query-iceberg-topics.adoc[Query Iceberg-enabled Topics].
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly @kbatuigas ! :)


Use this page to:

* [ ] {learning-objective-1}
* [ ] {learning-objective-2}

== Prerequisites

Before you query a topic with SQL:

* Enable the Redpanda SQL engine on your Redpanda Bring Your Own Cloud (BYOC) cluster. See xref:sql:get-started/deploy-sql-cluster.adoc[Enable Redpanda SQL].
* Have a Redpanda Cloud user with the *SQL: Access* (or *SQL: Manage*) data-plane RBAC permission. For a *SQL: Access* user to query a topic, a *SQL: Manage* user must first `GRANT SELECT` on the topic to that user. See xref:sql:manage/manage-access.adoc[Manage access to Redpanda SQL].
* Connect to Redpanda SQL with `psql` or another PostgreSQL client. See xref:sql:get-started/sql-quickstart.adoc[] for a `psql` example, or xref:sql:connect-to-sql/index.adoc[Connect to Redpanda SQL].
* Confirm that the Redpanda topic you want to query has a schema registered in Schema Registry. Redpanda SQL supports Protobuf, JSON, and Avro schemas.

== Map the topic to a SQL table

Each Redpanda topic appears as a SQL table inside a Redpanda catalog. When Redpanda SQL is enabled, a catalog named `default_redpanda_catalog` is created automatically and points at your cluster.

Define a table against the topic with `CREATE TABLE`:

[source,sql]
----
CREATE TABLE default_redpanda_catalog=>orders WITH (
topic = 'orders',
schema_subject = 'orders-value'
);
----

Replace `orders` with your topic name and `orders-value` with the Schema Registry subject that holds the topic's value schema. `schema_subject` is optional. If omitted, Redpanda SQL uses the topic-name strategy default (`<topic>-value`).

If the topic uses a Protobuf schema that defines more than one message, also set `output_schema_message_full_name` to the fully-qualified name of the message to use:

[source,sql]
----
CREATE TABLE default_redpanda_catalog=>orders WITH (
topic = 'orders',
schema_subject = 'orders-value',
output_schema_message_full_name = 'com.example.orders.Order'
);
----

The table inherits its column definitions from the registered schema. Each top-level field in the schema becomes a SQL column. For querying nested fields in struct types, see xref:sql:query-data/query-nested-fields.adoc[].

In addition to the columns derived from your topic's schema, Redpanda SQL adds two struct columns to every catalog-mapped table:

* `redpanda`: Kafka record metadata such as partition, offset, and timestamp.
* `redpanda_raw`: Populated only when `error_handling_policy = 'FILL_NULL'` and a record fails to decode.

For details, see xref:reference:sql/sql-statements/create-table.adoc#auto-added-columns[Auto-added columns].

== Run queries

Query the table with standard `SELECT` syntax. The following query returns the first 10 records:

[source,sql]
----
SELECT * FROM default_redpanda_catalog=>orders LIMIT 10;
----

Aggregate and filter records using familiar PostgreSQL constructs:

[source,sql]
----
SELECT customer_id, SUM(amount) AS total
FROM default_redpanda_catalog=>orders
WHERE status = 'completed'
GROUP BY customer_id
ORDER BY total DESC
LIMIT 10;
----

== Next steps

* xref:sql:query-data/query-iceberg-topics.adoc[Query Iceberg-enabled Topics]: run queries against historical data retained beyond your Redpanda retention window.
* xref:reference:sql/sql-statements/create-table.adoc[CREATE TABLE]: full reference for the table-against-topic syntax, including all options.
* xref:reference:sql/index.adoc[Redpanda SQL Reference]: supported SQL statements, clauses, data types, and functions.
81 changes: 2 additions & 79 deletions modules/sql/pages/query-data/redpanda-catalogs.adoc
Original file line number Diff line number Diff line change
@@ -1,81 +1,4 @@
= Redpanda Catalogs
:description: Redpanda catalogs are named connections that map Redpanda topics to queryable SQL tables.
:page-topic-type: reference

Redpanda catalogs are named connections that let you query Redpanda topics using standard SQL. The catalog model consists of three core concepts:

* Catalogs: Named connections to a Redpanda cluster, created with xref:reference:sql/sql-statements/create-redpanda-catalog.adoc[CREATE REDPANDA CATALOG].
* Tables: Redpanda topics mapped as queryable SQL tables using the `catalog_name\=>table_name` syntax, created with xref:reference:sql/sql-statements/create-table.adoc[CREATE TABLE].
* Storage connections: Named connections to external object storage such as Amazon S3, created with xref:reference:sql/sql-statements/create-storage.adoc[CREATE STORAGE].

NOTE: Redpanda SQL operates in read-only mode. Data mutation operations such as `INSERT`, `UPDATE`, and `DELETE` are not available. Data is ingested into Redpanda topics and made queryable through catalog mappings.

== Typical workflow

To query Redpanda topic data with SQL:

. Create a catalog connection:
+
[source,sql]
----
CREATE REDPANDA CATALOG production_redpanda
WITH (
initial_brokers = 'broker1:9092',
schema_registry_url = 'http://schema-registry:8081'
);
----

. Map a topic as a table:
+
[source,sql]
----
CREATE TABLE production_redpanda=>user_events
WITH (topic = 'user-events');
----

. Query the data:
+
[source,sql]
----
SELECT * FROM production_redpanda=>user_events LIMIT 10;
----

== Related statements

[cols="<40%,<60%",options="header"]
|===
|Statement |Description

|xref:reference:sql/sql-statements/create-redpanda-catalog.adoc[CREATE REDPANDA CATALOG]
|Create a catalog connection to a Redpanda cluster.

|xref:reference:sql/sql-statements/alter-redpanda-catalog.adoc[ALTER REDPANDA CATALOG]
|Modify connection properties of an existing catalog.

|xref:reference:sql/sql-statements/create-table.adoc[CREATE TABLE]
|Map a Redpanda topic to a SQL table through a catalog.

|xref:reference:sql/sql-statements/alter-table.adoc[ALTER TABLE]
|Modify options of an existing catalog table.

|xref:reference:sql/sql-statements/drop-table.adoc[DROP TABLE]
|Remove a catalog table mapping.

|xref:reference:sql/sql-statements/drop-redpanda-catalog.adoc[DROP REDPANDA CATALOG]
|Remove a Redpanda catalog connection.

|xref:reference:sql/sql-statements/drop-storage.adoc[DROP STORAGE]
|Remove a named storage definition.

|xref:reference:sql/sql-statements/show-tables.adoc[SHOW TABLES]
|List tables within a catalog.

|xref:reference:sql/sql-statements/describe.adoc[DESCRIBE]
|Show details about a catalog or catalog table.

|xref:reference:sql/sql-statements/create-storage.adoc[CREATE STORAGE]
|Create a connection to external object storage.

|xref:reference:sql/sql-statements/alter-storage.adoc[ALTER STORAGE]
|Modify an existing storage connection.
|===
// TODO: Full content rewrite lives on the DOC-2049 branch (PR #573).
// Replace this stub when DOC-2049 merges into rp-sql.