/
collector.go
103 lines (93 loc) · 2.89 KB
/
collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package internal
import (
"context"
"fmt"
"github.com/ppussar/mongodb_exporter/internal/logger"
"github.com/ppussar/mongodb_exporter/internal/wrapper"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/mgo.v2/bson"
"time"
)
// Collector queries one prometheus metric from mongoDB
type Collector struct {
desc *prometheus.Desc
config Metric
Mongo wrapper.IConnection
varTagValueNames []string
ErrorC chan error
}
var log = logger.GetInstance()
// NewCollector constructor
// initializes every descriptor and returns a pointer to the collector
func NewCollector(m Metric, con wrapper.IConnection, errorC chan error) *Collector {
varTagNames := make([]string, 0, len(m.TagAttributes))
varTagValues := make([]string, 0, len(m.TagAttributes))
for key, value := range m.TagAttributes {
varTagNames = append(varTagNames, key)
varTagValues = append(varTagValues, value)
}
return &Collector{
desc: prometheus.NewDesc(
m.Name,
m.Help,
varTagNames,
m.Tags,
),
config: m,
Mongo: con,
varTagValueNames: varTagValues,
ErrorC: errorC,
}
}
// Describe must be implemented by a prometheus collector
// It essentially writes all descriptors to the prometheus desc channel.
func (col *Collector) Describe(ch chan<- *prometheus.Desc) {
ch <- col.desc
}
// Collect implements required collect function for all prometheus collectors
func (col *Collector) Collect(ch chan<- prometheus.Metric) {
var err error
var cur wrapper.ICursor
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if len(col.config.Aggregate) != 0 {
cur, err = col.Mongo.Aggregate(ctx, col.config.Db, col.config.Collection, col.config.Aggregate)
} else if len(col.config.Find) != 0 {
cur, err = col.Mongo.Find(ctx, col.config.Db, col.config.Collection, col.config.Find)
} else {
log.Error(fmt.Sprintf("Nothing to do, check config of metric: %v", col))
}
if err != nil {
log.Error(fmt.Sprintf("Error during collect: %v", err))
col.ErrorC <- err
return
}
defer cur.Close(context.Background())
for cur.Next(ctx) {
var result bson.M
err := cur.Decode(&result)
if err != nil {
log.Error(fmt.Sprintf("Error during collect: %v", err))
col.ErrorC <- err
return
}
val := result[col.config.MetricsAttribute]
tagValues := col.extractVarTagsValues(result)
ch <- prometheus.MustNewConstMetric(col.desc, prometheus.GaugeValue, val.(float64), tagValues...)
}
if err := cur.Err(); err != nil {
log.Error(fmt.Sprintf("Error during collect: %v", err))
col.ErrorC <- err
}
}
func (col *Collector) extractVarTagsValues(result bson.M) []string {
tagValues := make([]string, len(col.varTagValueNames))
for i, tagName := range col.varTagValueNames {
tagValue := result[tagName]
tagValues[i] = tagValue.(string)
}
return tagValues
}
func (col *Collector) String() string {
return col.desc.String()
}