Add Kafka SSL/SASL support to event-gateway and restructure Websub to follow broker driver agnostic architecture#1876
Conversation
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (6)
event-gateway/gateway-runtime/internal/config/config.go (1)
364-390: ⚡ Quick winMake
validateReadableFile's "required" error message context-agnostic.Now that
validateReadableFileis reused fromvalidateKafkaConfig, the hardcoded message"%s is required when server.websub_tls_enabled is true"is misleading if this branch is ever reached from a non-WebSub caller. Today the Kafka callers guard against empty paths upstream, so the branch is unreachable from there—but a future caller could trip it.Proposed change
- if trimmedPath == "" { - return fmt.Errorf("%s is required when server.websub_tls_enabled is true", fieldName) - } + if trimmedPath == "" { + return fmt.Errorf("%s is required", fieldName) + }The WebSub callers already convey the enablement context in
fieldName, so the shorter message remains accurate.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/gateway-runtime/internal/config/config.go` around lines 364 - 390, The validateReadableFile function returns a context-specific "required" error mentioning server.websub_tls_enabled; make this message context-agnostic by replacing the hardcoded string in validateReadableFile with a generic message such as "%s is required" (use fieldName to provide caller context), leaving all other checks (os.Stat, IsDir, os.Open, Close) and their detailed errors unchanged so callers like validateKafkaConfig can reuse the helper without misleading WebSub-specific text.event-gateway/docker-compose.dev.yaml (1)
95-99: 💤 Low valueConsider a Kafka healthcheck for more reliable startup.
event-gatewaydepends onkafkawithcondition: service_started, which only waits for the container to start—not for the broker to accept SASL_SSL connections. Adding a healthcheck on thekafkaservice and switching this dependency toservice_healthywould reduce flaky first-run startups in local dev.Suggested healthcheck (illustrative)
kafka: image: bitnamilegacy/kafka:4.0.0-debian-12-r10 hostname: kafka + healthcheck: + test: ["CMD-SHELL", "kafka-broker-api-versions.sh --bootstrap-server localhost:29092 --command-config /opt/bitnami/kafka/config/healthcheck.properties >/dev/null 2>&1 || exit 1"] + interval: 10s + timeout: 5s + retries: 12(Healthcheck command needs a SASL_SSL client config file mounted into the container to actually authenticate against the secured listener.)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/docker-compose.dev.yaml` around lines 95 - 99, Add a Docker healthcheck to the kafka service and change the event-gateway depends_on condition from service_started to service_healthy so the gateway waits until Kafka is actually ready; specifically, add a healthcheck block to the kafka service (using a small script/command that attempts a SASL_SSL authenticated connection to the broker and returns appropriate exit codes) and ensure any required SASL_SSL client config files are mounted into the kafka container so the healthcheck can authenticate, then update the event-gateway depends_on entry for kafka to use condition: service_healthy (leave gateway-controller as-is).event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/replayer.go (1)
39-48: 💤 Low valueUse defer for admin client close.
adminClient.Close()at line 45 runs only whenListEndOffsetsreturns; if a future change introduces an early return between client creation andListEndOffsets, the client could leak. Usingdefermakes the cleanup robust to such edits.Proposed change
adminClient, err := NewClient(cfg) if err != nil { return fmt.Errorf("failed to create kafka replay admin client: %w", err) } + defer adminClient.Close() admin := kadm.NewClient(adminClient) endOffsets, err := admin.ListEndOffsets(ctx, topic) - adminClient.Close() if err != nil { return fmt.Errorf("failed to list replay end offsets for topic %s: %w", topic, err) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/replayer.go` around lines 39 - 48, After creating the Kafka admin client with NewClient(cfg) and assigning it to adminClient, immediately schedule its cleanup with defer adminClient.Close() so the client is always closed even if future edits introduce early returns; move or remove the existing adminClient.Close() call and ensure admin.ListEndOffsets(ctx, topic) still runs and its error is handled (symbols: NewClient, adminClient, admin.ListEndOffsets, adminClient.Close).event-gateway/gateway-runtime/internal/subscription/sync.go (1)
54-88: ⚡ Quick winCaller context is discarded in both publish methods.
PublishSubscriptionandPublishTombstoneacceptcontext.Contextbut ignore it (parameter_) and create a freshcontext.WithTimeout(context.Background(), 10*time.Second). Caller-initiated cancellation (e.g., HTTP client disconnect, shutdown) will not abort the publish. Consider deriving the timeout from the caller's context so cancellation propagates:Proposed change
-func (p *SyncProducer) PublishSubscription(_ context.Context, sub *Subscription) error { +func (p *SyncProducer) PublishSubscription(ctx context.Context, sub *Subscription) error { sub.RuntimeID = p.runtimeID ... - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel()(Apply the same pattern to
PublishTombstone.)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/gateway-runtime/internal/subscription/sync.go` around lines 54 - 88, Both PublishSubscription and PublishTombstone currently discard the caller's context and create a new background context, preventing caller-initiated cancellation from propagating; change the signatures to use the provided ctx (remove the underscore) and derive a timeout from it by calling context.WithTimeout(ctx, 10*time.Second) (and defer cancel) before calling p.driver.Publish, ensuring cancellation from the caller is honored; apply the same pattern in SyncProducer.PublishTombstone and keep the existing syncKey and p.driver.Publish usage intact.event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go (1)
78-123: ⚡ Quick winAdd tests for remaining validation branches.
Please add explicit cases for (1)
tls_cert_file/tls_key_filepairing enforcement and (2) unsupportedsasl_mechanismrejection. This will close the main untested branches in the new validator.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go` around lines 78 - 123, Add two new unit tests calling ResolveConnectionConfig to exercise the remaining validation branches: (1) a test that sets TLS=true and provides only TLSCertFile or only TLSKeyFile (create temporary readable cert/key files with t.TempDir and os.WriteFile) and asserts an error is returned to verify cert/key pairing enforcement; and (2) a test that passes a config with sasl_mechanism set to an unsupported value (e.g. "plain-unsupported") in the map[string]interface{} form and asserts ResolveConnectionConfig returns an error to verify unsupported SASL mechanism rejection; place these alongside the existing TestResolveConnectionConfig_* tests and reference ResolveConnectionConfig in the test names for clarity.event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go (1)
62-108: ⚡ Quick winReject unknown override keys to avoid silent misconfiguration.
Currently only known keys are read, and unexpected keys are ignored. Adding a pre-check that fails on unknown keys would make binding errors visible early and reduce config drift/debug time.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go` around lines 62 - 108, The overrides handling silently ignores unknown keys; update the code around the overrides processing (the block that reads overrides into ConnectionConfig and uses helpers stringOverride/boolOverride) to first validate keys: build a set/slice of allowed keys (e.g. "brokers","tls","tls_ca_file","tls_cert_file","tls_key_file","tls_server_name","sasl_mechanism","sasl_username","sasl_password") and iterate the keys of the overrides map to reject any key not in that set by returning a descriptive error (include the unknown key name) before applying overrides to cfg; keep using the existing stringOverride/boolOverride helpers and error propagation for known keys.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@event-gateway/docker/kafka/generate-certs.sh`:
- Line 95: The chmod currently makes CA and JKS files world-readable; tighten
permissions by changing the chmod call in generate-certs.sh so that
kafka.keystore.jks and kafka.truststore.jks are owner-only (e.g., 0600) while
leaving ca.crt as world-readable if needed (0644), or make all three owner-only
if the CA cert must also be restricted; update the chmod invocation that
references "${cert_dir}/ca.crt", "${cert_dir}/kafka.keystore.jks", and
"${cert_dir}/kafka.truststore.jks" accordingly.
In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go`:
- Around line 124-141: EnsureCompactedTopic currently treats
TOPIC_ALREADY_EXISTS as success without checking config; update
EnsureCompactedTopic to, when isTopicAlreadyExistsErr(t.Err) is true, call the
admin API to fetch the existing topic's config (e.g., DescribeConfigs/GetConfig
on the topic via e.admin), verify the "cleanup.policy" includes "compact", and
if not return an error (or attempt to update the config) instead of returning
nil; reference EnsureCompactedTopic, e.admin.CreateTopics, and
isTopicAlreadyExistsErr to locate where to add the verification and error path.
In `@event-gateway/gateway-runtime/internal/subscription/sync.go`:
- Around line 28-30: The code builds a franz-go kgo.Record only to copy its
Key/Value/Topic into a connectors.Message; remove the kgo.Record and franz-go
import and create the connectors.Message directly in PublishSubscription
instead. Locate PublishSubscription and any local variable (e.g., rec or record)
created as kgo.Record between the import block and the body (including the block
referenced around lines 62-79), replace instantiation of kgo.Record with direct
construction of connectors.Message{Key: ..., Value: ..., Topic: ...} (preserving
any headers/metadata mapping), remove references to kgo.Record fields, and
delete the "github.com/twmb/franz-go/pkg/kgo" import so the file becomes
broker-agnostic.
---
Nitpick comments:
In `@event-gateway/docker-compose.dev.yaml`:
- Around line 95-99: Add a Docker healthcheck to the kafka service and change
the event-gateway depends_on condition from service_started to service_healthy
so the gateway waits until Kafka is actually ready; specifically, add a
healthcheck block to the kafka service (using a small script/command that
attempts a SASL_SSL authenticated connection to the broker and returns
appropriate exit codes) and ensure any required SASL_SSL client config files are
mounted into the kafka container so the healthcheck can authenticate, then
update the event-gateway depends_on entry for kafka to use condition:
service_healthy (leave gateway-controller as-is).
In `@event-gateway/gateway-runtime/internal/config/config.go`:
- Around line 364-390: The validateReadableFile function returns a
context-specific "required" error mentioning server.websub_tls_enabled; make
this message context-agnostic by replacing the hardcoded string in
validateReadableFile with a generic message such as "%s is required" (use
fieldName to provide caller context), leaving all other checks (os.Stat, IsDir,
os.Open, Close) and their detailed errors unchanged so callers like
validateKafkaConfig can reuse the helper without misleading WebSub-specific
text.
In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go`:
- Around line 78-123: Add two new unit tests calling ResolveConnectionConfig to
exercise the remaining validation branches: (1) a test that sets TLS=true and
provides only TLSCertFile or only TLSKeyFile (create temporary readable cert/key
files with t.TempDir and os.WriteFile) and asserts an error is returned to
verify cert/key pairing enforcement; and (2) a test that passes a config with
sasl_mechanism set to an unsupported value (e.g. "plain-unsupported") in the
map[string]interface{} form and asserts ResolveConnectionConfig returns an error
to verify unsupported SASL mechanism rejection; place these alongside the
existing TestResolveConnectionConfig_* tests and reference
ResolveConnectionConfig in the test names for clarity.
In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go`:
- Around line 62-108: The overrides handling silently ignores unknown keys;
update the code around the overrides processing (the block that reads overrides
into ConnectionConfig and uses helpers stringOverride/boolOverride) to first
validate keys: build a set/slice of allowed keys (e.g.
"brokers","tls","tls_ca_file","tls_cert_file","tls_key_file","tls_server_name","sasl_mechanism","sasl_username","sasl_password")
and iterate the keys of the overrides map to reject any key not in that set by
returning a descriptive error (include the unknown key name) before applying
overrides to cfg; keep using the existing stringOverride/boolOverride helpers
and error propagation for known keys.
In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/replayer.go`:
- Around line 39-48: After creating the Kafka admin client with NewClient(cfg)
and assigning it to adminClient, immediately schedule its cleanup with defer
adminClient.Close() so the client is always closed even if future edits
introduce early returns; move or remove the existing adminClient.Close() call
and ensure admin.ListEndOffsets(ctx, topic) still runs and its error is handled
(symbols: NewClient, adminClient, admin.ListEndOffsets, adminClient.Close).
In `@event-gateway/gateway-runtime/internal/subscription/sync.go`:
- Around line 54-88: Both PublishSubscription and PublishTombstone currently
discard the caller's context and create a new background context, preventing
caller-initiated cancellation from propagating; change the signatures to use the
provided ctx (remove the underscore) and derive a timeout from it by calling
context.WithTimeout(ctx, 10*time.Second) (and defer cancel) before calling
p.driver.Publish, ensuring cancellation from the caller is honored; apply the
same pattern in SyncProducer.PublishTombstone and keep the existing syncKey and
p.driver.Publish usage intact.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e2056bb0-5e42-4c93-b452-0955687f01d6
📒 Files selected for processing (20)
event-gateway/.env.exampleevent-gateway/docker-compose.dev.yamlevent-gateway/docker/kafka/generate-certs.shevent-gateway/gateway-runtime/cmd/event-gateway/main.goevent-gateway/gateway-runtime/cmd/event-gateway/plugins.goevent-gateway/gateway-runtime/configs/config.tomlevent-gateway/gateway-runtime/internal/config/config.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/consumer.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/publisher.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/replayer.goevent-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.goevent-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.goevent-gateway/gateway-runtime/internal/connectors/types.goevent-gateway/gateway-runtime/internal/runtime/runtime_test.goevent-gateway/gateway-runtime/internal/subscription/reconciler.goevent-gateway/gateway-runtime/internal/subscription/sync.goevent-gateway/gateway-runtime/internal/xdsclient/handler.go
| -storepass "${password}" \ | ||
| -file "${cert_dir}/ca.crt" | ||
|
|
||
| chmod 0644 "${cert_dir}/ca.crt" "${cert_dir}/kafka.keystore.jks" "${cert_dir}/kafka.truststore.jks" |
There was a problem hiding this comment.
Restrict keystore/truststore file permissions.
Line 95 sets sensitive JKS files to world-readable (0644). Please tighten this to owner-only access.
Suggested patch
-chmod 0644 "${cert_dir}/ca.crt" "${cert_dir}/kafka.keystore.jks" "${cert_dir}/kafka.truststore.jks"
+chmod 0644 "${cert_dir}/ca.crt"
+chmod 0600 "${cert_dir}/kafka.keystore.jks" "${cert_dir}/kafka.truststore.jks"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| chmod 0644 "${cert_dir}/ca.crt" "${cert_dir}/kafka.keystore.jks" "${cert_dir}/kafka.truststore.jks" | |
| chmod 0644 "${cert_dir}/ca.crt" | |
| chmod 0600 "${cert_dir}/kafka.keystore.jks" "${cert_dir}/kafka.truststore.jks" |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@event-gateway/docker/kafka/generate-certs.sh` at line 95, The chmod currently
makes CA and JKS files world-readable; tighten permissions by changing the chmod
call in generate-certs.sh so that kafka.keystore.jks and kafka.truststore.jks
are owner-only (e.g., 0600) while leaving ca.crt as world-readable if needed
(0644), or make all three owner-only if the CA cert must also be restricted;
update the chmod invocation that references "${cert_dir}/ca.crt",
"${cert_dir}/kafka.keystore.jks", and "${cert_dir}/kafka.truststore.jks"
accordingly.
| "github.com/twmb/franz-go/pkg/kgo" | ||
| "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/connectors" | ||
| ) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Drop the intermediate kgo.Record and remove the franz-go import.
PublishSubscription constructs a kgo.Record only to immediately copy Key/Value/Topic into a connectors.Message. Since the file no longer publishes through kgo, the record (and the franz-go import) can be removed, making this file fully broker-agnostic and aligned with the refactor's intent.
Proposed change
- "github.com/twmb/franz-go/pkg/kgo"
"github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/connectors" key := syncKey(sub.Topic, sub.CallbackURL)
- record := &kgo.Record{
- Key: []byte(key),
- Value: value,
- Topic: p.syncTopic,
- }
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := p.driver.Publish(ctx, p.syncTopic, &connectors.Message{
- Key: record.Key,
- Value: record.Value,
- Topic: record.Topic,
+ Key: []byte(key),
+ Value: value,
+ Topic: p.syncTopic,
}); err != nil {Also applies to: 62-79
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@event-gateway/gateway-runtime/internal/subscription/sync.go` around lines 28
- 30, The code builds a franz-go kgo.Record only to copy its Key/Value/Topic
into a connectors.Message; remove the kgo.Record and franz-go import and create
the connectors.Message directly in PublishSubscription instead. Locate
PublishSubscription and any local variable (e.g., rec or record) created as
kgo.Record between the import block and the body (including the block referenced
around lines 62-79), replace instantiation of kgo.Record with direct
construction of connectors.Message{Key: ..., Value: ..., Topic: ...} (preserving
any headers/metadata mapping), remove references to kgo.Record fields, and
delete the "github.com/twmb/franz-go/pkg/kgo" import so the file becomes
broker-agnostic.
ec429c6
into
wso2:event-gateway-bug-fixes-after-0.5.0
Purpose
event-gateway could not connect to secured Kafka clusters in a complete way. Kafka TLS/SASL settings existed at the runtime config level, but they were not fully wired through the Kafka broker-driver path, and some WebSub
subscription state flows still constructed Kafka clients directly. In addition, the local dev stack only supported plaintext Kafka, so secured connectivity could not be validated end-to-end in the repo.
Goals
Approach
User stories
Documentation
N/A — this change is implementation and local-dev-stack focused; product documentation was not updated in this PR.
Automation tests
go test ./... passed in event-gateway/gateway-runtime.
Added focused tests for Kafka connection config resolution and validation.
No dedicated automated integration test suite was added in this PR.
Performed compose-based smoke validation for the secured Kafka services and Kafka UI connectivity.
Security checks
Samples
Related PRs
N/A
Test environment