Skip to content

Commit

Permalink
store: merge ChangeSummaries when notify() calls are merged (#4305)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicks committed Mar 11, 2021
1 parent 74abfc7 commit 983de0e
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 14 deletions.
32 changes: 18 additions & 14 deletions internal/store/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ func (l *subscriberList) NotifyAll(ctx context.Context, store *Store, summary Ch

for _, s := range subscribers {
s := s
isPending := s.claimPending()
isPending := s.claimPending(summary)
if isPending {
SafeGo(store, func() {
s.notify(ctx, store, summary)
s.notify(ctx, store)
})
}
}
Expand All @@ -126,8 +126,8 @@ type subscriberEntry struct {

// At any given time, there are at most two goroutines
// notifying the subscriber: a pending goroutine and an active goroutine.
hasPending bool
hasActive bool
pendingChange *ChangeSummary
activeChange *ChangeSummary

// The active mutex is held by the goroutine currently notifying the
// subscriber. It may be held for a long time if the subscriber
Expand All @@ -141,38 +141,42 @@ type subscriberEntry struct {

// Returns true if this is the pending goroutine.
// Returns false to do nothing.
func (e *subscriberEntry) claimPending() bool {
// If there's a pending change, we merge the passed summary.
func (e *subscriberEntry) claimPending(s ChangeSummary) bool {
e.stateMu.Lock()
defer e.stateMu.Unlock()

if e.hasPending {
if e.pendingChange != nil {
e.pendingChange.Add(s)
return false
}
e.hasPending = true
e.pendingChange = &ChangeSummary{}
e.pendingChange.Add(s)
return true
}

func (e *subscriberEntry) movePendingToActive() {
func (e *subscriberEntry) movePendingToActive() *ChangeSummary {
e.stateMu.Lock()
defer e.stateMu.Unlock()

e.hasPending = false
e.hasActive = true
e.activeChange = e.pendingChange
e.pendingChange = nil
return e.activeChange
}

func (e *subscriberEntry) clearActive() {
e.stateMu.Lock()
defer e.stateMu.Unlock()

e.hasActive = false
e.activeChange = nil
}

func (e *subscriberEntry) notify(ctx context.Context, store *Store, summary ChangeSummary) {
func (e *subscriberEntry) notify(ctx context.Context, store *Store) {
e.activeMu.Lock()
defer e.activeMu.Unlock()

e.movePendingToActive()
e.subscriber.OnChange(ctx, store, summary)
activeChange := e.movePendingToActive()
e.subscriber.OnChange(ctx, store, *activeChange)
e.clearActive()
}

Expand Down
26 changes: 26 additions & 0 deletions internal/store/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,32 @@ func TestSubscriberInterleavedCalls(t *testing.T) {
}
}

func TestSubscriberInterleavedCallsSummary(t *testing.T) {
st, _ := NewStoreWithFakeReducer()
ctx := context.Background()
s := newFakeSubscriber()
require.NoError(t, st.AddSubscriber(ctx, s))

st.NotifySubscribers(ctx, ChangeSummary{CmdSpecs: map[string]bool{"spec-1": true}})
call := <-s.onChange
assert.Equal(t, call.summary, ChangeSummary{CmdSpecs: map[string]bool{"spec-1": true}})

st.NotifySubscribers(ctx, ChangeSummary{CmdSpecs: map[string]bool{"spec-2": true}})
st.NotifySubscribers(ctx, ChangeSummary{CmdSpecs: map[string]bool{"spec-3": true}})
time.Sleep(10 * time.Millisecond)
close(call.done)

call = <-s.onChange
assert.Equal(t, call.summary, ChangeSummary{CmdSpecs: map[string]bool{"spec-2": true, "spec-3": true}})
close(call.done)

select {
case <-s.onChange:
t.Fatal("Expected no more onChange calls")
case <-time.After(10 * time.Millisecond):
}
}

func TestAddSubscriberToAlreadySetUpListCallsSetUp(t *testing.T) {
st, _ := NewStoreWithFakeReducer()
ctx := context.Background()
Expand Down
13 changes: 13 additions & 0 deletions internal/store/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ func (s ChangeSummary) IsLogOnly() bool {
return cmp.Equal(s, ChangeSummary{Log: true})
}

func (s *ChangeSummary) Add(other ChangeSummary) {
s.Legacy = s.Legacy || other.Legacy
s.Log = s.Log || other.Log
if len(other.CmdSpecs) > 0 {
if s.CmdSpecs == nil {
s.CmdSpecs = make(map[string]bool)
}
for k, v := range other.CmdSpecs {
s.CmdSpecs[k] = v
}
}
}

func LegacyChangeSummary() ChangeSummary {
return ChangeSummary{Legacy: true}
}
Expand Down

0 comments on commit 983de0e

Please sign in to comment.