/
dispatcher_caller.go
57 lines (49 loc) · 1.35 KB
/
dispatcher_caller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package geetplaban
import (
"time"
)
type DispPatcherCaller struct {
dis Dispatcher
col Collector
ackr Acker
nanodelay int64
ticker *time.Ticker
id uint
max_unacked uint32
}
func (dcaller *DispPatcherCaller) NewDispatcher(d Dispatcher, coll Collector,
acker Acker, nanodelay int64, id uint) *DispPatcherCaller {
if nanodelay < 0 {
nanodelay = -nanodelay
}
ticker := time.NewTicker(time.Nanosecond * time.Duration(nanodelay))
return &DispPatcherCaller{d, coll, acker, nanodelay, ticker, id,
GetConfig().GetMaxUnacked()}
}
func (dcaller *DispPatcherCaller) Stop() {
dcaller.ticker.Stop()
}
func (dcaller *DispPatcherCaller) runCaller() {
dcaller.dis.Prepare(dcaller.col, nil)
max_unacked := int32(GetConfig().GetMaxUnacked())
for _ = range dcaller.ticker.C {
if dcaller.col.getEmittedTracked() >= max_unacked {
LOG.Infof("MAX unacked id:%d unacked:%d", dcaller.id, dcaller.col.getEmittedTracked())
continue
}
dcaller.dis.LookForWork()
}
dcaller.dis.Shutdown()
}
func (dcaller *DispPatcherCaller) Fail(id string) {
dcaller.col.updEmittedTracked(-1)
dcaller.dis.Fail(id)
}
func (dcaller *DispPatcherCaller) TimedOut(id string) {
dcaller.col.updEmittedTracked(-1)
dcaller.dis.TimedOut(id)
}
func (dcaller *DispPatcherCaller) Ack(id string) {
dcaller.col.updEmittedTracked(-1)
dcaller.dis.Ack(id)
}