-
Notifications
You must be signed in to change notification settings - Fork 53
/
notifier.go
116 lines (104 loc) · 2.39 KB
/
notifier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package notifier
import (
"context"
"sync"
"github.com/google/go-cmp/cmp"
"golang.org/x/exp/slices"
)
type test struct {
}
func (t test) Clone() test {
return t
}
func CloneList[T Clonable[T]](list []T) []T {
c := make([]T, 0, len(list))
for _, t := range list {
c = append(c, t.Clone())
}
return c
}
type AliasNotifer[T Clonable[T]] updateNotifier[T]
type updateNotifier[T Clonable[T]] struct {
finder Finder[T]
updateChannels []chan []T
channelsMu *sync.Mutex
startCond *sync.Cond
latest []T
latestMu sync.Mutex
}
var _ UpdateNotifier[test] = (*updateNotifier[test])(nil)
func NewUpdateNotifier[T Clonable[T]](finder Finder[T]) UpdateNotifier[T] {
mu := &sync.Mutex{}
return &updateNotifier[T]{
finder: finder,
updateChannels: []chan []T{},
channelsMu: mu,
startCond: sync.NewCond(mu),
latest: []T{},
}
}
func (u *updateNotifier[T]) NotifyC(ctx context.Context) <-chan []T {
u.channelsMu.Lock()
defer u.channelsMu.Unlock()
updateC := make(chan []T, 3)
u.updateChannels = append(u.updateChannels, updateC)
if len(u.updateChannels) == 1 {
// If this was the first channel to be added, unlock any calls to
// fetchRules which might be waiting.
u.startCond.Broadcast()
}
go func() {
<-ctx.Done()
u.channelsMu.Lock()
defer u.channelsMu.Unlock()
// Remove the channel from the list
for i, c := range u.updateChannels {
if c == updateC {
u.updateChannels = slices.Delete(u.updateChannels, i, i+1)
break
}
}
}()
return updateC
}
func (u *updateNotifier[T]) Refresh(ctx context.Context) {
u.channelsMu.Lock()
for len(u.updateChannels) == 0 {
// If there are no channels yet, wait until one is added.
u.startCond.Wait()
}
u.channelsMu.Unlock()
groups, err := u.finder.Find(ctx)
if err != nil {
return
}
modified := false
u.latestMu.Lock()
defer u.latestMu.Unlock()
// compare the lengths as a quick preliminary check
if len(groups) == len(u.latest) {
// If the lengths are the same, compare the contents of each rule
for i := 0; i < len(groups); i++ {
if !cmp.Equal(groups[i], u.latest[i]) {
modified = true
break
}
}
} else {
modified = true
}
if !modified {
return
}
u.latest = groups
u.channelsMu.Lock()
cloned := CloneList(u.latest)
for _, c := range u.updateChannels {
select {
case c <- cloned:
cloned = CloneList(u.latest)
default:
}
}
u.channelsMu.Unlock()
}