Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 46 additions & 48 deletions docs/quix-cloud/quixlake/open-format.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,69 +33,67 @@ Quix Lake stores Kafka messages and metadata as **open files** in your blob stor

=== "Raw data (Avro)"

Short description

- Every Kafka message is persisted as one Avro record.
- Captures payload, timestamp, partition, offset, headers, and optional gap markers for periods with no messages.
- Each Kafka message is persisted as **one Avro record**.
- Preserves **timestamp**, **key**, **headers**, **value**, **partition**, and **offset** exactly as produced by Kafka.
- `value` is stored as **bytes**; you control the serialization of your payload (e.g., JSON/Avro/Protobuf) before it lands here.

**Record fields**

| Field | Type | Meaning |
|----------------|--------------------|-------------------------------------------|
| `Payload` | bytes or string | Serialized message value |
| `TimestampMs` | long | Message timestamp in milliseconds |
| `Partition` | int | Kafka partition number |
| `Offset` | long | Kafka offset |
| `Headers` | map<string, bytes> | Kafka headers (optional) |
| `Gap` | boolean | True when the row marks a gap (optional) |
| `GapReason` | string | Human-readable reason (optional) |
| Field | Avro type | Meaning |
|--------------|--------------------|----------------------------------------------------------------------------|
| `timestamp` | `long` | Kafka message timestamp in **milliseconds since Unix epoch**. |
| `key` | `["null","bytes"]` | Kafka message key; `null` if keyless. |
| `headers` | `array<Header>` | Kafka headers as an array of `{ key: string, value: ["string","bytes"] }`. |
| `value` | `bytes` | Message payload as raw bytes (you define the serialization upstream). |
| `partition` | `long` | Kafka partition number. |
| `offset` | `long` | Kafka offset. |

=== "Index metadata (Parquet)"
!!! Note
`headers.value` supports **either UTF-8 strings or raw bytes**. Values are written exactly as provided by the producer.

Short description
=== "Index metadata (Parquet)"

- Compact Parquet descriptors summarize where data lives so the Catalog and APIs can discover datasets without scanning Avro.
Compact **Parquet** descriptors summarize where raw Avro files live so Catalog/UI/APIs can **discover datasets without scanning Avro**.

**Columns**

| Column | Type | Meaning |
|------------------|----------|----------------------------------------------|
| `Path` | string | Full object path to the referenced file |
| `Topic` | string | Kafka topic |
| `Key` | string | Stream key |
| `Partition` | int | Kafka partition number |
| `TimestampStart` | long | First record timestamp in ms |
| `TimestampEnd` | long | Last record timestamp in ms |
| `OffsetStart` | long | First Kafka offset in the segment |
| `OffsetEnd` | long | Last Kafka offset in the segment |
| `RecordCount` | long | Number of records |
| `FileSizeBytes` | long | Size of the referenced file |
| `CreatedAt` | datetime | Descriptor creation time in UTC |
| `DeletedAt` | datetime | Soft-delete marker (nullable) |
| Column | Parquet type | Meaning |
|------------------|---------------------------------|-------------------------------------------------------------|
| `Path` | `BYTE_ARRAY (UTF8)` | Full object path to the referenced Avro file |
| `Topic` | `BYTE_ARRAY (UTF8)` | Kafka topic |
| `Key` | `BYTE_ARRAY (UTF8)` | Stream key |
| `Partition` | `INT64` | Kafka partition number |
| `TimestampStart` | `INT64` | First record timestamp in **milliseconds since Unix epoch** |
| `TimestampEnd` | `INT64` | Last record timestamp in **milliseconds since Unix epoch** |
| `StartOffset` | `INT64` | First Kafka offset in the segment |
| `EndOffset` | `INT64` | Last Kafka offset in the segment |
| `RecordCount` | `INT64` | Number of records |
| `FileSizeBytes` | `INT64` | Size of the referenced file (bytes) |
| `CreatedAt` | `INT64 (TIMESTAMP_MICROS, UTC)` | Descriptor creation time (UTC, microseconds) |

!!! Notes

- Field names are **canonical**: `StartOffset` / `EndOffset` (not `OffsetStart` / `OffsetEnd`).
- `CreatedAt` uses Parquet logical type **`TimestampMicros`**, adjusted to UTC.
- Time units for `TimestampStart`/`TimestampEnd` are **milliseconds** for consistency with the Avro message `timestamp`.

=== "Custom metadata (Parquet)"

Short description

- Optional key or dataset level properties you add, for example experiment id, labels, or business tags. Stored as Parquet and indexed for search.
Optional dataset/key-level properties stored as **Parquet** and indexed for discovery and filtering.

**Columns**

| Column | Type | Meaning |
|-----------------|----------|-------------------------------------------|
| `Topic` | string | Kafka topic |
| `Key` | string | Stream key |
| `MetadataKey` | string | Property name |
| `MetadataValue` | string | Property value |
| `UpdatedUtc` | datetime | When this metadata entry was last updated |


## How it flows

* Ingest: write Avro into partitioned folders ([Quix Lake - Sink](../managed-services/sink.md))
* Index: write Parquet descriptors alongside ([Quix Lake - Sink](../managed-services/sink.md))
* Discover: UI and APIs read Parquet to list and filter your datasets ([Quix Lake - API](./api.md))
* Use: Replay to Kafka, or query with your engines of choice ([Quix Lake - Replay](../managed-services/replay.md))
| Column | Parquet type | Meaning |
|-----------------|---------------------------------|----------------------------------------------|
| `Topic` | `BYTE_ARRAY (UTF8)` | Kafka topic |
| `Key` | `BYTE_ARRAY (UTF8)` | Stream key |
| `MetadataKey` | `BYTE_ARRAY (UTF8)` | Property name |
| `MetadataValue` | `BYTE_ARRAY (UTF8)` | Property value |
| `DeleteFlag` | `BYTE_ARRAY (UTF8)` | Soft-delete marker `True`/`False` |
| `UpdatedUtc` | `INT64 (TIMESTAMP_MICROS, UTC)` | Descriptor creation time (UTC, microseconds) |

!!! Tip "Querying the latest metadata"
To resolve the latest metadata per `(Topic, Key, MetadataKey)`, use a “latest” reduction by `UpdatedUtc` (e.g., `max_by`), then filter out `DeleteFlag = TRUE`.

## Guarantees

Expand Down