-
Notifications
You must be signed in to change notification settings - Fork 82
/
workerpool.go
865 lines (800 loc) · 32.5 KB
/
workerpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
// Copyright 2019 PayPal Inc.
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package lib
import (
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/paypal/hera/cal"
"github.com/paypal/hera/utility/logger"
)
var bcklgEvtPrefix = [wtypeTotalCount]string{
"BKLG", "BKLG_R", "BKLG_S"}
var bcklgTimeoutEvtPrefix = [wtypeTotalCount]string{
"bklg", "bklg_r", "bklg_s"}
var poolNamePrefix = [wtypeTotalCount]string{
"write", "readonly", "standby"}
// WorkerPool represents a pool of workers of the same kind
// the implementation uses a C++-ish mutex/condition variable/queue rather than a Golang-ish channel + timer
// because the policy for using the worker is LIFO (for better usage of cache) while the channels are FIFO
type WorkerPool struct {
//
// a locking condition used to wait/notify on empty/replenish pool
// this lock and the two worker queues are supposed to be a private member fields
//
poolCond *sync.Cond
activeQ Queue // a queue with active worker client ready to serve traffic
ShardID int // the shard the workers are connected to
Type HeraWorkerType // the worker type like write, read
InstID int
// interval for checking on worker lifespan
lifeSpanCheckInterval int
currentSize int // the number of workers in the pool
desiredSize int // the desired number of workers in the pool, usually equal to currentSize, different for a
// brief period when the pool is dynamically resized
moduleName string // basically the application name as it comes from the command line
// the number of worker not in INIT state, atomically maintained
numHealthyWorkers int32
//
// number of requests in the backlog. we could lock operation to publish state event, but
// status updates after the publishing call inside state log is not inside lock.
// use atomic to synchronize this number.
//
backlogCnt int32
//
// caller receives a ticket when getting a workerclient. caller returns workerclient
// together with the ticket to ensure workerclient is returned by the same caller
// only once.
//
checkoutTickets map[interface{}]string
//
// adaptive queue manager to decide on long/short timeouts and saturation recovery.
//
aqmanager *adaptiveQueueManager
// the actual list of workers
workers []*WorkerClient
// Throtle workers lifecycle
thr Throttler
}
// Init creates the pool by creating the workers and making all the initializations
func (pool *WorkerPool) Init(wType HeraWorkerType, size int, instID int, shardID int, moduleName string) error {
pool.Type = wType
pool.activeQ = NewQueue()
//pool.poolCond = &sync.Cond{L: &sync.Mutex{}}
pool.poolCond = sync.NewCond(&sync.Mutex{})
pool.lifeSpanCheckInterval = GetConfig().lifeSpanCheckInterval
pool.InstID = instID
pool.ShardID = shardID
pool.currentSize = 0
pool.desiredSize = size
pool.moduleName = moduleName
pool.workers = make([]*WorkerClient, size)
pool.thr = NewThrottler(uint32(GetConfig().MaxDbConnectsPerSec), fmt.Sprintf("%d_%d_%d", wType, shardID, instID))
for i := 0; i < size; i++ {
go pool.spawnWorker(i)
pool.currentSize++
}
pool.checkoutTickets = make(map[interface{}]string)
pool.aqmanager = &adaptiveQueueManager{}
err := pool.aqmanager.init(pool)
go pool.checkWorkerLifespan()
return err
}
// spawnWorker starts a worker and spawn a routine waiting for the "ready" message
func (pool *WorkerPool) spawnWorker(wid int) error {
worker := NewWorker(wid, pool.Type, pool.InstID, pool.ShardID, pool.moduleName, pool.thr)
worker.setState(wsSchd)
millis := rand.Intn(GetConfig().RandomStartMs)
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, wid, "randomized start ms",millis)
}
time.Sleep(time.Millisecond * time.Duration(millis))
initLimit := pool.desiredSize * GetConfig().InitLimitPct / 100
for {
initCnt := GetStateLog().GetWorkerCountForPool(wsInit, pool.ShardID, pool.Type, pool.InstID)
if initCnt <= initLimit {
break
}
millis := rand.Intn(3000)
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, initCnt, "is too many in init state. waiting to start",wid)
}
time.Sleep(time.Millisecond * time.Duration(millis))
}
er := worker.StartWorker()
if er != nil {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "failed starting worker: ", er)
}
pool.poolCond.L.Lock()
pool.currentSize--
pool.poolCond.L.Unlock()
return er
}
if logger.GetLogger().V(logger.Info) {
logger.GetLogger().Log(logger.Info, "worker started type ", pool.Type, " id", worker.ID, " instid", pool.InstID, " shardid", pool.ShardID)
}
//
// after establishing uds with the worker, it will be add to active queue
//
// oracle connect errors show up in attach worker
worker.AttachToWorker() // does not return
return nil
}
// RestartWorker is called after a worker exited to perform the necessary cleanup and re-start a new worker.
// In the rare situation where the pool need to be down-sized a new worker is not restarted.
func (pool *WorkerPool) RestartWorker(worker *WorkerClient) (err error) {
if worker == nil {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "WorkerReady nil, size=", pool.activeQ.Len(), "type=", pool.Type)
}
return nil
}
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "RestartWorker(): ", pool.Type, pool.desiredSize, pool.currentSize, worker.pid, worker.ID)
}
pool.poolCond.L.Lock()
//
// release terminated workerclient (and fd inside) if we havenot done it yet.
//
delete(pool.checkoutTickets, worker)
pool.aqmanager.unregisterDispatchedWorker(worker)
if worker.ID >= pool.desiredSize /*we resize by terminating worker with higher ID*/ {
if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Verbose, "Pool type=", pool.Type, ", worker=", worker.pid, "exited, new one not started because pool was resized:", pool.currentSize, "->", pool.desiredSize)
}
pool.currentSize--
if pool.desiredSize == pool.currentSize {
//
// let statlog resets the worker size
//
GetStateLog().PublishStateEvent(StateEvent{eType: WorkerResizeEvt, shardID: pool.ShardID, wType: pool.Type, instID: pool.InstID, newWSize: pool.currentSize})
}
pool.activeQ.Remove(worker)
pool.poolCond.L.Unlock()
return
}
pool.activeQ.Remove(worker)
pool.poolCond.L.Unlock()
go pool.spawnWorker(worker.ID)
return nil
}
// WorkerReady is called after the worker started and become available. It puts the worker into the internal list
// of workers as well as in the list of available workers
func (pool *WorkerPool) WorkerReady(worker *WorkerClient) (err error) {
if worker == nil {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "WorkerReady nil, size=", pool.activeQ.Len(), "type=", pool.Type)
}
return nil
}
pool.poolCond.L.Lock()
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "Pool::WorkerReady", worker.pid, worker.Type, worker.instID)
}
pool.activeQ.Push(worker)
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "poolsize(ready)", pool.activeQ.Len(), " type ", pool.Type, " instance ", pool.InstID)
}
pool.workers[worker.ID] = worker
pool.poolCond.L.Unlock()
//
// notify one waiting agent on the availability of a new worker in the pool
//
pool.poolCond.Signal()
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "poolsize (after signal)", pool.activeQ.Len(), " type ", pool.Type)
}
return nil
}
// GetWorker gets the active worker if available. backlog with timeout if not.
//
// @param sqlhash to check for soft eviction against a blacklist of slow queries.
// if getworker needs to exam the incoming sql, there does not seem to be another elegant
// way to do this except to pass in the sqlhash as a parameter.
// @param timeoutMs[0] timeout in milliseconds. default to adaptive queue timeout.
func (pool *WorkerPool) GetWorker(sqlhash int32, timeoutMs ...int) (worker *WorkerClient, t string, err error) {
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "Pool::GetWorker(start) type:", pool.Type, ", instance:", pool.InstID, ", active: ", pool.activeQ.Len(), "healthy:", pool.GetHealthyWorkersCount())
}
defer func() {
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "Pool::GetWorker(end) type:", pool.Type, ", instance:", pool.InstID, ", active: ", pool.activeQ.Len(), "healthy:", pool.GetHealthyWorkersCount())
}
}()
pool.poolCond.L.Lock()
var workerclient = pool.getActiveWorker()
for workerclient == nil {
if pool.GetHealthyWorkersCount() == 0 {
msg := fmt.Sprintf("REJECT_DB_DOWN_%s%d", poolNamePrefix[pool.Type], pool.InstID)
e := cal.NewCalEvent(cal.EventTypeWarning, msg, cal.TransOK, "")
e.AddDataInt("sql_hash", int64(uint32(sqlhash)))
e.Completed()
pool.poolCond.L.Unlock()
return nil, "", ErrRejectDbDown
}
timeout, longTo := pool.aqmanager.getBacklogTimeout()
if len(timeoutMs) > 0 {
timeout = timeoutMs[0]
}
if timeout == 0 {
// no bklg events!
pool.poolCond.L.Unlock()
return nil, "", errors.New("no worker available")
}
//
// check if we need to evict sql with hash=sqlhash.
//
if pool.aqmanager.shouldSoftEvict(sqlhash) {
pool.poolCond.L.Unlock()
if logger.GetLogger().V(logger.Warning) {
logger.GetLogger().Log(logger.Warning, "soft sql eviction, sql_hash=", uint32(sqlhash))
}
e := cal.NewCalEvent("SOFT_EVICTION", fmt.Sprint(uint32(sqlhash)), cal.TransOK, "")
e.Completed()
return nil, "", ErrSaturationSoftSQLEviction
}
//
// c++ has a REJECT_DB_DOWN check which is mostly an attempt to prevent backlog
// overflow. but bouncer's connection check should have done that already.
// as a result, we do not implment REJECT_DB_DOWN in golang.
//
// client connection can not get an active worker. put it in backlog
//
blgsize := atomic.LoadInt32(&(pool.backlogCnt))
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "add to backlog. type:", pool.Type, ", instance:", pool.InstID, " timeout:", timeout, ", blgsize:", blgsize)
}
if blgsize == 0 {
pool.aqmanager.lastEmptyTimeMs = (time.Now().UnixNano() / int64(time.Millisecond))
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "setlastempty(enter)", pool.aqmanager.lastEmptyTimeMs)
}
}
atomic.AddInt32(&(pool.backlogCnt), 1)
GetStateLog().PublishStateEvent(StateEvent{eType: ConnStateEvt, shardID: pool.ShardID, wType: pool.Type, instID: pool.InstID, oldCState: Idle, newCState: Backlog})
//
// go refused to add a wait timeout https://github.com/golang/go/issues/9578
// wakeup chann return the time this thread spent in the backlog doghouse.
//
wakeupchann := make(chan int64)
go func(cond *sync.Cond) {
startTime := time.Now().UnixNano()
//
// if caller times out and goes away, we need to unlock after waking up.
//
cond.Wait()
cond.L.Unlock()
//
// if backlog times out on this channel, noone will be listening on the other
// end, which blocks a write to wakeupchann. to avoid dangling thread, use unblocking.
// if notify wakes up such a thread, other thread in backlog will need another notify
// to wake up even there is already one free worker in the pool. worker are getting
// returned consistenly, so we are not worried about this little delay. cond.broadcast
// may resolve the delay in this corner case, but letting all waiting threads to race
// for one free worker, with one winner and the rest loopin back into wait is the cost.
//
select {
case wakeupchann <- ((time.Now().UnixNano() - startTime) / int64(time.Millisecond)):
default:
}
close(wakeupchann)
}(pool.poolCond)
select {
case <-time.After(time.Millisecond * time.Duration(timeout)):
//
// lock to protect accessing clearAllEvictedSqlhash
//
pool.poolCond.L.Lock()
pool.resetIfLastBacklogEntry("timeout")
pool.decBacklogCnt()
pool.poolCond.L.Unlock()
//
// backlog timeout. change connstate to idle, and return error.
// caller will close client connection that takes connstate
// further from idle to close.
//
GetStateLog().PublishStateEvent(StateEvent{eType: ConnStateEvt, shardID: pool.ShardID, wType: pool.Type, instID: pool.InstID, oldCState: Backlog, newCState: Idle})
//
// log a backlog timeout event.
//
msg := fmt.Sprintf("timeout %d no idle child & req %s%d backlog timed out, close client connection", timeout, poolNamePrefix[pool.Type], pool.InstID)
var ename string
if longTo {
if GetConfig().EnableSharding {
ename = fmt.Sprintf("%s%d_shd%d_timeout", bcklgTimeoutEvtPrefix[pool.Type], pool.InstID, pool.ShardID)
} else {
ename = fmt.Sprintf("%s%d_timeout", bcklgTimeoutEvtPrefix[pool.Type], pool.InstID)
}
} else {
if GetConfig().EnableSharding {
ename = fmt.Sprintf("%s%d_shd%d_eviction", bcklgTimeoutEvtPrefix[pool.Type], pool.InstID, pool.ShardID)
} else {
ename = fmt.Sprintf("%s%d_eviction", bcklgTimeoutEvtPrefix[pool.Type], pool.InstID)
}
}
e := cal.NewCalEvent(cal.EventTypeWarning, ename, cal.TransOK, msg)
e.Completed()
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "backlog timeout. type:", pool.Type, ", instance:", pool.InstID)
}
//
// we are bailing out. but the waiting routine is still sleeping.
//
pool.poolCond.Signal() // try to jostle the waiting routine free
if longTo {
return nil, "", ErrBklgTimeout
}
return nil, "", ErrBklgEviction
case sleepingtime, _ := <-wakeupchann:
pool.poolCond.L.Lock() // relock after wakeup routine unlocks on its exit
//
// log a backlog wakeup event.
//
var etype string
if GetConfig().EnableSharding {
etype = fmt.Sprintf("%s%d_shd%d", bcklgEvtPrefix[pool.Type], pool.InstID, pool.ShardID)
} else {
etype = fmt.Sprintf("%s%d", bcklgEvtPrefix[pool.Type], pool.InstID)
}
if longTo {
etype += "_long"
} else {
etype += "_short"
}
ename := fmt.Sprintf("%d", (sleepingtime / GetConfig().BacklogTimeoutUnit))
e := cal.NewCalEvent(etype, ename, cal.TransOK, strconv.Itoa(int(sleepingtime)))
e.Completed()
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "exiting backlog. type:", pool.Type, ", instance:", pool.InstID)
}
workerclient = pool.getActiveWorker()
//
// we still have the lock. if there are other connections also woke up but lost the
// race to acquire the lock, backlog stats still have them counted.
// if backlog count is 1
// if workerclient != nil, we are the only one racing and will be the last one
// exiting backlog
// if workerclient == nil, we did not win the race and are going back to backlog
//
if workerclient != nil {
pool.resetIfLastBacklogEntry("acquire")
}
//
// reduce even if workerclient is null since we add to blgcnt going back to the top.
//
pool.decBacklogCnt()
//
// a connection was waken up from backlog, reset backlog and idle count.
// it is possible some other thread races ahead and get the worker
// just being returned. if that happens, we get a nil worker and return
// back into backlog at the top of the for loop.
//
GetStateLog().PublishStateEvent(StateEvent{eType: ConnStateEvt, shardID: pool.ShardID, wType: pool.Type, instID: pool.InstID, oldCState: Backlog, newCState: Idle})
}
}
ticket := fmt.Sprintf("%d", rand.Uint64())
//
// error causes coordinator to disconnect external client
//
if pool.aqmanager.alreadyDispatched(ticket, workerclient) {
msg := fmt.Sprintf("pid=%d;pooltype=%d", workerclient.pid, pool.Type)
e := cal.NewCalEvent(cal.EventTypeWarning, "double_dispatch", cal.TransOK, msg)
e.Completed()
pool.poolCond.L.Unlock()
return nil, "", errors.New("double_dispatch")
}
pool.checkoutTickets[workerclient] = ticket
pool.aqmanager.registerDispatchedWorker(ticket, workerclient)
pool.poolCond.L.Unlock()
GetStateLog().PublishStateEvent(StateEvent{eType: ConnStateEvt, shardID: pool.ShardID, wType: pool.Type, instID: pool.InstID, oldCState: Idle, newCState: Assign})
//another best effort, in case ReturnWorker() lost the race
wchLen := len(workerclient.channel())
// drain the channel if data late
if wchLen > 0 {
workerclient.DrainResponseChannel(0 /*no wait to minimize the latency*/)
}
return workerclient, ticket, nil
}
// ReturnWorker puts the worker into the list of available workers. It is called usually after a coordinator
// used it for requests and no longer needs it.
// If the pool is about to be downsize, the worker is instead terminated instead of being put in the available list.
// It the worker lifetime expired, the worker is instead terminated instead of being put in the available list.
func (pool *WorkerPool) ReturnWorker(worker *WorkerClient, ticket string) (err error) {
now := time.Now().Unix()
//
// has to lock before checking QUCE. otherwise, we check and pass QUCE, someone else lock,
// we block, someone else set QUCE to prevent worker return, someone else unlock, we lock,
// we already passed QUCE and return worker by mistake.
//
pool.poolCond.L.Lock()
if (worker == nil) || (worker.Status == wsQuce) {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "trying to return an invalid worker (bailing), size=", pool.activeQ.Len(), "type=", pool.Type, ", instance:", pool.InstID)
}
pool.poolCond.L.Unlock()
return nil
}
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "Pool::ReturnWorker(start)", worker.pid, worker.Type, worker.instID, "healthy:", pool.GetHealthyWorkersCount())
}
if (len(ticket) == 0) || (pool.checkoutTickets[worker] != ticket) {
msg := fmt.Sprintf("pid=%d;pooltype=%d", worker.pid, pool.Type)
e := cal.NewCalEvent(cal.EventTypeWarning, "rtrn_worker_using_wrong_ticket", cal.TransOK, msg)
e.Completed()
pool.poolCond.L.Unlock()
return errors.New("returning a worker using wrong ticket")
}
delete(pool.checkoutTickets, worker)
pool.aqmanager.unregisterDispatchedWorker(worker)
if (worker.channel() != nil) && (len(worker.channel()) > 0) {
e := cal.NewCalEvent(cal.EventTypeWarning, "rtrn_worker_having_unprocessed_msg", cal.TransOK, strconv.Itoa(len(worker.channel())))
e.Completed()
worker.DrainResponseChannel(time.Microsecond * 10)
}
worker.setState(wsAcpt)
if (pool.desiredSize < pool.currentSize) && (worker.ID >= pool.desiredSize) {
go func(w *WorkerClient) {
if logger.GetLogger().V(logger.Info) {
logger.GetLogger().Log(logger.Info, "Pool resized, terminate worker: pid =", worker.pid, ", pool_type =", worker.Type, ", inst =", worker.instID)
}
w.Terminate()
}(worker)
//pool.currentSize-- // restartworker actually does the size reduction.
pool.poolCond.L.Unlock()
return nil
}
skipRecycle := false
// check for the lifespan
if (worker.exitTime != 0) && (worker.exitTime <= now) {
if pool.GetHealthyWorkersCount() == int32(pool.desiredSize) {
//
// reset exit time to prevent checkWorkerLifespan from terminating this worker again.
//
worker.exitTime = 0
go func(w *WorkerClient) {
if logger.GetLogger().V(logger.Info) {
logger.GetLogger().Log(logger.Info, "Lifespan exceeded, terminate worker: pid =", worker.pid, ", pool_type =", worker.Type, ", inst =", worker.instID)
}
w.Terminate()
}(worker)
pool.poolCond.L.Unlock()
return nil
} else {
skipRecycle = true
}
}
//
// check for max requests which can change at runtime.
//
maxReqs := GetMaxRequestsPerChild()
if (maxReqs > (worker.maxReqCount + worker.maxReqCount/4)) ||
(maxReqs < (worker.maxReqCount - worker.maxReqCount/4)) {
if maxReqs >= 4 {
worker.maxReqCount = maxReqs - uint32(rand.Intn(int(maxReqs/4)))
}
if logger.GetLogger().V(logger.Info) {
logger.GetLogger().Log(logger.Info, "Max requests change pickedup pid =", worker.pid, "cnt", worker.reqCount, "max", worker.maxReqCount)
}
}
if worker.maxReqCount != 0 {
//worker.reqCount++ // count in dorequest for each statement instead of for each session.
if worker.reqCount >= worker.maxReqCount {
if pool.GetHealthyWorkersCount() == int32(pool.desiredSize) {
go func(w *WorkerClient) {
if logger.GetLogger().V(logger.Info) {
logger.GetLogger().Log(logger.Info, "Max requests exceeded, terminate worker: pid =", worker.pid, ", pool_type =", worker.Type, ", inst =", worker.instID, "cnt", worker.reqCount, "max", worker.maxReqCount)
}
w.Terminate()
}(worker)
pool.poolCond.L.Unlock()
return nil
} else {
skipRecycle = true
}
}
}
if skipRecycle {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=",pool.moduleName,"shard_id=",pool.ShardID, "HEALTHY worker Count=",pool.GetHealthyWorkersCount(),"TotalWorkers:=", pool.desiredSize)
}
calMsg := fmt.Sprintf("Recycle(worker_pid)=%d, module_name=%s,shard_id=%d", worker.pid, worker.moduleName, worker.shardID)
evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER","ReturnWorker", cal.TransOK, calMsg)
evt.Completed()
}
var pstatus = false
if GetConfig().LifoScheduler {
pstatus = pool.activeQ.PushFront(worker)
} else {
pstatus = pool.activeQ.Push(worker)
}
blgsize := atomic.LoadInt32(&(pool.backlogCnt))
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "poolsize (after return)", pool.activeQ.Len(), " type ", pool.Type, ", instance:", pool.InstID, ", pushstatus:", pstatus, ", bklg:", blgsize, worker.pid)
}
pool.poolCond.L.Unlock()
//
// notify one waiting agent on the availability of a new worker in the pool
//
pool.poolCond.Signal()
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "Pool::ReturnWorker(end after signal)", pool.activeQ.Len(), " type ", pool.Type, "healthy:", pool.GetHealthyWorkersCount(), worker.pid)
}
return nil
}
/**
* caller has lock
*/
func (pool *WorkerPool) getActiveWorker() (worker *WorkerClient) {
var workerclient *WorkerClient
var cnt = pool.activeQ.Len()
for cnt > 0 {
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "poolsize (before get)", pool.activeQ.Len(), " type ", pool.Type, ", instance:", pool.InstID)
}
workerclient = pool.activeQ.Poll().(*WorkerClient)
if workerclient.Status > wsInit {
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "Pool::SelectWorker", workerclient.pid, workerclient.Type, pool.InstID)
}
return workerclient
}
cnt--
pool.activeQ.Push(workerclient) // put it at the end of queue
}
return nil
}
// Resize resize the worker pool when the corresponding dynamic configuration changed.
// When the size is increased, the increase is immediate by spawning the necessary number of new workers.
// When the size is decreased, it removes the workers whose id is bigger then the number of workers. If
// the workers to be removed are free, they are terminated immediately, otherwise the termination is delayed
// until the worker eventually calls ReturnWorker to make itself available
func (pool *WorkerPool) Resize(newSize int) {
if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Verbose, "Resizing pool:", pool.Type, pool.currentSize, "->", newSize)
}
pool.poolCond.L.Lock()
defer pool.poolCond.L.Unlock()
if newSize == pool.desiredSize {
return
}
pool.desiredSize = newSize
if pool.desiredSize > pool.currentSize {
// worker increase
//
workers := make([]*WorkerClient, pool.desiredSize)
copy(workers, pool.workers)
pool.workers = workers
// let statlog resets the worker size
//
GetStateLog().PublishStateEvent(StateEvent{eType: WorkerResizeEvt, shardID: pool.ShardID, wType: pool.Type, instID: pool.InstID, newWSize: newSize})
for i := pool.currentSize; i < newSize; i++ {
go pool.spawnWorker(i)
}
pool.currentSize = pool.desiredSize
} else {
// remove the idle/free workers now. workers not free with ID > pool.desiredSize are terminated in ReturnWorker
remove := func(item interface{}) bool {
worker := item.(*WorkerClient)
if worker.ID >= pool.desiredSize {
// run in go routine so it doesn't block
go func(w *WorkerClient) {
if logger.GetLogger().V(logger.Info) {
logger.GetLogger().Log(logger.Info, "Pool resized, terminate worker: pid =", worker.pid, ", pool_type =", worker.Type, ", inst =", worker.instID)
}
w.Terminate()
}(worker)
return true
}
return false
}
pool.activeQ.ForEachRemove(remove)
}
}
// Healthy checks if the number of workers connected to the database is greater than 20%
func (pool *WorkerPool) Healthy() bool {
pool.poolCond.L.Lock()
size := pool.currentSize
pool.poolCond.L.Unlock()
numHealthyWorkers := atomic.LoadInt32(&(pool.numHealthyWorkers))
if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Verbose, "Healthy check pool type =", pool.Type, ", id =", pool.InstID, ", healthy = ", numHealthyWorkers, ", size =", size)
}
return (numHealthyWorkers * 100) >= (int32(size) * 20)
}
// IncHealthyWorkers called to increment the number of workers conected to the database
func (pool *WorkerPool) IncHealthyWorkers() {
atomic.AddInt32(&(pool.numHealthyWorkers), 1)
}
// DecHealthyWorkers called to decrement the number of workers conected to the database
func (pool *WorkerPool) DecHealthyWorkers() {
atomic.AddInt32(&(pool.numHealthyWorkers), -1)
}
// GetHealthyWorkersCount returns the number of workers conected to the database
func (pool *WorkerPool) GetHealthyWorkersCount() int32 {
return atomic.LoadInt32(&(pool.numHealthyWorkers))
}
// RacMaint is called when rac maintenance is needed. It marks the workers for restart, spreading
// to an interval in order to avoid connection storm to the database
func (pool *WorkerPool) RacMaint(racReq racAct) {
if logger.GetLogger().V(logger.Info) {
logger.GetLogger().Log(logger.Info, "Rac maint processing, shard =", pool.ShardID, ", inst =", racReq.instID, ", time=", racReq.tm)
}
now := time.Now().Unix()
window := GetConfig().RacRestartWindow
dbUname := ""
cnt := 0
pool.poolCond.L.Lock()
for i := 0; i < pool.currentSize; i++ {
if (pool.workers[i] != nil) && (racReq.instID == 0 || pool.workers[i].racID == racReq.instID) && (pool.workers[i].startTime < int64(racReq.tm)) {
statusTime := now
// requested time is in the past, restart starts from now
// requested time is in the future, set restart time starting from it
if now < int64(racReq.tm) {
statusTime = int64(racReq.tm)
}
if racReq.delay {
pool.workers[i].exitTime = statusTime + int64(window*i/pool.currentSize)
} else {
pool.workers[i].exitTime = statusTime
}
if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Verbose, "Rac maint activating, worker", i, pool.workers[i].pid, "exittime=", pool.workers[i].exitTime, now, window, pool.currentSize)
}
cnt++
if len(dbUname) == 0 {
dbUname = pool.workers[i].dbUname
}
}
}
pool.poolCond.L.Unlock()
// TODO: C++ worker logs one event for each worker, in the worker, so
// we keep the same. Think about changing it
for i := 0; i < cnt; i++ {
evt := cal.NewCalEvent("RAC_ID", fmt.Sprintf("%d", racReq.instID), cal.TransOK, "")
evt.Completed()
evt = cal.NewCalEvent("DB_UNAME", dbUname, cal.TransOK, "")
evt.Completed()
}
}
// checkWorkerLifespan is called periodically to check if any worker lifetime has expired and terminates it
func (pool *WorkerPool) checkWorkerLifespan() {
var skipcnt uint32
var cutofftime uint32
for {
if skipcnt < 90 {
skipcnt = skipcnt + 1
} else {
skipcnt = 0
//
// bigger one
//
idleto := uint32(GetTrIdleTimeoutMs())
dummy := uint32(GetIdleTimeoutMs())
if dummy > idleto {
idleto = dummy
}
//
// terminate worker if it stays dispatched more than 3 times the idle timeout ago.
//
idleto = 3 * idleto
//
// worker.sqlStartTimeMs is measured since the start of mux.
//
muxnow := uint32((time.Now().UnixNano() - GetStateLog().GetStartTime()) / int64(time.Millisecond))
cutofftime = muxnow - idleto
}
var workers []*WorkerClient
now := time.Now().Unix()
pool.poolCond.L.Lock()
for i := 0; i < pool.currentSize; i++ {
if (pool.workers[i] != nil) && (pool.workers[i].exitTime != 0) && (pool.workers[i].exitTime <= now) {
if pool.GetHealthyWorkersCount() < (int32(pool.desiredSize*GetConfig().MaxDesiredHealthyWorkerPct/100)) { // Should it be a config value
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=",pool.moduleName,"shard_id=",pool.ShardID, "HEALTHY worker Count=",pool.GetHealthyWorkersCount(),"TotalWorkers:", pool.desiredSize)
}
calMsg := fmt.Sprintf("module_name=%s,shard_id=%d", pool.moduleName, pool.ShardID)
evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER","checkWorkerLifespan", cal.TransOK, calMsg)
evt.Completed()
break
}
if pool.activeQ.Remove(pool.workers[i]) {
workers = append(workers, pool.workers[i])
//
// reset exit time to prevent return worker from terminating this worker again.
//
pool.workers[i].exitTime = 0
if len(workers) > pool.desiredSize*10/100 { // Should it be a config value
break //Always recycle 10% of workers at a time
}
} else {
if GetConfig().EnableDanglingWorkerRecovery {
//
// if disabled, ignore (the worker is in use, it will be checked when freed)
// otherwise check every 15 min to see if worker.sqlstarttimems is greater
// than 3 x idletimeout. catch dangling worker not returned by coordinator
//
if skipcnt == 0 {
stime := atomic.LoadUint32(&(pool.workers[i].sqlStartTimeMs))
//
// could be worker is dispatched but coordinator has not set stime yet.
//
if stime != 0 {
if stime < cutofftime {
workers = append(workers, pool.workers[i])
pool.workers[i].exitTime = 0
evt := cal.NewCalEvent(cal.EventTypeWarning, "terminate_dispatched_worker", cal.TransOK, fmt.Sprintf("%d", pool.workers[i].pid))
evt.Completed()
}
}
}
}
}
}
}
pool.poolCond.L.Unlock()
for _, w := range workers {
if logger.GetLogger().V(logger.Info) {
logger.GetLogger().Log(logger.Info, "checkworkerlifespan - Lifespan exceeded, terminate worker: pid =", w.pid, ", pool_type =", w.Type, ", inst =", w.instID ,"HEALTHY worker Count=",pool.GetHealthyWorkersCount(),"TotalWorkers:", pool.desiredSize)
}
w.Terminate()
}
time.Sleep(time.Duration(pool.lifeSpanCheckInterval) * time.Second)
}
}
/**
* check to see if backlog will become empty
* @param loc who is calling us.
*/
func (pool *WorkerPool) resetIfLastBacklogEntry(loc string) {
blgsize := atomic.LoadInt32(&(pool.backlogCnt))
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "resetIfLastBacklogEntry blgsize", blgsize, loc)
}
if blgsize == 1 {
now := time.Now().UnixNano() / int64(time.Millisecond)
duration := now - pool.aqmanager.lastEmptyTimeMs
var ename string
if GetConfig().EnableSharding {
ename = fmt.Sprintf("aq%s%d_shd%d", bcklgTimeoutEvtPrefix[pool.Type], pool.InstID, pool.ShardID)
} else {
ename = fmt.Sprintf("aq%s%d", bcklgTimeoutEvtPrefix[pool.Type], pool.InstID)
}
evt := cal.NewCalEvent("QUEUE", ename, cal.TransOK, fmt.Sprintf("%d", duration))
evt.AddDataStr("stime", fmt.Sprintf("%d&etime=%d %s", pool.aqmanager.lastEmptyTimeMs, now, loc))
evt.Completed()
pool.aqmanager.lastEmptyTimeMs = now
pool.aqmanager.clearAllEvictedSqlhash()
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "setlastempty(exit)", loc, pool.aqmanager.lastEmptyTimeMs)
}
}
}
func (pool *WorkerPool) decBacklogCnt() {
if atomic.LoadInt32(&(pool.backlogCnt)) > 0 {
atomic.AddInt32(&(pool.backlogCnt), -1)
} else {
logger.GetLogger().Log(logger.Warning, "invalid backlogCnt (acquire)")
e := cal.NewCalEvent(cal.EventTypeWarning, "negative bcklgCnt", cal.TransOK, "")
e.Completed()
atomic.StoreInt32(&(pool.backlogCnt), 0)
}
}