forked from vsco/jsonconsul
-
Notifications
You must be signed in to change notification settings - Fork 0
/
export_watch.go
226 lines (181 loc) · 5.44 KB
/
export_watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
// Adapted from
// https://github.com/hashicorp/envconsul/blob/master/runner.go
package jsonconsul
import (
"encoding/json"
"fmt"
"io"
"log"
"os"
"strings"
"sync"
"time"
dep "github.com/hashicorp/consul-template/dependency"
"github.com/hashicorp/consul-template/watch"
"github.com/hashicorp/consul/api"
)
func (c *JsonExport) RunWatcher() {
runner, err := NewRunner(c, false)
if err != nil {
log.Fatal(err)
}
runner.Start()
}
type Runner struct {
sync.RWMutex
// Prefix is the KeyPrefixDependency associated with this Runner.
Prefix *dep.StoreKeyPrefix
// ErrCh and DoneCh are channels where errors and finish notifications occur.
ErrCh chan error
DoneCh chan struct{}
// ExitCh is a channel for parent processes to read exit status values from
// the child processes.
ExitCh chan int
// config is the Config that created this Runner. It is used internally to
// construct other objects and pass data.
config *JsonExport
// client is the consul/api client.
client *api.Client
// once indicates the runner should get data exactly one time and then stop.
once bool
// minTimer and maxTimer are used for quiescence.
minTimer, maxTimer <-chan time.Time
// outStream and errStream are the io.Writer streams where the runner will
// write information.
outStream, errStream io.Writer
// watcher is the watcher this runner is using.
watcher *watch.Watcher
// data is the latest representation of the data from Consul.
data map[string][]*dep.KeyPair
// killSignal is the signal to send to kill the process.
killSignal os.Signal
}
// NewRunner accepts a JsonExport, and boolean value for once mode.
func NewRunner(config *JsonExport, once bool) (*Runner, error) {
var err error
log.Printf("[INFO] (runner) creating new runner (once: %v)\n", once)
runner := &Runner{
config: config,
once: once,
}
s := strings.TrimPrefix(config.Prefix, "/")
runner.Prefix, err = dep.ParseStoreKeyPrefix(s)
if err != nil {
return nil, err
}
if err := runner.init(); err != nil {
return nil, err
}
return runner, nil
}
// Start creates a new runner and begins watching dependencies and quiescence
// timers. This is the main event loop and will block until finished.
func (r *Runner) Start() {
var exitCh <-chan int
log.Println("[INFO] (runner) starting")
// Add the dependencies to the watcher
r.watcher.Add(r.Prefix)
for {
select {
case data := <-r.watcher.DataCh:
r.Receive(data.Dependency, data.Data)
// Drain all views that have data
OUTER:
for {
select {
case data = <-r.watcher.DataCh:
r.Receive(data.Dependency, data.Data)
default:
break OUTER
}
}
case <-r.minTimer:
log.Println("[INFO] (runner) quiescence minTimer fired")
r.minTimer, r.maxTimer = nil, nil
case <-r.maxTimer:
log.Println("[INFO] (runner) quiescence maxTimer fired")
r.minTimer, r.maxTimer = nil, nil
case err := <-r.watcher.ErrCh:
// Intentionally do not send the error back up to the runner. Eventually,
// once Consul API implements errwrap and multierror, we can check the
// "type" of error and conditionally alert back.
//
// if err.Contains(Something) {
// errCh <- err
// }
log.Println("[ERR] (runner) watcher reported error: ", err)
case <-r.watcher.FinishCh:
log.Println("[INFO] (runner) watcher reported finish")
return
case code := <-exitCh:
r.ExitCh <- code
case <-r.DoneCh:
log.Println("[INFO] (runner) received finish")
return
}
// If we got this far, that means we got new data or one of the timers
// fired, so attempt to re-process the environment.
r.Run()
}
}
// Stop halts the execution of this runner and its subprocesses.
func (r *Runner) Stop() {
log.Println("[INFO] (runner) stopping")
r.watcher.Stop()
close(r.DoneCh)
}
// Receive accepts data from Consul and maps that data to the prefix.
func (r *Runner) Receive(d dep.Dependency, data interface{}) {
r.Lock()
defer r.Unlock()
r.data[d.HashCode()] = data.([]*dep.KeyPair)
}
// Run executes and manages the child process with the correct environment. The
// current enviornment is also copied into the child process environment.
func (r *Runner) Run() {
log.Printf("[INFO] (runner) running")
// TODO: Just call the app to consul again. This should
// probably be updated to actually receive the values that we
// got but MVP.
r.config.Run()
}
// init creates the Runner's underlying data structures and returns an error if
// any problems occur.
func (r *Runner) init() error {
// Print the final config for debugging
result, err := json.MarshalIndent(r.config, "", " ")
if err != nil {
return err
}
log.Printf("[DEBUG] (runner) final config (tokens suppressed):\n\n%s\n\n", result)
r.client = client
// Create the watcher
watcher, err := newWatcher(r.config, client, r.once)
if err != nil {
return fmt.Errorf("runner: %s", err)
}
r.watcher = watcher
r.data = make(map[string][]*dep.KeyPair)
r.outStream = os.Stdout
r.errStream = os.Stderr
r.ErrCh = make(chan error)
r.DoneCh = make(chan struct{})
r.ExitCh = make(chan int, 1)
return nil
}
// newWatcher creates a new watcher.
func newWatcher(config *JsonExport, client *api.Client, once bool) (*watch.Watcher, error) {
log.Println("[INFO] (runner) creating Watcher")
clientSet := dep.NewClientSet()
if err := clientSet.Add(client); err != nil {
return nil, err
}
watcher, err := watch.NewWatcher(&watch.WatcherConfig{
Clients: clientSet,
Once: once,
})
if err != nil {
return nil, err
}
return watcher, err
}