/
start_stop_once.go
157 lines (126 loc) · 3.87 KB
/
start_stop_once.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package utils
import (
"fmt"
"sync"
"github.com/pkg/errors"
"go.uber.org/atomic"
)
type errNotStarted struct {
state startStopOnceState
}
func (e *errNotStarted) Error() string {
return fmt.Sprintf("service is %q, not started", e.state)
}
// startStopOnceState holds the state for StartStopOnce
type startStopOnceState int32
const (
startStopOnceUnstarted startStopOnceState = iota
startStopOnceStarted
startStopOnceStarting
startStopOnceStopping
startStopOnceStopped
)
func (s startStopOnceState) String() string {
switch s {
case startStopOnceUnstarted:
return "Unstarted"
case startStopOnceStarted:
return "Started"
case startStopOnceStarting:
return "Starting"
case startStopOnceStopping:
return "Stopping"
case startStopOnceStopped:
return "Stopped"
default:
return fmt.Sprintf("unrecognized state: %d", s)
}
}
// StartStopOnce can be embedded in a struct to help implement types.Service.
type StartStopOnce struct {
state atomic.Int32
sync.RWMutex // lock is held during startup/shutdown, RLock is held while executing functions dependent on a particular state
}
// StartOnce sets the state to Started
func (s *StartStopOnce) StartOnce(name string, fn func() error) error {
// SAFETY: We do this compare-and-swap outside of the lock so that
// concurrent StartOnce() calls return immediately.
success := s.state.CAS(int32(startStopOnceUnstarted), int32(startStopOnceStarting))
if !success {
return errors.Errorf("%v has already started once", name)
}
s.Lock()
defer s.Unlock()
err := fn()
success = s.state.CAS(int32(startStopOnceStarting), int32(startStopOnceStarted))
if !success {
// SAFETY: If this is reached, something must be very wrong: once.state
// was tampered with outside of the lock.
panic(fmt.Sprintf("%v entered unreachable state, unable to set state to started", name))
}
return err
}
// StopOnce sets the state to Stopped
func (s *StartStopOnce) StopOnce(name string, fn func() error) error {
// SAFETY: We hold the lock here so that Stop blocks until StartOnce
// executes. This ensures that a very fast call to Stop will wait for the
// code to finish starting up before teardown.
s.Lock()
defer s.Unlock()
success := s.state.CAS(int32(startStopOnceStarted), int32(startStopOnceStopping))
if !success {
return errors.Errorf("%v is unstarted or has already stopped once", name)
}
err := fn()
success = s.state.CAS(int32(startStopOnceStopping), int32(startStopOnceStopped))
if !success {
// SAFETY: If this is reached, something must be very wrong: once.state
// was tampered with outside of the lock.
panic(fmt.Sprintf("%v entered unreachable state, unable to set state to stopped", name))
}
return err
}
// State retrieves the current state
func (s *StartStopOnce) State() startStopOnceState {
state := s.state.Load()
return startStopOnceState(state)
}
// IfStarted runs the func and returns true only if started, otherwise returns false
func (s *StartStopOnce) IfStarted(f func()) (ok bool) {
s.RLock()
defer s.RUnlock()
state := s.state.Load()
if startStopOnceState(state) == startStopOnceStarted {
f()
return true
}
return false
}
// IfNotStopped runs the func and returns true if in any state other than Stopped
func (s *StartStopOnce) IfNotStopped(f func()) (ok bool) {
s.RLock()
defer s.RUnlock()
state := s.state.Load()
if startStopOnceState(state) == startStopOnceStopped {
return false
}
f()
return true
}
// Ready returns ErrNotStarted if the state is not started.
func (s *StartStopOnce) Ready() error {
state := s.State()
if state == startStopOnceStarted {
return nil
}
return &errNotStarted{state: state}
}
// Healthy returns ErrNotStarted if the state is not started.
// Override this per-service with more specific implementations.
func (s *StartStopOnce) Healthy() error {
state := s.State()
if state == startStopOnceStarted {
return nil
}
return &errNotStarted{state: state}
}