-
Notifications
You must be signed in to change notification settings - Fork 53
/
condition_tracker.go
157 lines (139 loc) · 3.74 KB
/
condition_tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package health
import (
"fmt"
"sync"
"time"
gsync "github.com/kralicky/gpkg/sync"
"go.uber.org/atomic"
"go.uber.org/zap"
)
type ConditionStatus int32
const (
StatusPending ConditionStatus = iota
StatusFailure
StatusDisabled
)
func (s ConditionStatus) String() string {
switch s {
case StatusPending:
return "Pending"
case StatusFailure:
return "Failure"
case StatusDisabled:
return "Disabled"
}
return ""
}
func ConditionStatusStrToEnum(status string) (ConditionStatus, error) {
switch status {
case "Pending":
return StatusPending, nil
case "Failure":
return StatusFailure, nil
case "Disabled":
return StatusDisabled, nil
}
return 0, fmt.Errorf("unknown condition status %s", status)
}
const (
CondConfigSync = "Config Sync"
CondBackend = "Backend"
CondNodeDriver = "Node Driver"
)
type ConditionTracker interface {
Set(key string, value ConditionStatus, reason string)
Clear(key string, reason ...string)
List() []string
LastModified() time.Time
// Adds a listener that will be called whenever any condition is changed.
// The listener will be called in a separate goroutine. Ensure that the
// listener does not itself set or clear any conditions.
AddListener(listener any)
}
func NewDefaultConditionTracker(logger *zap.SugaredLogger) ConditionTracker {
ct := &defaultConditionTracker{
logger: logger,
modTime: atomic.NewTime(time.Now()),
}
ct.conditions.Store(CondConfigSync, StatusPending)
return ct
}
type defaultConditionTracker struct {
conditions gsync.Map[string, ConditionStatus]
logger *zap.SugaredLogger
modTime *atomic.Time
listenersMu sync.Mutex
listeners []func(key string, value *ConditionStatus, reason string)
}
func (ct *defaultConditionTracker) Set(key string, value ConditionStatus, reason string) {
lg := ct.logger.With(
"condition", key,
"status", value,
"reason", reason,
)
if v, ok := ct.conditions.Load(key); ok {
if v != value {
lg.Info("condition changed")
}
} else if v != value {
lg.Info("condition set")
ct.conditions.Store(key, value)
ct.modTime.Store(time.Now())
ct.notifyListeners(key, &value, reason)
}
}
func (ct *defaultConditionTracker) Clear(key string, reason ...string) {
lg := ct.logger.With(
"condition", key,
)
if len(reason) > 0 {
lg = lg.With("reason", reason[0])
}
if v, ok := ct.conditions.LoadAndDelete(key); ok {
ct.modTime.Store(time.Now())
lg.With(
"previous", v,
).Info("condition cleared")
ct.notifyListeners(key, nil, "")
}
}
func (ct *defaultConditionTracker) List() []string {
var conditions []string
ct.conditions.Range(func(key string, value ConditionStatus) bool {
conditions = append(conditions, fmt.Sprintf("%s %s", key, value))
return true
})
return conditions
}
func (ct *defaultConditionTracker) LastModified() time.Time {
return ct.modTime.Load()
}
func (ct *defaultConditionTracker) AddListener(listener any) {
ct.listenersMu.Lock()
defer ct.listenersMu.Unlock()
switch listener := listener.(type) {
case func():
ct.listeners = append(ct.listeners, func(_ string, _ *ConditionStatus, _ string) {
listener()
})
case func(string):
ct.listeners = append(ct.listeners, func(key string, _ *ConditionStatus, _ string) {
listener(key)
})
case func(string, *ConditionStatus):
ct.listeners = append(ct.listeners, func(key string, value *ConditionStatus, _ string) {
listener(key, value)
})
case func(string, *ConditionStatus, string):
ct.listeners = append(ct.listeners, listener)
default:
panic(fmt.Sprintf("unsupported listener type %T", listener))
}
}
func (ct *defaultConditionTracker) notifyListeners(key string, value *ConditionStatus, reason string) {
ct.listenersMu.Lock()
defer ct.listenersMu.Unlock()
for _, listener := range ct.listeners {
go listener(key, value, reason)
}
}