Skip to content

Commit

Permalink
Merge pull request #2665 from redpanda-data/mihaitodor-add-timestamp-…
Browse files Browse the repository at this point in the history
…field-kafka

Add `timestamp` field to the `kafka` and `kafka_franz` outputs.
  • Loading branch information
Jeffail authored Jun 19, 2024
2 parents 6d472c0 + 637f64c commit eeafe01
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 10 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ Changelog

All notable changes to this project will be documented in this file.

## v4.31.0 - TBD

### Added

- Field `timestamp` added to the `kafka` and `kafka_franz` outputs. (@mihaitodor)

## 4.30.0 - 2024-06-13

### Added
Expand Down
19 changes: 19 additions & 0 deletions docs/modules/components/pages/outputs/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ output:
byte_size: 0
period: ""
check: ""
timestamp: ${! timestamp_unix() } # No default (optional)
```
--
Expand Down Expand Up @@ -104,6 +105,7 @@ output:
initial_interval: 3s
max_interval: 10s
max_elapsed_time: 30s
timestamp: ${! timestamp_unix() } # No default (optional)
```
--
Expand Down Expand Up @@ -822,4 +824,21 @@ max_elapsed_time: 1m
max_elapsed_time: 1h
```
=== `timestamp`
An optional timestamp to set for each message. When left empty, the current timestamp is used.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
*Type*: `string`
```yml
# Examples
timestamp: ${! timestamp_unix() }
timestamp: ${! metadata("kafka_timestamp_unix") }
```
19 changes: 19 additions & 0 deletions docs/modules/components/pages/outputs/kafka_franz.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ output:
byte_size: 0
period: ""
check: ""
timestamp: ${! timestamp_unix() } # No default (optional)
```
--
Expand Down Expand Up @@ -85,6 +86,7 @@ output:
root_cas_file: ""
client_certs: []
sasl: [] # No default (optional)
timestamp: ${! timestamp_unix() } # No default (optional)
```
--
Expand Down Expand Up @@ -750,4 +752,21 @@ An external ID to provide when assuming a role.
*Default*: `""`
=== `timestamp`
An optional timestamp to set for each message. When left empty, the current timestamp is used.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
*Type*: `string`
```yml
# Examples
timestamp: ${! timestamp_unix() }
timestamp: ${! metadata("kafka_timestamp_unix") }
```
92 changes: 88 additions & 4 deletions internal/impl/kafka/integration_sarama_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/redpanda-data/connect/v4/internal/impl/kafka"
)

// TestIntegrationSaramaCheckpointOneLockUp checks that setting `checkpoint_limit: 1` on the `kafka` input doesn't lead to lockups.
// Note: This test will take 10 minutes to complete unless you specify the `-timeout` flag explicitly. If you set `-timeout 0`, it will complete in a minute.
func TestIntegrationSaramaCheckpointOneLockUp(t *testing.T) {
integration.CheckSkipExact(t)
t.Parallel()
Expand All @@ -35,15 +37,19 @@ func TestIntegrationSaramaCheckpointOneLockUp(t *testing.T) {
kafkaPortStr := strconv.Itoa(kafkaPort)

options := &dockertest.RunOptions{
Repository: "docker.vectorized.io/vectorized/redpanda",
Repository: "redpandadata/redpanda",
Tag: "latest",
Hostname: "redpanda",
ExposedPorts: []string{"9092"},
PortBindings: map[docker.Port][]docker.PortBinding{
"9092/tcp": {{HostIP: "", HostPort: kafkaPortStr}},
},
Cmd: []string{
"redpanda", "start", "--smp 1", "--overprovisioned", "",
"redpanda",
"start",
"--node-id 0",
"--mode dev-container",
"--set rpk.additional_start_flags=[--reactor-backend=epoll]",
"--kafka-addr 0.0.0.0:9092",
fmt.Sprintf("--advertise-kafka-addr localhost:%v", kafkaPort),
},
Expand All @@ -59,6 +65,7 @@ func TestIntegrationSaramaCheckpointOneLockUp(t *testing.T) {
return createKafkaTopic(context.Background(), "localhost:"+kafkaPortStr, "wcotesttopic", 20)
}))

// When the `-timeout` flag is not set explicitly, the default is 10 minutes: https://pkg.go.dev/cmd/go#hdr-Testing_flags
dl, exists := t.Deadline()
if exists {
dl = dl.Add(-time.Second)
Expand Down Expand Up @@ -193,15 +200,19 @@ func TestIntegrationSaramaRedpanda(t *testing.T) {
kafkaPortStr := strconv.Itoa(kafkaPort)

options := &dockertest.RunOptions{
Repository: "docker.vectorized.io/vectorized/redpanda",
Repository: "redpandadata/redpanda",
Tag: "latest",
Hostname: "redpanda",
ExposedPorts: []string{"9092"},
PortBindings: map[docker.Port][]docker.PortBinding{
"9092/tcp": {{HostIP: "", HostPort: kafkaPortStr}},
},
Cmd: []string{
"redpanda", "start", "--smp 1", "--overprovisioned", "",
"redpanda",
"start",
"--node-id 0",
"--mode dev-container",
"--set rpk.additional_start_flags=[--reactor-backend=epoll]",
"--kafka-addr 0.0.0.0:9092",
fmt.Sprintf("--advertise-kafka-addr localhost:%v", kafkaPort),
},
Expand Down Expand Up @@ -641,3 +652,76 @@ input:
})
})
}

func TestIntegrationSaramaOutputFixedTimestamp(t *testing.T) {
integration.CheckSkip(t)
t.Parallel()

pool, err := dockertest.NewPool("")
require.NoError(t, err)

kafkaPort, err := integration.GetFreePort()
require.NoError(t, err)

kafkaPortStr := strconv.Itoa(kafkaPort)

options := &dockertest.RunOptions{
Repository: "redpandadata/redpanda",
Tag: "latest",
Hostname: "redpanda",
ExposedPorts: []string{"9092"},
PortBindings: map[docker.Port][]docker.PortBinding{
"9092/tcp": {{HostIP: "", HostPort: kafkaPortStr}},
},
Cmd: []string{
"redpanda",
"start",
"--node-id 0",
"--mode dev-container",
"--set rpk.additional_start_flags=[--reactor-backend=epoll]",
"--kafka-addr 0.0.0.0:9092",
fmt.Sprintf("--advertise-kafka-addr localhost:%v", kafkaPort),
},
}

pool.MaxWait = time.Minute
resource, err := pool.RunWithOptions(options)
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, pool.Purge(resource))
})

_ = resource.Expire(900)
require.NoError(t, pool.Retry(func() error {
return createKafkaTopic(context.Background(), "localhost:"+kafkaPortStr, "testingconnection", 1)
}))

template := `
output:
kafka:
addresses: [ localhost:$PORT ]
topic: topic-$ID
timestamp: 666
input:
kafka:
addresses: [ localhost:$PORT ]
topics: [ topic-$ID ]
consumer_group: "blobfish"
processors:
- mapping: |
root = if metadata("kafka_timestamp_unix") != 666 { "error: invalid timestamp" }
`

suite := integration.StreamTests(
integration.StreamTestOpenCloseIsolated(),
)

suite.Run(
t, template,
integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, vars *integration.StreamTestConfigVars) {
require.NoError(t, createKafkaTopic(ctx, "localhost:"+kafkaPortStr, vars.ID, 1))
}),
integration.StreamTestOptPort(kafkaPortStr),
)
}
89 changes: 85 additions & 4 deletions internal/impl/kafka/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,19 @@ func TestIntegrationKafka(t *testing.T) {
kafkaPortStr := strconv.Itoa(kafkaPort)

options := &dockertest.RunOptions{
Repository: "docker.vectorized.io/vectorized/redpanda",
Repository: "redpandadata/redpanda",
Tag: "latest",
Hostname: "redpanda",
ExposedPorts: []string{"9092"},
PortBindings: map[docker.Port][]docker.PortBinding{
"9092/tcp": {{HostIP: "", HostPort: kafkaPortStr}},
},
Cmd: []string{
"redpanda", "start", "--smp 1", "--overprovisioned",
"redpanda",
"start",
"--node-id 0",
"--mode dev-container",
"--set rpk.additional_start_flags=[--reactor-backend=epoll]",
"--kafka-addr 0.0.0.0:9092",
fmt.Sprintf("--advertise-kafka-addr localhost:%v", kafkaPort),
},
Expand Down Expand Up @@ -263,15 +267,19 @@ func TestIntegrationKafkaSasl(t *testing.T) {
kafkaPortStr := strconv.Itoa(kafkaPort)

options := &dockertest.RunOptions{
Repository: "docker.vectorized.io/vectorized/redpanda",
Repository: "redpandadata/redpanda",
Tag: "latest",
Hostname: "redpanda",
ExposedPorts: []string{"9092"},
PortBindings: map[docker.Port][]docker.PortBinding{
"9092/tcp": {{HostIP: "", HostPort: kafkaPortStr}},
},
Cmd: []string{
"redpanda", "start", "--smp 1", "--overprovisioned",
"redpanda",
"start",
"--node-id 0",
"--mode dev-container",
"--set rpk.additional_start_flags=[--reactor-backend=epoll]",
"--kafka-addr 0.0.0.0:9092",
"--set redpanda.enable_sasl=true",
`--set redpanda.superusers=["admin"]`,
Expand Down Expand Up @@ -356,3 +364,76 @@ input:
integration.StreamTestOptVarSet("VAR1", ""),
)
}

func TestIntegrationKafkaOutputFixedTimestamp(t *testing.T) {
integration.CheckSkip(t)
t.Parallel()

pool, err := dockertest.NewPool("")
require.NoError(t, err)

kafkaPort, err := integration.GetFreePort()
require.NoError(t, err)

kafkaPortStr := strconv.Itoa(kafkaPort)

options := &dockertest.RunOptions{
Repository: "redpandadata/redpanda",
Tag: "latest",
Hostname: "redpanda",
ExposedPorts: []string{"9092"},
PortBindings: map[docker.Port][]docker.PortBinding{
"9092/tcp": {{HostIP: "", HostPort: kafkaPortStr}},
},
Cmd: []string{
"redpanda",
"start",
"--node-id 0",
"--mode dev-container",
"--set rpk.additional_start_flags=[--reactor-backend=epoll]",
"--kafka-addr 0.0.0.0:9092",
fmt.Sprintf("--advertise-kafka-addr localhost:%v", kafkaPort),
},
}

pool.MaxWait = time.Minute
resource, err := pool.RunWithOptions(options)
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, pool.Purge(resource))
})

_ = resource.Expire(900)
require.NoError(t, pool.Retry(func() error {
return createKafkaTopic(context.Background(), "localhost:"+kafkaPortStr, "testingconnection", 1)
}))

template := `
output:
kafka_franz:
seed_brokers: [ localhost:$PORT ]
topic: topic-$ID
timestamp: 666
input:
kafka_franz:
seed_brokers: [ localhost:$PORT ]
topics: [ topic-$ID ]
consumer_group: "blobfish"
processors:
- mapping: |
root = if metadata("kafka_timestamp_unix") != 666 { "error: invalid timestamp" }
`

suite := integration.StreamTests(
integration.StreamTestOpenCloseIsolated(),
)

suite.Run(
t, template,
integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, vars *integration.StreamTestConfigVars) {
require.NoError(t, createKafkaTopic(ctx, "localhost:"+kafkaPortStr, vars.ID, 1))
}),
integration.StreamTestOptPort(kafkaPortStr),
)
}
Loading

0 comments on commit eeafe01

Please sign in to comment.