-
Notifications
You must be signed in to change notification settings - Fork 179
/
single_runner.go
78 lines (66 loc) · 2.55 KB
/
single_runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package runner
import (
"sync"
)
// SingleRunner is a support struct for implementing module.ReadyDoneAware
type SingleRunner struct {
stateTransition sync.Mutex // lock for preventing concurrent state transitions
startupCommenced bool // indicates whether Start(...) invoked
startupCompleted chan struct{} // channel is closed when startup was completed and the component is properly running
shutdownCommenced bool // indicates whether Stop() invoked
shutdownSignal chan struct{} // used to signal that shutdown has commenced
shutdownCompleted chan struct{} // used to signal that shutdown was completed and the component is done
}
func NewSingleRunner() SingleRunner {
return SingleRunner{
stateTransition: sync.Mutex{},
startupCommenced: false,
startupCompleted: make(chan struct{}),
shutdownCommenced: false,
shutdownSignal: make(chan struct{}),
shutdownCompleted: make(chan struct{}),
}
}
// ShutdownSignal returns a channel that is closed when the shutdown signal has been given
func (u *SingleRunner) ShutdownSignal() <-chan struct{} {
return u.shutdownSignal
}
func (s *SingleRunner) Start(f func()) <-chan struct{} {
s.stateTransition.Lock()
if s.startupCommenced || s.shutdownCommenced {
s.stateTransition.Unlock()
return s.startupCompleted
}
s.startupCommenced = true
go func() {
close(s.startupCompleted)
s.stateTransition.Unlock()
f()
// there are two cases f() would exit:
// (a) f exited on its own without Abort() being called (this is generally an internal error)
// (b) f exited as a reaction to Abort() being called
// In either case, we want to abort and close shutdownCompleted
s.stateTransition.Lock()
s.unsafeCommenceShutdown()
close(s.shutdownCompleted)
s.stateTransition.Unlock()
}()
return s.startupCompleted
}
// Abort() will abort the SingleRunner. Note that the channel returned from Start() will never be
// closed if the SingleRunner is aborted before Start() is called. This mimics the real world case:
// * consider a runner at a starting position of a race waiting for the start signal
// * if the runner to told to abort the race _before_ the start signal occurred, the runner will never start
func (s *SingleRunner) Abort() <-chan struct{} {
s.stateTransition.Lock()
s.unsafeCommenceShutdown()
s.stateTransition.Unlock()
return s.shutdownCompleted
}
// unsafeCommenceShutdown executes the shutdown logic once but is not concurrency safe
func (s *SingleRunner) unsafeCommenceShutdown() {
if !s.shutdownCommenced {
s.shutdownCommenced = true
close(s.shutdownSignal)
}
}