-
Notifications
You must be signed in to change notification settings - Fork 53
/
context.go
75 lines (64 loc) · 1.42 KB
/
context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package storage
import (
"context"
"errors"
"time"
"go.uber.org/atomic"
)
var (
ErrEventChannelClosed = errors.New("event channel closed")
ErrObjectDeleted = errors.New("object deleted")
)
type watchContext[T any] struct {
base context.Context
eventC <-chan WatchEvent[T]
done chan struct{}
err *atomic.Error
}
// Returns a context that listens on a watch event channel and closes its
// Done channel when the object is deleted. This context should have exclusive
// read access to the event channel to avoid missing events.
func NewWatchContext[T any](
base context.Context,
eventC <-chan WatchEvent[T],
) context.Context {
cc := &watchContext[T]{
base: base,
eventC: eventC,
done: make(chan struct{}),
err: atomic.NewError(nil),
}
go cc.watch()
return cc
}
func (cc *watchContext[T]) watch() {
defer close(cc.done)
for {
select {
case <-cc.base.Done():
cc.err.Store(cc.base.Err())
return
case event, ok := <-cc.eventC:
if !ok {
cc.err.Store(ErrEventChannelClosed)
return
}
if event.EventType == WatchEventDelete {
cc.err.Store(ErrObjectDeleted)
return
}
}
}
}
func (cc *watchContext[T]) Deadline() (time.Time, bool) {
return cc.base.Deadline()
}
func (cc *watchContext[T]) Done() <-chan struct{} {
return cc.done
}
func (cc *watchContext[T]) Err() error {
return cc.err.Load()
}
func (cc *watchContext[T]) Value(key any) any {
return cc.base.Value(key)
}