forked from mkabilov/pg2ch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
state.go
91 lines (74 loc) · 1.78 KB
/
state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package replicator
import "sync/atomic"
type state struct {
v uint32
}
type StateValue uint32
const (
StateInit StateValue = iota
StateWorking
StatePaused
StatePausing
StateShuttingDown
)
var states = map[StateValue]string{
StateInit: "INIT",
StateWorking: "WORKING",
StatePausing: "PAUSING",
StatePaused: "PAUSED",
}
func (r *Replicator) State() StateValue {
return StateValue(r.curState.v)
}
func (r *Replicator) pause() string {
r.logger.Debugf("pausing")
if state := r.curState.Load(); state == StatePausing ||
state == StatePaused {
return r.curState.String()
}
if !r.curState.CompareAndSwap(StateWorking, StatePausing) {
return r.curState.String()
}
r.inTxMutex.Lock()
/* try to flush everying we have buffered */
if err := r.tblBuffersFlush(); err != nil {
r.curState.Store(StateWorking)
r.inTxMutex.Unlock()
return "could not flush tables"
}
r.curState.Store(StatePaused)
r.logger.Debugf("paused")
return "OK"
}
// resume should mind the pause in progress state
func (r *Replicator) resume() string {
r.logger.Debugf("resuming")
if !r.curState.CompareAndSwap(StatePaused, StateWorking) {
if r.curState.Load() == StatePausing {
r.logger.Debugf("pause is in progress")
return "PAUSE IS IN PROGRESS"
} else {
r.logger.Debugf("not paused")
return "NOT PAUSED"
}
}
r.inTxMutex.Unlock()
r.logger.Debugf("resumed")
return "OK"
}
func (s *state) String() string {
stateName, ok := states[s.Load()]
if !ok {
return "UNKNOWN"
}
return stateName
}
func (s *state) Load() StateValue {
return StateValue(atomic.LoadUint32(&s.v))
}
func (s *state) CompareAndSwap(old, new StateValue) bool {
return atomic.CompareAndSwapUint32(&s.v, uint32(old), uint32(new))
}
func (s *state) Store(new StateValue) {
atomic.StoreUint32(&s.v, uint32(new))
}