Add implementation for WebBrokerAPIs#1954
Conversation
📝 WalkthroughWalkthroughThis pull request introduces WebBrokerApi, a new resource type enabling bidirectional protocol mediation between WebSocket clients and Apache Kafka brokers. The implementation spans both the control plane (gateway-controller) and runtime (event-gateway) components. WebBrokerApi configurations are persisted in a new database table, exposed through REST management endpoints, translated to xDS EventChannelConfig resources by the control plane, and dynamically instantiated at runtime via dedicated receivers, broker drivers, and policy-chain orchestration. The feature reuses existing policy engines, adding new hooks for connection-init, produce, and consume phases. Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 17
🧹 Nitpick comments (1)
event-gateway/gateway-runtime/internal/xdsclient/handler_test.go (1)
170-178: ⚡ Quick winAdd a WebBrokerApi handler test here.
These new stub methods make the interface compile, but this file still does not exercise
HandleResources()with a WebBrokerApi payload. A focused add/update/remove test that includeson_connection_init.requestandon_connection_init.responsewould lock the xDS contract down and catch regressions in the new decoder.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/gateway-runtime/internal/xdsclient/handler_test.go` around lines 170 - 178, Add a focused unit test that calls HandleResources with a WebBrokerApi payload and asserts add/update/remove behavior: create a test that uses the existing recordingBindingManager (which has AddWebBrokerApiBinding and RemoveWebBrokerApiBinding) to capture names, construct WebBrokerApi resource protobufs for add, update and delete cases and include on_connection_init.request and on_connection_init.response entries in the payload so the new decoder path is exercised, invoke HandleResources and assert recordingBindingManager.added/removed contain expected binding names and that on_connection_init.request/response are parsed as expected; this will lock the xDS contract and catch regressions in the decoder.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/rest-apis/gateway/webbroker-api-management.md`:
- Around line 540-545: The delete response example currently returns an
inconsistent id ("stock-trading-webbroker-api"); update the example JSON in the
delete response block so the "id" value matches the id used in create/get/list
examples ("stock-trading-v1.0") to keep the resource id consistent across
examples (look for the delete response JSON object with keys
"status","message","id").
In `@docs/rest-apis/gateway/websub-api-management.md`:
- Around line 232-247: Update the example request/response payloads to match the
new WebSub schema by replacing legacy spec.channels arrays with a keyed object
under spec.channels (each key is the channel name and value matches the
WebSubChannel shape) and by adding spec.allChannels using the
WebSubAllChannelPolicies shape where applicable; ensure per-event entries use
the WebSubEventPolicies structure (on_subscription, on_unsubscription,
on_message_received, on_message_delivery) and that per-channel overrides appear
inside each channel object, and update any example references to
WebSubAllChannelPolicies, WebSubChannel, and spec.allChannels accordingly so
examples validate against the documented schema.
In `@event-gateway/gateway-runtime/configs/channels-webbrokerapi-example.yaml`:
- Around line 11-13: Update the receiver type in the WebBrokerApi example so it
uses the protocol-mediation receiver; change the receiver block that currently
sets type: websocket to type: websocket-broker-api (the receiver definition in
the example YAML) so the sample routes through the registered mediation receiver
implementation rather than bypassing it.
In
`@event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go`:
- Around line 406-407: The channel receive in the loop (e.g., "case msg :=
<-conn.inbound:") must check whether the channel is closed before dereferencing
msg; change the receive to use the comma-ok form (msg, ok := <-conn.inbound) and
break/return from the loop when ok==false to avoid nil/zero-value panics when
accessing msg.Value or similar. Apply the same guard to the second occurrence
around the 465-466 region (any other direct receives from conn.inbound in the
same loops) so all inbound reads exit cleanly on channel close.
- Around line 235-247: Move the API-level on_connection_init.response processing
to before the WebSocket handshake: call
e.processor.ProcessConnectionInitResponse(r.Context(), e.channel.Name, respMsg)
prior to calling upgrader.Upgrade, check and handle the returned error (log and
return), then convert respMsg.Headers (map[string][]string) into an http.Header
and pass that as the third argument to upgrader.Upgrade instead of nil so the
handshake includes the policy-set headers; keep using upgrader.Upgrade, respMsg,
and e.processor.ProcessConnectionInitResponse as the reference points.
- Around line 516-517: The teardown currently closes shared channels
conn.inbound and conn.outbound which can cause panics when other goroutines
(e.g., the broker callback) still attempt to send; instead, remove the
close(conn.inbound) and close(conn.outbound) calls and wire teardown to cancel
the connection's context or signal the consumer to stop (use the existing
cancellation/context or a stop channel) so senders observe the cancellation and
stop sending; ensure any goroutine that owns/reads these channels is responsible
for closing them if/when truly appropriate and document that conn.outbound is
not closed by teardown to avoid races with broker callback sends.
In `@event-gateway/gateway-runtime/internal/hub/hub.go`:
- Around line 646-648: In ProcessByChainKey the body-phase RequestContext is
built with MessageToRequestContext which creates a fresh SharedContext, so
header-phase metadata is lost; change construction to reuse the header-phase
shared context by initializing reqCtx via MessageToRequestContext but then set
reqCtx.SharedContext = reqHeaderCtx.SharedContext (or call
MessageToRequestContext using reqHeaderCtx.SharedContext if overload exists)
before calling h.engine.ExecuteRequestBodyPolicies, and ensure the modified
reqCtx is the one passed to ExecuteRequestBodyPolicies so body policies see
header metadata (refer to MessageToRequestContext, reqCtx, reqHeaderCtx,
SharedContext, ExecuteRequestBodyPolicies).
In `@event-gateway/gateway-runtime/internal/runtime/runtime.go`:
- Around line 1233-1244: AddWebBrokerApiBinding registers receivers on r.wsMux
but never records the WebBrokerApi route path in bindingPaths, so
RemoveWebBrokerApiBinding cannot remove the handler from the same mux
(r.websubMux) and leaves stale WS routes; fix by recording each registered
WebBrokerApi path into bindingPaths when creating the receiver in
AddWebBrokerApiBinding and use a removable WebSocket mux implementation for
r.wsMux (same type that supports deregister/unregister) so
RemoveWebBrokerApiBinding can call the mux's remove/unregister method for each
stored path; update RemoveWebBrokerApiBinding to look up the path in
bindingPaths, call the r.wsMux removal API, remove the bindingPaths entry, and
also ensure activeReceivers[wbb.Name] is cleaned up consistently.
- Around line 445-487: LoadChannels() and Run() may already have added protocol
servers to r.servers, so the xDS branch must not blindly call newManagedServer()
and create duplicate listeners; instead, while holding r.mu check r.servers for
an existing server matching the protocol/port (e.g., WebSocket ->
r.cfg.Server.WebSocketPort with r.wsMux, WebSub-HTTP ->
r.cfg.Server.WebSubHTTPPort with r.websubMux, WebSub-HTTPS ->
r.cfg.Server.WebSubHTTPSPort) and if found reuse that server/mux (do not call
newManagedServer or append/run another one); only call
newManagedServer("WebSocket", ...), newManagedServer("WebSub-HTTP", ...), or
newManagedServer("WebSub-HTTPS", ...) when no existing server for that
protocol/port exists, then append to r.servers and launch via r.runServer as
currently done, ensuring mutex locking/unlocking semantics remain correct.
In `@event-gateway/gateway-runtime/internal/xdsclient/handler.go`:
- Around line 420-426: The decoder currently treats
ecr.Policies["on_connection_init"] only as a flat []interface{} and always maps
into apiPolicies.OnConnectionInit.Request via mapGenericPolicyList, causing any
nested "request"/"response" structures to be lost; update the handler that reads
ecr.Policies (the policiesMap branch around ecr.Policies, connInitIface,
mapGenericPolicyList and apiPolicies.OnConnectionInit.Request) to accept both
forms: if on_connection_init is a map[string]interface{} containing "request"
and/or "response", decode each side separately (map each []interface{} to
Request and Response using mapGenericPolicyList), otherwise fall back to the
existing flat-array behavior and populate Request for backward compatibility;
apply the same change to the other occurrence noted (the block around lines
handling on_connection_init at 461-466).
In `@event-gateway/README.md`:
- Around line 230-234: Replace the hard-coded Authorization header in the curl
example with curl's -u option: remove the "Authorization: Basic
YWRtaW46YWRtaW4=" header and use -u with a placeholder (e.g., -u admin:admin or
-u <user>:<pass>) in the curl invocation shown in the README curl example so
credentials aren't embedded as a base64 literal.
In `@gateway/gateway-controller/pkg/api/handlers/webbroker_api_handler.go`:
- Around line 93-106: ListWebBrokerApis currently ignores the incoming filter
params and returns all configs; update the function (ListWebBrokerApis) to apply
filtering on params.DisplayName, params.Version and params.Status against the
retrieved configs slice before building items. Iterate over configs, skip
entries that don't match the provided non-empty params (use string equality or
case-insensitive comparison as appropriate for displayName/version, and match
status to the config's status field), and only append matching configs to items;
keep the existing error handling and response construction otherwise so the
endpoint returns the filtered result set when filters are provided.
- Around line 111-115: The JSON response uses the wrong field name
"webBrokerApis" which violates the API contract; update the response to use the
schema-approved key "apis" instead (leave "status" and "count" unchanged and
continue returning the same items variable), i.e., replace the "webBrokerApis":
items entry with "apis": items in the handler that sends c.JSON (the listing
handler that constructs the gin.H with status/count/items).
- Around line 185-188: The delete success response currently returns only
"status" and "message"; update the JSON returned in the handler that sends this
response to include the deleted resource id (use the same id variable extracted
earlier in the handler, e.g., id or webBrokerApiID) by adding "id": id to the
gin.H map so the response matches the documented schema.
In `@gateway/gateway-controller/pkg/policyxds/event_channel_translator.go`:
- Around line 213-225: The current code leaves
apiOnConnectionInit/apiOnProduce/apiOnConsume nil when no policies are present,
causing policies.on_* to serialize as null; change the translator to initialize
these variables to empty slices/arrays (e.g., []interface{}{}) instead of nil
and only replace them with buildPolicyList(...) when Policies is non-nil/has
items (affect the same pattern at the other blocks referenced), so
EventChannelConfig/policies.on_* consistently serializes as an empty array;
update variables and the conditional branches around spec.AllChannels and
buildPolicyList accordingly.
In `@gateway/gateway-controller/pkg/storage/sql_store.go`:
- Around line 142-143: GetAllConfigs currently builds a UNION of tables but is
missing the webbroker_apis table even though the type mapping includes
"WebBrokerApi" (case "WebBrokerApi" -> "webbroker_apis"); update the
GetAllConfigs function to add the webbroker_apis SELECT/UNION branch so
WebBrokerApi rows are included in full scans/cache hydration. Locate
GetAllConfigs in sql_store.go and append the same SELECT projection used for
other artifact tables for the webbroker_apis table (using the "webbroker_apis"
identifier that matches the existing case mapping) ensuring the returned columns
and type tag align with other unions.
In `@gateway/gateway-controller/pkg/utils/api_deployment.go`:
- Around line 311-319: The WebBrokerApi branch currently bypasses schema
validation (api.WebBrokerApi branch setting apiName = c.Spec.DisplayName /
apiVersion = c.Spec.Version); restore validation by invoking
s.validator.Validate(&c) (as in the commented code), check if
len(validationErrors) > 0, call s.logValidationErrors(params.Logger, apiID,
apiName, validationErrors) and return nil, &ValidationErrorListError{Errors:
validationErrors} to fail fast; if validator support for WebBrokerApi truly
doesn't exist, explicitly return a validation/failure error (e.g.,
ValidationErrorListError or a clear not-supported validation error) instead of
persisting the resource so malformed Policy versions (like invalid version
patterns) cannot be stored.
---
Nitpick comments:
In `@event-gateway/gateway-runtime/internal/xdsclient/handler_test.go`:
- Around line 170-178: Add a focused unit test that calls HandleResources with a
WebBrokerApi payload and asserts add/update/remove behavior: create a test that
uses the existing recordingBindingManager (which has AddWebBrokerApiBinding and
RemoveWebBrokerApiBinding) to capture names, construct WebBrokerApi resource
protobufs for add, update and delete cases and include
on_connection_init.request and on_connection_init.response entries in the
payload so the new decoder path is exercised, invoke HandleResources and assert
recordingBindingManager.added/removed contain expected binding names and that
on_connection_init.request/response are parsed as expected; this will lock the
xDS contract and catch regressions in the decoder.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: cf530405-ba20-4f73-9ad0-b66b4a3bc881
📒 Files selected for processing (31)
docs/rest-apis/gateway/README.mddocs/rest-apis/gateway/schemas.mddocs/rest-apis/gateway/webbroker-api-management.mddocs/rest-apis/gateway/websub-api-management.mdevent-gateway/ARCHITECTURE.mdevent-gateway/README.mdevent-gateway/gateway-runtime/cmd/event-gateway/plugins.goevent-gateway/gateway-runtime/configs/channels-webbrokerapi-example.yamlevent-gateway/gateway-runtime/internal/binding/loader.goevent-gateway/gateway-runtime/internal/binding/types.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/consumer.goevent-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.goevent-gateway/gateway-runtime/internal/connectors/types.goevent-gateway/gateway-runtime/internal/hub/hub.goevent-gateway/gateway-runtime/internal/hub/policy_adapter.goevent-gateway/gateway-runtime/internal/runtime/runtime.goevent-gateway/gateway-runtime/internal/xdsclient/handler.goevent-gateway/gateway-runtime/internal/xdsclient/handler_test.gogateway/gateway-controller/api/management-openapi.yamlgateway/gateway-controller/cmd/controller/main.gogateway/gateway-controller/pkg/api/handlers/webbroker_api_handler.gogateway/gateway-controller/pkg/api/management/generated.gogateway/gateway-controller/pkg/eventlistener/api_processor.gogateway/gateway-controller/pkg/models/stored_config.gogateway/gateway-controller/pkg/policyxds/event_channel_translator.gogateway/gateway-controller/pkg/policyxds/snapshot.gogateway/gateway-controller/pkg/storage/gateway-controller-db.sqlgateway/gateway-controller/pkg/storage/sql_store.gogateway/gateway-controller/pkg/utils/api_deployment.gogateway/gateway-runtime/policy-engine/internal/executor/chain.gogateway/gateway-runtime/policy-engine/pkg/engine/types.go
| ws, err := upgrader.Upgrade(w, r, nil) | ||
| if err != nil { | ||
| slog.Error("WebSocket upgrade failed", "error", err) | ||
| return | ||
| } | ||
|
|
||
| // Apply API-level on_connection_init.response policies. | ||
| slog.Debug("[3] Applying API-level onConnectionInit.response policies", "api", e.channel.Name, "channel", channelName) | ||
|
|
||
| respMsg := &connectors.Message{ | ||
| Headers: map[string][]string{}, | ||
| } | ||
| if _, err := e.processor.ProcessConnectionInitResponse(r.Context(), e.channel.Name, respMsg); err != nil { |
There was a problem hiding this comment.
Apply on_connection_init.response before upgrade and pass headers to upgrader.
The response policy currently runs after Upgrade and its output is discarded, so it cannot affect handshake headers. Build response headers from the processed message and pass them as the third argument to upgrader.Upgrade.
Proposed fix
- // Upgrade to WebSocket.
- ws, err := upgrader.Upgrade(w, r, nil)
+ // Apply API-level on_connection_init.response policies before upgrade.
+ respMsg := &connectors.Message{Headers: map[string][]string{}}
+ processedResp, err := e.processor.ProcessConnectionInitResponse(r.Context(), e.channel.Name, respMsg)
+ if err != nil {
+ slog.Error("[3] onConnectionInit.response policy failed", "channel", e.channel.Name, "error", err)
+ http.Error(w, "connection init failed", http.StatusForbidden)
+ return
+ }
+ upgradeHeaders := http.Header{}
+ for k, vals := range processedResp.Headers {
+ for _, v := range vals {
+ upgradeHeaders.Add(k, v)
+ }
+ }
+
+ // Upgrade to WebSocket.
+ ws, err := upgrader.Upgrade(w, r, upgradeHeaders)
if err != nil {
slog.Error("WebSocket upgrade failed", "error", err)
return
}
-
- // Apply API-level on_connection_init.response policies.
- slog.Debug("[3] Applying API-level onConnectionInit.response policies", "api", e.channel.Name, "channel", channelName)
-
- respMsg := &connectors.Message{
- Headers: map[string][]string{},
- }
- if _, err := e.processor.ProcessConnectionInitResponse(r.Context(), e.channel.Name, respMsg); err != nil {
- slog.Error("[3] onConnectionInit.response policy failed", "channel", e.channel.Name, "error", err)
- ws.Close()
- return
- }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go`
around lines 235 - 247, Move the API-level on_connection_init.response
processing to before the WebSocket handshake: call
e.processor.ProcessConnectionInitResponse(r.Context(), e.channel.Name, respMsg)
prior to calling upgrader.Upgrade, check and handle the returned error (log and
return), then convert respMsg.Headers (map[string][]string) into an http.Header
and pass that as the third argument to upgrader.Upgrade instead of nil so the
handshake includes the policy-set headers; keep using upgrader.Upgrade, respMsg,
and e.processor.ProcessConnectionInitResponse as the reference points.
There was a problem hiding this comment.
Will check and consider as a future improvement, alongwith comments at #1944
There was a problem hiding this comment.
@senthuran16 Understood. Would you like me to open a GitHub issue to track this improvement — moving on_connection_init.response processing before the WebSocket upgrade and passing the resulting headers to upgrader.Upgrade? That way it won't be lost when addressing the related comments in #1944.
🧠 Learnings used
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1496
File: gateway/gateway-controller/pkg/policyxds/combined_cache.go:559-559
Timestamp: 2026-03-26T09:58:10.495Z
Learning: When reviewing Go code in this repo, usages of `cache.NewTestRawResponse` from `github.com/envoyproxy/go-control-plane/pkg/cache/v3` (Go-Control-Plane v0.14.0+) are valid for production use. Do not flag it as a missing/non-existent function or as “test-only”; the function is publicly exported with signature `func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse`.
There was a problem hiding this comment.
@coderabbitai open a GitHub issue to track this fix
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
gh issue create \
--repo wso2/api-platform \
--title "Move on_connection_init.response policy processing before WebSocket upgrade in WebBrokerApi receiver" \
--body "## Summary
In \`event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go\`, the \`on_connection_init.response\` policy is currently applied **after** the WebSocket handshake (\`upgrader.Upgrade\`), and its output headers are discarded. This means any headers set by the response policy cannot influence the WebSocket handshake.
## Expected Behavior
\`ProcessConnectionInitResponse\` should be called **before** \`upgrader.Upgrade\`, and the resulting headers from the processed message should be passed as the third argument to \`upgrader.Upgrade\` (instead of \`nil\`), so that handshake headers reflect policy output.
## References
- PR: https://github.com/wso2/api-platform/pull/1954
- Review comment: https://github.com/wso2/api-platform/pull/1954#discussion_r3239562627
- Related comments in: https://github.com/wso2/api-platform/pull/1944
## Requested by
`@senthuran16`"
Length of output: 49
@senthuran16 Done! I've opened GitHub issue #1958 to track moving on_connection_init.response policy processing before the WebSocket upgrade and passing the resulting headers to upgrader.Upgrade.
🧠 Learnings used
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1496
File: gateway/gateway-controller/pkg/policyxds/combined_cache.go:559-559
Timestamp: 2026-03-26T09:58:10.495Z
Learning: When reviewing Go code in this repo, usages of `cache.NewTestRawResponse` from `github.com/envoyproxy/go-control-plane/pkg/cache/v3` (Go-Control-Plane v0.14.0+) are valid for production use. Do not flag it as a missing/non-existent function or as “test-only”; the function is publicly exported with signature `func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse`.
| case msg := <-conn.inbound: | ||
| // Apply channel-specific on_produce policies. |
There was a problem hiding this comment.
Guard channel reads in loops to avoid nil-message panics.
When teardown closes channels, these receives can return zero values; subsequent len(msg.Value) dereferences can panic. Exit loop when channel is closed.
Proposed fix
- case msg := <-conn.inbound:
+ case msg, ok := <-conn.inbound:
+ if !ok || msg == nil {
+ return
+ }
// Apply channel-specific on_produce policies.
@@
- case msg := <-conn.outbound:
+ case msg, ok := <-conn.outbound:
+ if !ok || msg == nil {
+ return
+ }
slog.Debug("[7] Applying channel onConsume policies",Also applies to: 465-466
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go`
around lines 406 - 407, The channel receive in the loop (e.g., "case msg :=
<-conn.inbound:") must check whether the channel is closed before dereferencing
msg; change the receive to use the comma-ok form (msg, ok := <-conn.inbound) and
break/return from the loop when ok==false to avoid nil/zero-value panics when
accessing msg.Value or similar. Apply the same guard to the second occurrence
around the 465-466 region (any other direct receives from conn.inbound in the
same loops) so all inbound reads exit cleanly on channel close.
There was a problem hiding this comment.
Will check and consider as a future improvement, alongwith comments at #1944
There was a problem hiding this comment.
@senthuran16, understood — no action needed in this PR. Would you like me to open a follow-up GitHub issue to track the channel-close guard fix (comma-ok pattern for conn.inbound and conn.outbound reads) so it isn't lost?
🧠 Learnings used
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1496
File: gateway/gateway-controller/pkg/policyxds/combined_cache.go:559-559
Timestamp: 2026-03-26T09:58:10.495Z
Learning: When reviewing Go code in this repo, usages of `cache.NewTestRawResponse` from `github.com/envoyproxy/go-control-plane/pkg/cache/v3` (Go-Control-Plane v0.14.0+) are valid for production use. Do not flag it as a missing/non-existent function or as “test-only”; the function is publicly exported with signature `func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse`.
There was a problem hiding this comment.
@coderabbitai open a GitHub issue to track this fix
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
gh issue create \
--repo wso2/api-platform \
--title "Guard channel reads with comma-ok pattern in WebBrokerApi receiver to prevent nil-message panics" \
--body "## Summary
In \`event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go\`, the channel receives in \`inboundLoop\` (around line 406) and \`outboundLoop\` (around line 465) use the plain form (\`msg := <-conn.inbound\` / \`msg := <-conn.outbound\`). When teardown closes these channels, the receive returns a zero/nil value and subsequent dereferences (e.g., \`msg.Value\`) can panic.
## Proposed Fix
Apply the comma-ok pattern to both receive sites so the loops exit cleanly when the channel is closed:
\`\`\`go
// inboundLoop
case msg, ok := <-conn.inbound:
if !ok || msg == nil {
return
}
// outboundLoop
case msg, ok := <-conn.outbound:
if !ok || msg == nil {
return
}
\`\`\`
## References
- PR: https://github.com/wso2/api-platform/pull/1954
- Review comment: https://github.com/wso2/api-platform/pull/1954#discussion_r3239562632
Raised by `@senthuran16`." \
--assignee senthuran16
Length of output: 49
@senthuran16, the follow-up issue has been created: #1959
It tracks adding the comma-ok guard to both conn.inbound (line 406) and conn.outbound (line 465) in broker_api_connector.go, and has been assigned to you.
🧠 Learnings used
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1496
File: gateway/gateway-controller/pkg/policyxds/combined_cache.go:559-559
Timestamp: 2026-03-26T09:58:10.495Z
Learning: When reviewing Go code in this repo, usages of `cache.NewTestRawResponse` from `github.com/envoyproxy/go-control-plane/pkg/cache/v3` (Go-Control-Plane v0.14.0+) are valid for production use. Do not flag it as a missing/non-existent function or as “test-only”; the function is publicly exported with signature `func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse`.
| case "WebBrokerApi": | ||
| return "webbroker_apis", nil |
There was a problem hiding this comment.
Include WebBrokerApi in full config scans to avoid missing records.
Line 142 correctly maps WebBrokerApi, but GetAllConfigs (Line 848 onward) still does not UNION webbroker_apis. As a result, WebBrokerApi artifacts can be saved and loaded by direct lookup, yet omitted from full-list reads/cache hydration paths.
Suggested fix
UNION ALL
+ SELECT a.uuid, a.kind, a.handle, a.display_name, a.version, wb.configuration, a.desired_state,
+ a.deployment_id, a.origin, a.created_at, a.updated_at, a.deployed_at,
+ a.cp_sync_status, a.cp_sync_info, a.cp_artifact_id
+ FROM artifacts a
+ JOIN webbroker_apis wb ON a.uuid = wb.uuid AND a.gateway_id = wb.gateway_id
+ WHERE a.gateway_id = ?
+
+ UNION ALL
+
SELECT a.uuid, a.kind, a.handle, a.display_name, a.version, lp.configuration, a.desired_state,
a.deployment_id, a.origin, a.created_at, a.updated_at, a.deployed_at,
a.cp_sync_status, a.cp_sync_info, a.cp_artifact_id
FROM artifacts a
JOIN llm_providers lp ON a.uuid = lp.uuid AND a.gateway_id = lp.gateway_id
@@
- rows, err := s.query(query, s.gatewayId, s.gatewayId, s.gatewayId, s.gatewayId, s.gatewayId)
+ rows, err := s.query(query, s.gatewayId, s.gatewayId, s.gatewayId, s.gatewayId, s.gatewayId, s.gatewayId)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@gateway/gateway-controller/pkg/storage/sql_store.go` around lines 142 - 143,
GetAllConfigs currently builds a UNION of tables but is missing the
webbroker_apis table even though the type mapping includes "WebBrokerApi" (case
"WebBrokerApi" -> "webbroker_apis"); update the GetAllConfigs function to add
the webbroker_apis SELECT/UNION branch so WebBrokerApi rows are included in full
scans/cache hydration. Locate GetAllConfigs in sql_store.go and append the same
SELECT projection used for other artifact tables for the webbroker_apis table
(using the "webbroker_apis" identifier that matches the existing case mapping)
ensuring the returned columns and type tag align with other unions.
There was a problem hiding this comment.
Will check and consider as a future improvement, alongwith comments at #1944.
Note: This might be applicable to WebSub APIs too
Will open a Git issue to track this and resolve the comment
There was a problem hiding this comment.
@senthuran16, thanks for the note. Just to clarify: looking at the current GetAllConfigs query, websub_apis is already included in the UNION, so the gap is specific to webbroker_apis. Happy to open a GitHub issue to track this if you'd like.
🧠 Learnings used
Learnt from: Saadha123
Repo: wso2/api-platform PR: 1420
File: gateway/gateway-controller/pkg/config/policy_validator.go:149-163
Timestamp: 2026-03-20T15:22:10.323Z
Learning: In the API Gateway controller config/transform/builder code under `gateway/gateway-controller/pkg/**`, treat `ResolvePolicyVersion` in `policy_validator.go` as a validation + definition-existence helper only. At its call sites, do not pass the resolved full semver (e.g., `v1.0.0`) downstream—retain and pass the original major-only policy reference string (e.g., `v1`) to `ConvertAPIPolicyToModel`/`convertAPIPolicyToSDK`, since the policy engine expects major-only format. For empty-version handling, note that the `ResolvePolicyVersion` path is not exercised by LLM policies; those policies resolve versions through the transformer’s `policyVersionResolver` instead.
Learnt from: VirajSalaka
Repo: wso2/api-platform PR: 1425
File: gateway/gateway-controller/pkg/api/handlers/handlers.go:730-732
Timestamp: 2026-03-22T04:48:40.788Z
Learning: In the wso2/api-platform `gateway/gateway-controller` service, treat `EventHub` as DB-backed: when `s.eventHub != nil`, `s.db` is guaranteed to be non-nil. Therefore, for handlers and related code, it’s sufficient to guard on `s.eventHub == nil` (e.g., skip EventHub-dependent paths) and you should not also add redundant `s.db == nil` checks. This is based on the project invariant that memory-only (non-DB) mode is planned for removal.
Learnt from: nimsara66
Repo: wso2/api-platform PR: 1438
File: gateway/gateway-controller/pkg/utils/api_deployment.go:353-359
Timestamp: 2026-03-23T02:48:39.635Z
Learning: For secret handling during API deployment in the gateway-controller codebase, follow the intended design “resolve at runtime, persist unresolved.” When calling `saveOrUpdateConfig`, always persist the stored (unresolved) config (`storedCfg`) that still contains `$secret{...}` placeholders. Use the resolved/decrypted config (`resolvedCfg`) only for runtime purposes (e.g., values returned in `APIDeploymentResult.StoredConfig` for xDS/policy evaluation). Never persist or write `resolvedCfg` (plaintext secrets) to the DB or in-memory store—reviews should flag any changes that write decrypted secret values to storage or logs.
Learnt from: dushaniw
Repo: wso2/api-platform PR: 1463
File: gateway/gateway-controller/pkg/controlplane/sync.go:323-362
Timestamp: 2026-03-25T04:34:50.763Z
Learning: In the gateway-controller codebase, it is an accepted design to log and continue after failures in in-memory state updates (e.g., store.Update) or event-hub publishes (e.g., eventHub.PublishEvent) when the failure occurs following a successful DB upsert (e.g., UpsertConfig). Do not review-comment this as a bug or require synchronous rollback/error propagation. This applies to sync logic (e.g., processSyncStatusUpdates/processSyncDeletions), websocket event handlers (e.g., handleAPIUndeployedEvent and similar), and the REST service layer (e.g., saveOrUpdateConfig). The DB upsert rollback is intentionally not enforced because prior config state is not retained before overwriting, and gateway restart is relied on to recover from any resulting in-memory/xDS staleness via store reload from the DB.
Learnt from: ShalkiWenushika
Repo: wso2/api-platform PR: 1436
File: gateway/gateway-controller/pkg/storage/interface.go:150-156
Timestamp: 2026-03-25T17:21:43.067Z
Learning: In `gateway/gateway-controller/pkg/storage/`, treat `ConfigStore` (in `memory.go`) and the `Storage` interface (in `interface.go`) as unrelated abstractions. `ConfigStore` is an in-memory cache with its own method set, while `Storage` is the persistence-layer interface (implemented by `sqlStore` and test mocks). When reviewing changes, do not classify modifications to the `Storage` interface (e.g., adding new methods) as breaking changes for `ConfigStore`; since `ConfigStore` does not implement `Storage`, they do not impact each other.
Learnt from: Arshardh
Repo: wso2/api-platform PR: 1478
File: gateway/gateway-controller/pkg/storage/sql_store.go:1431-1440
Timestamp: 2026-03-25T19:08:39.349Z
Learning: In wso2/api-platform’s gateway/gateway-controller, treat `APIKey.ApplicationID` as being intentionally populated from `app.application_uuid` (and `application_id` in xDS/analytics payloads as the UUID-based value) rather than from `app.application_id`. `application_id` is mutable, while `application_uuid` is the stable identifier required for analytics event tracking. When reviewing SQL queries/joins in the storage layer (e.g., `GetAPIKeysByAPI`, `GetAllAPIKeys`), do not flag this UUID-vs-id mapping as incorrect and do not suggest switching to `app.application_id`.
Learnt from: VirajSalaka
Repo: wso2/api-platform PR: 1493
File: gateway/gateway-controller/tests/integration/schema_test.go:201-202
Timestamp: 2026-03-26T06:54:34.762Z
Learning: For the wso2/api-platform gateway/gateway-controller service, treat “delete and recreate” of the database (during tests/upgrades) as the intentional/accepted upgrade path for SQL schema changes. Do not raise review issues for the absence of schema migrations or SQL schema-version upgrade logic in this service’s database implementation files (e.g., sqlite.go/postgres.go), since this is a deliberate design choice.
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1496
File: gateway/gateway-controller/pkg/policyxds/combined_cache.go:559-559
Timestamp: 2026-03-26T09:58:10.495Z
Learning: When reviewing Go code in this repo, usages of `cache.NewTestRawResponse` from `github.com/envoyproxy/go-control-plane/pkg/cache/v3` (Go-Control-Plane v0.14.0+) are valid for production use. Do not flag it as a missing/non-existent function or as “test-only”; the function is publicly exported with signature `func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse`.
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1500
File: gateway/gateway-controller/pkg/utils/mcp_transformer.go:150-153
Timestamp: 2026-03-27T17:26:39.573Z
Learning: In wso2/api-platform’s `gateway/gateway-controller` service, the OpenAPI spec `gateway/gateway-controller/api/management-openapi.yaml` defines the `Policy` schema’s `version` as required (under `required: [name, version]`) with pattern `'^v\d+$'`, and the generated Go model `gateway/gateway-controller/pkg/api/management/generated.go` represents `Policy.Version` as a non-pointer `string`.
During code review, ensure any PR that constructs an `api.Policy` does not omit/leave `Version` unset; leaving it empty will fail the API validation. If the PR’s change requires `Version` to become optional (or otherwise changes its required/validation semantics), you must update `management-openapi.yaml` accordingly (e.g., remove `version` from the `required` list), then regenerate the Go types so validation behavior matches the spec.
There was a problem hiding this comment.
@coderabbitai open a GitHub issue to track this fix
There was a problem hiding this comment.
Rate Limit Exceeded
@senthuran16 have exceeded the limit for the number of chat messages per hour. Please wait 10 minutes and 31 seconds before sending another message.
There was a problem hiding this comment.
@coderabbitai open a GitHub issue to track this fix
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
gh issue create \
--repo wso2/api-platform \
--title "Fix: Include webbroker_apis in GetAllConfigs UNION query" \
--body "## Summary
The \`GetAllConfigs\` function in \`gateway/gateway-controller/pkg/storage/sql_store.go\` builds a UNION query across all artifact type tables but is missing the \`webbroker_apis\` table. This means \`WebBrokerApi\` artifacts are saved and retrievable via direct lookup, but are silently omitted from full-list scans and in-memory cache hydration paths.
## Impact
Any code path that calls \`GetAllConfigs\` (e.g., cache reload on gateway restart) will not return \`WebBrokerApi\` configurations, potentially causing stale or incomplete state.
## Fix
Add a UNION branch for \`webbroker_apis\` in \`GetAllConfigs\`, consistent with the existing branches for \`rest_apis\`, \`websub_apis\`, \`llm_providers\`, \`llm_proxies\`, and \`mcp_proxies\`. Also increment the parameter count passed to \`s.query\`.
\`\`\`diff
UNION ALL
+ SELECT a.uuid, a.kind, a.handle, a.display_name, a.version, wb.configuration, a.desired_state,
+ a.deployment_id, a.origin, a.created_at, a.updated_at, a.deployed_at,
+ a.cp_sync_status, a.cp_sync_info, a.cp_artifact_id
+ FROM artifacts a
+ JOIN webbroker_apis wb ON a.uuid = wb.uuid AND a.gateway_id = wb.gateway_id
+ WHERE a.gateway_id = ?
+
+ UNION ALL
SELECT a.uuid, a.kind, a.handle, a.display_name, a.version, lp.configuration, a.desired_state,
\`\`\`
And update the query call:
\`\`\`diff
- rows, err := s.query(query, s.gatewayId, s.gatewayId, s.gatewayId, s.gatewayId, s.gatewayId)
+ rows, err := s.query(query, s.gatewayId, s.gatewayId, s.gatewayId, s.gatewayId, s.gatewayId, s.gatewayId)
\`\`\`
## References
- Identified in PR: https://github.com/wso2/api-platform/pull/1954
- Review comment: https://github.com/wso2/api-platform/pull/1954#discussion_r3239562683
- Requested by: `@senthuran16`"
Unrecoverable error: Failed to clone repository
🏁 Script executed:
gh issue create \
--repo wso2/api-platform \
--title "Fix: Include webbroker_apis in GetAllConfigs UNION query" \
--body "## Summary
The \`GetAllConfigs\` function in \`gateway/gateway-controller/pkg/storage/sql_store.go\` builds a UNION query across all artifact type tables but is missing the \`webbroker_apis\` table. This means \`WebBrokerApi\` artifacts are saved and retrievable via direct lookup, but are silently omitted from full-list scans and in-memory cache hydration paths.
## Impact
Any code path that calls \`GetAllConfigs\` (e.g., cache reload on gateway restart) will not return \`WebBrokerApi\` configurations, potentially causing stale or incomplete state.
## Fix
Add a UNION branch for \`webbroker_apis\` in \`GetAllConfigs\`, consistent with the existing branches for \`rest_apis\`, \`websub_apis\`, \`llm_providers\`, \`llm_proxies\`, and \`mcp_proxies\`. Also increment the parameter count passed to \`s.query\`.
\`\`\`diff
UNION ALL
+ SELECT a.uuid, a.kind, a.handle, a.display_name, a.version, wb.configuration, a.desired_state,
+ a.deployment_id, a.origin, a.created_at, a.updated_at, a.deployed_at,
+ a.cp_sync_status, a.cp_sync_info, a.cp_artifact_id
+ FROM artifacts a
+ JOIN webbroker_apis wb ON a.uuid = wb.uuid AND a.gateway_id = wb.gateway_id
+ WHERE a.gateway_id = ?
+
+ UNION ALL
SELECT a.uuid, a.kind, a.handle, a.display_name, a.version, lp.configuration, a.desired_state,
\`\`\`
And update the query call:
\`\`\`diff
- rows, err := s.query(query, s.gatewayId, s.gatewayId, s.gatewayId, s.gatewayId, s.gatewayId)
+ rows, err := s.query(query, s.gatewayId, s.gatewayId, s.gatewayId, s.gatewayId, s.gatewayId, s.gatewayId)
\`\`\`
## References
- Identified in PR: https://github.com/wso2/api-platform/pull/1954
- Review comment: https://github.com/wso2/api-platform/pull/1954#discussion_r3239562683
- Requested by: `@senthuran16`"
Length of output: 49
@senthuran16, the GitHub issue has been created to track this fix: #1965
It covers adding the missing webbroker_apis UNION branch to GetAllConfigs in gateway/gateway-controller/pkg/storage/sql_store.go, along with the corresponding parameter count update in the s.query call.
🧠 Learnings used
Learnt from: Saadha123
Repo: wso2/api-platform PR: 1420
File: gateway/gateway-controller/pkg/config/policy_validator.go:149-163
Timestamp: 2026-03-20T15:22:10.323Z
Learning: In the API Gateway controller config/transform/builder code under `gateway/gateway-controller/pkg/**`, treat `ResolvePolicyVersion` in `policy_validator.go` as a validation + definition-existence helper only. At its call sites, do not pass the resolved full semver (e.g., `v1.0.0`) downstream—retain and pass the original major-only policy reference string (e.g., `v1`) to `ConvertAPIPolicyToModel`/`convertAPIPolicyToSDK`, since the policy engine expects major-only format. For empty-version handling, note that the `ResolvePolicyVersion` path is not exercised by LLM policies; those policies resolve versions through the transformer’s `policyVersionResolver` instead.
Learnt from: VirajSalaka
Repo: wso2/api-platform PR: 1425
File: gateway/gateway-controller/pkg/api/handlers/handlers.go:730-732
Timestamp: 2026-03-22T04:48:40.788Z
Learning: In the wso2/api-platform `gateway/gateway-controller` service, treat `EventHub` as DB-backed: when `s.eventHub != nil`, `s.db` is guaranteed to be non-nil. Therefore, for handlers and related code, it’s sufficient to guard on `s.eventHub == nil` (e.g., skip EventHub-dependent paths) and you should not also add redundant `s.db == nil` checks. This is based on the project invariant that memory-only (non-DB) mode is planned for removal.
Learnt from: nimsara66
Repo: wso2/api-platform PR: 1438
File: gateway/gateway-controller/pkg/utils/api_deployment.go:353-359
Timestamp: 2026-03-23T02:48:39.635Z
Learning: For secret handling during API deployment in the gateway-controller codebase, follow the intended design “resolve at runtime, persist unresolved.” When calling `saveOrUpdateConfig`, always persist the stored (unresolved) config (`storedCfg`) that still contains `$secret{...}` placeholders. Use the resolved/decrypted config (`resolvedCfg`) only for runtime purposes (e.g., values returned in `APIDeploymentResult.StoredConfig` for xDS/policy evaluation). Never persist or write `resolvedCfg` (plaintext secrets) to the DB or in-memory store—reviews should flag any changes that write decrypted secret values to storage or logs.
Learnt from: dushaniw
Repo: wso2/api-platform PR: 1463
File: gateway/gateway-controller/pkg/controlplane/sync.go:323-362
Timestamp: 2026-03-25T04:34:50.763Z
Learning: In the gateway-controller codebase, it is an accepted design to log and continue after failures in in-memory state updates (e.g., store.Update) or event-hub publishes (e.g., eventHub.PublishEvent) when the failure occurs following a successful DB upsert (e.g., UpsertConfig). Do not review-comment this as a bug or require synchronous rollback/error propagation. This applies to sync logic (e.g., processSyncStatusUpdates/processSyncDeletions), websocket event handlers (e.g., handleAPIUndeployedEvent and similar), and the REST service layer (e.g., saveOrUpdateConfig). The DB upsert rollback is intentionally not enforced because prior config state is not retained before overwriting, and gateway restart is relied on to recover from any resulting in-memory/xDS staleness via store reload from the DB.
Learnt from: ShalkiWenushika
Repo: wso2/api-platform PR: 1436
File: gateway/gateway-controller/pkg/storage/interface.go:150-156
Timestamp: 2026-03-25T17:21:43.067Z
Learning: In `gateway/gateway-controller/pkg/storage/`, treat `ConfigStore` (in `memory.go`) and the `Storage` interface (in `interface.go`) as unrelated abstractions. `ConfigStore` is an in-memory cache with its own method set, while `Storage` is the persistence-layer interface (implemented by `sqlStore` and test mocks). When reviewing changes, do not classify modifications to the `Storage` interface (e.g., adding new methods) as breaking changes for `ConfigStore`; since `ConfigStore` does not implement `Storage`, they do not impact each other.
Learnt from: Arshardh
Repo: wso2/api-platform PR: 1478
File: gateway/gateway-controller/pkg/storage/sql_store.go:1431-1440
Timestamp: 2026-03-25T19:08:39.349Z
Learning: In wso2/api-platform’s gateway/gateway-controller, treat `APIKey.ApplicationID` as being intentionally populated from `app.application_uuid` (and `application_id` in xDS/analytics payloads as the UUID-based value) rather than from `app.application_id`. `application_id` is mutable, while `application_uuid` is the stable identifier required for analytics event tracking. When reviewing SQL queries/joins in the storage layer (e.g., `GetAPIKeysByAPI`, `GetAllAPIKeys`), do not flag this UUID-vs-id mapping as incorrect and do not suggest switching to `app.application_id`.
Learnt from: VirajSalaka
Repo: wso2/api-platform PR: 1493
File: gateway/gateway-controller/tests/integration/schema_test.go:201-202
Timestamp: 2026-03-26T06:54:34.762Z
Learning: For the wso2/api-platform gateway/gateway-controller service, treat “delete and recreate” of the database (during tests/upgrades) as the intentional/accepted upgrade path for SQL schema changes. Do not raise review issues for the absence of schema migrations or SQL schema-version upgrade logic in this service’s database implementation files (e.g., sqlite.go/postgres.go), since this is a deliberate design choice.
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1496
File: gateway/gateway-controller/pkg/policyxds/combined_cache.go:559-559
Timestamp: 2026-03-26T09:58:10.495Z
Learning: When reviewing Go code in this repo, usages of `cache.NewTestRawResponse` from `github.com/envoyproxy/go-control-plane/pkg/cache/v3` (Go-Control-Plane v0.14.0+) are valid for production use. Do not flag it as a missing/non-existent function or as “test-only”; the function is publicly exported with signature `func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse`.
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1500
File: gateway/gateway-controller/pkg/utils/mcp_transformer.go:150-153
Timestamp: 2026-03-27T17:26:39.573Z
Learning: In wso2/api-platform’s `gateway/gateway-controller` service, the OpenAPI spec `gateway/gateway-controller/api/management-openapi.yaml` defines the `Policy` schema’s `version` as required (under `required: [name, version]`) with pattern `'^v\d+$'`, and the generated Go model `gateway/gateway-controller/pkg/api/management/generated.go` represents `Policy.Version` as a non-pointer `string`.
During code review, ensure any PR that constructs an `api.Policy` does not omit/leave `Version` unset; leaving it empty will fail the API validation. If the PR’s change requires `Version` to become optional (or otherwise changes its required/validation semantics), you must update `management-openapi.yaml` accordingly (e.g., remove `version` from the `required` list), then regenerate the Go types so validation behavior matches the spec.
Purpose
This will replace PR: #1944
WebBrokerApi Walkthrough
The WebBrokerApi enables bidirectional WebSocket ↔ Kafka protocol mediation. This walkthrough demonstrates creating a stock trading API where clients can produce messages to Kafka and consume messages in real-time over WebSocket.
Step 1: Create a WebBroker API
Use the following curl command to create a WebBrokerApi with a
priceschannel that maps to Kafka topics:This creates a WebBrokerApi where:
stock.pricesKafka topicdummy.pricesKafka topic are delivered to the WebSocket clientStep 2: Connect via WebSocket
Install
wscatif you haven't already:Connect to the WebBroker API and select the
priceschannel using theX-channelheader:wscat -c ws://localhost:8081/stock-trading/v1.0 -H "X-channel: prices"Once connected, you'll see:
Step 3: Monitor Messages Published to Kafka
In a new terminal, start a Kafka consumer to monitor messages that clients send via WebSocket:
docker exec -it event-gateway-kafka-1 /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic stock.prices \ --from-beginningNow, type a message in your WebSocket terminal (Step 2) and press Enter:
The message should appear in the Kafka consumer terminal immediately.
Step 4: Publish Messages from Kafka to WebSocket
In another terminal, start a Kafka producer to send messages that will be delivered to WebSocket clients:
docker exec -it event-gateway-kafka-1 /opt/kafka/bin/kafka-console-producer.sh \ --bootstrap-server localhost:9092 \ --topic dummy.pricesType a message in the Kafka producer terminal and press Enter:
The message should appear in your WebSocket terminal (Step 2):
Goals
Approach
User stories
Documentation
Automation tests
Security checks
Samples
Related PRs
Test environment