-
Notifications
You must be signed in to change notification settings - Fork 75
/
compactor.go
199 lines (181 loc) · 8.83 KB
/
compactor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package compactor
import (
"net"
"time"
"github.com/observatorium/observatorium/configuration_go/kubegen/cmdopt"
kghelpers "github.com/observatorium/observatorium/configuration_go/kubegen/helpers"
"github.com/observatorium/observatorium/configuration_go/kubegen/workload"
"github.com/observatorium/observatorium/configuration_go/schemas/log"
thanostime "github.com/observatorium/observatorium/configuration_go/schemas/thanos/time"
monv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
)
const (
dataVolumeName string = "data"
defaultHTTPPort int = 10902
)
// CompactorOptions represents the options/flags for the compactor.
// See https://thanos.io/tip/components/compact.md/#flags for details.
type CompactorOptions struct {
BlockFilesConcurrency int `opt:"block-files-concurrency"`
BlockMetaFetchConcurrency int `opt:"block-meta-fetch-concurrency"`
BlockViewerGlobalSyncBlockInterval time.Duration `opt:"block-viewer.global.sync-block-interval"`
BlockViewerGlobalSyncBlockTimeout time.Duration `opt:"block-viewer.global.sync-block-timeout"`
BucketWebLabel string `opt:"bucket-web-label"`
CompactBlocksFetchConcurrency int `opt:"compact.blocks-fetch-concurrency"`
CompactCleanupInterval time.Duration `opt:"compact.cleanup-interval"`
CompactConcurrency int `opt:"compact.concurrency"`
CompactProgressInterval time.Duration `opt:"compact.progress-interval"`
ConsistencyDelay time.Duration `opt:"consistency-delay"`
DataDir string `opt:"data-dir"`
DeduplicationFunc string `opt:"deduplication.func"`
DeduplicationReplicaLabel string `opt:"deduplication.replica-label"`
DeleteDelay time.Duration `opt:"delete-delay"`
DownsampleConcurrency int `opt:"downsample.concurrency"`
DownsamplingDisable bool `opt:"downsampling.disable"`
HashFunc string `opt:"hash-func"`
HttpAddress *net.TCPAddr `opt:"http-address"`
HttpGracePeriod time.Duration `opt:"http-grace-period"`
HttpConfig string `opt:"http.config"`
LogFormat log.Format `opt:"log.format"`
LogLevel log.Level `opt:"log.level"`
MaxTime *thanostime.TimeOrDurationValue `opt:"max-time"`
MinTime *thanostime.TimeOrDurationValue `opt:"min-time"`
ObjstoreConfig string `opt:"objstore.config"`
ObjstoreConfigFile string `opt:"objstore.config-file"`
RetentionResolution1h time.Duration `opt:"retention.resolution-1h"`
RetentionResolution5m time.Duration `opt:"retention.resolution-5m"`
RetentionResolutionRaw time.Duration `opt:"retention.resolution-raw"`
SelectorRelabelConfig string `opt:"selector.relabel-config"`
SelectorRelabelConfigFile string `opt:"selector.relabel-config-file"`
TracingConfig string `opt:"tracing.config"`
TracingConfigFile string `opt:"tracing.config-file"`
Version bool `opt:"version,noval"`
Wait bool `opt:"wait,noval"`
WaitInterval time.Duration `opt:"wait-interval"`
WebDisable bool `opt:"web.disable"`
WebDisableCors bool `opt:"web.disable-cors"`
WebExternalPrefix string `opt:"web.external-prefix"`
WebPrefixHeader string `opt:"web.prefix-header"`
WebRoutePrefix string `opt:"web.route-prefix"`
// Extra options not officially supported by the compactor.
cmdopt.ExtraOpts
}
type CompactorStatefulSet struct {
options *CompactorOptions
workload.StatefulSetWorkload
}
func NewDefaultOptions() *CompactorOptions {
return &CompactorOptions{
ObjstoreConfig: "$(OBJSTORE_CONFIG)",
Wait: true,
LogLevel: "warn",
LogFormat: "logfmt",
DataDir: "/var/thanos/compactor",
RetentionResolutionRaw: time.Hour * 24 * 365,
DeleteDelay: time.Hour * 24 * 2,
CompactConcurrency: 1,
DownsampleConcurrency: 1,
DeduplicationReplicaLabel: "replica",
}
}
// NewCompactor returns a new compactor statefulset with default values.
// It allows generating the all the manifests for the compactor.
func NewCompactor(opts *CompactorOptions, namespace, imageTag string) *CompactorStatefulSet {
if opts == nil {
opts = NewDefaultOptions()
}
commonLabels := map[string]string{
workload.NameLabel: "thanos-compact",
workload.InstanceLabel: "observatorium",
workload.PartOfLabel: "observatorium",
workload.ComponentLabel: "database-compactor",
workload.VersionLabel: imageTag,
}
labelSelectors := map[string]string{
workload.NameLabel: commonLabels[workload.NameLabel],
workload.InstanceLabel: commonLabels[workload.InstanceLabel],
}
probePort := kghelpers.GetPortOrDefault(defaultHTTPPort, opts.HttpAddress)
ssWorkload := workload.StatefulSetWorkload{
Replicas: 1,
VolumeSize: "50Gi",
PodConfig: workload.PodConfig{
Image: "quay.io/thanos/thanos",
ImageTag: imageTag,
ImagePullPolicy: corev1.PullIfNotPresent,
Name: "observatorium-thanos-compact",
Namespace: namespace,
CommonLabels: commonLabels,
ContainerResources: kghelpers.NewResourcesRequirements("2", "3", "2000Mi", "3000Mi"),
Affinity: kghelpers.NewAntiAffinity(nil, labelSelectors),
EnableServiceMonitor: true,
LivenessProbe: kghelpers.NewProbe("/-/healthy", probePort, kghelpers.ProbeConfig{
FailureThreshold: 4,
PeriodSeconds: 30,
}),
ReadinessProbe: kghelpers.NewProbe("/-/ready", probePort, kghelpers.ProbeConfig{
FailureThreshold: 20,
PeriodSeconds: 5,
}),
TerminationGracePeriodSeconds: 120,
Env: []corev1.EnvVar{
kghelpers.NewEnvFromSecret("OBJSTORE_CONFIG", "objectStore-secret", "thanos.yaml"),
kghelpers.NewEnvFromField("HOST_IP_ADDRESS", "status.hostIP"),
},
ConfigMaps: make(map[string]map[string]string),
Secrets: make(map[string]map[string][]byte),
},
}
return &CompactorStatefulSet{
options: opts,
StatefulSetWorkload: ssWorkload,
}
}
// Manifests returns the manifests for the compactor.
// It includes the statefulset, the service, the service monitor, the service account and the config maps required by the containers.
func (c *CompactorStatefulSet) Objects() []runtime.Object {
container := c.makeContainer()
return c.StatefulSetWorkload.Objects(container)
}
func (c *CompactorStatefulSet) makeContainer() *workload.Container {
httpPort := kghelpers.GetPortOrDefault(defaultHTTPPort, c.options.HttpAddress)
kghelpers.CheckProbePort(httpPort, c.LivenessProbe)
kghelpers.CheckProbePort(httpPort, c.ReadinessProbe)
// Print warning if data directory is not specified.
if c.options.DataDir == "" {
panic("data directory is not specified for the statefulset.")
}
ret := c.ToContainer()
ret.Name = "thanos"
ret.Args = append([]string{"compact"}, cmdopt.GetOpts(c.options)...)
ret.Ports = []corev1.ContainerPort{
{
Name: "http",
ContainerPort: int32(httpPort),
Protocol: corev1.ProtocolTCP,
},
}
ret.ServicePorts = []corev1.ServicePort{
kghelpers.NewServicePort("http", httpPort, httpPort),
}
ret.MonitorPorts = []monv1.Endpoint{
{
Port: "http",
RelabelConfigs: kghelpers.GetDefaultServiceMonitorRelabelConfig(),
},
}
ret.VolumeClaims = append(ret.VolumeClaims, workload.PersistentVolumeClaim{
Name: dataVolumeName,
Size: c.VolumeSize,
Class: c.VolumeType,
})
ret.VolumeMounts = []corev1.VolumeMount{
{
Name: dataVolumeName,
MountPath: c.options.DataDir,
},
}
return ret
}