forked from thanos-io/thanos
-
Notifications
You must be signed in to change notification settings - Fork 0
/
responsiveness.go
189 lines (159 loc) · 5.44 KB
/
responsiveness.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package main
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"time"
"cloud.google.com/go/storage"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/benchmark/pkg/tsdb"
"github.com/pkg/errors"
"google.golang.org/api/iterator"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
// This test will:
// 1. Generate a large amount of historic metric data, and store it in GCS.
// 2. Bootstrap a cluster containing a thanos-store and a thanos-query.
// 3. Perform queries over the store & measure responsiveness.
func testStoreResponsiveness(logger log.Logger, opts *opts) error {
tmpDir, err := ioutil.TempDir("", "thanos")
if err != nil {
return errors.Wrap(err, "failed to create temporary directory for holding tsdb data")
}
tsdbDir := filepath.Join(tmpDir, "tsdb")
defer func() {
if err := os.RemoveAll(tsdbDir); err != nil {
level.Error(logger).Log("failed to remove tsdb dir", tsdbDir)
}
}()
// Create local tsdb.
level.Info(logger).Log("msg", "Writing historic timeseries", "num-timeseries", opts.numTimeseries, "output-dir", tsdbDir)
tsdbEndTime := time.Now()
tsdbStartTime := tsdbEndTime.Add(-*opts.tsdbLength)
if err := tsdb.CreateThanosTSDB(tsdb.Opts{
OutputDir: tsdbDir,
NumTimeseries: *opts.numTimeseries,
StartTime: tsdbStartTime,
EndTime: tsdbEndTime,
SampleInterval: time.Second * 15,
BlockLength: *opts.blockLength,
}); err != nil {
return errors.Wrap(err, "failed to generate tsdb")
}
// Create k8s client.
k8sConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: *opts.kubeConfig},
&clientcmd.ConfigOverrides{CurrentContext: *opts.cluster},
).ClientConfig()
if err != nil {
return errors.Wrap(err, "failed to create client config for cluster")
}
k8sClient, err := kubernetes.NewForConfig(k8sConfig)
if err != nil {
return errors.Wrap(err, "failed to create client set")
}
// Remove any resources in the cluster.
if err := cleanCluster(logger, k8sClient); err != nil {
return err
}
// Create resources for this cluster.
if err := bootstrapStoreResponsivenessCluster(logger, opts, k8sClient); err != nil {
return err
}
// Safety prompt.
fmt.Printf("WARNING: this will delete all data in the bucket (%s). Do you want to continue? Y/n: ", *opts.bucket)
var resp string
if _, err := fmt.Scanln(&resp); err != nil {
return errors.Wrap(err, "failed to confirm intput")
}
if resp != "Y" && resp != "y" {
return nil
}
level.Info(logger).Log("msg", "Uploading timeseries to GCS")
// Upload TSDB.
if err := pushToGCS(logger, opts, tsdbDir); err != nil {
return errors.Wrap(err, "failed to upload data to gcs")
}
// Collect query information.
results, err := getQueryTimes(logger, opts, k8sClient, "thanos-query", thanosNamespace, "querier-thanos", thanosHTTPPort)
if err != nil {
return err
}
level.Info(logger).Log("results", string(results))
return nil
}
func bootstrapStoreResponsivenessCluster(logger log.Logger, opts *opts, k8sClient *kubernetes.Clientset) error {
// Create namespaces.
if err := createNamespaces(logger, k8sClient); err != nil {
return errors.Wrap(err, "failed to create namespaces")
}
// Create headless service for thanos gossip members.
if _, err := k8sClient.CoreV1().Services(thanosNamespace).Create(createThanosGossipService(thanosNamespace)); err != nil {
return errors.Wrap(err, "failed to create headless service for thanos gossip")
}
// Create thanos store.
level.Info(logger).Log("msg", "Creating thanos store")
if _, err := k8sClient.CoreV1().Pods(thanosNamespace).Create(createThanosStore(opts, "improbable-thanos-loadtest")); err != nil {
return errors.Wrap(err, "failed to create thanos store pod")
}
// Create thanos query layer.
level.Info(logger).Log("msg", "Creating thanos query layer")
svc, pod := createThanosQuery(opts)
if _, err := k8sClient.CoreV1().Services(thanosNamespace).Create(svc); err != nil {
return errors.Wrap(err, "failed to create thanos query service")
}
if _, err := k8sClient.CoreV1().Pods(thanosNamespace).Create(pod); err != nil {
return errors.Wrap(err, "failed to create thanos query pod")
}
return nil
}
func pushToGCS(logger log.Logger, opts *opts, uploadDir string) error {
ctx := context.Background()
client, err := storage.NewClient(ctx)
if err != nil {
return err
}
bkt := client.Bucket(*opts.bucket)
objIt := bkt.Objects(ctx, nil)
for {
obj, err := objIt.Next()
if err == iterator.Done {
break
} else if err != nil {
return err
}
level.Info(logger).Log("Deleting file", obj.Name)
if err := bkt.Object(obj.Name).Delete(ctx); err != nil {
level.Warn(logger).Log("failed to delete file", obj.Name, "error", err)
continue
}
}
return filepath.Walk(uploadDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
f, err := os.Open(path)
if err != nil {
return errors.Wrapf(err, "failed to open file (%s)", path)
}
trimmedPath := strings.TrimPrefix(path, uploadDir+string(filepath.Separator))
level.Info(logger).Log("Uploading file", trimmedPath)
w := bkt.Object(trimmedPath).NewWriter(ctx)
if _, err := io.Copy(w, f); err != nil {
return errors.Wrapf(err, "failed to upload file (%s)", trimmedPath)
}
if err := w.Close(); err != nil {
return errors.Wrapf(err, "failed to close file (%s)", trimmedPath)
}
return nil
})
}