-
Notifications
You must be signed in to change notification settings - Fork 110
/
manager.go
159 lines (132 loc) · 3.81 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package operation
import (
"context"
"errors"
"sync"
"time"
"go.uber.org/multierr"
"go.viam.com/utils"
)
// SingleOperationManager ensures only 1 operation is happening a time
// An operation can be nested, so if there is already an operation in progress,
// it can have sub-operations without an issue.
type SingleOperationManager struct {
mu sync.Mutex
currentOp *anOp
}
// CancelRunning cancel's a current operation unless it's mine.
func (sm *SingleOperationManager) CancelRunning(ctx context.Context) {
if ctx.Value(somCtxKeySingleOp) != nil {
return
}
sm.mu.Lock()
defer sm.mu.Unlock()
sm.cancelInLock(ctx)
}
// OpRunning returns if there is a current operation.
func (sm *SingleOperationManager) OpRunning() bool {
sm.mu.Lock()
defer sm.mu.Unlock()
return sm.currentOp != nil
}
type somCtxKey byte
const somCtxKeySingleOp = somCtxKey(iota)
// New creates a new operation, cancels previous, returns a new context and function to call when done.
func (sm *SingleOperationManager) New(ctx context.Context) (context.Context, func()) {
// handle nested ops
if ctx.Value(somCtxKeySingleOp) != nil {
return ctx, func() {}
}
sm.mu.Lock()
// first cancel any old operation
sm.cancelInLock(ctx)
theOp := &anOp{}
ctx = context.WithValue(ctx, somCtxKeySingleOp, theOp)
theOp.ctx, theOp.cancelFunc = context.WithCancel(ctx)
sm.currentOp = theOp
sm.mu.Unlock()
return theOp.ctx, func() {
if !theOp.closed {
theOp.closed = true
}
sm.mu.Lock()
if theOp == sm.currentOp {
sm.currentOp = nil
}
sm.mu.Unlock()
}
}
// NewTimedWaitOp returns true if it finished, false if cancelled.
// If there are other operations pending, this will cancel them.
func (sm *SingleOperationManager) NewTimedWaitOp(ctx context.Context, dur time.Duration) bool {
ctx, finish := sm.New(ctx)
defer finish()
return utils.SelectContextOrWait(ctx, dur)
}
// IsPoweredInterface is a utility so can wait on IsPowered easily. It returns whether it is
// powered, the power percent (between 0 and 1, or between -1 and 1 for motors that support
// negative power), and any error that occurred while obtaining these.
type IsPoweredInterface interface {
IsPowered(ctx context.Context, extra map[string]interface{}) (bool, float64, error)
}
// WaitTillNotPowered waits until IsPowered returns false.
func (sm *SingleOperationManager) WaitTillNotPowered(ctx context.Context, pollTime time.Duration, powered IsPoweredInterface,
stop func(context.Context, map[string]interface{}) error,
) (err error) {
// Defers a function that will stop and clean up if the context errors
defer func(ctx context.Context) {
var errStop error
if errors.Is(ctx.Err(), context.Canceled) {
sm.mu.Lock()
oldOp := sm.currentOp == ctx.Value(somCtxKeySingleOp)
sm.mu.Unlock()
if oldOp || sm.currentOp == nil {
errStop = stop(ctx, map[string]interface{}{})
}
}
err = multierr.Combine(ctx.Err(), errStop)
}(ctx)
return sm.WaitForSuccess(
ctx,
pollTime,
func(ctx context.Context) (res bool, err error) {
res, _, err = powered.IsPowered(ctx, nil)
return !res, err
},
)
}
// WaitForSuccess will call testFunc every pollTime until it returns true or an error.
func (sm *SingleOperationManager) WaitForSuccess(
ctx context.Context,
pollTime time.Duration,
testFunc func(ctx context.Context) (bool, error),
) error {
ctx, finish := sm.New(ctx)
defer finish()
for {
res, err := testFunc(ctx)
if err != nil {
return err
}
if res {
return nil
}
if !utils.SelectContextOrWait(ctx, pollTime) {
return ctx.Err()
}
}
}
func (sm *SingleOperationManager) cancelInLock(ctx context.Context) {
myOp := ctx.Value(somCtxKeySingleOp)
op := sm.currentOp
if op == nil || myOp == op {
return
}
op.cancelFunc()
sm.currentOp = nil
}
type anOp struct {
ctx context.Context
cancelFunc context.CancelFunc
closed bool
}