-
Notifications
You must be signed in to change notification settings - Fork 14
/
signal.go
105 lines (89 loc) · 2.62 KB
/
signal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package util
// Signalling primitives: single-signal sender/receiver pair and sync.Cond-ish exposed over a
// channel instead
import (
"sync"
)
func NewSingleSignalPair[T any]() (SignalSender[T], SignalReceiver[T]) {
sigCh := make(chan T, 1)
once := &sync.Once{}
closeSigCh := func() { once.Do(func() { close(sigCh) }) }
return SignalSender[T]{
send: func(data T) {
once.Do(func() {
sigCh <- data
close(sigCh)
})
},
}, SignalReceiver[T]{sigCh: sigCh, closeSigCh: closeSigCh}
}
type SignalSender[T any] struct {
send func(T)
}
type SignalReceiver[T any] struct {
sigCh chan T
closeSigCh func()
}
func (s SignalSender[T]) Send(data T) {
s.send(data)
}
func (s SignalReceiver[T]) Recv() <-chan T {
return s.sigCh
}
func (s SignalReceiver[T]) Close() {
s.closeSigCh()
}
// NewCondChannelPair creates a sender/receiver pair for a sync.Cond-like interface
//
// The differences from sync.Cond are that receiving is exposed through a channel (so it can be
// select-ed) and there is no equivalent to (*Cond).Broadcast()
func NewCondChannelPair() (CondChannelSender, CondChannelReceiver) {
ch := make(chan struct{}, 1)
return CondChannelSender{ch: ch}, CondChannelReceiver{ch: ch}
}
// CondChannelSender is the sending half of a sync.Cond-like interface
type CondChannelSender struct {
ch chan struct{}
}
// CondChannelReceiver is the receiving half of a sync.Cond-like interface
type CondChannelReceiver struct {
ch chan struct{}
}
// Send performs a non-blocking notify of the associated CondChannelReceiver
//
// If there is currently a receiver waiting via Recv, then this will immediately wake them.
// Otherwise, the next receive on the channel returned by Recv will complete immediately.
func (c *CondChannelSender) Send() {
select {
case c.ch <- struct{}{}:
default:
}
}
// Unsend cancels an existing signal that has been sent but not yet received.
//
// It returns whether there was a signal to be cancelled.
func (c *CondChannelSender) Unsend() bool {
select {
case <-c.ch:
return true
default:
return false
}
}
// Consume removes any existing signal created by Send, requiring an additional Send to be made
// before the receiving on Recv will unblock
//
// This method is non-blocking.
func (c *CondChannelReceiver) Consume() {
select {
case <-c.ch:
default:
}
}
// Recv returns a channel for which receiving will complete either (a) immediately, if Send has been
// called without Consume or another receive since; or (b) as soon as Send is next called
//
// This method is non-blocking but receiving on the returned channel may block.
func (c *CondChannelReceiver) Recv() <-chan struct{} {
return c.ch
}