forked from thanos-io/thanos
/
ingest.go
102 lines (86 loc) · 3.55 KB
/
ingest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package main
import (
"fmt"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
// This test will:
// 1. Spin up a number of prometheus+thanos sidecars.
// 2. Spin up a number of metrics producers for each scraper.
// 3. Run a thanos-query in front of all the scrapers & expose it.
func testIngest(logger log.Logger, opts *opts) error {
// Create k8s client.
k8sConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: *opts.kubeConfig},
&clientcmd.ConfigOverrides{CurrentContext: *opts.cluster},
).ClientConfig()
if err != nil {
return errors.Wrap(err, "failed to create client config for cluster")
}
k8sClient, err := kubernetes.NewForConfig(k8sConfig)
if err != nil {
return errors.Wrap(err, "failed to create client set")
}
// Remove any resources in the cluster.
if err := cleanCluster(logger, k8sClient); err != nil {
return err
}
// Create resources.
if err := bootstrapIngestCluster(logger, opts, k8sClient); err != nil {
return err
}
return nil
}
func bootstrapIngestCluster(logger log.Logger, opts *opts, k8sClient *kubernetes.Clientset) error {
// Create namespaces.
if err := createNamespaces(logger, k8sClient); err != nil {
return errors.Wrap(err, "failed to create namespaces")
}
// Create admin role for prometheus.
crb, sa := createAdminRole()
if _, err := k8sClient.CoreV1().ServiceAccounts(promNamespace).Create(sa); err != nil {
return errors.Wrap(err, "failed to create monitoring service account")
}
if _, err := k8sClient.RbacV1().ClusterRoleBindings().Create(crb); err != nil {
return errors.Wrap(err, "failed to create clusterrolebinding")
}
// Create headless services for thanos gossip members.
if _, err := k8sClient.CoreV1().Services(thanosNamespace).Create(createThanosGossipService(thanosNamespace)); err != nil {
return errors.Wrap(err, "failed to create headless service for thanos gossip")
}
if _, err := k8sClient.CoreV1().Services(promNamespace).Create(createThanosGossipService(promNamespace)); err != nil {
return errors.Wrap(err, "failed to create headless service for thanos gossip")
}
cfg, err := createPrometheusConfig("^loadgen-$(MON_ID)-.*$")
if err != nil {
return err
}
if _, err := k8sClient.CoreV1().ConfigMaps(promNamespace).Create(cfg); err != nil {
return errors.Wrap(err, "failed to create prometheus configmap")
}
for i := 0; i < *opts.numPrometheus; i++ {
name := fmt.Sprintf("mon-%d", i)
loadgenName := fmt.Sprintf("loadgen-%s", name)
level.Info(logger).Log("msg", "Creating loadgen", "name", loadgenName)
if _, err := k8sClient.AppsV1().ReplicaSets(loadgenNamespace).Create(createLoadgen(loadgenName, int32(*opts.numLoadgen))); err != nil {
return errors.Wrapf(err, "failed to create loadgen (%s)", loadgenName)
}
level.Info(logger).Log("msg", "Creating prometheus", "name", name)
if _, err := k8sClient.AppsV1().StatefulSets(promNamespace).Create(createPrometheus(opts, name, "")); err != nil {
return errors.Wrapf(err, "failed to create prometheus (%s)", name)
}
}
// Create thanos query layer.
level.Info(logger).Log("msg", "Creating thanos query layer")
svc, pod := createThanosQuery(opts)
if _, err := k8sClient.CoreV1().Services(thanosNamespace).Create(svc); err != nil {
return errors.Wrap(err, "failed to create thanos query service")
}
if _, err := k8sClient.CoreV1().Pods(thanosNamespace).Create(pod); err != nil {
return errors.Wrap(err, "failed to create thanos query pod")
}
return nil
}