forked from hashicorp/terraform
-
Notifications
You must be signed in to change notification settings - Fork 7
/
wrapper.go
182 lines (151 loc) · 4.9 KB
/
wrapper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
// Package signalwrapper is used to run functions that are sensitive to
// signals that may be received from outside the process. It can also be
// used as just an async function runner that is cancellable and we may
// abstract this further into another package in the future.
package signalwrapper
import (
"log"
"os"
"os/signal"
"sync"
)
// CancellableFunc is a function that cancels if it receives a message
// on the given channel. It must return an error if any occurred. It should
// return no error if it was cancelled successfully since it is assumed
// that this function will probably be called again at some future point
// since it was interrupted.
type CancellableFunc func(<-chan struct{}) error
// Run wraps and runs the given cancellable function and returns the Wrapped
// struct that can be used to listen for events, cancel on other events
// (such as timeouts), etc.
func Run(f CancellableFunc) *Wrapped {
// Determine the signals we're listening to. Prematurely making
// this a slice since I predict a future where we'll add others and
// the complexity in doing so is low.
signals := []os.Signal{os.Interrupt}
// Register a listener for the signals
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, signals...)
// Create the channel we'll use to "cancel"
cancelCh := make(chan struct{})
// This channel keeps track of whether the function we're running
// completed successfully and the errors it may have had. It is
// VERY IMPORTANT that the errCh is buffered to at least 1 so that
// it doesn't block when finishing.
doneCh := make(chan struct{})
errCh := make(chan error, 1)
// Build our wrapped result
wrapped := &Wrapped{
ErrCh: errCh,
errCh: errCh,
cancelCh: cancelCh,
}
// Start the function
go func() {
log.Printf("[DEBUG] signalwrapper: executing wrapped function")
err := f(cancelCh)
// Close the done channel _before_ sending the error in case
// the error channel read blocks (it shouldn't) to avoid interrupts
// doing anything.
close(doneCh)
// Mark completion
log.Printf("[DEBUG] signalwrapper: wrapped function execution ended")
wrapped.done(err)
}()
// Goroutine to track interrupts and make sure we do at-most-once
// delivery of an interrupt since we're using a channel.
go func() {
// Clean up after this since this is the only function that
// reads signals.
defer signal.Stop(sigCh)
select {
case <-doneCh:
// Everything happened naturally
case <-sigCh:
log.Printf("[DEBUG] signalwrapper: signal received, cancelling wrapped function")
// Stop the function. Goroutine since we don't care about
// the result and we'd like to end this goroutine as soon
// as possible to avoid any more signals coming in.
go wrapped.Cancel()
}
}()
return wrapped
}
// Wrapped is the return value of wrapping a function. This has channels
// that can be used to wait for a result as well as functions to help with
// different behaviors.
type Wrapped struct {
// Set and consumed by user
// ErrCh is the channel to listen for real-time events on the wrapped
// function. A nil error sent means the execution completed without error.
// This is an exactly once delivery channel.
ErrCh <-chan error
// Set by creator
errCh chan<- error
cancelCh chan<- struct{}
// Set automatically
once sync.Once
cancelCond *sync.Cond
cancelLock *sync.Mutex
resultErr error
resultSet bool
}
// Cancel stops the running function and blocks until it returns. The
// resulting value is returned.
//
// It is safe to call this multiple times. This will return the resulting
// error value each time.
func (w *Wrapped) Cancel() error {
w.once.Do(w.init)
w.cancelLock.Lock()
// If we have a result set, return that
if w.resultSet {
w.cancelLock.Unlock()
return w.resultErr
}
// If we have a cancel channel, close it to signal and set it to
// nil so we never do that again.
if w.cancelCh != nil {
close(w.cancelCh)
w.cancelCh = nil
}
// Wait for the result to be set
defer w.cancelLock.Unlock()
w.cancelCond.Wait()
return w.resultErr
}
// Wait waits for the completion of the wrapped function and returns the result.
//
// This can be called multiple times safely.
func (w *Wrapped) Wait() error {
w.once.Do(w.init)
w.cancelLock.Lock()
defer w.cancelLock.Unlock()
// If we don't have a result yet, wait for that
if !w.resultSet {
w.cancelCond.Wait()
}
// Return the result
return w.resultErr
}
// done marks this wrapped function as done with the resulting value.
// This must only be called once.
func (w *Wrapped) done(err error) {
w.once.Do(w.init)
w.cancelLock.Lock()
// Set the result
w.resultErr = err
w.resultSet = true
// Notify any waiters
w.cancelCond.Broadcast()
// Unlock since the next call can be blocking
w.cancelLock.Unlock()
// Notify any channel listeners
w.errCh <- err
}
func (w *Wrapped) init() {
// Create the condition variable
var m sync.Mutex
w.cancelCond = sync.NewCond(&m)
w.cancelLock = &m
}