diff --git a/CHANGELOG.md b/CHANGELOG.md index 13b7444ebe..f5a168ad99 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re ## Unreleased ### Added + - [#3977](https://github.com/thanos-io/thanos/pull/3903) Expose exemplars for `http_request_duration_seconds` histogram if tracing is enabled. - [#3903](https://github.com/thanos-io/thanos/pull/3903) Store: Returning custom grpc code when reaching series/chunk limits. - [3919](https://github.com/thanos-io/thanos/pull/3919) Allow to disable automatically setting CORS headers using `--web.disable-cors` flag in each component that exposes an API. @@ -24,35 +25,42 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re - [#3960](https://github.com/thanos-io/thanos/pull/3960) fix deduplication of equal alerts with different labels ### Changed + - [#3948](https://github.com/thanos-io/thanos/pull/3948) Receiver: Adjust `http_request_duration_seconds` buckets for low latency requests. - [#3856](https://github.com/thanos-io/thanos/pull/3856) Mixin: _breaking :warning:_ Introduce flexible multi-cluster/namespace mode for alerts and dashboards. Removes jobPrefix config option. Removes `namespace` by default. ### Removed -## [v0.19.0 - ](https://github.com/thanos-io/thanos/tree/release-0.19) +## [v0.19.0-rc.2](https://github.com/thanos-io/thanos/releases/tag/v0.19.0-rc.2) - 2021.03.24 ### Added -- [#3862](https://github.com/thanos-io/thanos/pull/3862) Sidecar, Store, Query, Ruler, Receiver, Query-Frontend: Added request logging for gRPC and HTTP in the server side. -- [#3740](https://github.com/thanos-io/thanos/pull/3740) Query: Added `--query.default-step` flag to set default step. -- [#3700](https://github.com/thanos-io/thanos/pull/3700) ui: make old bucket viewer UI work with vanilla Prometheus blocks -- [#2641](https://github.com/thanos-io/thanos/issues/2641) Query Frontend: Added `--query-range.request-downsampled` flag enabling additional queries for downsampled data in case of empty or incomplete response to range request. -- [#3792](https://github.com/thanos-io/thanos/pull/3792) Receiver: Added `--tsdb.allow-overlapping-blocks` flag to allow overlapping tsdb blocks and enable vertical compaction -- [#3031](https://github.com/thanos-io/thanos/pull/3031) Compact/Sidecar/other writers: added `--hash-func`. If some function has been specified, writers calculate hashes using that function of each file in a block before uploading them. If those hashes exist in the `meta.json` file then Compact does not download the files if they already exist on disk and with the same hash. This also means that the data directory passed to Thanos Compact is only *cleared once at boot* or *if everything succeeds*. So, if you, for example, use persistent volumes on k8s and your Thanos Compact crashes or fails to make an iteration properly then the last downloaded files are not wiped from the disk. The directories that were created the last time are only wiped again after a successful iteration or if the previously picked up blocks have disappeared. -- [#3686](https://github.com/thanos-io/thanos/pull/3686) Query: Added federated metric metadata support. -- [#3846](https://github.com/thanos-io/thanos/pull/3846) Query: Added federated exemplars API support. +- [#3700](https://github.com/thanos-io/thanos/pull/3700) Compact/Web: Make old bucket viewer UI work with vanilla Prometheus blocks. +- [#3657](https://github.com/thanos-io/thanos/pull/3657) *: It's now possible to configure HTTP transport options for S3 client. +- [#3752](https://github.com/thanos-io/thanos/pull/3752) Compact/Store: Added `--block-meta-fetch-concurrency` allowing to configure number of go routines for block metadata synchronization. +- [#3723](https://github.com/thanos-io/thanos/pull/3723) Query Frontend: Added `--query-range.request-downsampled` flag enabling additional queries for downsampled data in case of empty or incomplete response to range request. +- [#3579](https://github.com/thanos-io/thanos/pull/3579) Cache: Added inmemory cache for caching bucket. +- [#3792](https://github.com/thanos-io/thanos/pull/3792) Receiver: Added `--tsdb.allow-overlapping-blocks` flag to allow overlapping tsdb blocks and enable vertical compaction. +- [#3740](https://github.com/thanos-io/thanos/pull/3740) Query: Added `--query.default-step` flag to set default step. Useful when your tenant scrape interval is stable and far from default UI's 1s. +- [#3686](https://github.com/thanos-io/thanos/pull/3686) Query/Sidecar: Added metric metadata API support. You can now configure you Querier to fetch Prometheus metrics metadata from leaf Prometheus-es! +- [#3031](https://github.com/thanos-io/thanos/pull/3031) Compact/Sidecar/Receive/Rule: Added `--hash-func`. If some function has been specified, writers calculate hashes using that function of each file in a block before uploading them. If those hashes exist in the `meta.json` file then Compact does not download the files if they already exist on disk and with the same hash. This also means that the data directory passed to Thanos Compact is only *cleared once at boot* or *if everything succeeds*. So, if you, for example, use persistent volumes on k8s and your Thanos Compact crashes or fails to make an iteration properly then the last downloaded files are not wiped from the disk. The directories that were created the last time are only wiped again after a successful iteration or if the previously picked up blocks have disappeared. ### Fixed -- [#3773](https://github.com/thanos-io/thanos/pull/3773) Compact: Pad compaction planner size check -- [#3795](https://github.com/thanos-io/thanos/pull/3795) s3: A truncated "get object" response is reported as error. +- [#3705](https://github.com/thanos-io/thanos/pull/3705) Store: Fix race condition leading to failing queries or possibly incorrect query results. +- [#3661](https://github.com/thanos-io/thanos/pull/3661) Compact: Deletion-mark.json is deleted as the last one, which could in theory lead to potential store gateway load or query error for such in-deletion block. +- [#3760](https://github.com/thanos-io/thanos/pull/3760) Store: Fix panic caused by a race condition happening on concurrent index-header reader usage and unload, when `--store.enable-index-header-lazy-reader` is enabled. +- [#3759](https://github.com/thanos-io/thanos/pull/3759) Store: Fix panic caused by a race condition happening on concurrent index-header lazy load and unload, when `--store.enable-index-header-lazy-reader` is enabled. +- [#3773](https://github.com/thanos-io/thanos/pull/3773) Compact: Fixed compaction planner size check, making sure we don't create too large blocks. - [#3814](https://github.com/thanos-io/thanos/pull/3814) Store: Decreased memory utilisation while fetching block's chunks. - [#3815](https://github.com/thanos-io/thanos/pull/3815) Receive: Improve handling of empty time series from clients +- [#3795](https://github.com/thanos-io/thanos/pull/3795) s3: A truncated "get object" response is reported as error. +- [#3899](https://github.com/thanos-io/thanos/pull/3899) Receive: Correct the inference of client gRPC configuration. +- [#3943](https://github.com/thanos-io/thanos/pull/3943): Receive: Fixed memory regression introduced in v0.17.0. ### Changed -- [#3705](https://github.com/thanos-io/thanos/pull/3705) Store: Fix race condition leading to failing queries or possibly incorrect query results. -- [#3854](https://github.com/thanos-io/thanos/pull/3854) Mixin: Remove assumed metrics. Use `thanos_info` instead of `kube_pod_info` for dashboard selectors. +- [#3804](https://github.com/thanos-io/thanos/pull/3804) Ruler, Receive, Querier: Updated Prometheus dependency. TSDB characteristics might have changed. ## [v0.18.0](https://github.com/thanos-io/thanos/releases/tag/v0.18.0) - 2021.01.27 @@ -74,8 +82,6 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re - [#3567](https://github.com/thanos-io/thanos/pull/3567) Mixin: Reintroduce `thanos_objstore_bucket_operation_failures_total` alert. - [#3527](https://github.com/thanos-io/thanos/pull/3527) Query Frontend: Fix query_range behavior when start/end times are the same -- [#3760](https://github.com/thanos-io/thanos/pull/3760) Store: Fix panic caused by a race condition happening on concurrent index-header reader usage and unload, when `--store.enable-index-header-lazy-reader` is enabled. -- [#3759](https://github.com/thanos-io/thanos/pull/3759) Store: Fix panic caused by a race condition happening on concurrent index-header lazy load and unload, when `--store.enable-index-header-lazy-reader` is enabled. - [#3560](https://github.com/thanos-io/thanos/pull/3560) Query Frontend: Allow separate label cache - [#3672](https://github.com/thanos-io/thanos/pull/3672) Rule: Prevent crashing due to `no such host error` when using `dnssrv+` or `dnssrvnoa+`. - [#3461](https://github.com/thanos-io/thanos/pull/3461) Compact, Shipper, Store: Fixed panic when no external labels are set in block metadata. diff --git a/docs/storage.md b/docs/storage.md index 989767feb4..8b62d29067 100644 --- a/docs/storage.md +++ b/docs/storage.md @@ -92,7 +92,7 @@ config: trace: enable: false list_objects_version: "" - part_size: 134217728 + part_size: 67108864 sse_config: type: "" kms_key_id: "" diff --git a/go.mod b/go.mod index e4b866d860..9f349b14f2 100644 --- a/go.mod +++ b/go.mod @@ -77,6 +77,9 @@ replace ( github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab // Update to v1.1.1 to make sure windows CI pass. github.com/elastic/go-sysinfo => github.com/elastic/go-sysinfo v1.1.1 + + // TODO: Remove this: https://github.com/thanos-io/thanos/issues/3967. + github.com/minio/minio-go/v7 => github.com/bwplotka/minio-go/v7 v7.0.11-0.20210324165441-f9927e5255a6 // Make sure Prometheus version is pinned as Prometheus semver does not include Go APIs. github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20210315220929-1cba1741828b github.com/sercand/kuberesolver => github.com/sercand/kuberesolver v2.4.0+incompatible diff --git a/go.sum b/go.sum index c7e3495567..d070c4160e 100644 --- a/go.sum +++ b/go.sum @@ -185,6 +185,8 @@ github.com/blang/semver v3.5.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnweb github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= +github.com/bwplotka/minio-go/v7 v7.0.11-0.20210324165441-f9927e5255a6 h1:h9SZ0jmAKjtrZF6iZ77/jdXdHr+Usn29itI669SVRp4= +github.com/bwplotka/minio-go/v7 v7.0.11-0.20210324165441-f9927e5255a6/go.mod h1:td4gW1ldOsj1PbSNS+WYK43j+P1XVhX/8W8awaYlBFo= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v0.0.0-20181003080854-62661b46c409/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= @@ -888,9 +890,6 @@ github.com/minio/md5-simd v1.1.0 h1:QPfiOqlZH+Cj9teu0t9b1nTBfPbyTl16Of5MeuShdK4= github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw= github.com/minio/minio-go/v6 v6.0.44/go.mod h1:qD0lajrGW49lKZLtXKtCB4X/qkMf0a5tBvN2PaZg7Gg= github.com/minio/minio-go/v6 v6.0.56/go.mod h1:KQMM+/44DSlSGSQWSfRrAZ12FVMmpWNuX37i2AX0jfI= -github.com/minio/minio-go/v7 v7.0.2/go.mod h1:dJ80Mv2HeGkYLH1sqS/ksz07ON6csH3S6JUMSQ2zAns= -github.com/minio/minio-go/v7 v7.0.10 h1:1oUKe4EOPUEhw2qnPQaPsJ0lmVTYLFu03SiItauXs94= -github.com/minio/minio-go/v7 v7.0.10/go.mod h1:td4gW1ldOsj1PbSNS+WYK43j+P1XVhX/8W8awaYlBFo= github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU= github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 7b0c6c9084..23ee8c39b0 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -957,7 +957,9 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { ignoreDirs := []string{} for _, gr := range groups { - ignoreDirs = append(ignoreDirs, gr.Key()) + for _, grID := range gr.IDs() { + ignoreDirs = append(ignoreDirs, filepath.Join(gr.Key(), grID.String())) + } } if err := runutil.DeleteAll(c.compactDir, ignoreDirs...); err != nil { diff --git a/pkg/compactv2/compactor_test.go b/pkg/compactv2/compactor_test.go index c1af292d1b..e5c979a053 100644 --- a/pkg/compactv2/compactor_test.go +++ b/pkg/compactv2/compactor_test.go @@ -306,6 +306,97 @@ func TestCompactor_WriteSeries_e2e(t *testing.T) { NumChunks: 2, }, }, + { + name: "1 blocks + delete modifier. For deletion request, full match is required. Delete the first two series", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "foo", Value: "bar"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "b", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "c", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + modifiers: []Modifier{WithDeletionModifier( + metadata.DeletionRequest{ + Matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "a", "1"), + labels.MustNewMatcher(labels.MatchEqual, "b", "2"), + }, + })}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "b", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "c", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + expectedChanges: "Deleted {a=\"1\", b=\"2\"} [{0 20}]\nDeleted {a=\"1\", b=\"2\", foo=\"bar\"} [{0 20}]\n", + expectedStats: tsdb.BlockStats{ + NumSamples: 18, + NumSeries: 3, + NumChunks: 3, + }, + }, + { + name: "1 blocks + delete modifier. Deletion request contains non-equal matchers.", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}, {Name: "foo", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}, {Name: "foo", Value: "bar"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}, {Name: "foo", Value: "baz"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "foo", Value: "bat"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + + // Label a is present but with an empty value. + {lset: labels.Labels{{Name: "a", Value: ""}, {Name: "foo", Value: "bat"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + // Series with unrelated labels. + {lset: labels.Labels{{Name: "c", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + }, + }, + modifiers: []Modifier{WithDeletionModifier( + metadata.DeletionRequest{ + Matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotEqual, "a", "1"), + labels.MustNewMatcher(labels.MatchRegexp, "foo", "^ba.$"), + }, + })}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: ""}, {Name: "foo", Value: "bat"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}, {Name: "foo", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "c", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "foo", Value: "bat"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + }, + expectedChanges: "Deleted {a=\"2\", foo=\"bar\"} [{0 20}]\nDeleted {a=\"3\", foo=\"baz\"} [{0 20}]\n", + expectedStats: tsdb.BlockStats{ + NumSamples: 36, + NumSeries: 6, + NumChunks: 12, + }, + }, } { t.Run(tcase.name, func(t *testing.T) { tmpDir, err := ioutil.TempDir("", "test-series-writer") diff --git a/pkg/compactv2/modifiers.go b/pkg/compactv2/modifiers.go index 9d521c0fd2..aef4ec065e 100644 --- a/pkg/compactv2/modifiers.go +++ b/pkg/compactv2/modifiers.go @@ -62,12 +62,9 @@ SeriesLoop: for _, deletions := range d.d.deletions { for _, m := range deletions.Matchers { v := lbls.Get(m.Name) - if v == "" { - continue - } // Only if all matchers in the deletion request are matched can we proceed to deletion. - if !m.Matches(v) { + if v == "" || !m.Matches(v) { continue DeletionsLoop } } diff --git a/pkg/objstore/s3/s3.go b/pkg/objstore/s3/s3.go index 2507eaea54..63373d6abb 100644 --- a/pkg/objstore/s3/s3.go +++ b/pkg/objstore/s3/s3.go @@ -66,9 +66,7 @@ var DefaultConfig = Config{ MaxIdleConnsPerHost: 100, MaxConnsPerHost: 0, }, - // Minimum file size after which an HTTP multipart request should be used to upload objects to storage. - // Set to 128 MiB as in the minio client. - PartSize: 1024 * 1024 * 128, + PartSize: 1024 * 1024 * 64, // 64MB. } // Config stores the configuration for s3 bucket. @@ -85,6 +83,7 @@ type Config struct { TraceConfig TraceConfig `yaml:"trace"` ListObjectsVersion string `yaml:"list_objects_version"` // PartSize used for multipart upload. Only used if uploaded object size is known and larger than configured PartSize. + // NOTE we need to make sure this number does not produce more parts than 10 000. PartSize uint64 `yaml:"part_size"` SSEConfig SSEConfig `yaml:"sse_config"` } @@ -449,7 +448,6 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { size = -1 } - // partSize cannot be larger than object size. partSize := b.partSize if size < int64(partSize) { partSize = 0 diff --git a/pkg/objstore/s3/s3_e2e_test.go b/pkg/objstore/s3/s3_e2e_test.go new file mode 100644 index 0000000000..e837b9baa5 --- /dev/null +++ b/pkg/objstore/s3/s3_e2e_test.go @@ -0,0 +1,55 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package s3_test + +import ( + "bytes" + "context" + "strings" + "testing" + + "github.com/cortexproject/cortex/integration/e2e" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/go-kit/kit/log" + "github.com/thanos-io/thanos/pkg/objstore/s3" + "github.com/thanos-io/thanos/test/e2e/e2ethanos" + + "github.com/thanos-io/thanos/pkg/testutil" +) + +// Regression benchmark for https://github.com/thanos-io/thanos/issues/3917. +func BenchmarkUpload(b *testing.B) { + b.ReportAllocs() + ctx := context.Background() + + s, err := e2e.NewScenario("e2e_bench_mino_client") + testutil.Ok(b, err) + b.Cleanup(e2ethanos.CleanScenario(b, s)) + + const bucket = "test" + m := e2edb.NewMinio(8080, bucket) + testutil.Ok(b, s.StartAndWaitReady(m)) + + bkt, err := s3.NewBucketWithConfig(log.NewNopLogger(), s3.Config{ + Bucket: bucket, + AccessKey: e2edb.MinioAccessKey, + SecretKey: e2edb.MinioSecretKey, + Endpoint: m.HTTPEndpoint(), + Insecure: true, + }, "test-feed") + testutil.Ok(b, err) + + buf := bytes.Buffer{} + buf.Grow(1028 * 1028 * 100) // 100MB. + word := "abcdefghij" + for i := 0; i < buf.Cap()/len(word); i++ { + _, _ = buf.WriteString(word) + } + str := buf.String() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + testutil.Ok(b, bkt.Upload(ctx, "test", strings.NewReader(str))) + } +} diff --git a/pkg/objstore/s3/s3_test.go b/pkg/objstore/s3/s3_test.go index 2cac107cae..b084473e26 100644 --- a/pkg/objstore/s3/s3_test.go +++ b/pkg/objstore/s3/s3_test.go @@ -208,7 +208,7 @@ http_config: cfg, err := parseConfig(input) testutil.Ok(t, err) - testutil.Assert(t, cfg.PartSize == 1024*1024*128, "when part size not set it should default to 128MiB") + testutil.Assert(t, cfg.PartSize == 1024*1024*64, "when part size not set it should default to 128MiB") input2 := []byte(`bucket: "bucket-name" endpoint: "s3-endpoint" diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index f634e5b8ab..21e7161754 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -280,6 +280,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { span, ctx := tracing.StartSpan(r.Context(), "receive_http") defer span.Finish() + // TODO(bwplotka): Optimize readAll https://github.com/thanos-io/thanos/pull/3334/files. compressed, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -293,6 +294,9 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { return } + // NOTE: Due to zero copy ZLabels, Labels used from WriteRequests keeps memory + // from the whole request. Ensure that we always copy those when we want to + // store them for longer time. var wreq prompb.WriteRequest if err := proto.Unmarshal(reqBuf, &wreq); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -313,7 +317,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { tenant = h.options.DefaultTenantID } - // exit early if the request contained no data + // Exit early if the request contained no data. if len(wreq.Timeseries) == 0 { level.Info(h.logger).Log("msg", "empty timeseries from client", "tenant", tenant) return diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 4bcb314d2e..a806947061 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -7,9 +7,16 @@ import ( "bytes" "context" "fmt" + "io/ioutil" + "math" "math/rand" "net/http" "net/http/httptest" + "os" + "path/filepath" + "runtime" + "runtime/pprof" + "strings" "sync" "testing" "time" @@ -18,9 +25,12 @@ import ( "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/runutil" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -175,13 +185,11 @@ func TestDetermineWriteErrorCause(t *testing.T) { } } -func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) { - cfg := []HashringConfig{ - { - Hashring: "test", - }, - } - var handlers []*Handler +func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) { + var ( + cfg = []HashringConfig{{Hashring: "test"}} + handlers []*Handler + ) // create a fake peer group where we manually fill the cache with fake addresses pointed to our handlers // This removes the network from the tests and creates a more consistent testing harness. peers := &peerGroup{ @@ -511,7 +519,7 @@ func TestReceiveQuorum(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - handlers, hashring := newHandlerHashring(tc.appendables, tc.replicationFactor) + handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor) tenant := "test" // Test from the point of view of every node // so that we know status code does not depend @@ -850,7 +858,7 @@ func TestReceiveWithConsistencyDelay(t *testing.T) { // to see all requests completing all the time, since we're using local // network we are not expecting anything to go wrong with these. t.Run(tc.name, func(t *testing.T) { - handlers, hashring := newHandlerHashring(tc.appendables, tc.replicationFactor) + handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor) tenant := "test" // Test from the point of view of every node // so that we know status code does not depend @@ -957,3 +965,228 @@ type fakeRemoteWriteGRPCServer struct { func (f *fakeRemoteWriteGRPCServer) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, opts ...grpc.CallOption) (*storepb.WriteResponse, error) { return f.h.RemoteWrite(ctx, in) } + +func BenchmarkHandlerReceiveHTTP(b *testing.B) { + benchmarkHandlerMultiTSDBReceiveRemoteWrite(testutil.NewTB(b)) +} + +func TestHandlerReceiveHTTP(t *testing.T) { + benchmarkHandlerMultiTSDBReceiveRemoteWrite(testutil.NewTB(t)) +} + +// tsOverrideTenantStorage is storage that overrides timestamp to make it have consistent interval. +type tsOverrideTenantStorage struct { + TenantStorage + + interval int64 +} + +func (s *tsOverrideTenantStorage) TenantAppendable(tenant string) (Appendable, error) { + a, err := s.TenantStorage.TenantAppendable(tenant) + return &tsOverrideAppendable{Appendable: a, interval: s.interval}, err +} + +type tsOverrideAppendable struct { + Appendable + + interval int64 +} + +func (a *tsOverrideAppendable) Appender(ctx context.Context) (storage.Appender, error) { + ret, err := a.Appendable.Appender(ctx) + return &tsOverrideAppender{Appender: ret, interval: a.interval}, err +} + +type tsOverrideAppender struct { + storage.Appender + + interval int64 +} + +var cnt int64 + +func (a *tsOverrideAppender) Append(ref uint64, l labels.Labels, _ int64, v float64) (uint64, error) { + cnt += a.interval + return a.Appender.Append(ref, l, cnt, v) +} + +// serializeSeriesWithOneSample returns marshaled and compressed remote write requests like it would +// be send to Thanos receive. +// It has one sample and allow passing multiple series, in same manner as typical Prometheus would batch it. +func serializeSeriesWithOneSample(t testing.TB, series [][]labelpb.ZLabel) []byte { + r := &prompb.WriteRequest{Timeseries: make([]prompb.TimeSeries, 0, len(series))} + + for _, s := range series { + r.Timeseries = append(r.Timeseries, prompb.TimeSeries{ + Labels: s, + // Timestamp does not matter, it will be overridden. + Samples: []prompb.Sample{{Value: math.MaxFloat64, Timestamp: math.MinInt64}}, + }) + } + body, err := proto.Marshal(r) + testutil.Ok(t, err) + return snappy.Encode(nil, body) +} + +func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { + dir, err := ioutil.TempDir("", "test_receive") + testutil.Ok(b, err) + defer func() { testutil.Ok(b, os.RemoveAll(dir)) }() + + handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1) + handler := handlers[0] + + reg := prometheus.NewRegistry() + + logger := log.NewNopLogger() + m := NewMultiTSDB( + dir, logger, reg, &tsdb.Options{ + MinBlockDuration: int64(2 * time.Hour / time.Millisecond), + MaxBlockDuration: int64(2 * time.Hour / time.Millisecond), + RetentionDuration: int64(6 * time.Hour / time.Millisecond), + NoLockfile: true, + StripeSize: 1, // Disable stripe pre allocation so we can have clear profiles. + }, + labels.FromStrings("replica", "01"), + "tenant_id", + nil, + false, + metadata.NoneFunc, + ) + defer func() { testutil.Ok(b, m.Close()) }() + handler.writer = NewWriter(logger, m) + + testutil.Ok(b, m.Flush()) + testutil.Ok(b, m.Open()) + + for _, tcase := range []struct { + name string + writeRequest []byte + }{ + { + name: "typical labels under 1KB, 500 of them", + writeRequest: serializeSeriesWithOneSample(b, func() [][]labelpb.ZLabel { + series := make([][]labelpb.ZLabel, 500) + for s := 0; s < len(series); s++ { + lbls := make([]labelpb.ZLabel, 10) + for i := 0; i < len(lbls); i++ { + // Label ~20B name, 50B value. + lbls[i] = labelpb.ZLabel{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} + } + series[s] = lbls + } + return series + }()), + }, + { + name: "typical labels under 1KB, 5000 of them", + writeRequest: serializeSeriesWithOneSample(b, func() [][]labelpb.ZLabel { + series := make([][]labelpb.ZLabel, 5000) + for s := 0; s < len(series); s++ { + lbls := make([]labelpb.ZLabel, 10) + for i := 0; i < len(lbls); i++ { + // Label ~20B name, 50B value. + lbls[i] = labelpb.ZLabel{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} + } + series[s] = lbls + } + return series + }()), + }, + { + name: "extremely large label value 10MB, 10 of them", + writeRequest: serializeSeriesWithOneSample(b, func() [][]labelpb.ZLabel { + series := make([][]labelpb.ZLabel, 10) + for s := 0; s < len(series); s++ { + lbl := &strings.Builder{} + lbl.Grow(1024 * 1024 * 10) // 10MB. + word := "abcdefghij" + for i := 0; i < lbl.Cap()/len(word); i++ { + _, _ = lbl.WriteString(word) + } + series[s] = []labelpb.ZLabel{{Name: "__name__", Value: lbl.String()}} + } + return series + }()), + }, + } { + b.Run(tcase.name, func(b testutil.TB) { + handler.options.DefaultTenantID = fmt.Sprintf("%v-ok", tcase.name) + handler.writer.multiTSDB = &tsOverrideTenantStorage{TenantStorage: m, interval: 1} + + // It takes time to create new tenant, wait for it. + { + app, err := m.TenantAppendable(handler.options.DefaultTenantID) + testutil.Ok(b, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + testutil.Ok(b, runutil.Retry(1*time.Second, ctx.Done(), func() error { + _, err = app.Appender(ctx) + return err + })) + } + + b.Run("OK", func(b testutil.TB) { + n := b.N() + b.ResetTimer() + for i := 0; i < n; i++ { + r := httptest.NewRecorder() + handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))}) + testutil.Equals(b, http.StatusOK, r.Code, "got non 200 error: %v", r.Body.String()) + } + }) + + handler.options.DefaultTenantID = fmt.Sprintf("%v-conflicting", tcase.name) + handler.writer.multiTSDB = &tsOverrideTenantStorage{TenantStorage: m, interval: -1} // Timestamp can't go down, which will cause conflict error. + + // It takes time to create new tenant, wait for it. + { + app, err := m.TenantAppendable(handler.options.DefaultTenantID) + testutil.Ok(b, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + testutil.Ok(b, runutil.Retry(1*time.Second, ctx.Done(), func() error { + _, err = app.Appender(ctx) + return err + })) + } + + // First request should be fine, since we don't change timestamp, rest is wrong. + r := httptest.NewRecorder() + handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))}) + testutil.Equals(b, http.StatusOK, r.Code, "got non 200 error: %v", r.Body.String()) + + b.Run("conflict errors", func(b testutil.TB) { + n := b.N() + b.ResetTimer() + for i := 0; i < n; i++ { + r := httptest.NewRecorder() + handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))}) + testutil.Equals(b, http.StatusConflict, r.Code, "%v", i) + } + }) + }) + } + + runtime.GC() + // Take snapshot at the end to reveal how much memory we keep in TSDB. + testutil.Ok(b, Heap("../../")) + +} + +func Heap(dir string) (err error) { + if err := os.MkdirAll(dir, os.ModePerm); err != nil { + return err + } + + f, err := os.Create(filepath.Join(dir, "mem.pprof")) + if err != nil { + return err + } + defer runutil.CloseWithErrCapture(&err, f, "close") + return pprof.WriteHeapProfile(f) +} diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index ed76a1610e..d58ac35f95 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -13,6 +13,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" @@ -61,13 +62,12 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR var errs errutil.MultiError for _, t := range wreq.Timeseries { - lset := make(labels.Labels, len(t.Labels)) - for j := range t.Labels { - lset[j] = labels.Label{ - Name: t.Labels[j].Name, - Value: t.Labels[j].Value, - } - } + // Copy labels so we allocate memory only for labels, nothing else. + labelpb.ReAllocZLabelsStrings(&t.Labels) + + // TODO(bwplotka): Use improvement https://github.com/prometheus/prometheus/pull/8600, so we do that only when + // we need it (when we store labels for longer). + lset := labelpb.ZLabelsToPromLabels(t.Labels) // Append as many valid samples as possible, but keep track of the errors. for _, s := range t.Samples { diff --git a/pkg/runutil/runutil.go b/pkg/runutil/runutil.go index 97171bce82..f726eb25c3 100644 --- a/pkg/runutil/runutil.go +++ b/pkg/runutil/runutil.go @@ -55,6 +55,7 @@ import ( "io/ioutil" "os" "path/filepath" + "strings" "time" "github.com/go-kit/kit/log" @@ -162,6 +163,7 @@ func ExhaustCloseWithErrCapture(err *error, r io.ReadCloser, format string, a .. // DeleteAll deletes all files and directories inside the given // dir except for the ignoreDirs directories. +// NOTE: DeleteAll is not idempotent. func DeleteAll(dir string, ignoreDirs ...string) error { entries, err := ioutil.ReadDir(dir) if err != nil { @@ -169,6 +171,7 @@ func DeleteAll(dir string, ignoreDirs ...string) error { } var groupErrs errutil.MultiError + var matchingIgnores []string for _, d := range entries { if !d.IsDir() { if err := os.RemoveAll(filepath.Join(dir, d.Name())); err != nil { @@ -177,22 +180,36 @@ func DeleteAll(dir string, ignoreDirs ...string) error { continue } - var found bool - for _, id := range ignoreDirs { - if id == d.Name() { - found = true - break + // ignoreDirs might be multi-directory paths. + matchingIgnores = matchingIgnores[:0] + ignore := false + for _, ignoreDir := range ignoreDirs { + id := strings.Split(ignoreDir, "/") + if id[0] == d.Name() { + if len(id) == 1 { + ignore = true + break + } + matchingIgnores = append(matchingIgnores, filepath.Join(id[1:]...)) } } - if !found { + if ignore { + continue + } + + if len(matchingIgnores) == 0 { if err := os.RemoveAll(filepath.Join(dir, d.Name())); err != nil { groupErrs.Add(err) } + continue + } + if err := DeleteAll(filepath.Join(dir, d.Name()), matchingIgnores...); err != nil { + groupErrs.Add(err) } } - if groupErrs != nil { + if groupErrs.Err() != nil { return errors.Wrap(groupErrs.Err(), "delete file/dir") } return nil diff --git a/pkg/runutil/runutil_test.go b/pkg/runutil/runutil_test.go index 908a3398a3..a50efbd356 100644 --- a/pkg/runutil/runutil_test.go +++ b/pkg/runutil/runutil_test.go @@ -125,35 +125,43 @@ func TestCloseMoreThanOnce(t *testing.T) { testutil.Equals(t, true, lc.WasCalled) } -func TestClearsDirectoriesFilesProperly(t *testing.T) { +func TestDeleteAll(t *testing.T) { dir, err := ioutil.TempDir("", "example") testutil.Ok(t, err) t.Cleanup(func() { - os.RemoveAll(dir) + testutil.Ok(t, os.RemoveAll(dir)) }) - f, err := os.Create(filepath.Join(dir, "test123")) + f, err := os.Create(filepath.Join(dir, "file1")) testutil.Ok(t, err) testutil.Ok(t, f.Close()) - testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN8"), os.ModePerm)) - testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW1TN8"), os.ModePerm)) - f, err = os.Create(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN9")) + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "a"), os.ModePerm)) + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "b"), os.ModePerm)) + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "c", "innerc"), os.ModePerm)) + f, err = os.Create(filepath.Join(dir, "a", "file2")) + testutil.Ok(t, err) + testutil.Ok(t, f.Close()) + f, err = os.Create(filepath.Join(dir, "c", "file3")) testutil.Ok(t, err) testutil.Ok(t, f.Close()) - testutil.Ok(t, DeleteAll(dir, "01EHBQRN4RF0HSRR1772KW0TN9", "01EHBQRN4RF0HSRR1772KW0TN8")) + testutil.Ok(t, DeleteAll(dir, "file1", "a", filepath.Join("c", "innerc"))) - _, err = os.Stat(filepath.Join(dir, "test123")) + // Deleted. + _, err = os.Stat(filepath.Join(dir, "file1")) testutil.Assert(t, os.IsNotExist(err)) - - _, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN9")) + _, err = os.Stat(filepath.Join(dir, "b/")) testutil.Assert(t, os.IsNotExist(err)) - - _, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW1TN8/")) + _, err = os.Stat(filepath.Join(dir, "file3")) testutil.Assert(t, os.IsNotExist(err)) - _, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN8/")) + // Exists. + _, err = os.Stat(filepath.Join(dir, "a", "file2")) + testutil.Ok(t, err) + _, err = os.Stat(filepath.Join(dir, "a/")) + testutil.Ok(t, err) + _, err = os.Stat(filepath.Join(dir, "c", "innerc")) testutil.Ok(t, err) } diff --git a/pkg/store/labelpb/label.go b/pkg/store/labelpb/label.go index 5712cd912f..99083f82ab 100644 --- a/pkg/store/labelpb/label.go +++ b/pkg/store/labelpb/label.go @@ -34,10 +34,21 @@ func ZLabelsFromPromLabels(lset labels.Labels) []ZLabel { // ZLabelsToPromLabels convert slice of labelpb.ZLabel to Prometheus labels in type unsafe manner. // It reuses the same memory. Caller should abort using passed []ZLabel. +// NOTE: Use with care. ZLabels holds memory from the whole protobuf unmarshal, so the returned +// Prometheus Labels will hold this memory as well. func ZLabelsToPromLabels(lset []ZLabel) labels.Labels { return *(*labels.Labels)(unsafe.Pointer(&lset)) } +// ReAllocZLabelsStrings re-allocates all underlying bytes for string, detaching it from bigger memory pool. +func ReAllocZLabelsStrings(lset *[]ZLabel) { + for j, l := range *lset { + // NOTE: This trick converts from string to byte without copy, but copy when creating string. + (*lset)[j].Name = string(noAllocBytes(l.Name)) + (*lset)[j].Value = string(noAllocBytes(l.Value)) + } +} + // LabelsFromPromLabels converts Prometheus labels to slice of labelpb.ZLabel in type unsafe manner. // It reuses the same memory. Caller should abort using passed labels.Labels. func LabelsFromPromLabels(lset labels.Labels) []Label { @@ -61,7 +72,7 @@ func ZLabelSetsToPromLabelSets(lss ...ZLabelSet) []labels.Labels { // ZLabel is a Label (also easily transformable to Prometheus labels.Labels) that can be unmarshalled from protobuf // reusing the same memory address for string bytes. -// NOTE: While unmarshal use exactly same bytes that were allocated for protobuf, this will mean that *whole* protobuf +// NOTE: While unmarshalling it uses exactly same bytes that were allocated for protobuf. This mean that *whole* protobuf // bytes will be not GC-ed as long as ZLabels are referenced somewhere. Use it carefully, only for short living // protobuf message processing. type ZLabel Label diff --git a/pkg/store/labelpb/label_test.go b/pkg/store/labelpb/label_test.go index a6227dfd27..ee80b5ad47 100644 --- a/pkg/store/labelpb/label_test.go +++ b/pkg/store/labelpb/label_test.go @@ -174,6 +174,41 @@ func BenchmarkZLabelsMarshalUnmarshal(b *testing.B) { }) } +var ret labels.Labels + +func BenchmarkTransformWithAndWithoutCopy(b *testing.B) { + const ( + fmtLbl = "%07daaaaaaaaaabbbbbbbbbbccccccccccdddddddddd" + num = 1000000 + ) + + b.Run("ZLabelsToPromLabels", func(b *testing.B) { + b.ReportAllocs() + lbls := make([]ZLabel, num) + for i := 0; i < num; i++ { + lbls[i] = ZLabel{Name: fmt.Sprintf(fmtLbl, i), Value: fmt.Sprintf(fmtLbl, i)} + } + b.ResetTimer() + + for i := 0; i < b.N; i++ { + ret = ZLabelsToPromLabels(lbls) + } + }) + b.Run("ZLabelsToPromLabelsWithRealloc", func(b *testing.B) { + b.ReportAllocs() + lbls := make([]ZLabel, num) + for i := 0; i < num; i++ { + lbls[i] = ZLabel{Name: fmt.Sprintf(fmtLbl, i), Value: fmt.Sprintf(fmtLbl, i)} + } + b.ResetTimer() + + for i := 0; i < b.N; i++ { + ReAllocZLabelsStrings(&lbls) + ret = ZLabelsToPromLabels(lbls) + } + }) +} + func TestSortZLabelSets(t *testing.T) { expectedResult := ZLabelSets{ { diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index 1934af2393..57a9951353 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -539,8 +539,26 @@ func TestCompactWithStoreGateway(t *testing.T) { // No replica label with overlaps should halt compactor. This test is sequential // because we do not want two Thanos Compact instances deleting the same partially - // uploaded blocks and blocks with deletion marks. + // uploaded blocks and blocks with deletion marks. We also check that Thanos Compactor + // deletes directories inside of a compaction group that do not belong there. { + // Precreate a directory. It should be deleted. + // In a hypothetical scenario, the directory could be a left-over from + // a compaction that had crashed. + p := filepath.Join(s.SharedDir(), "data", "compact", "expect-to-halt", "compact") + + testutil.Assert(t, len(blocksWithHashes) > 0) + + m, err := block.DownloadMeta(ctx, logger, bkt, blocksWithHashes[0]) + testutil.Ok(t, err) + + randBlockDir := filepath.Join(p, compact.DefaultGroupKey(m.Thanos), "ITISAVERYRANDULIDFORTESTS0") + testutil.Ok(t, os.MkdirAll(randBlockDir, os.ModePerm)) + + f, err := os.Create(filepath.Join(randBlockDir, "index")) + testutil.Ok(t, err) + testutil.Ok(t, f.Close()) + c, err := e2ethanos.NewCompactor(s.SharedDir(), "expect-to-halt", svcConfig, nil) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(c)) @@ -578,6 +596,10 @@ func TestCompactWithStoreGateway(t *testing.T) { ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.HTTPEndpoint(), "loaded")) testutil.Ok(t, s.Stop(c)) + + _, err = os.Stat(randBlockDir) + testutil.NotOk(t, err) + testutil.Assert(t, os.IsNotExist(err)) } // Sequential because we want to check that Thanos Compactor does not diff --git a/test/e2e/e2ethanos/helpers.go b/test/e2e/e2ethanos/helpers.go index 714bdbb468..0bcd21c912 100644 --- a/test/e2e/e2ethanos/helpers.go +++ b/test/e2e/e2ethanos/helpers.go @@ -16,7 +16,7 @@ import ( "github.com/thanos-io/thanos/pkg/testutil" ) -func CleanScenario(t *testing.T, s *e2e.Scenario) func() { +func CleanScenario(t testing.TB, s *e2e.Scenario) func() { return func() { // Make sure Clean can properly delete everything. testutil.Ok(t, exec.Command("chmod", "-R", "777", s.SharedDir()).Run())