Improve gateway event delivery when agent-manager scaled up#949
Conversation
|
Warning Review limit reached
Your plan includes 1 review of capacity. Refill in 45 minutes and 10 seconds. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more review capacity refills, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than trial, open-source, and free plans. In all cases, review capacity refills continuously over time. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughAdds an EventHub: domain types, in-memory gateway registry, PostgreSQL-backed SQLBackend with polling/delivery and cleanup, DB migration v21, WebSocket manager/controller integration to subscribe/forward events, DI wiring (ProvideEventHub), GatewayEventsService publishing via EventHub, and graceful shutdown closing the hub. ChangesEventHub Multi-Pod Event Delivery
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
agent-manager-service/services/gateway_events_service.go (1)
71-103:⚠️ Potential issue | 🟠 Major | ⚡ Quick winUse operation-specific EventHub
Actionvalues instead of hardcoded"CREATE"All events are currently published with
Action: "CREATE", including undeploy/revoke/update paths. That loses semantic correctness for action-driven consumers.💡 Proposed fix
-func (s *GatewayEventsService) broadcastEvent(gatewayID string, eventType string, payload interface{}) error { +func (s *GatewayEventsService) broadcastEvent(gatewayID string, eventType string, action string, payload interface{}) error { @@ evt := eventhub.Event{ GatewayID: gatewayID, OriginatedTimestamp: time.Now(), EventType: eventhub.EventType(eventType), - Action: "CREATE", + Action: action, EntityID: correlationID, EventData: string(eventJSON), } @@ func (s *GatewayEventsService) BroadcastDeploymentEvent(gatewayID string, event *DeploymentEvent) error { - return s.broadcastEvent(gatewayID, "api.deployed", event) + return s.broadcastEvent(gatewayID, "api.deployed", "CREATE", event) } @@ func (s *GatewayEventsService) BroadcastUndeploymentEvent(gatewayID string, event *APIUndeploymentEvent) error { - return s.broadcastEvent(gatewayID, "api.undeployed", event) + return s.broadcastEvent(gatewayID, "api.undeployed", "DELETE", event) } @@ func (s *GatewayEventsService) BroadcastLLMProviderDeploymentEvent(gatewayID string, event *models.LLMProviderDeploymentEvent) error { - return s.broadcastEvent(gatewayID, "llmprovider.deployed", event) + return s.broadcastEvent(gatewayID, "llmprovider.deployed", "CREATE", event) } @@ func (s *GatewayEventsService) BroadcastLLMProviderUndeploymentEvent(gatewayID string, event *models.LLMProviderUndeploymentEvent) error { - return s.broadcastEvent(gatewayID, "llmprovider.undeployed", event) + return s.broadcastEvent(gatewayID, "llmprovider.undeployed", "DELETE", event) } @@ func (s *GatewayEventsService) BroadcastLLMProxyDeploymentEvent(gatewayID string, event *models.LLMProxyDeploymentEvent) error { - return s.broadcastEvent(gatewayID, "llmproxy.deployed", event) + return s.broadcastEvent(gatewayID, "llmproxy.deployed", "CREATE", event) } @@ func (s *GatewayEventsService) BroadcastLLMProxyUndeploymentEvent(gatewayID string, event *models.LLMProxyUndeploymentEvent) error { - return s.broadcastEvent(gatewayID, "llmproxy.undeployed", event) + return s.broadcastEvent(gatewayID, "llmproxy.undeployed", "DELETE", event) } @@ func (s *GatewayEventsService) BroadcastAPIKeyCreatedEvent(gatewayID string, event *models.APIKeyCreatedEvent) error { - return s.broadcastEvent(gatewayID, "apikey.created", event) + return s.broadcastEvent(gatewayID, "apikey.created", "CREATE", event) } @@ func (s *GatewayEventsService) BroadcastAPIKeyRevokedEvent(gatewayID string, event *models.APIKeyRevokedEvent) error { - return s.broadcastEvent(gatewayID, "apikey.revoked", event) + return s.broadcastEvent(gatewayID, "apikey.revoked", "DELETE", event) } @@ func (s *GatewayEventsService) BroadcastAPIKeyUpdatedEvent(gatewayID string, event *models.APIKeyUpdatedEvent) error { - return s.broadcastEvent(gatewayID, "apikey.updated", event) + return s.broadcastEvent(gatewayID, "apikey.updated", "UPDATE", event) }Also applies to: 117-150
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@agent-manager-service/services/gateway_events_service.go` around lines 71 - 103, The broadcastEvent function always sets EventHub Action to the hardcoded string "CREATE" which loses semantic intent; update broadcastEvent (and any callers) to accept an action parameter (e.g., action string) or map eventType to the correct eventhub action and use that value when constructing evt.Action; adjust GatewayEventsService.broadcastEvent signature and all invocations to pass the appropriate action (e.g., "CREATE", "UPDATE", "DELETE"/"REVOKE"/"UNDEPLOY" as applicable) so events published by eventhub have correct operation-specific Action values.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@agent-manager-service/db_migrations/021_add_eventhub_tables.go`:
- Line 37: The migration's events table defines the "action" column with a CHECK
that only allows 'CREATE','UPDATE','DELETE', which will reject API key lifecycle
events; update the CHECK constraint in
agent-manager-service/db_migrations/021_add_eventhub_tables.go (the SQL that
defines the action TEXT NOT NULL CHECK ...) to include 'PROVISION' and 'REVOKE'
so API_KEY provision/revoke events can be inserted; ensure the modified
migration string still matches the surrounding SQL formatting and run or re-run
migration tests to verify inserts succeed for those actions.
In `@agent-manager-service/eventhub/sqlbackend.go`:
- Around line 519-553: The code is advancing gw.knownVersion and gw.lastPolled
even when subscribers is empty; update the logic so updates only occur when
there are active subscribers: detect if len(subscribers) == 0 (or use
subscriberChannelsAvailable(subscribers) appropriately) and in that case do not
set gw.knownVersion or gw.lastPolled (leave the gateway cursor unchanged) and
return/log that delivery was skipped; otherwise proceed with the existing update
path. Ensure the check is applied around the block that assigns gw.knownVersion
and gw.lastPolled so queued events are not silently dropped when subscribers is
empty.
- Around line 354-372: The unsubscribe flow races with pollGatewayWithState
sending to subscribers because removeSubscriber/removeAllSubscribers drop the
registry lock before closing channels; change the coordination so channels are
not closed while pollGatewayWithState may send: either (A) in
pollGatewayWithState hold b.registry.mu.RLock() across the send loop that
iterates gw.subscribers (so sends happen under the read lock and
Unsubscribe/UnsubscribeAll will block until sends complete), or (B) add a
per-subscriber struct with a shutdown/inflight counter or a closed flag (update
removeSubscriber/removeAllSubscribers to mark subscriber as closed under
b.registry.mu.Lock(), wait for inflight sends to drain or set a flag, then close
the channel) to ensure close(ch) cannot race with ch <- evt; update Unsubscribe,
UnsubscribeAll, removeSubscriber, removeAllSubscribers and pollGatewayWithState
accordingly to use the chosen coordination.
In `@agent-manager-service/websocket/manager.go`:
- Around line 149-158: The connection registration currently logs Subscribe
errors but continues as if successful; update the logic in the block using
m.hub.Subscribe(gatewayID) so that when Subscribe returns an error you propagate
that failure back to the caller (do not proceed with setting conn.eventSub,
m.wg.Add(1) or launching m.forwardEvents), close or cleanup the partially
created conn, and return an appropriate error/result indicating registration
failed; specifically modify the branch handling Subscribe(gatewayID) to return
on err (after logging) instead of proceeding to set conn.eventSub and start the
goroutine.
---
Outside diff comments:
In `@agent-manager-service/services/gateway_events_service.go`:
- Around line 71-103: The broadcastEvent function always sets EventHub Action to
the hardcoded string "CREATE" which loses semantic intent; update broadcastEvent
(and any callers) to accept an action parameter (e.g., action string) or map
eventType to the correct eventhub action and use that value when constructing
evt.Action; adjust GatewayEventsService.broadcastEvent signature and all
invocations to pass the appropriate action (e.g., "CREATE", "UPDATE",
"DELETE"/"REVOKE"/"UNDEPLOY" as applicable) so events published by eventhub have
correct operation-specific Action values.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 08b89dd8-8026-48cd-90d0-28bc7b0c9562
📒 Files selected for processing (13)
agent-manager-service/app/app.goagent-manager-service/controllers/websocket_controller.goagent-manager-service/db_migrations/021_add_eventhub_tables.goagent-manager-service/db_migrations/migration_list.goagent-manager-service/eventhub/sqlbackend.goagent-manager-service/eventhub/topic.goagent-manager-service/eventhub/types.goagent-manager-service/services/gateway_events_service.goagent-manager-service/websocket/connection.goagent-manager-service/websocket/manager.goagent-manager-service/wiring/params.goagent-manager-service/wiring/wire.goagent-manager-service/wiring/wire_gen.go
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (2)
agent-manager-service/eventhub/topic.go (1)
100-107: 🏗️ Heavy liftAvoid returning internal
*gatewaypointers fromgatewayRegistry.getAll.
Current callsites accessgw.subscribers,gw.knownVersion,gw.lastPolled, andgw.queuedLoggedAtundergatewayRegistry.mu(so the race concern isn’t evident in current usage), butgetAllstill leaks mutable pointers outside the lock scope and remains an easy footgun for future changes—return snapshots or provide locked accessor/iteration APIs to enforce the locking contract.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@agent-manager-service/eventhub/topic.go` around lines 100 - 107, gatewayRegistry.getAll currently returns slices of internal *gateway pointers which leaks mutable state (subscribers, knownVersion, lastPolled, queuedLoggedAt) outside the registry lock; change getAll to avoid returning internal pointers by either returning a slice of gateway value copies (copy fields into new gateway structs) or by replacing getAll with a locked iteration helper like gatewayRegistry.ranged(fn *gateway) that acquires mu.RLock(), calls the provided callback for each gateway while still holding the lock, and does not expose internal pointers after unlock; update callsites to use the new copying or callback API so no internal gateway pointer escapes the lock.console/workspaces/pages/llm-providers/src/subComponents/AddLLMProviderForm.tsx (1)
606-606: 💤 Low valueUse
===for consistency.Line 606 uses
==to compare length, but line 156 uses===for the same check. Prefer===for type-safe equality throughout.✨ Proposed fix
disabled={ isSubmitting || !formData.gatewayIds || - formData.gatewayIds?.length == 0 + formData.gatewayIds?.length === 0 }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@console/workspaces/pages/llm-providers/src/subComponents/AddLLMProviderForm.tsx` at line 606, The comparison formData.gatewayIds?.length == 0 in AddLLMProviderForm should use strict equality; replace the loose equality operator (==) with the strict operator (===) so it reads formData.gatewayIds?.length === 0 to match the other check and ensure type-safe comparison for the gatewayIds length check.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@agent-manager-service/eventhub/sqlbackend.go`:
- Around line 230-245: Create a single helper (e.g., normalizeGatewayID or
normalizeGateway) that trims whitespace and validates non-empty gateway IDs, and
use it everywhere RegisterGateway, PublishEvent, Subscribe, Unsubscribe, and
UnsubscribeAll currently accept gatewayID directly; replace the inline
strings.TrimSpace/empty checks with calls to this helper, return the same
"gateway_id cannot be empty" error when validation fails, and ensure all
DB/registry calls (including b.upsertGatewayStmt.Exec and
registry.register/unregister/lookup) use the normalized value so `" gateway-1 "`
and `"gateway-1"` map to the same key.
- Around line 211-219: Initialize currently starts goroutines (pollLoop,
cleanupLoop) before validating SQLBackendConfig, which can cause panics when
PollInterval or CleanupInterval are <= 0 and incorrect purges when
RetentionPeriod <= 0; update Initialize to validate b.config.PollInterval,
b.config.CleanupInterval and b.config.RetentionPeriod (ensure each is > 0)
immediately after prepareStatements() and before b.wg.Add/starting goroutines,
returning a descriptive error if any value is invalid (or optionally set safe
defaults), so pollLoop and cleanupLoop can assume valid durations.
In
`@console/workspaces/pages/llm-providers/src/subComponents/AddLLMProviderForm.tsx`:
- Around line 155-159: The effect can overwrite user edits due to a stale
closure over formData; update the effect (useEffect) to call setFormData with a
functional updater that examines the previous state (prev) and only sets
gatewayIds when prev.gatewayIds is empty, e.g. compute the new state from prev
and gateways[0].uuid rather than spreading the outer formData; keep the effect
dependency on gateways only. Target the setFormData/formData/gatewayIds logic
inside the existing useEffect.
---
Nitpick comments:
In `@agent-manager-service/eventhub/topic.go`:
- Around line 100-107: gatewayRegistry.getAll currently returns slices of
internal *gateway pointers which leaks mutable state (subscribers, knownVersion,
lastPolled, queuedLoggedAt) outside the registry lock; change getAll to avoid
returning internal pointers by either returning a slice of gateway value copies
(copy fields into new gateway structs) or by replacing getAll with a locked
iteration helper like gatewayRegistry.ranged(fn *gateway) that acquires
mu.RLock(), calls the provided callback for each gateway while still holding the
lock, and does not expose internal pointers after unlock; update callsites to
use the new copying or callback API so no internal gateway pointer escapes the
lock.
In
`@console/workspaces/pages/llm-providers/src/subComponents/AddLLMProviderForm.tsx`:
- Line 606: The comparison formData.gatewayIds?.length == 0 in
AddLLMProviderForm should use strict equality; replace the loose equality
operator (==) with the strict operator (===) so it reads
formData.gatewayIds?.length === 0 to match the other check and ensure type-safe
comparison for the gatewayIds length check.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 38fc3004-d729-4e0a-a5ef-818c5308ac7f
📒 Files selected for processing (14)
agent-manager-service/app/app.goagent-manager-service/controllers/websocket_controller.goagent-manager-service/db_migrations/021_add_eventhub_tables.goagent-manager-service/db_migrations/migration_list.goagent-manager-service/eventhub/sqlbackend.goagent-manager-service/eventhub/topic.goagent-manager-service/eventhub/types.goagent-manager-service/services/gateway_events_service.goagent-manager-service/websocket/connection.goagent-manager-service/websocket/manager.goagent-manager-service/wiring/params.goagent-manager-service/wiring/wire.goagent-manager-service/wiring/wire_gen.goconsole/workspaces/pages/llm-providers/src/subComponents/AddLLMProviderForm.tsx
✅ Files skipped from review due to trivial changes (1)
- agent-manager-service/db_migrations/021_add_eventhub_tables.go
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@agent-manager-service/eventhub/sqlbackend.go`:
- Around line 459-470: The callsite uses b.registry.get(state.GatewayID) to
obtain a *gateway and then calls pollGatewayWithState(gw, state) after the
registry lock has been released, creating a race with
addSubscriber/removeSubscriber; fix by ensuring a safe snapshot or lock is held
while using the gateway: either change b.registry.get to return a safe
copy/immutable snapshot of the gateway (so pollGatewayWithState can read without
locking) or acquire the registry's read lock around the call to
pollGatewayWithState and access the gateway while the lock is held. Update this
callsite (b.registry.get, pollGatewayWithState) to match the chosen approach and
ensure addSubscriber/removeSubscriber in topic.go are consistent with the new
get() behavior.
In `@agent-manager-service/eventhub/topic.go`:
- Around line 110-117: The get method currently takes and releases r.mu itself
which contradicts its docstring and returns a pointer that becomes unsafe;
remove the internal locking from gatewayRegistry.get (delete
r.mu.RLock()/RUnlock()) and update its docstring to clearly state callers MUST
hold r.mu (read or write) while using the returned *gateway; then audit
callsites (e.g., addSubscriber, removeSubscriber and any other callers) to
ensure they acquire the appropriate r.mu lock before calling get, or
alternatively change callers to use a new snapshot accessor if you prefer copy
semantics.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 72aee45e-29ff-4a5a-9732-5b135ba6bc9e
📒 Files selected for processing (3)
agent-manager-service/eventhub/sqlbackend.goagent-manager-service/eventhub/topic.goconsole/workspaces/pages/llm-providers/src/subComponents/AddLLMProviderForm.tsx
Resolves conflicts in wiring/wire.go and wiring/wire_gen.go where upstream added the EventHub provider (wso2#949) alongside this PR's instrumentation catalog providers. Both provider sets end up in the merged wire.Build calls; wire_gen.go was regenerated and formatted to match.
upstream/main brought in PR wso2#955 (CORS config for agent deployments) whose migration was numbered 021. The earlier upstream merge of wso2#949 (EventHub) also landed at 021. Both files declared migration021, breaking the build. Renamed 021_add_cors_allow_origins.go to 022_add_cors_allow_origins.go, updated the symbol from migration021 to migration022, bumped the struct's ID field to 22, added the new entry to migration_list.go, and bumped latestVersion from 21 to 22. EventHub keeps 021 since it landed on main first. Also picks up upstream's other PR-merge changes (amctl external agent create, gateway connectivity tweaks, etc) cleanly.
Purpose
In a multi-pod deployment,
GatewayEventsServicewas delivering events by callingmanager.GetConnections(gatewayID)directly against the local pod's in-memory WebSocket registry. Since each podmaintains its own isolated connection state, events were silently dropped whenever the pod handling the API request
was not the pod holding the gateway's WebSocket connection. This affected all gateway configuration change
notifications — LLM provider updates, LLM proxy changes, and API key provisioning.
Closes #948
Goals
Approach
Introduce a SQL-backed
EventHubthat acts as a shared event bus between pods:eventhub_eventsand atomically bumps a version ID ineventhub_gateway_stateseventhub_gateway_states; when a version change is detected, itfetches the new events and fans them out to local subscribers
forwardEventsgoroutine that reads from the subscription channel and writes to the WebSocket connectionforwardEventsgoroutine exitscleanly
User stories
Release note
Documentation
Training
Certification
Marketing
Automation tests
Security checks
Samples
Related PRs
Migrations (if applicable)
Test environment
Learning
Summary by CodeRabbit
New Features
Improvements
Bug Fixes