-
Notifications
You must be signed in to change notification settings - Fork 82
/
workerbroker.go
411 lines (382 loc) · 13.6 KB
/
workerbroker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
// Copyright 2019 PayPal Inc.
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package lib
import (
"errors"
"os"
"os/signal"
"sync"
"syscall"
"github.com/paypal/hera/utility"
"github.com/paypal/hera/utility/logger"
)
// HeraWorkerType defines the possible worker type
type HeraWorkerType int
// constants for HeraWorkerType
const (
wtypeRW HeraWorkerType = iota
wtypeRO // @TODO
wtypeStdBy
wtypeTotalCount
)
// WorkerPoolCfg is a configuration structure to keep setups for each type of worker pool
type WorkerPoolCfg struct {
maxWorkerCnt int // each instance of a worker type has the same max worker count.
instCnt int // number of instances (e.g. standbys) in a type of worker pool.
}
// WorkerBroker is managing the workers, starting the worker pools, and restarting workers when needed
type WorkerBroker struct {
//
// array of maps with each map representing one shard of deployment.
// inside each map,
// key = HeraWorkerType,
// value = array of WorkerPools, with each pool corresponds to one instance
// of that HeraWorkerType. wtypeStdBy has multiples instances known
// as multiple standbys. RW will have two instances for primary/failover.
// workerpools [shard](map[HeraWorkerType][inst]*WorkerPool)
//
workerpools []map[HeraWorkerType][]*WorkerPool
poolCfgs []map[HeraWorkerType]*WorkerPoolCfg
//
// a pid->workerclient map to maintain active workers. when an worker exits either
// through recycle or unexpectedly, we receive a sigchld event and will trace down
// and restart the stopped workers.
//
pidworkermap map[int32]*WorkerClient
lock sync.Mutex
//
// loaded from cfg once and used later.
//
maxShardSize int
// used to signal when the signal handler method finishes
stopped chan struct{}
}
var sBrokerInstance *WorkerBroker
var once sync.Once
// GetWorkerBrokerInstance returns the singleton broker instance where different request handler goroutines can use to get a free worker
func GetWorkerBrokerInstance() *WorkerBroker {
//
// no retry. if intialization fails, main() should bail out.
//
once.Do(func() {
sBrokerInstance = &WorkerBroker{}
err := sBrokerInstance.init()
if err != nil {
sBrokerInstance = nil
}
})
return sBrokerInstance
}
/**
* private method to set up different worker pools
*
* @TODO pull types and sizes from config
*/
func (broker *WorkerBroker) init() error {
broker.stopped = make(chan struct{})
broker.maxShardSize = GetConfig().NumOfShards
if (broker.maxShardSize == 0) || !(GetConfig().EnableSharding) {
broker.maxShardSize = 1
}
//
// MAX_NUM_STANDBY = 10
//
maxStndbySize := GetConfig().NumStdbyDbs
if maxStndbySize > 10 {
maxStndbySize = 10
}
MaxWorkerSize := <-GetConfig().NumWorkersCh()
if logger.GetLogger().V(logger.Info) {
logger.GetLogger().Log(logger.Info, "num_standby_dbs", maxStndbySize, "max_worker", MaxWorkerSize)
}
//
// initialize workerpools and poolCfgs for all shards.
//
broker.workerpools = make([](map[HeraWorkerType][]*WorkerPool), broker.maxShardSize)
broker.poolCfgs = make([](map[HeraWorkerType]*WorkerPoolCfg), broker.maxShardSize)
var workercnt int
for s := 0; s < broker.maxShardSize; s++ {
//
// setup broker configuration, inst and worker size can be loaded from cdb
//
broker.poolCfgs[s] = make(map[HeraWorkerType]*WorkerPoolCfg, wtypeTotalCount)
//
broker.poolCfgs[s][wtypeRO] = new(WorkerPoolCfg)
broker.poolCfgs[s][wtypeRO].maxWorkerCnt = GetNumRWorkers(s)
if broker.poolCfgs[s][wtypeRO].maxWorkerCnt > 0 {
broker.poolCfgs[s][wtypeRO].instCnt = 1
}
broker.poolCfgs[s][wtypeRW] = new(WorkerPoolCfg)
broker.poolCfgs[s][wtypeRW].maxWorkerCnt = GetNumWWorkers(s)
broker.poolCfgs[s][wtypeRW].instCnt = 1
broker.poolCfgs[s][wtypeStdBy] = new(WorkerPoolCfg)
if GetConfig().EnableTAF {
broker.poolCfgs[s][wtypeStdBy].maxWorkerCnt = GetNumWWorkers(s)
broker.poolCfgs[s][wtypeStdBy].instCnt = 1
} else {
broker.poolCfgs[s][wtypeStdBy].maxWorkerCnt = 0
broker.poolCfgs[s][wtypeStdBy].instCnt = 0
}
//
// populate worker pools with attached workerclients base on poolcfg template
//
broker.workerpools[s] = make(map[HeraWorkerType][]*WorkerPool, wtypeTotalCount)
for t := 0; t < int(wtypeTotalCount); t++ {
poolcfg := broker.poolCfgs[s][HeraWorkerType(t)]
if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Verbose, "init pool ", poolcfg)
}
workercnt += (poolcfg.instCnt * poolcfg.maxWorkerCnt)
broker.workerpools[s][HeraWorkerType(t)] = make([]*WorkerPool, poolcfg.instCnt)
for i := 0; i < poolcfg.instCnt; i++ {
broker.workerpools[s][HeraWorkerType(t)][i] = &WorkerPool{}
}
}
}
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "workercnt ", workercnt)
}
//
// start worker monitor thread that will reap and restart any defuncted workers
//
broker.pidworkermap = make(map[int32]*WorkerClient, workercnt)
return nil
}
// RestartWorkerPool (re)starts all the worker pools
// workerpool.init calls statelog.init, which in turn calls back GetWorkerBrokerInstance
// this causes a deadlock during workerbroker initialization since golang lock is not
// reentrant. taking out workerpool.init from broker.init and calling it separately.
func (broker *WorkerBroker) RestartWorkerPool(_moduleName string) error {
var err error
for s := 0; s < broker.maxShardSize; s++ {
for t := 0; t < int(wtypeTotalCount); t++ {
poolcfg := broker.poolCfgs[s][HeraWorkerType(t)]
for i := 0; i < poolcfg.instCnt; i++ {
err = broker.workerpools[s][HeraWorkerType(t)][i].Init(HeraWorkerType(t), poolcfg.maxWorkerCnt, i, s, _moduleName)
if err != nil {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "failed to start workerpool", err)
}
return err
}
}
}
}
err = broker.startWorkerMonitor()
return err
}
// GetWorkerPoolCfgs returns the worker pool configuration
func (broker *WorkerBroker) GetWorkerPoolCfgs() (pCfgs []map[HeraWorkerType]*WorkerPoolCfg) {
return broker.poolCfgs
}
// GetWorkerPool get the worker pool object for the type and id
// ids holds optional paramenters.
// ids[0] == instance id; ids[1] == shard id.
// if a particular id is not set, it defaults to 0.
// TODO: interchange sid <--> instId since instId is not yet used
func (broker *WorkerBroker) GetWorkerPool(wType HeraWorkerType, ids ...int) (workerbroker *WorkerPool, err error) {
//
// default sid and instId to 0 if user doesnot bother to send one
//
var instID int
var sid int
var size = len(ids)
if size == 1 {
instID = ids[0]
} else if size > 1 {
instID = ids[0]
sid = ids[1]
}
if broker.workerpools != nil {
if broker.workerpools[sid] != nil && len(broker.workerpools[sid]) > 0 {
if broker.workerpools[sid][wType] != nil && len(broker.workerpools[sid][wType]) > 0 {
if broker.workerpools[sid][wType][instID] != nil {
return broker.workerpools[sid][wType][instID], nil
}
}
}
}
return nil, errors.New("uninitialized worker pool")
}
// AddPidToWorkermap add the worker to the map pid -> worker
func (broker *WorkerBroker) AddPidToWorkermap(worker *WorkerClient, pid int) {
broker.lock.Lock()
defer broker.lock.Unlock()
broker.pidworkermap[int32(pid)] = worker
if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Verbose, "Added", pid, ", pwmap:", broker.pidworkermap)
}
}
func (broker *WorkerBroker) startWorkerMonitor() (err error) {
//
// set up sig channel for worker exiting event
//
schannel := make(chan os.Signal, 1)
signal.Notify(schannel, syscall.SIGCHLD, syscall.SIGTERM)
go func(sigchannel chan os.Signal) {
//
// forever loop to react on worker exit event or opscfg worker change
//
cfgWorkerChange := GetConfig().NumWorkersCh()
for {
select {
case <-cfgWorkerChange:
broker.changeMaxWorkers()
//
// Block until a signal is received.
//
case signal := <-sigchannel:
switch signal {
case syscall.SIGCHLD:
if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Verbose, "worker exit signal:", signal)
}
//
// no one have called waitpid on recycled or self-retired workers.
// we can get all the pids in this call. double the size in case we
// get none-hera defunct processes. +1 in case racing casue mapsize=0.
//
var arraySize = 2*len(broker.pidworkermap) + 1
var defunctPids = make([]int32, arraySize)
if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Verbose, "Wait SIGCHLD len=", arraySize-1, ", pwmap:", broker.pidworkermap)
}
if arraySize > 0 {
utility.ReapDefunctPids(defunctPids)
}
if logger.GetLogger().V(logger.Info) {
logger.GetLogger().Log(logger.Info, "exited worker", defunctPids)
}
broker.lock.Lock()
for i := 0; i < arraySize; i++ {
//
// last valid entry in stoppedpids is followed by one or more zeros.
//
if defunctPids[i] == 0 {
break
}
var workerclient = broker.pidworkermap[defunctPids[i]]
if workerclient != nil {
delete(broker.pidworkermap, defunctPids[i])
pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID)
if err != nil {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err)
}
} else {
//
// a worker could be terminated while serving a request.
// in these cases, doRead() in workerclient will get an
// EOF and exit. doSession() in coordinator will get the
// worker outCh closed event and exit, at which point
// coordinator itself calls returnworker to set connstate
// from assign to idle.
// no need to publish the following event again.
//
//if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) {
// GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle})
//}
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "worker (pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.")
}
workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long
pool.RestartWorker(workerclient)
}
} else {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Exited worker pid =", defunctPids[i], " not found")
}
}
}
broker.lock.Unlock()
case syscall.SIGTERM:
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "Got SIGTERM")
}
var wg sync.WaitGroup
wg.Add(len(broker.pidworkermap))
for pid, worker := range broker.pidworkermap {
go func(w *WorkerClient) {
w.Terminate()
}(worker)
go func(p int) {
proc, err := os.FindProcess(p)
if err == nil {
proc.Wait()
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "pid =", p, " worker process reaped")
}
}
wg.Done()
}(int(pid))
}
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "Waiting for workers to exit")
}
wg.Wait()
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "Mux done")
}
// signal the main loop to exit
close(broker.stopped)
return
} // switch signal
} // select
} // for
}(schannel)
return nil
}
/*
resizePool calls workerpool.Resize to resize a worker pool when the dynamic configuration of
the number of workers changed
*/
func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, shardID int) {
broker.poolCfgs[0][wType].maxWorkerCnt = maxWorkers
pool, err := broker.GetWorkerPool(wType, 0, shardID)
if err != nil {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Can't pool of type", wType, ", shard", shardID, ",error:", err)
}
} else {
pool.Resize(maxWorkers)
}
}
/*
changeMaxWorkers is called when the dynamic config changed, it calls resizePool() for all the pools
*/
func (broker *WorkerBroker) changeMaxWorkers() {
wW := GetNumWWorkers(0)
rW := GetNumRWorkers(0)
for i := 0; i < GetConfig().NumOfShards; i++ {
broker.resizePool(wtypeRW, wW, i)
if rW != 0 {
broker.resizePool(wtypeRO, rW, i)
}
// if TAF enabled, handle stdby as well
if GetConfig().EnableTAF {
broker.resizePool(wtypeStdBy, wW, i)
}
if GetConfig().EnableWhitelistTest {
// only resize shard 0
break
}
}
}
// Stopped is called when we are done, it sends a message to the "stopped" channel, which is read by the main mux routine
func (broker *WorkerBroker) Stopped() <-chan struct{} {
return broker.stopped
}