/
stats.go
67 lines (59 loc) · 2.45 KB
/
stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// Copyright 2022 V Kontakte LLC
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
package main
import (
"fmt"
"strconv"
"github.com/vkcom/statshouse/internal/metajournal"
"github.com/vkcom/statshouse/internal/agent"
"github.com/vkcom/statshouse/internal/receiver"
)
type statsHandler struct {
receiversUDP []*receiver.UDP
receiverRPC *receiver.RPCReceiver
sh2 *agent.Agent
metricsStorage *metajournal.MetricsStorage
}
func (h statsHandler) handleStats(stats map[string]string) {
var statPacketsTotal uint64
var statBytesTotal uint64
var statBatchesTotalOK uint64
var statBatchesTotalErr uint64
for i, u := range h.receiversUDP {
v := u.StatPacketsTotal()
statPacketsTotal += v
if len(h.receiversUDP) > 1 {
stats[fmt.Sprintf("statshouse_udp_recv_%d_packets", i)] = strconv.FormatUint(v, 10)
}
v = u.StatBytesTotal()
statBytesTotal += v
if len(h.receiversUDP) > 1 {
stats[fmt.Sprintf("statshouse_udp_recv_%d_bytes", i)] = strconv.FormatUint(v, 10)
}
v = u.StatBatchesTotalOK()
statBatchesTotalOK += v
if len(h.receiversUDP) > 1 {
stats[fmt.Sprintf("statshouse_udp_recv_%d_batches_ok", i)] = strconv.FormatUint(v, 10)
}
v = u.StatBatchesTotalErr()
statBatchesTotalErr += v
if len(h.receiversUDP) > 1 {
stats[fmt.Sprintf("statshouse_udp_recv_%d_batches_err", i)] = strconv.FormatUint(v, 10)
}
}
stats["statshouse_udp_recv_packets"] = strconv.FormatUint(statPacketsTotal, 10)
stats["statshouse_udp_recv_bytes"] = strconv.FormatUint(statBytesTotal, 10)
stats["statshouse_udp_recv_batches_ok"] = strconv.FormatUint(statBatchesTotalOK, 10)
stats["statshouse_udp_recv_batches_err"] = strconv.FormatUint(statBatchesTotalErr, 10)
stats["statshouse_rpc_recv_calls_ok"] = strconv.FormatUint(h.receiverRPC.StatCallsTotalOK.Load(), 10)
stats["statshouse_rpc_recv_calls_err"] = strconv.FormatUint(h.receiverRPC.StatCallsTotalErr.Load(), 10)
stats["statshouse_journal_version"] = strconv.FormatInt(h.metricsStorage.Version(), 10)
for i, s := range h.sh2.Shards {
stats[fmt.Sprintf("statshouse_queue_size_disk_%d", i)] = fmt.Sprintf("%d", s.HistoricBucketsDataSizeDisk())
stats[fmt.Sprintf("statshouse_queue_size_memory_%d", i)] = fmt.Sprintf("%d", s.HistoricBucketsDataSizeMemory())
stats[fmt.Sprintf("statshouse_shard_alive_%d", i)] = fmt.Sprintf("%v", s.IsAlive())
}
}