-
-
Notifications
You must be signed in to change notification settings - Fork 7
/
collect.go
180 lines (149 loc) · 6.67 KB
/
collect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package exporter
import (
"context"
"errors"
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"
r "gopkg.in/rethinkdb/rethinkdb-go.v6"
)
// Collect send collected metrics values to the prometheus chan
func (e *RethinkdbExporter) Collect(ch chan<- prometheus.Metric) {
start := time.Now()
ctx := context.TODO() // TODO: add scrape timeout
errcount := e.collectRethinkStats(ctx, ch)
elapsed := time.Since(start)
ch <- prometheus.MustNewConstMetric(e.metrics.scrapeErrors, prometheus.GaugeValue, float64(errcount))
ch <- prometheus.MustNewConstMetric(e.metrics.scrapeLatency, prometheus.GaugeValue, elapsed.Seconds())
log.Debug().Dur("duration", elapsed).Msg("collect finished")
}
func (e *RethinkdbExporter) collectRethinkStats(ctx context.Context, ch chan<- prometheus.Metric) int {
errcount := 0
cur, err := r.DB(r.SystemDatabase).Table(r.StatsSystemTable).Run(e.rconn, r.RunOpts{Context: ctx})
if err != nil {
log.Error().Err(err).Msg("failed to query system stats table")
errcount++
return errcount
}
defer func() {
err := cur.Close()
if err != nil {
log.Warn().Err(err).Msg("error while closing cursor")
}
}()
if cur.Err() != nil {
log.Error().Err(cur.Err()).Msg("query error from cursor")
errcount++
return errcount
}
wg := &errgroup.Group{}
var stat stat
for cur.Next(&stat) {
if cur.Err() != nil {
log.Error().Err(cur.Err()).Msg("query error from cursor")
errcount++
return errcount
}
err = e.processStat(ctx, stat, wg, ch)
if err != nil {
log.Warn().Err(err).Msg("error while processing stat")
errcount++
}
}
err = wg.Wait()
if err != nil {
log.Warn().Err(err).Msg("error while processing stat")
errcount++
}
return errcount
}
type stat struct {
ID []string `rethinkdb:"id"`
Server string `rethinkdb:"server"`
Database string `rethinkdb:"db"`
Table string `rethinkdb:"table"`
QueryEngine queryEngine `rethinkdb:"query_engine"`
StorageEngine storageEngine `rethinkdb:"storage_engine"`
}
type queryEngine struct {
ClientConnections float64 `rethinkdb:"client_connections"`
QPS float64 `rethinkdb:"queries_per_sec"`
ReadDocsPerSec float64 `rethinkdb:"read_docs_per_sec"`
WrittenDocsPerSec float64 `rethinkdb:"written_docs_per_sec"`
}
type storageEngine struct {
Cache struct {
InUseBytes float64 `rethinkdb:"in_use_bytes"`
} `rethinkdb:"cache"`
Disk struct {
ReadBytesPerSec float64 `rethinkdb:"read_bytes_per_sec"`
WrittenBytesPerSec float64 `rethinkdb:"written_bytes_per_sec"`
SpaceUsage struct {
DataBytes float64 `rethinkdb:"data_bytes"`
} `rethinkdb:"space_usage"`
} `rethinkdb:"disk"`
}
type info struct {
DocCountEstimates []float64 `rethinkdb:"doc_count_estimates"`
}
func (e *RethinkdbExporter) processStat(ctx context.Context, stat stat, wg *errgroup.Group, ch chan<- prometheus.Metric) error {
if len(stat.ID) == 0 {
return errors.New("unexpected empty stat id")
}
switch stat.ID[0] {
case "cluster":
e.processClusterStat(stat, ch)
case "server":
e.processServerStat(stat, ch)
case "table":
e.processTableStat(ctx, stat, wg, ch)
case "table_server":
e.processTableServerStat(stat, ch)
default:
return fmt.Errorf("unexpected stat id: '%v'", stat.ID[0])
}
return nil
}
func (e *RethinkdbExporter) processClusterStat(stat stat, ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(e.metrics.clusterClientConnections, prometheus.GaugeValue, stat.QueryEngine.ClientConnections)
ch <- prometheus.MustNewConstMetric(e.metrics.clusterDocsPerSecond, prometheus.GaugeValue, stat.QueryEngine.ReadDocsPerSec, readOperation)
ch <- prometheus.MustNewConstMetric(e.metrics.clusterDocsPerSecond, prometheus.GaugeValue, stat.QueryEngine.WrittenDocsPerSec, writtenOperation)
}
func (e *RethinkdbExporter) processServerStat(stat stat, ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(e.metrics.serverClientConnections, prometheus.GaugeValue, stat.QueryEngine.ClientConnections, stat.Server)
ch <- prometheus.MustNewConstMetric(e.metrics.serverDocsPerSecond, prometheus.GaugeValue, stat.QueryEngine.ReadDocsPerSec, stat.Server, readOperation)
ch <- prometheus.MustNewConstMetric(e.metrics.serverDocsPerSecond, prometheus.GaugeValue, stat.QueryEngine.WrittenDocsPerSec, stat.Server, writtenOperation)
ch <- prometheus.MustNewConstMetric(e.metrics.serverQueriesPerSecond, prometheus.GaugeValue, stat.QueryEngine.ReadDocsPerSec, stat.Server)
}
func (e *RethinkdbExporter) processTableStat(ctx context.Context, stat stat, wg *errgroup.Group, ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(e.metrics.tableDocsPerSecond, prometheus.GaugeValue, stat.QueryEngine.ReadDocsPerSec, stat.Database, stat.Table, readOperation)
ch <- prometheus.MustNewConstMetric(e.metrics.tableDocsPerSecond, prometheus.GaugeValue, stat.QueryEngine.WrittenDocsPerSec, stat.Database, stat.Table, writtenOperation)
if e.metrics.tableRowsCount != nil {
dbName := stat.Database
tableName := stat.Table
wg.Go(func() error {
var info info
err := r.DB(dbName).Table(tableName).Info().ReadOne(&info, e.rconn, r.RunOpts{Context: ctx})
if err != nil {
log.Warn().Err(err).Str("db", dbName).Str("table", tableName).Msg("failed to get table info")
return err
}
sum := 0.0
for _, e := range info.DocCountEstimates {
sum += float64(e)
}
ch <- prometheus.MustNewConstMetric(e.metrics.tableRowsCount, prometheus.GaugeValue, sum, dbName, tableName)
return nil
})
}
}
func (e *RethinkdbExporter) processTableServerStat(stat stat, ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(e.metrics.tableReplicaDocsPerSecond, prometheus.GaugeValue, stat.QueryEngine.ReadDocsPerSec, stat.Database, stat.Table, stat.Server, readOperation)
ch <- prometheus.MustNewConstMetric(e.metrics.tableReplicaDocsPerSecond, prometheus.GaugeValue, stat.QueryEngine.WrittenDocsPerSec, stat.Database, stat.Table, stat.Server, writtenOperation)
ch <- prometheus.MustNewConstMetric(e.metrics.tableReplicaCacheBytes, prometheus.GaugeValue, stat.StorageEngine.Cache.InUseBytes, stat.Database, stat.Table, stat.Server)
ch <- prometheus.MustNewConstMetric(e.metrics.tableReplicaIO, prometheus.GaugeValue, stat.StorageEngine.Disk.ReadBytesPerSec, stat.Database, stat.Table, stat.Server, readOperation)
ch <- prometheus.MustNewConstMetric(e.metrics.tableReplicaIO, prometheus.GaugeValue, stat.StorageEngine.Disk.WrittenBytesPerSec, stat.Database, stat.Table, stat.Server, writtenOperation)
ch <- prometheus.MustNewConstMetric(e.metrics.tableReplicaDataBytes, prometheus.GaugeValue, stat.StorageEngine.Disk.SpaceUsage.DataBytes, stat.Database, stat.Table, stat.Server)
}