/
instrumented_inserter.go
74 lines (64 loc) · 2.28 KB
/
instrumented_inserter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package generic
import (
"context"
"github.com/lawrencejones/pgsink/internal/telem"
"github.com/lawrencejones/pgsink/pkg/changelog"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opencensus.io/trace"
)
var (
sinkInsertDurationSeconds = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "pgsink_sink_insert_duration_seconds",
Help: "Distribution of time spent issuing inserts, by route (if applicable)",
Buckets: prometheus.ExponentialBuckets(0.125, 2, 12), // 0.125 -> 512s
},
[]string{"route"},
)
sinkInsertBatchSize = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "pgsink_sink_insert_batch_size",
Help: "Distribution of insert batch sizes",
Buckets: prometheus.ExponentialBuckets(1, 2, 13), // 1 -> 8192
},
[]string{"route"},
)
)
type instrumentedInserter struct {
Inserter
route string
durationSeconds, batchSize prometheus.ObserverVec
}
// NewInstrumentedInserter wraps an existing synchronous inserter, causing every insert to
// be logged, capture batch size and duration in metrics, and create new spans.
func NewInstrumentedInserter(route Route, i Inserter) Inserter {
labels := prometheus.Labels(map[string]string{"route": string(route)})
return &instrumentedInserter{
Inserter: i,
route: string(route),
durationSeconds: sinkInsertDurationSeconds.MustCurryWith(labels),
batchSize: sinkInsertBatchSize.MustCurryWith(labels),
}
}
func (i *instrumentedInserter) Insert(ctx context.Context, modifications []*changelog.Modification) (count int, lsn *uint64, err error) {
ctx, span, logger := telem.StartSpan(ctx, "pkg/sinks/generic.Inserter.Insert")
defer span.End()
batchSize := len(modifications)
span.AddAttributes(
trace.StringAttribute("route", i.route),
trace.Int64Attribute("batch_size", int64(batchSize)),
)
defer prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
logger.Log("event", "insert",
"route", i.route,
"duration", v,
"batch_size", batchSize,
"count", count,
"lsn", lsn,
"error", err)
i.durationSeconds.WithLabelValues().Observe(v)
i.batchSize.WithLabelValues().Observe(float64(batchSize))
})).ObserveDuration()
return i.Inserter.Insert(ctx, modifications)
}