eventservice: event service use server level tz to avoid unncessary call and verbose log#4836
Conversation
📝 WalkthroughWalkthroughChangefeed filter handling and timezone responsibilities were refactored: DispatcherInfo now exposes filter config, eventService centralizes timezone, and eventBroker/getOrSetChangefeedStatus create or reuse a shared filter via shared filter storage and attach it to changefeedStatus. Tests updated to use the new APIs. Changes
Sequence Diagram(s)sequenceDiagram
participant Dispatcher as DispatcherInfo
participant Broker as eventBroker
participant Storage as SharedFilterStorage
participant Status as changefeedStatus
Dispatcher->>Broker: addDispatcher(info)
Broker->>Broker: getOrSetChangefeedStatus(info)
alt status exists
Broker-->>Dispatcher: return existing changefeedStatus
else new status
Broker->>Storage: GetOrSetFilter(changefeedID, info.GetFilterConfig(), Broker.timezone)
Storage-->>Broker: filter (or error -> panic)
Broker->>Status: create changefeedStatus{filter: filter, ...}
Broker->>Broker: LoadOrStore(status)
Broker-->>Dispatcher: return new changefeedStatus
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request refactors the event service to centralize timezone management and move filter creation into the dispatcher initialization process. The DispatcherInfo interface was updated to provide filter configurations instead of pre-instantiated filters, and the eventService now utilizes a global timezone setting. Feedback was provided regarding a potential nil pointer dereference when accessing the filter configuration in newDispatcherStat, which could lead to a panic if the configuration is not explicitly checked before use.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
pkg/eventservice/event_broker.go (1)
1577-1588: Potential race condition ingetOrSetChangefeedStatus- mitigated by current usageThe Load-then-Store pattern here is not atomic. If two goroutines call this concurrently for the same
changefeedID, both could create newchangefeedStatusinstances, and one would overwrite the other, potentially losing registered dispatchers.Currently this appears safe because callers (
addDispatcher,resetDispatcher) are invoked sequentially fromeventService.Run's select loop. However, consider usingsync.Map.LoadOrStorefor defensive thread-safety:💡 Suggested defensive improvement
func (c *eventBroker) getOrSetChangefeedStatus(changefeedID common.ChangeFeedID, syncPointInterval time.Duration) *changefeedStatus { - stat, ok := c.changefeedMap.Load(changefeedID) - if !ok { - stat = newChangefeedStatus(changefeedID, syncPointInterval) - stat.(*changefeedStatus).timezone = c.timezone - log.Info("new changefeed status", zap.Stringer("changefeedID", changefeedID)) - c.changefeedMap.Store(changefeedID, stat) + newStat := newChangefeedStatus(changefeedID, syncPointInterval) + newStat.timezone = c.timezone + stat, loaded := c.changefeedMap.LoadOrStore(changefeedID, newStat) + if !loaded { + log.Info("new changefeed status", zap.Stringer("changefeedID", changefeedID)) metrics.EventServiceScanWindowBaseTsGaugeVec.WithLabelValues(changefeedID.String()).Set(0) metrics.EventServiceScanWindowIntervalGaugeVec.WithLabelValues(changefeedID.String()).Set(defaultScanInterval.Seconds()) } return stat.(*changefeedStatus) }Note: This creates a
changefeedStatuson every call even if one exists, but it ensures atomicity. Alternatively, use double-checked locking if allocation cost is a concern.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/eventservice/event_broker.go` around lines 1577 - 1588, The Load-then-Store pattern in getOrSetChangefeedStatus can race; change the implementation to use changefeedMap.LoadOrStore to atomically obtain or insert a *changefeedStatus (call newChangefeedStatus only for the candidate), then if the returned loaded flag is false (i.e., we stored the new one) set the timezone on that instance, log and call metrics.EventServiceScanWindowBaseTsGaugeVec/...Set(...) and return the stored value; if loaded is true discard the candidate and return the existing value. This preserves current behavior but makes getOrSetChangefeedStatus atomic and avoids overwriting existing changefeedStatus (symbols: getOrSetChangefeedStatus, changefeedMap, newChangefeedStatus, changefeedStatus, metrics.EventServiceScanWindowBaseTsGaugeVec).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@pkg/eventservice/event_broker.go`:
- Around line 1577-1588: The Load-then-Store pattern in getOrSetChangefeedStatus
can race; change the implementation to use changefeedMap.LoadOrStore to
atomically obtain or insert a *changefeedStatus (call newChangefeedStatus only
for the candidate), then if the returned loaded flag is false (i.e., we stored
the new one) set the timezone on that instance, log and call
metrics.EventServiceScanWindowBaseTsGaugeVec/...Set(...) and return the stored
value; if loaded is true discard the candidate and return the existing value.
This preserves current behavior but makes getOrSetChangefeedStatus atomic and
avoids overwriting existing changefeedStatus (symbols: getOrSetChangefeedStatus,
changefeedMap, newChangefeedStatus, changefeedStatus,
metrics.EventServiceScanWindowBaseTsGaugeVec).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: fc7b7e96-124b-455b-a856-ff8d31dce4aa
📒 Files selected for processing (5)
pkg/eventservice/dispatcher_stat.gopkg/eventservice/event_broker.gopkg/eventservice/event_service.gopkg/eventservice/event_service_test.gopkg/messaging/message.go
|
/retest |
|
/test all |
|
/test all |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
pkg/eventservice/event_service_test.go (1)
502-537: Unify these changefeed-status test helpers.
newChangefeedStatusForTesthardcodes UTC, whileaddChangefeedStatusToBrokerForTestcreates a partially initializedchangefeedStatusand relies onmustInitChangefeedStatusFilterlater. That makes it easy for future tests to miss the broker invariant thatstatus.filteris preinitialized from the broker timezone. A single helper that always takes the intended timezone and fully initializesstatuswould keep tests closer to production behavior.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/eventservice/event_service_test.go` around lines 502 - 537, Tests use multiple helpers that differ in timezone handling and initialization (newChangefeedStatusForTest, addChangefeedStatusToBrokerForTest, mustInitChangefeedStatusFilter), which risks leaving changefeedStatus.filter uninitialized; replace them with a single helper (e.g., newInitializedChangefeedStatusForTest) that accepts DispatcherInfo, timezone string, and an eventBroker pointer, creates a fully initialized changefeedStatus via newChangefeedStatus(changefeedID, syncPointInterval), stores it into broker.changefeedMap, and sets status.filter using newChangefeedFilterForTest/GetOrSetFilter so the stored status always has filter populated; update tests to call this new helper and remove the partial helpers to enforce the broker invariant.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/eventservice/event_broker.go`:
- Around line 1588-1595: The code currently panics on filter init failure;
replace the log.Panic call in the block that calls
filter.GetSharedFilterStorage().GetOrSetFilter(changefeedID,
info.GetFilterConfig(), c.timezone) with returning the error up the call chain
(wrap with context mentioning changefeedID and filter config), and update
callers of this site (the functions addDispatcher and resetDispatcher) to
propagate and return that error instead of letting the process crash; ensure
changefeedFilter is only used when err == nil and that the returned error is
handled by the caller paths that previously assumed success.
- Around line 1578-1606: SharedFilterStorage currently lacks a removal method,
causing filters keyed by changefeedID to leak after changefeed teardown; add a
DeleteFilter(changefeedID) method to the SharedFilterStorage implementation
(complementing GetOrSetFilter) that safely removes the filter from the internal
map, and then call SharedFilterStorage.DeleteFilter(changefeedID) from the same
teardown locations where changefeedMap.Delete(changefeedID) is invoked (the code
paths that remove the changefeedStatus and metrics) so the filter entry is
removed atomically/under the same synchronization to prevent memory leaks.
---
Nitpick comments:
In `@pkg/eventservice/event_service_test.go`:
- Around line 502-537: Tests use multiple helpers that differ in timezone
handling and initialization (newChangefeedStatusForTest,
addChangefeedStatusToBrokerForTest, mustInitChangefeedStatusFilter), which risks
leaving changefeedStatus.filter uninitialized; replace them with a single helper
(e.g., newInitializedChangefeedStatusForTest) that accepts DispatcherInfo,
timezone string, and an eventBroker pointer, creates a fully initialized
changefeedStatus via newChangefeedStatus(changefeedID, syncPointInterval),
stores it into broker.changefeedMap, and sets status.filter using
newChangefeedFilterForTest/GetOrSetFilter so the stored status always has filter
populated; update tests to call this new helper and remove the partial helpers
to enforce the broker invariant.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 386da5e3-94a9-4ce6-9883-4180e8a1dd73
📒 Files selected for processing (7)
pkg/eventservice/dispatcher_stat.gopkg/eventservice/dispatcher_stat_test.gopkg/eventservice/event_broker.gopkg/eventservice/event_broker_test.gopkg/eventservice/event_scanner_test.gopkg/eventservice/event_service_test.gopkg/eventservice/metrics_collector_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/eventservice/dispatcher_stat.go
|
/test all |
|
Please add an integration test to cover this situation |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: asddongmen, wk989898 The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
|
/retest |
What problem does this PR solve?
Issue Number: close #4843
This PR is helpful to remove the call to the GetTimezone method, which print a log each time on the dispatcher request, such as add dispatcher
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Refactor
Tests