/
flag_tracker_impl.go
104 lines (92 loc) · 3.61 KB
/
flag_tracker_impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package internal
import (
"sync"
"gopkg.in/launchdarkly/go-sdk-common.v2/lduser"
"gopkg.in/launchdarkly/go-sdk-common.v2/ldvalue"
"gopkg.in/launchdarkly/go-server-sdk.v5/interfaces"
)
// flagTrackerImpl is the internal implementation of FlagTracker. It's not exported because
// the rest of the SDK code only interacts with the public interface.
//
// The underlying FlagChangeEventBroadcaster receives notifications of flag changes in general.
// When a value change listener is created with AddFlagValueChangeListener, this is implemented
// by creating a regular FlagChangeEvent channel and starting a goroutine that reads it and posts
// events as appropriate to a FlagValueChangeEvent channel; the flagTrackerImpl maintains its own
// mapping of this to the underlying channel which is necessary for unregistering it.
type flagTrackerImpl struct {
broadcaster *FlagChangeEventBroadcaster
evaluateFn func(string, lduser.User, ldvalue.Value) ldvalue.Value
valueChangeSubscriptions map[<-chan interfaces.FlagValueChangeEvent]<-chan interfaces.FlagChangeEvent
lock sync.Mutex
}
// NewFlagTrackerImpl creates the internal implementation of FlagTracker.
func NewFlagTrackerImpl(
broadcaster *FlagChangeEventBroadcaster,
evaluateFn func(flagKey string, user lduser.User, defaultValue ldvalue.Value) ldvalue.Value,
) interfaces.FlagTracker {
return &flagTrackerImpl{
broadcaster: broadcaster,
evaluateFn: evaluateFn,
valueChangeSubscriptions: make(map[<-chan interfaces.FlagValueChangeEvent]<-chan interfaces.FlagChangeEvent),
}
}
// AddFlagChangeListener is a standard method of FlagTracker.
func (f *flagTrackerImpl) AddFlagChangeListener() <-chan interfaces.FlagChangeEvent {
return f.broadcaster.AddListener()
}
// RemoveFlagChangeListener is a standard method of FlagTracker.
func (f *flagTrackerImpl) RemoveFlagChangeListener(listener <-chan interfaces.FlagChangeEvent) {
f.broadcaster.RemoveListener(listener)
}
// AddFlagValueChangeListener is a standard method of FlagTracker.
func (f *flagTrackerImpl) AddFlagValueChangeListener(
flagKey string,
user lduser.User,
defaultValue ldvalue.Value,
) <-chan interfaces.FlagValueChangeEvent {
valueCh := make(chan interfaces.FlagValueChangeEvent, subscriberChannelBufferLength)
flagCh := f.broadcaster.AddListener()
go runValueChangeListener(flagCh, valueCh, f.evaluateFn, flagKey, user, defaultValue)
f.lock.Lock()
f.valueChangeSubscriptions[valueCh] = flagCh
f.lock.Unlock()
return valueCh
}
// RemoveFlagValueChangeListener is a standard method of FlagTracker.
func (f *flagTrackerImpl) RemoveFlagValueChangeListener(listener <-chan interfaces.FlagValueChangeEvent) {
f.lock.Lock()
flagCh, ok := f.valueChangeSubscriptions[listener]
delete(f.valueChangeSubscriptions, listener)
f.lock.Unlock()
if ok {
f.broadcaster.RemoveListener(flagCh)
}
}
func runValueChangeListener(
flagCh <-chan interfaces.FlagChangeEvent,
valueCh chan<- interfaces.FlagValueChangeEvent,
evaluateFn func(flagKey string, user lduser.User, defaultValue ldvalue.Value) ldvalue.Value,
flagKey string,
user lduser.User,
defaultValue ldvalue.Value,
) {
currentValue := evaluateFn(flagKey, user, defaultValue)
for {
flagChange, ok := <-flagCh
if !ok {
// the underlying subscription has been unregistered
close(valueCh)
return
}
if flagChange.Key != flagKey {
continue
}
newValue := evaluateFn(flagKey, user, defaultValue)
if newValue.Equal(currentValue) {
continue
}
event := interfaces.FlagValueChangeEvent{Key: flagKey, OldValue: currentValue, NewValue: newValue}
currentValue = newValue
valueCh <- event
}
}