diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 6bf9f93f408..7a93e25a022 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -5,7 +5,6 @@ package main import ( "context" - "io/ioutil" "os" "path/filepath" "time" @@ -19,7 +18,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" - terrors "github.com/prometheus/prometheus/tsdb/errors" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" @@ -158,39 +156,11 @@ func RunDownsample( // the downsampling process resumes without restarting the whole process // but the blocks do not exist in the remote object storage anymore. func removeAllNonMetaDirs(metas map[ulid.ULID]*metadata.Meta, dir string) error { - entries, err := ioutil.ReadDir(dir) - if err != nil { - return errors.Wrap(err, "read dir") + ignoreDirs := []string{} + for ulid := range metas { + ignoreDirs = append(ignoreDirs, ulid.String()) } - var groupErrs terrors.MultiError - - for _, d := range entries { - if !d.IsDir() { - if err := os.RemoveAll(filepath.Join(dir, d.Name())); err != nil { - groupErrs.Add(err) - } - continue - } - - var found bool - for ulid := range metas { - if ulid.String() == d.Name() { - found = true - break - } - } - - if !found { - if err := os.RemoveAll(filepath.Join(dir, d.Name())); err != nil { - groupErrs.Add(err) - } - } - } - - if groupErrs != nil { - return errors.Wrap(groupErrs, "delete file/dir") - } - return nil + return runutil.DeleteAllExceptDirs(dir, ignoreDirs) } func downsampleBucket( diff --git a/cmd/thanos/downsample_test.go b/cmd/thanos/downsample_test.go index cb903ec4749..7d8c4a90093 100644 --- a/cmd/thanos/downsample_test.go +++ b/cmd/thanos/downsample_test.go @@ -2,50 +2,3 @@ // Licensed under the Apache License 2.0. package main - -import ( - "io/ioutil" - "os" - "path/filepath" - "testing" - - "github.com/oklog/ulid" - "github.com/thanos-io/thanos/pkg/block/metadata" - "github.com/thanos-io/thanos/pkg/testutil" -) - -func TestDownsampleClearsDirectoriesFiles(t *testing.T) { - dir, err := ioutil.TempDir("", "example") - testutil.Ok(t, err) - - t.Cleanup(func() { - os.RemoveAll(dir) - }) - - f, err := os.Create(filepath.Join(dir, "test123")) - testutil.Ok(t, err) - testutil.Ok(t, f.Close()) - - testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN8"), os.ModePerm)) - testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW1TN8"), os.ModePerm)) - f, err = os.Create(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN9")) - testutil.Ok(t, err) - testutil.Ok(t, f.Close()) - - testutil.Ok(t, removeAllNonMetaDirs(map[ulid.ULID]*metadata.Meta{ - ulid.MustParse("01EHBQRN4RF0HSRR1772KW0TN8"): {}, - ulid.MustParse("01EHBQRN4RF0HSRR1772KW0TN9"): {}, - }, dir)) - - _, err = os.Stat(filepath.Join(dir, "test123")) - testutil.Assert(t, os.IsNotExist(err)) - - _, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN9")) - testutil.Assert(t, os.IsNotExist(err)) - - _, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW1TN8/")) - testutil.Assert(t, os.IsNotExist(err)) - - _, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN8/")) - testutil.Ok(t, err) -} diff --git a/pkg/block/metadata/hash.go b/pkg/block/metadata/hash.go index e86889790a5..c8dc34ea97a 100644 --- a/pkg/block/metadata/hash.go +++ b/pkg/block/metadata/hash.go @@ -65,7 +65,7 @@ func CalculateHash(p string, hf HashFunc) (ObjectHash, error) { // Ignores the given files. func CalculateHashDir(dir string, hf HashFunc, ignore ...string) (map[string]ObjectHash, error) { if hf == NoneFunc { - return make(map[string]ObjectHash), nil + return nil, nil } ret := map[string]ObjectHash{} if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 1f728457dbd..239091f8b26 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -27,6 +27,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/runutil" ) type ResolutionLevel int64 @@ -899,39 +900,11 @@ func NewBucketCompactor( // restarting the whole process but the blocks do not exist in the remote // object storage anymore. func removeAllNonCGDirs(groups []*Group, compactDir string) error { - entries, err := ioutil.ReadDir(compactDir) - if err != nil { - return errors.Wrap(err, "read dir") + ignoreDirs := []string{} + for _, gr := range groups { + ignoreDirs = append(ignoreDirs, gr.Key()) } - var groupErrs terrors.MultiError - - for _, d := range entries { - if !d.IsDir() { - if err := os.RemoveAll(filepath.Join(compactDir, d.Name())); err != nil { - groupErrs.Add(err) - } - continue - } - - var found bool - for _, gr := range groups { - if gr.Key() == d.Name() { - found = true - break - } - } - - if !found { - if err := os.RemoveAll(filepath.Join(compactDir, d.Name())); err != nil { - groupErrs.Add(err) - } - } - } - - if groupErrs != nil { - return errors.Wrap(groupErrs, "delete file/dir") - } - return nil + return runutil.DeleteAllExceptDirs(compactDir, ignoreDirs) } // Compact runs compaction over bucket. diff --git a/pkg/runutil/runutil.go b/pkg/runutil/runutil.go index 5b3e85940bf..93d1c96cc0b 100644 --- a/pkg/runutil/runutil.go +++ b/pkg/runutil/runutil.go @@ -54,6 +54,7 @@ import ( "io" "io/ioutil" "os" + "path/filepath" "time" "github.com/go-kit/kit/log" @@ -157,3 +158,41 @@ func ExhaustCloseWithErrCapture(err *error, r io.ReadCloser, format string, a .. *err = merr.Err() } + +// DeleteAllExceptDirs deletes all files and directories inside the given +// dir except for the ignoreDirs directories. +func DeleteAllExceptDirs(dir string, ignoreDirs []string) error { + entries, err := ioutil.ReadDir(dir) + if err != nil { + return errors.Wrap(err, "read dir") + } + var groupErrs tsdberrors.MultiError + + for _, d := range entries { + if !d.IsDir() { + if err := os.RemoveAll(filepath.Join(dir, d.Name())); err != nil { + groupErrs.Add(err) + } + continue + } + + var found bool + for _, id := range ignoreDirs { + if id == d.Name() { + found = true + break + } + } + + if !found { + if err := os.RemoveAll(filepath.Join(dir, d.Name())); err != nil { + groupErrs.Add(err) + } + } + } + + if groupErrs != nil { + return errors.Wrap(groupErrs, "delete file/dir") + } + return nil +} diff --git a/pkg/runutil/runutil_test.go b/pkg/runutil/runutil_test.go index 7cad147b80f..47338871af3 100644 --- a/pkg/runutil/runutil_test.go +++ b/pkg/runutil/runutil_test.go @@ -5,7 +5,9 @@ package runutil import ( "io" + "io/ioutil" "os" + "path/filepath" "strings" "testing" @@ -122,3 +124,36 @@ func TestCloseMoreThanOnce(t *testing.T) { CloseWithLogOnErr(lc, r, "should be called") testutil.Equals(t, true, lc.WasCalled) } + +func TestClearsDirectoriesFilesProperly(t *testing.T) { + dir, err := ioutil.TempDir("", "example") + testutil.Ok(t, err) + + t.Cleanup(func() { + os.RemoveAll(dir) + }) + + f, err := os.Create(filepath.Join(dir, "test123")) + testutil.Ok(t, err) + testutil.Ok(t, f.Close()) + + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN8"), os.ModePerm)) + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW1TN8"), os.ModePerm)) + f, err = os.Create(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN9")) + testutil.Ok(t, err) + testutil.Ok(t, f.Close()) + + testutil.Ok(t, DeleteAllExceptDirs(dir, []string{"01EHBQRN4RF0HSRR1772KW0TN8", "01EHBQRN4RF0HSRR1772KW0TN9"})) + + _, err = os.Stat(filepath.Join(dir, "test123")) + testutil.Assert(t, os.IsNotExist(err)) + + _, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN9")) + testutil.Assert(t, os.IsNotExist(err)) + + _, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW1TN8/")) + testutil.Assert(t, os.IsNotExist(err)) + + _, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN8/")) + testutil.Ok(t, err) +} diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go index 78a5bc306e7..77981aa4837 100644 --- a/pkg/shipper/shipper_e2e_test.go +++ b/pkg/shipper/shipper_e2e_test.go @@ -45,7 +45,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) { }() extLset := labels.FromStrings("prometheus", "prom-1") - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false, false, metadata.SHA256Func) + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false, false, metadata.NoneFunc) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -219,7 +219,7 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) { defer upcancel2() testutil.Ok(t, p.WaitPrometheusUp(upctx2)) - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, false, metadata.SHA256Func) + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, false, metadata.NoneFunc) // Create 10 new blocks. 9 of them (non compacted) should be actually uploaded. var (