-
Notifications
You must be signed in to change notification settings - Fork 50
/
coffin.go
217 lines (190 loc) · 6.26 KB
/
coffin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
package coffin
import (
"context"
"sync/atomic"
"github.com/pkg/errors"
"gopkg.in/tomb.v2"
)
type Coffin interface {
// Alive returns true if the coffin is not in a dying or dead state.
Alive() bool
// Context returns a context that is a copy of the provided parent context with
// a replaced Done channel that is closed when either the coffin is dying or the
// parent is cancelled.
//
// If parent is nil, it defaults to the parent provided via WithContext, or an
// empty background parent if the coffin wasn't created via WithContext.
Context(parent context.Context) context.Context
// Dead returns the channel that can be used to wait until
// all goroutines have finished running.
Dead() <-chan struct{}
// Dying returns the channel that can be used to wait until
// t.Kill is called.
Dying() <-chan struct{}
Err() (reason error)
// Go runs f in a new goroutine and tracks its termination.
//
// If f returns a non-nil error, t.Kill is called with that
// error as the death reason parameter.
//
// It is f's responsibility to monitor the coffin and return
// appropriately once it is in a dying state.
//
// It is safe for the f function to call the Go method again
// to create additional tracked goroutines. Once all tracked
// goroutines return, the Dead channel is closed and the
// Wait method unblocks and returns the death reason.
//
// Calling the Go method after all tracked goroutines return
// causes a runtime panic. For that reason, calling the Go
// method a second time out of a tracked goroutine is unsafe.
Go(f func() error)
// Gof is like Go, but wraps the returned error with the given
// name and args
Gof(f func() error, name string, args ...interface{})
// GoWithContext is like Go, but passes the given context to f
GoWithContext(ctx context.Context, f func(ctx context.Context) error)
// GoWithContextf is like Gof, but passes the given context to f
GoWithContextf(ctx context.Context, f func(ctx context.Context) error, msg string, args ...interface{})
// Kill puts the coffin in a dying state for the given reason,
// closes the Dying channel, and sets Alive to false.
//
// Although Kill may be called multiple times, only the first
// non-nil error is recorded as the death reason.
//
// If reason is ErrDying, the previous reason isn't replaced
// even if nil. It's a runtime error to call Kill with ErrDying
// if t is not in a dying state.
Kill(reason error)
// Killf calls the Kill method with an error built providing the received
// parameters to fmt.Errorf. The generated error is also returned.
Killf(f string, a ...interface{}) error
// Wait blocks until all goroutines have finished running, and
// then returns the reason for their death.
//
// If you never spawned a task using one of the Go function, Wait
// returns nil.
Wait() error
// Returns the number of started go routines in this coffin.
Started() int
// Returns the number of currently running go routines in this coffin.
Running() int
// Returns the number of go routines that have already returned in this coffin.
Terminated() int
}
type coffin struct {
// we MUST represent this as a ptr as tomb. Tomb contains a mutex that we are not allowed to copy!
tomb *tomb.Tomb
// number of started go routines
started int32
// number of terminated go routines
terminated int32
}
func New() Coffin {
return &coffin{
tomb: new(tomb.Tomb),
started: 0,
terminated: 0,
}
}
// WithContext returns a new coffin that is killed when the provided parent
// context is canceled, and a copy of parent with a replaced Done channel
// that is closed when either the coffin is dying or the parent is canceled.
// The returned context may also be obtained via the coffin's Context method.
//
// If the context is canceled, the coffin is killed with the error from the context.
// Thus, you will normally get a context.Canceled error from a coffin you stop like this.
func WithContext(parent context.Context) (Coffin, context.Context) {
tmb, ctx := tomb.WithContext(parent)
cfn := &coffin{
tomb: tmb,
started: 0,
terminated: 0,
}
return cfn, ctx
}
func (c *coffin) Alive() bool {
return c.tomb.Alive()
}
func (c *coffin) Context(parent context.Context) context.Context {
return c.tomb.Context(parent)
}
func (c *coffin) Dead() <-chan struct{} {
return c.tomb.Dead()
}
func (c *coffin) Dying() <-chan struct{} {
return c.tomb.Dying()
}
func (c *coffin) Err() (reason error) {
return c.tomb.Err()
}
func (c *coffin) Go(f func() error) {
atomic.AddInt32(&c.started, 1)
c.tomb.Go(func() (err error) {
defer atomic.AddInt32(&c.terminated, 1)
defer func() {
panicErr := ResolveRecovery(recover())
if panicErr != nil {
err = panicErr
}
}()
return f()
})
}
func (c *coffin) Gof(f func() error, msg string, args ...interface{}) {
atomic.AddInt32(&c.started, 1)
c.tomb.Go(func() (err error) {
defer atomic.AddInt32(&c.terminated, 1)
defer func() {
panicErr := ResolveRecovery(recover())
if panicErr != nil {
err = errors.Wrapf(panicErr, msg, args...)
}
}()
err = f()
if err != nil {
err = errors.Wrapf(err, msg, args...)
}
return
})
}
func (c *coffin) GoWithContext(ctx context.Context, f func(ctx context.Context) error) {
c.Go(func() error {
return f(ctx)
})
}
func (c *coffin) GoWithContextf(ctx context.Context, f func(ctx context.Context) error, msg string, args ...interface{}) {
c.Gof(func() error {
return f(ctx)
}, msg, args...)
}
// Kill puts the coffin in a dying state for the given reason,
// closes the Dying channel, and sets Alive to false.
//
// Although Kill may be called multiple times, only the first
// non-nil error is recorded as the death reason.
//
// If reason is ErrDying, the previous reason isn't replaced
// even if nil. It's a runtime error to call Kill with ErrDying
// if t is not in a dying state.
func (c *coffin) Kill(reason error) {
c.tomb.Kill(reason)
}
func (c *coffin) Killf(f string, a ...interface{}) error {
return c.tomb.Killf(f, a...)
}
func (c *coffin) Wait() error {
if atomic.LoadInt32(&c.started) == 0 {
return nil
}
return c.tomb.Wait()
}
func (c *coffin) Started() int {
return int(atomic.LoadInt32(&c.started))
}
func (c *coffin) Running() int {
return c.Started() - c.Terminated()
}
func (c *coffin) Terminated() int {
return int(atomic.LoadInt32(&c.terminated))
}