-
Notifications
You must be signed in to change notification settings - Fork 3
/
value.go
167 lines (142 loc) · 4.1 KB
/
value.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package resource
import (
"context"
"errors"
"log"
"sync"
"time"
"google.golang.org/protobuf/proto"
"github.com/smart-core-os/sc-golang/internal/minibus"
)
// Value represents a simple state field in an object. Think Temperature or Volume or Occupancy. Use a Value to
// gain thread safe reads/writes that also support FieldMasks and update notifications.
type Value struct {
*config
mu sync.RWMutex
value proto.Message
changeTime time.Time
bus minibus.Bus
}
func NewValue(opts ...Option) *Value {
c := computeConfig(opts...)
res := &Value{
config: c,
}
res.value = c.initialValue
res.changeTime = c.clock.Now()
c.initialValue = nil // clear so it can be GC'd when the value changes
return res
}
func (r *Value) Get(opts ...ReadOption) proto.Message {
return r.get(ComputeReadConfig(opts...))
}
func (r *Value) get(req *ReadRequest) proto.Message {
r.mu.RLock()
defer r.mu.RUnlock()
return req.FilterClone(r.value)
}
// Set updates the current value of this Value with the given value.
// Returns the new value.
// Provide WriteOption to control masks and other variables during the update.
func (r *Value) Set(value proto.Message, opts ...WriteOption) (proto.Message, error) {
return r.set(value, ComputeWriteConfig(opts...))
}
func (r *Value) set(value proto.Message, request WriteRequest) (proto.Message, error) {
writer := request.fieldUpdater(r.writableFields)
if err := writer.Validate(value); err != nil {
return nil, err
}
disarm := timeoutAlarm(time.Second, "GetAndUpdate took too long")
_, newValue, err := GetAndUpdate(
&r.mu,
func() (proto.Message, error) {
return r.value, nil
},
request.changeFn(writer, value),
func(message proto.Message) {
r.value = message
r.changeTime = request.updateTime(r.clock)
},
)
disarm()
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5)
defer cancel()
r.bus.Send(ctx, &ValueChange{
Value: newValue,
ChangeTime: request.updateTime(r.clock),
})
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return nil, errors.New("bus.Send blocked for too long")
}
return newValue, err
}
// Pull emits a ValueChange on the returned chan whenever the underlying value changes.
// The changes emitted can be adjusted using WithEquivalence.
// The returned chan will be closed when no more events will be emitted, either because ctx was cancelled or for other
// reasons.
func (r *Value) Pull(ctx context.Context, opts ...ReadOption) <-chan *ValueChange {
readConfig := ComputeReadConfig(opts...)
filter := readConfig.ResponseFilter()
on, currentValue, changeTime := r.onUpdate(ctx, readConfig)
typedEvents := make(chan *ValueChange)
go func() {
defer close(typedEvents)
if currentValue != nil {
change := &ValueChange{Value: currentValue, ChangeTime: changeTime, SeedValue: true, LastSeedValue: true}
change = change.filter(filter)
select {
case <-ctx.Done():
return // give up sending
case typedEvents <- change:
}
}
last := currentValue
for event := range on {
change := event.(*ValueChange).filter(filter)
if r.equivalence != nil && r.equivalence.Compare(last, change.Value) {
continue
}
last = change.Value
select {
case <-ctx.Done():
return // give up sending
case typedEvents <- change:
}
}
}()
return typedEvents
}
func (r *Value) onUpdate(ctx context.Context, config *ReadRequest) (<-chan any, proto.Message, time.Time) {
var (
value proto.Message
changeTime time.Time
)
if !config.UpdatesOnly {
r.mu.RLock()
defer r.mu.RUnlock()
value = r.value
changeTime = r.changeTime
}
ch := r.bus.Listen(ctx)
if !config.Backpressure {
ch = minibus.DropExcess(ch)
}
return ch, value, changeTime
}
func timeoutAlarm(duration time.Duration, fmt string, args ...any) (disarm func()) {
ctx, cancel := context.WithTimeout(context.Background(), duration)
go func() {
<-ctx.Done()
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
log.Printf(fmt, args...)
}
}()
return cancel
}
// Clock returns the clock used by this resource for reporting time.
func (r *Value) Clock() Clock {
return r.clock
}