Conversation
📝 WalkthroughWalkthroughThis pull request refactors the watch mechanism across the storage system, removing error-driven watch closure ( Changes
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
f3edc39 to
0905973
Compare
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
fa785be to
64a9954
Compare
0905973 to
fa49e10
Compare
| } | ||
|
|
||
| c.watchOp.Close() | ||
| c.done <- struct{}{} |
There was a problem hiding this comment.
Instead of c.done <- struct{}{}, I think it should be close(c.done)
64a9954 to
2db5030
Compare
fa49e10 to
f7cad02
Compare
Up to standards ✅🟢 Issues
|
| Metric | Results |
|---|---|
| Complexity | 4 |
| Duplication | 0 |
TIP This summary will be updated as you push new changes. Give us feedback
Makes several substantial improvements to the `storage.Watch` operations: - `Watch` now does a synchronous `Get` operation before the starting the watch to get the current value(s) and to get the start revision for the watch. Callers no longer have to do this operation themselves. - `Watch` now restarts itself automatically using the most recently- fetched revision. We were previously repeating this restart logic in every caller. Restarts are rate-limited to 1 per second. - `Watch` reports errors over an error channel, matching the convention that we've established everywhere. We had an unused `Until` operation and I opted to remove it rather than update it for these changes.
f7cad02 to
8fbc079
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (2)
server/internal/storage/watch_test.go (1)
86-102: Consider using unique key for test isolation.Lines 88 and 97 use a hardcoded
"watch-err"key while other tests useuuid.NewString(). For consistency and to prevent potential interference if tests are run with-parallelin the future, consider using a unique key.♻️ Suggested change for consistency
t.Run("delivers error from handler", func(t *testing.T) { ctx := t.Context() - watch := storage.NewWatchOp[*TestValue](client, "watch-err") + key := uuid.NewString() + watch := storage.NewWatchOp[*TestValue](client, key) sentinel := assert.AnError handler := func(e *storage.Event[*TestValue]) error { return sentinel } require.NoError(t, watch.Watch(ctx, handler)) t.Cleanup(watch.Close) - err := storage.NewCreateOp(client, "watch-err", &TestValue{SomeField: "v"}). + err := storage.NewCreateOp(client, key, &TestValue{SomeField: "v"}). Exec(ctx) require.NoError(t, err) require.ErrorIs(t, <-watch.Error(), sentinel) })🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@server/internal/storage/watch_test.go` around lines 86 - 102, The test uses a hardcoded key "watch-err" for both NewWatchOp and NewCreateOp which can cause interference when tests run in parallel; replace the literal with a unique key (e.g., k := uuid.NewString()) and use that variable in storage.NewWatchOp[*TestValue](client, k) and storage.NewCreateOp(client, k, ...) so NewWatchOp, watch, and NewCreateOp all reference the same per-test unique key for isolation.server/internal/storage/watch.go (1)
50-70: Handler invoked while holding mutex during initial load.The
load()method callshandle()at line 62 while holdingo.mu. If the handler ever needs to call anywatchOpmethod (e.g.,Close()), this would deadlock. The current callers don't do this, but it's worth documenting or restructuring.Consider releasing the lock before invoking handlers, or document that handlers must not call back into the watch operation.
♻️ Optional: Release lock before calling handlers
func (o *watchOp[V]) load(ctx context.Context, handle func(e *Event[V]) error) error { o.mu.Lock() - defer o.mu.Unlock() resp, err := o.client.Get(ctx, o.key, o.options...) if err != nil { + o.mu.Unlock() return fmt.Errorf("failed to get initial items for watch: %w", err) } + o.revision = resp.Header.Revision + o.mu.Unlock() + for _, kv := range resp.Kvs { if err := handle(convertKVToEvent[V](kv)); err != nil { return err } } - o.revision = resp.Header.Revision - return nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@server/internal/storage/watch.go` around lines 50 - 70, The load() method currently holds o.mu while calling handle(), risking deadlock if handlers call back into watchOp; to fix, while holding o.mu build a slice of events by converting resp.Kvs with convertKVToEvent[V], set o.revision = resp.Header.Revision, then release o.mu and iterate the collected events calling handle(event) (returning any handler error). This ensures o.revision is updated under the lock but handlers run without holding o.mu; update the load() implementation accordingly (watchOp.load, o.mu, o.revision, convertKVToEvent[V], handle).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@server/internal/storage/watch_test.go`:
- Around line 86-102: The test uses a hardcoded key "watch-err" for both
NewWatchOp and NewCreateOp which can cause interference when tests run in
parallel; replace the literal with a unique key (e.g., k := uuid.NewString())
and use that variable in storage.NewWatchOp[*TestValue](client, k) and
storage.NewCreateOp(client, k, ...) so NewWatchOp, watch, and NewCreateOp all
reference the same per-test unique key for isolation.
In `@server/internal/storage/watch.go`:
- Around line 50-70: The load() method currently holds o.mu while calling
handle(), risking deadlock if handlers call back into watchOp; to fix, while
holding o.mu build a slice of events by converting resp.Kvs with
convertKVToEvent[V], set o.revision = resp.Header.Revision, then release o.mu
and iterate the collected events calling handle(event) (returning any handler
error). This ensures o.revision is updated under the lock but handlers run
without holding o.mu; update the load() implementation accordingly
(watchOp.load, o.mu, o.revision, convertKVToEvent[V], handle).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e7f1fc90-45d4-468f-93e7-db92ae3c1b2e
📒 Files selected for processing (10)
go.modserver/internal/election/candidate.goserver/internal/election/candidate_test.goserver/internal/migrate/runner.goserver/internal/scheduler/service.goserver/internal/storage/errors.goserver/internal/storage/interface.goserver/internal/storage/watch.goserver/internal/storage/watch_test.goserver/internal/testutils/logger.go
💤 Files with no reviewable changes (1)
- server/internal/storage/errors.go
Summary
Makes several substantial improvements to the
storage.Watchoperations:Watchnow does a synchronousGetoperation before starting the watch to get the current value(s) and to get the start revision for the watch. Callers no longer have to do this operation themselves.Watchnow restarts itself automatically using the most recently-fetched revision. We were previously repeating this restart logic in every caller. Restarts are rate-limited to 1 per second.Watchreports errors over an error channel, matching the convention that we've established everywhere.We had an unused
Untiloperation, so I opted to remove it rather than update it to reflect these changes.Testing
From the user's perspective, everything should function the same as it did before. The changes in this PR were more focused on improving the watch's usability and edge-case behavior, such as when a
Putoccurs between the initialGetand theWatchoperations, or the recovery behavior after an outage.