forked from open-falcon/falcon-plus
/
sender.go
73 lines (58 loc) · 1.76 KB
/
sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package sender
import (
"log"
pfc "github.com/niean/goperfcounter"
cmodel "github.com/open-falcon/falcon-plus/common/model"
nlist "github.com/toolkits/container/list"
nproc "github.com/toolkits/proc"
backend "github.com/open-falcon/falcon-plus/common/backend_pool"
"github.com/open-falcon/falcon-plus/modules/gateway/g"
)
const (
DefaultSendQueueMaxSize = 1024000 //102.4w
)
var (
SenderQueue = nlist.NewSafeListLimited(DefaultSendQueueMaxSize)
SenderConnPools *backend.SafeRpcConnPools
TransferMap = make(map[string]string, 0)
TransferHostnames = make([]string, 0)
TransferSendCnt = make(map[string]*nproc.SCounterQps, 0)
TransferSendFailCnt = make(map[string]*nproc.SCounterQps, 0)
)
func Start() {
initConnPools()
startSendTasks()
startSenderCron()
log.Println("send.Start, ok")
}
func Push2SendQueue(items []*cmodel.MetaData) {
for _, item := range items {
// statistics
pk := item.PK()
g.RecvDataTrace.Trace(pk, item)
g.RecvDataFilter.Filter(pk, item.Value, item)
isOk := SenderQueue.PushFront(item)
// statistics
if !isOk {
pfc.Meter("SendDrop", 1)
}
}
}
func initConnPools() {
cfg := g.Config()
// init transfer global configs
addrs := make([]string, 0)
for hn, addr := range cfg.Transfer.Cluster {
TransferHostnames = append(TransferHostnames, hn)
addrs = append(addrs, addr)
TransferMap[hn] = addr
}
// init transfer send cnt
for hn, addr := range cfg.Transfer.Cluster {
TransferSendCnt[hn] = nproc.NewSCounterQps(hn + ":" + addr)
TransferSendFailCnt[hn] = nproc.NewSCounterQps(hn + ":" + addr)
}
// init conn pools
SenderConnPools = backend.CreateSafeJsonrpcConnPools(int(cfg.Transfer.MaxConns), int(cfg.Transfer.MaxIdle),
int(cfg.Transfer.ConnTimeout), int(cfg.Transfer.CallTimeout), addrs)
}