-
Notifications
You must be signed in to change notification settings - Fork 0
/
plug.go
138 lines (122 loc) · 2.88 KB
/
plug.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package plug
import (
"sync"
)
// Plug represents a synchronization primitive that holds and releases
// execution for other objects.
type Plug interface {
// Begins operation of the plug and unblocks WaitForStart().
// May be invoked multiple times but only the first invocation has
// an effect.
Start()
// Ends operation of the plug and unblocks WaitForStop()
// May be invoked multiple times but only the first invocation has
// an effect. Calling Stop() before Start() is undefined. An error
// may be returned with the stop.
Stop(err error)
// Blocks until Start() is invoked
WaitForStart()
// Blocks until Stop() is invoked
WaitForStop() error
// Returns true if Start() has been invoked
IsStarted() bool
}
// plug is the default implementation of Plug
type plug struct {
start sync.Once
stop sync.Once
startCh chan struct{}
stopCh chan error
}
// New returns a new plug that can begin in the Started state.
func New(started bool) Plug {
p := &plug{
startCh: make(chan struct{}),
stopCh: make(chan error, 1),
}
if started {
p.Start()
}
return p
}
func (p *plug) Start() {
p.start.Do(func() { close(p.startCh) })
}
func (p *plug) Stop(err error) {
p.stop.Do(func() {
if err != nil {
p.stopCh <- err
}
close(p.stopCh)
})
}
func (p *plug) IsStarted() bool {
select {
case <-p.startCh:
return true
default:
return false
}
}
func (p *plug) WaitForStart() {
<-p.startCh
}
func (p *plug) WaitForStop() error {
err, ok := <-p.stopCh
if !ok {
return nil
}
return err
}
// Leaser controls access to a lease
type Leaser interface {
// AcquireAndHold tries to acquire the lease and hold it until it expires, the lease is deleted,
// or we observe another party take the lease. The notify channel will be sent a nil value
// when the lease is held, and closed when the lease is lost. If an error is sent the lease
// is also considered lost.
AcquireAndHold(chan error)
Release()
}
// leased uses a Leaser to control Start and Stop on a Plug
type Leased struct {
Plug
leaser Leaser
}
var _ Plug = &Leased{}
// NewLeased creates a Plug that starts when a lease is acquired
// and stops when it is lost.
func NewLeased(leaser Leaser) *Leased {
return &Leased{
Plug: New(false),
leaser: leaser,
}
}
// Stop releases the acquired lease
func (l *Leased) Stop(err error) {
l.leaser.Release()
l.Plug.Stop(err)
}
// Run tries to acquire and hold a lease, invoking Start()
// when the lease is held and invoking Stop() when the lease
// is lost. If the lease was lost gracefully, nil is returned.
// If the lease was lost due to an error, the error is returned.
func (l *Leased) Run() error {
ch := make(chan error, 1)
go l.leaser.AcquireAndHold(ch)
var err error
defer l.Stop(err)
for {
var ok bool
err, ok = <-ch
if !ok {
return nil
}
if err != nil {
for range ch {
// read the rest of the channel
}
return err
}
l.Start()
}
}