/
vector.go
130 lines (112 loc) · 3.86 KB
/
vector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package reaper
import (
"bytes"
"context"
"text/template"
"github.com/go-logr/logr"
cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
api "github.com/k8ssandra/k8ssandra-operator/apis/reaper/v1alpha1"
telemetryapi "github.com/k8ssandra/k8ssandra-operator/apis/telemetry/v1alpha1"
reaperpkg "github.com/k8ssandra/k8ssandra-operator/pkg/reaper"
"github.com/k8ssandra/k8ssandra-operator/pkg/reconciliation"
"github.com/k8ssandra/k8ssandra-operator/pkg/telemetry"
"github.com/k8ssandra/k8ssandra-operator/pkg/vector"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
func (r *ReaperReconciler) reconcileVectorConfigMap(
ctx context.Context,
reaper api.Reaper,
actualDc *cassdcapi.CassandraDatacenter,
remoteClient client.Client,
dcLogger logr.Logger,
) (ctrl.Result, error) {
namespace := reaper.Namespace
configMapKey := client.ObjectKey{
Namespace: namespace,
Name: reaperpkg.VectorAgentConfigMapName(actualDc.Spec.ClusterName, actualDc.DatacenterName()),
}
if reaper.Spec.Telemetry.IsVectorEnabled() {
// Create the vector toml config content
toml, err := CreateVectorToml(reaper.Spec.Telemetry)
if err != nil {
return ctrl.Result{}, err
}
desiredVectorConfigMap := reaperpkg.CreateVectorConfigMap(namespace, toml, *actualDc)
if err := ctrl.SetControllerReference(&reaper, desiredVectorConfigMap, r.Scheme); err != nil {
dcLogger.Error(err, "Failed to set controller reference on new Reaper Vector ConfigMap", "ConfigMap", configMapKey)
return ctrl.Result{}, err
}
recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *desiredVectorConfigMap)
switch {
case recRes.IsError():
return ctrl.Result{}, recRes.GetError()
case recRes.IsRequeue():
return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil
}
} else {
if err := deleteConfigMapIfExists(ctx, remoteClient, configMapKey, dcLogger); err != nil {
return ctrl.Result{}, err
}
}
dcLogger.Info("Reaper Vector Agent ConfigMap reconciliation complete")
return ctrl.Result{}, nil
}
func deleteConfigMapIfExists(ctx context.Context, remoteClient client.Client, configMapKey client.ObjectKey, logger logr.Logger) error {
configMap := &corev1.ConfigMap{}
if err := remoteClient.Get(ctx, configMapKey, configMap); err != nil {
if errors.IsNotFound(err) {
return nil
}
logger.Error(err, "Failed to get ConfigMap", configMapKey)
return err
}
if err := remoteClient.Delete(ctx, configMap); err != nil {
logger.Error(err, "Failed to delete ConfigMap", configMapKey)
return err
}
return nil
}
func CreateVectorToml(telemetrySpec *telemetryapi.TelemetrySpec) (string, error) {
vectorConfigToml := `
[sinks.console]
type = "console"
inputs = [ "reaper_metrics" ]
target = "stdout"
[sinks.console.encoding]
codec = "json"`
if telemetrySpec.Vector.Components != nil {
// Vector components are provided in the Telemetry spec, build the Vector sink config from them
vectorConfigToml = telemetry.BuildCustomVectorToml(telemetrySpec)
}
var scrapeInterval int32 = vector.DefaultScrapeInterval
if telemetrySpec.Vector.ScrapeInterval != nil {
scrapeInterval = int32(telemetrySpec.Vector.ScrapeInterval.Seconds())
}
config := vector.VectorConfig{
Sinks: vectorConfigToml,
ScrapePort: reaperpkg.MetricsPort,
ScrapeInterval: scrapeInterval,
}
vectorTomlTemplate := `
data_dir = "/var/lib/vector"
[api]
enabled = false
[sources.reaper_metrics]
type = "prometheus_scrape"
endpoints = [ "http://localhost:{{ .ScrapePort }}/prometheusMetrics" ]
scrape_interval_secs = {{ .ScrapeInterval }}
{{ .Sinks }}`
t, err := template.New("toml").Parse(vectorTomlTemplate)
if err != nil {
panic(err)
}
vectorToml := new(bytes.Buffer)
err = t.Execute(vectorToml, config)
if err != nil {
panic(err)
}
return vectorToml.String(), nil
}