Add Kafka SSL/SASL support to event-gateway and restructure Websub to follow broker driver agnostic architecture#1877
Add Kafka SSL/SASL support to event-gateway and restructure Websub to follow broker driver agnostic architecture#1877senthuran16 wants to merge 14 commits intomainfrom
Conversation
Normalize broker-facing event-gateway topic names for WebSub
Add Kafka SSL/SASL support to event-gateway and restructure Websub to follow broker driver agnostic architecture
📝 WalkthroughSummaryThis PR adds Kafka connection security support (TLS and SASL) to event-gateway by introducing a unified connection configuration resolution mechanism and extending the broker-driver abstraction to eliminate direct Kafka client dependencies across the codebase. Configuration & SetupAdded Kafka security configuration parameters to the runtime:
Connection Configuration ResolutionIntroduced
Broker Driver EnhancementsExtended the
Updated Kafka broker driver implementation to accept WebSub Subscription Management RefactoringRestructured WebSub components to use the broker-driver abstraction:
xDS Configuration PropagationEnhanced xDS control-plane integration to propagate Kafka security configuration as fallback settings for dynamically deployed bindings, ensuring runtime deployments inherit the same TLS/SASL configuration as the control plane when broker-specific config is omitted. Lines ChangedTotal: ~700 lines modified across configuration files, broker driver implementation, and WebSub components. Primary effort in connection config resolution (+317 lines) and Docker Compose updates (+85 lines). WalkthroughThis pull request adds Kafka TLS and SASL authentication support across the event-gateway. A new Sequence DiagramsequenceDiagram
participant Config as Runtime Config<br/>(global + binding)
participant Resolver as ResolveConnectionConfig<br/>(merger)
participant Builder as BuildClientOptions<br/>(option constructor)
participant KGO as franz-go<br/>Client Creation
participant Kafka as Kafka Broker<br/>(TLS + SASL)
Config->>Resolver: Global KafkaConfig + overrides map
Resolver->>Resolver: Merge global & binding-level settings<br/>(brokers, TLS, SASL)
Resolver->>Resolver: Normalize & validate merged config
Resolver-->>Builder: ConnectionConfig
Builder->>Builder: Create base options with brokers
Builder->>Builder: buildTLSConfig (if TLS enabled)<br/>Load CA, certs, set TLS 1.2 min
Builder->>Builder: buildSASLMechanism (if SASL set)<br/>Map mechanism to franz-go SASL
Builder-->>KGO: []kgo.Opt with TLS & SASL
KGO->>KGO: kgo.NewClient(allOpts)
KGO->>Kafka: Connect with TLS + SASL PLAIN/SCRAM
Kafka-->>KGO: Authenticated connection
KGO-->>Builder: *kgo.Client ready
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.12.1)level=error msg="[linters_context] typechecking error: pattern ./...: directory prefix . does not contain modules listed in go.work or their selected dependencies" Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
CodeRabbit check already ran on the feature branch PR: #1876 |
Revert generate cert file permission
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (2)
event-gateway/docker-compose.dev.yaml (2)
116-154: ⚡ Quick winAdd a Kafka healthcheck before other services depend on it.
The new TLS/SASL broker path has a longer startup path than plain container creation, so gating dependents on
service_startedis likely to cause intermittent boot races. Please add a healthcheck forkafkaand switch dependent services toservice_healthy.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/docker-compose.dev.yaml` around lines 116 - 154, Add a Docker healthcheck to the kafka service and make other services wait for that healthcheck instead of mere start: add a healthcheck block under the kafka service (use the broker SASL_SSL listener on port 29092 and a readiness command that verifies the broker is accepting connections/advertised listeners, e.g., a lightweight script or kafka-broker-api-versions/kafka-topics check) so Docker marks the service as healthy only when TLS/SASL startup is complete; then update any services that currently list kafka under depends_on to use condition: service_healthy (replace existing service_started/service_completed_successfully conditions) so dependents are gated on kafka's health rather than just started.
166-181: ⚡ Quick winPin
kafka-uito a fixed image tag.Using
provectuslabs/kafka-ui:latestmakes the dev stack and smoke validation non-reproducible across runs. Pin to a specific stable release (e.g.,v0.7.2) to ensure consistent behavior and simplify debugging when breakages occur.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/docker-compose.dev.yaml` around lines 166 - 181, The docker-compose service "kafka-ui" is using an unstable floating tag; change the image value in the kafka-ui service from provectuslabs/kafka-ui:latest to a fixed, tested tag (for example provectuslabs/kafka-ui:v0.7.2) so the dev stack and smoke tests are reproducible; update the image field under the kafka-ui service and commit the updated docker-compose.dev.yaml.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go`:
- Around line 68-91: ResolveConnectionConfig currently applies overrides on top
of global ConnectionConfig so setting tls=false via the overrides does not clear
inherited TLSCAFile/TLSCertFile/TLSKeyFile/TLSServerName and
validateConnectionConfig still fails; update ResolveConnectionConfig so that
when the overrides map contains tls and boolOverride returns ok with v==false
you explicitly set cfg.TLS = false and clear cfg.TLSCAFile, cfg.TLSCertFile,
cfg.TLSKeyFile and cfg.TLSServerName to empty values (so the binding can opt out
of global TLS), leaving behavior unchanged when tls is true or unspecified;
reference ResolveConnectionConfig, ConnectionConfig, and
validateConnectionConfig in your change.
In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/replayer.go`:
- Around line 50-69: The loop currently marks a partition complete only when
o.Offset == 0; instead determine completion from the readable range (i.e. if the
partition's current beginning/start offset is >= the end/target offset) rather
than just zero. Modify the logic around replayPartitionKey, completedPartitions
and targetOffsets to derive the partition's start/beginning offset (from
whatever source supplies the current readable range—e.g. a startOffsets map or
the ListedOffset fields) and set completedPartitions[key] = (startOffset >=
o.Offset) (or equivalent), keep the existing handling of o.Err, and ensure
replayComplete still uses completedPartitions; add a regression test that
simulates a partition with non-zero end offset but no readable records to
validate this behavior.
In `@event-gateway/gateway-runtime/internal/subscription/reconciler.go`:
- Around line 63-85: The replay handler currently swallows errors; update the
anonymous handler passed to r.driver.Replay so that json.Unmarshal failures and
r.store.Add/r.store.Remove errors are returned to the caller (propagate rather
than logging/ignoring), and only invoke r.callback when the store mutation
succeeded. Specifically, in the Replay callback used in Reconcile, replace the
nil-returns on unmarshal and store ops with returned errors (preserving context
like "unmarshal subscription" or "store add/remove" and include the underlying
error), call r.store.Add(parts) / r.store.Remove(parts) and check their error
results before calling r.callback(&Subscription{...}, ...) so failures abort the
replay and surface up to Reconcile.
---
Nitpick comments:
In `@event-gateway/docker-compose.dev.yaml`:
- Around line 116-154: Add a Docker healthcheck to the kafka service and make
other services wait for that healthcheck instead of mere start: add a
healthcheck block under the kafka service (use the broker SASL_SSL listener on
port 29092 and a readiness command that verifies the broker is accepting
connections/advertised listeners, e.g., a lightweight script or
kafka-broker-api-versions/kafka-topics check) so Docker marks the service as
healthy only when TLS/SASL startup is complete; then update any services that
currently list kafka under depends_on to use condition: service_healthy (replace
existing service_started/service_completed_successfully conditions) so
dependents are gated on kafka's health rather than just started.
- Around line 166-181: The docker-compose service "kafka-ui" is using an
unstable floating tag; change the image value in the kafka-ui service from
provectuslabs/kafka-ui:latest to a fixed, tested tag (for example
provectuslabs/kafka-ui:v0.7.2) so the dev stack and smoke tests are
reproducible; update the image field under the kafka-ui service and commit the
updated docker-compose.dev.yaml.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: da47262a-2339-4602-973e-e04a8e120829
📒 Files selected for processing (20)
event-gateway/.env.exampleevent-gateway/docker-compose.dev.yamlevent-gateway/docker/kafka/generate-certs.shevent-gateway/gateway-runtime/cmd/event-gateway/main.goevent-gateway/gateway-runtime/cmd/event-gateway/plugins.goevent-gateway/gateway-runtime/configs/config.tomlevent-gateway/gateway-runtime/internal/config/config.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/consumer.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/publisher.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/replayer.goevent-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.goevent-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.goevent-gateway/gateway-runtime/internal/connectors/types.goevent-gateway/gateway-runtime/internal/runtime/runtime_test.goevent-gateway/gateway-runtime/internal/subscription/reconciler.goevent-gateway/gateway-runtime/internal/subscription/sync.goevent-gateway/gateway-runtime/internal/xdsclient/handler.go
| if v, ok, err := boolOverride(overrides["tls"]); err != nil { | ||
| return ConnectionConfig{}, err | ||
| } else if ok { | ||
| cfg.TLS = v | ||
| } | ||
| if v, ok, err := stringOverride(overrides["tls_ca_file"]); err != nil { | ||
| return ConnectionConfig{}, err | ||
| } else if ok { | ||
| cfg.TLSCAFile = v | ||
| } | ||
| if v, ok, err := stringOverride(overrides["tls_cert_file"]); err != nil { | ||
| return ConnectionConfig{}, err | ||
| } else if ok { | ||
| cfg.TLSCertFile = v | ||
| } | ||
| if v, ok, err := stringOverride(overrides["tls_key_file"]); err != nil { | ||
| return ConnectionConfig{}, err | ||
| } else if ok { | ||
| cfg.TLSKeyFile = v | ||
| } | ||
| if v, ok, err := stringOverride(overrides["tls_server_name"]); err != nil { | ||
| return ConnectionConfig{}, err | ||
| } else if ok { | ||
| cfg.TLSServerName = v |
There was a problem hiding this comment.
Allow tls: false to clear inherited TLS settings.
Because ResolveConnectionConfig starts from the global config, a binding that overrides only tls: false still retains the global tls_ca_file, tls_cert_file, tls_key_file, and tls_server_name. validateConnectionConfig then rejects the result, so a binding cannot actually opt out of globally enabled TLS unless it also clears every TLS field manually.
Suggested fix
if v, ok, err := boolOverride(overrides["tls"]); err != nil {
return ConnectionConfig{}, err
} else if ok {
cfg.TLS = v
+ if !v {
+ cfg.TLSCAFile = ""
+ cfg.TLSCertFile = ""
+ cfg.TLSKeyFile = ""
+ cfg.TLSServerName = ""
+ }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if v, ok, err := boolOverride(overrides["tls"]); err != nil { | |
| return ConnectionConfig{}, err | |
| } else if ok { | |
| cfg.TLS = v | |
| } | |
| if v, ok, err := stringOverride(overrides["tls_ca_file"]); err != nil { | |
| return ConnectionConfig{}, err | |
| } else if ok { | |
| cfg.TLSCAFile = v | |
| } | |
| if v, ok, err := stringOverride(overrides["tls_cert_file"]); err != nil { | |
| return ConnectionConfig{}, err | |
| } else if ok { | |
| cfg.TLSCertFile = v | |
| } | |
| if v, ok, err := stringOverride(overrides["tls_key_file"]); err != nil { | |
| return ConnectionConfig{}, err | |
| } else if ok { | |
| cfg.TLSKeyFile = v | |
| } | |
| if v, ok, err := stringOverride(overrides["tls_server_name"]); err != nil { | |
| return ConnectionConfig{}, err | |
| } else if ok { | |
| cfg.TLSServerName = v | |
| if v, ok, err := boolOverride(overrides["tls"]); err != nil { | |
| return ConnectionConfig{}, err | |
| } else if ok { | |
| cfg.TLS = v | |
| if !v { | |
| cfg.TLSCAFile = "" | |
| cfg.TLSCertFile = "" | |
| cfg.TLSKeyFile = "" | |
| cfg.TLSServerName = "" | |
| } | |
| } | |
| if v, ok, err := stringOverride(overrides["tls_ca_file"]); err != nil { | |
| return ConnectionConfig{}, err | |
| } else if ok { | |
| cfg.TLSCAFile = v | |
| } | |
| if v, ok, err := stringOverride(overrides["tls_cert_file"]); err != nil { | |
| return ConnectionConfig{}, err | |
| } else if ok { | |
| cfg.TLSCertFile = v | |
| } | |
| if v, ok, err := stringOverride(overrides["tls_key_file"]); err != nil { | |
| return ConnectionConfig{}, err | |
| } else if ok { | |
| cfg.TLSKeyFile = v | |
| } | |
| if v, ok, err := stringOverride(overrides["tls_server_name"]); err != nil { | |
| return ConnectionConfig{}, err | |
| } else if ok { | |
| cfg.TLSServerName = v |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go`
around lines 68 - 91, ResolveConnectionConfig currently applies overrides on top
of global ConnectionConfig so setting tls=false via the overrides does not clear
inherited TLSCAFile/TLSCertFile/TLSKeyFile/TLSServerName and
validateConnectionConfig still fails; update ResolveConnectionConfig so that
when the overrides map contains tls and boolOverride returns ok with v==false
you explicitly set cfg.TLS = false and clear cfg.TLSCAFile, cfg.TLSCertFile,
cfg.TLSKeyFile and cfg.TLSServerName to empty values (so the binding can opt out
of global TLS), leaving behavior unchanged when tls is true or unspecified;
reference ResolveConnectionConfig, ConnectionConfig, and
validateConnectionConfig in your change.
| targetOffsets := make(map[replayPartitionKey]int64) | ||
| completedPartitions := make(map[replayPartitionKey]bool) | ||
| var offsetErr error | ||
| endOffsets.Each(func(o kadm.ListedOffset) { | ||
| if offsetErr != nil { | ||
| return | ||
| } | ||
| if o.Err != nil { | ||
| offsetErr = fmt.Errorf("failed to inspect replay offset for topic %s partition %d: %w", o.Topic, o.Partition, o.Err) | ||
| return | ||
| } | ||
| key := replayPartitionKey{topic: o.Topic, partition: o.Partition} | ||
| targetOffsets[key] = o.Offset | ||
| completedPartitions[key] = o.Offset == 0 | ||
| }) | ||
| if offsetErr != nil { | ||
| return offsetErr | ||
| } | ||
| if len(targetOffsets) == 0 || replayComplete(completedPartitions) { | ||
| return nil |
There was a problem hiding this comment.
Treat already-empty partitions as complete even when the end offset is non-zero.
completedPartitions[key] = o.Offset == 0 only handles brand-new partitions. A partition can have no readable records left while still reporting a non-zero end offset, and in that case this loop never marks it complete because no record arrives to advance the state. Replay will then wait until the context is canceled.
Please initialize completion from the current readable range, not just endOffset == 0, and add a regression test for that case.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/replayer.go`
around lines 50 - 69, The loop currently marks a partition complete only when
o.Offset == 0; instead determine completion from the readable range (i.e. if the
partition's current beginning/start offset is >= the end/target offset) rather
than just zero. Modify the logic around replayPartitionKey, completedPartitions
and targetOffsets to derive the partition's start/beginning offset (from
whatever source supplies the current readable range—e.g. a startOffsets map or
the ListedOffset fields) and set completedPartitions[key] = (startOffset >=
o.Offset) (or equivalent), keep the existing handling of o.Err, and ensure
replayComplete still uses completedPartitions; add a regression test that
simulates a partition with non-zero end offset but no readable records to
validate this behavior.
| err := r.driver.Replay(ctx, r.syncTopic, func(_ context.Context, msg *connectors.Message) error { | ||
| replayed++ | ||
| if msg.Value == nil { | ||
| parts := parseSyncKey(string(msg.Key)) | ||
| if parts != nil { | ||
| _ = r.store.Remove(parts[0], parts[1]) | ||
| if r.callback != nil { | ||
| r.callback(&sub, false) | ||
| r.callback(&Subscription{Topic: parts[0], CallbackURL: parts[1]}, true) | ||
| } | ||
| } | ||
| replayed++ | ||
| }) | ||
|
|
||
| // Check if we've caught up to all partitions | ||
| caughtUp := true | ||
| endOffsets.Each(func(o kadm.ListedOffset) { | ||
| if o.Offset > 0 { | ||
| // Simplified catch-up check | ||
| caughtUp = caughtUp && (replayed >= totalEnd) | ||
| } | ||
| }) | ||
| return nil | ||
| } | ||
|
|
||
| if caughtUp { | ||
| break | ||
| var sub Subscription | ||
| if err := json.Unmarshal(msg.Value, &sub); err != nil { | ||
| slog.Error("Failed to unmarshal subscription during reconciliation", "error", err) | ||
| return nil | ||
| } | ||
| _ = r.store.Add(&sub) | ||
| if r.callback != nil { | ||
| r.callback(&sub, false) | ||
| } | ||
| return nil |
There was a problem hiding this comment.
Propagate replay record failures instead of treating them as success.
This handler currently swallows decode errors and ignores SubscriptionStore add/remove failures, so Reconcile can report success with a partially rebuilt store. It also invokes the callback even if the store mutation failed. Returning an error here would stop startup on an inconsistent snapshot.
Suggested change
err := r.driver.Replay(ctx, r.syncTopic, func(_ context.Context, msg *connectors.Message) error {
replayed++
if msg.Value == nil {
parts := parseSyncKey(string(msg.Key))
- if parts != nil {
- _ = r.store.Remove(parts[0], parts[1])
- if r.callback != nil {
- r.callback(&Subscription{Topic: parts[0], CallbackURL: parts[1]}, true)
- }
- }
- return nil
+ if parts == nil {
+ return fmt.Errorf("invalid sync tombstone key %q", string(msg.Key))
+ }
+ if err := r.store.Remove(parts[0], parts[1]); err != nil {
+ return fmt.Errorf("failed to remove subscription during reconciliation: %w", err)
+ }
+ if r.callback != nil {
+ r.callback(&Subscription{Topic: parts[0], CallbackURL: parts[1]}, true)
+ }
+ return nil
}
var sub Subscription
if err := json.Unmarshal(msg.Value, &sub); err != nil {
- slog.Error("Failed to unmarshal subscription during reconciliation", "error", err)
- return nil
+ return fmt.Errorf("failed to unmarshal subscription during reconciliation: %w", err)
}
- _ = r.store.Add(&sub)
+ if err := r.store.Add(&sub); err != nil {
+ return fmt.Errorf("failed to add subscription during reconciliation: %w", err)
+ }
if r.callback != nil {
r.callback(&sub, false)
}
return nil
})🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@event-gateway/gateway-runtime/internal/subscription/reconciler.go` around
lines 63 - 85, The replay handler currently swallows errors; update the
anonymous handler passed to r.driver.Replay so that json.Unmarshal failures and
r.store.Add/r.store.Remove errors are returned to the caller (propagate rather
than logging/ignoring), and only invoke r.callback when the store mutation
succeeded. Specifically, in the Replay callback used in Reconcile, replace the
nil-returns on unmarshal and store ops with returned errors (preserving context
like "unmarshal subscription" or "store add/remove" and include the underlying
error), call r.store.Add(parts) / r.store.Remove(parts) and check their error
results before calling r.callback(&Subscription{...}, ...) so failures abort the
replay and surface up to Reconcile.
|
Replacing this PR with #1880 |
Purpose
event-gateway could not connect to secured Kafka clusters in a complete way. Kafka TLS/SASL settings existed at the runtime config level, but they were not fully wired through the Kafka broker-driver path, and some WebSub
subscription state flows still constructed Kafka clients directly. In addition, the local dev stack only supported plaintext Kafka, so secured connectivity could not be validated end-to-end in the repo.
Resolves #1861
Goals
Approach
User stories
Documentation
N/A — this change is implementation and local-dev-stack focused; product documentation was not updated in this PR.
Automation tests
go test ./... passed in event-gateway/gateway-runtime.
Added focused tests for Kafka connection config resolution and validation.
No dedicated automated integration test suite was added in this PR.
Performed compose-based smoke validation for the secured Kafka services and Kafka UI connectivity.
Security checks
Samples
Related PRs
#1876
Test environment