Skip to content

Commit

Permalink
Merge pull request #2698 from rockwotj/rockwood/wasm
Browse files Browse the repository at this point in the history
Implement Redpanda Data Transform Processor
  • Loading branch information
Jeffail committed Jul 18, 2024
2 parents d629f30 + 27da2cc commit fce3e39
Show file tree
Hide file tree
Showing 16 changed files with 1,458 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ All notable changes to this project will be documented in this file.
- (Benthos) Parameter `escape_html` added to the `format_json()` Bloblang method. (@mihaitodor)
- (Benthos) New `array` bloblang method. (@gramian)
- (Benthos) Algorithm `fnv32` added to the `hash` bloblang method. (@CallMeMhz)
- New experimental `redpanda_data_transform`. (@rockwotj)
- New `-community` suffixed build included in release artifacts, containing only FOSS functionality. (@Jeffail)
- New `-cloud` suffixed build included in release artifacts, containing components enabled in Redpanda Cloud. (@Jeffail)
- Field `status_topic` added to the global `redpanda` config block. (@Jeffail)
Expand Down
243 changes: 243 additions & 0 deletions docs/modules/components/pages/processors/redpanda_data_transform.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
= redpanda_data_transform
:type: processor
:status: experimental
:categories: ["Utility"]



////
THIS FILE IS AUTOGENERATED!

To make changes, edit the corresponding source file under:

https://github.com/redpanda-data/connect/tree/main/internal/impl/<provider>.

And:

https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl
////
component_type_dropdown::[]
Executes a Redpanda Data Transform as a processor
Introduced in version 4.31.0.
[tabs]
======
Common::
+
--
```yml
# Common config fields, showing default values
label: ""
redpanda_data_transform:
module_path: "" # No default (required)
input_key: "" # No default (optional)
output_key: "" # No default (optional)
input_headers:
include_prefixes: []
include_patterns: []
output_metadata:
include_prefixes: []
include_patterns: []
```
--
Advanced::
+
--
```yml
# All config fields, showing default values
label: ""
redpanda_data_transform:
module_path: "" # No default (required)
input_key: "" # No default (optional)
output_key: "" # No default (optional)
input_headers:
include_prefixes: []
include_patterns: []
output_metadata:
include_prefixes: []
include_patterns: []
timestamp: ${! timestamp_unix() } # No default (optional)
timeout: 10s
max_memory_pages: 1600
```
--
======
This processor executes a Redpanda Data Transform WebAssembly module, calling OnRecordWritten for each message being processed.
You can find out about how transforms work here: https://docs.redpanda.com/current/develop/data-transforms/how-transforms-work/[https://docs.redpanda.com/current/develop/data-transforms/how-transforms-work/^]
== Fields
=== `module_path`
The path of the target WASM module to execute.
*Type*: `string`
=== `input_key`
An optional key to populate for each message.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
*Type*: `string`
=== `output_key`
An optional name of metadata for an output message key.
*Type*: `string`
=== `input_headers`
Determine which (if any) metadata values should be added to messages as headers.
*Type*: `object`
=== `input_headers.include_prefixes`
Provide a list of explicit metadata key prefixes to match against.
*Type*: `array`
*Default*: `[]`
```yml
# Examples
include_prefixes:
- foo_
- bar_
include_prefixes:
- kafka_
include_prefixes:
- content-
```
=== `input_headers.include_patterns`
Provide a list of explicit metadata key regular expression (re2) patterns to match against.
*Type*: `array`
*Default*: `[]`
```yml
# Examples
include_patterns:
- .*
include_patterns:
- _timestamp_unix$
```
=== `output_metadata`
Determine which (if any) message headers should be added to the output as metadata.
*Type*: `object`
=== `output_metadata.include_prefixes`
Provide a list of explicit metadata key prefixes to match against.
*Type*: `array`
*Default*: `[]`
```yml
# Examples
include_prefixes:
- foo_
- bar_
include_prefixes:
- kafka_
include_prefixes:
- content-
```
=== `output_metadata.include_patterns`
Provide a list of explicit metadata key regular expression (re2) patterns to match against.
*Type*: `array`
*Default*: `[]`
```yml
# Examples
include_patterns:
- .*
include_patterns:
- _timestamp_unix$
```
=== `timestamp`
An optional timestamp to set for each message. When left empty, the current timestamp is used.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
*Type*: `string`
```yml
# Examples
timestamp: ${! timestamp_unix() }
timestamp: ${! metadata("kafka_timestamp_unix") }
```
=== `timeout`
The maximum period of time for a message to be processed
*Type*: `string`
*Default*: `"10s"`
=== `max_memory_pages`
The maximum amount of wasm memory pages (64KiB) that an individual wasm module instance can use
*Type*: `int`
*Default*: `1600`
1 change: 1 addition & 0 deletions internal/impl/redpanda/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.wasm
Loading

0 comments on commit fce3e39

Please sign in to comment.