-
Notifications
You must be signed in to change notification settings - Fork 1
/
plugin.go
104 lines (89 loc) · 3 KB
/
plugin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package prometheus
import (
"fmt"
"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type Plugin struct {
sentCounter prometheus.Counter
receivedCounter prometheus.Counter
sinkErrorsCounter prometheus.Counter
sourceErrorsCounter prometheus.Counter
writeOptions influxdb3.WriteOptions
procCounters map[string][]prometheus.Counter
procExecutionTimeGauges map[string]prometheus.Gauge
groupName string
pipelineId int
}
func NewPlugin(config Config) (*Plugin, error) {
plugin := &Plugin{
groupName: config.GroupName,
pipelineId: config.PipelineId,
sentCounter: promauto.NewCounter(prometheus.CounterOpts{
Name: "sent_messages",
Help: "The total number of messages send to the sink plugin",
}),
receivedCounter: promauto.NewCounter(prometheus.CounterOpts{
Name: "received_messages",
Help: "The total number of messages received by source plugin",
}),
sinkErrorsCounter: promauto.NewCounter(prometheus.CounterOpts{
Name: "sink_errors",
Help: "The total number of messages that were failed to be sent to sink",
}),
sourceErrorsCounter: promauto.NewCounter(prometheus.CounterOpts{
Name: "source_errors",
Help: "The total number of errors from source connector",
}),
procCounters: map[string][]prometheus.Counter{},
procExecutionTimeGauges: map[string]prometheus.Gauge{},
}
return plugin, nil
}
func (p *Plugin) IncrementReceivedCounter() {
p.receivedCounter.Inc()
}
func (p *Plugin) IncrementSentCounter() {
p.sentCounter.Inc()
}
func (p *Plugin) IncrementSinkErrCounter() {
p.sinkErrorsCounter.Inc()
}
func (p *Plugin) IncrementSourceErrCounter() {
p.sourceErrorsCounter.Inc()
}
func (p *Plugin) RegisterProcessors(processors []string) {
for _, proc := range processors {
p.procExecutionTimeGauges[proc] = promauto.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("%s_execution_time", proc),
Help: "Time taken to process message",
})
p.procCounters[proc] = []prometheus.Counter{
promauto.NewCounter(prometheus.CounterOpts{
Name: fmt.Sprintf("%s_dropped_messages", proc),
Help: "Messages that were dropped, filtered by the processor",
}),
promauto.NewCounter(prometheus.CounterOpts{
Name: fmt.Sprintf("%s_sent_messages", proc),
Help: "The total number of messages send to the next sink/processor plugin",
}),
promauto.NewCounter(prometheus.CounterOpts{
Name: fmt.Sprintf("%s_received_messages", proc),
Help: "The total number of messages send to the sink plugin",
}),
}
}
}
func (p *Plugin) SetProcessorExecutionTime(proc string, time int64) {
p.procExecutionTimeGauges[proc].Set(float64(time))
}
func (p *Plugin) IncrementProcessorDroppedMessages(proc string) {
p.procCounters[proc][0].Inc()
}
func (p *Plugin) IncrementProcessorReceivedMessages(proc string) {
p.procCounters[proc][1].Inc()
}
func (p *Plugin) IncrementProcessorSentMessages(proc string) {
p.procCounters[proc][2].Inc()
}