Skip to content

Commit

Permalink
*: minor refactoring, fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>
  • Loading branch information
GiedriusS committed Sep 5, 2020
1 parent 761a456 commit 95bf117
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 116 deletions.
38 changes: 4 additions & 34 deletions cmd/thanos/downsample.go
Expand Up @@ -5,7 +5,6 @@ package main

import (
"context"
"io/ioutil"
"os"
"path/filepath"
"time"
Expand All @@ -19,7 +18,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
terrors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
Expand Down Expand Up @@ -158,39 +156,11 @@ func RunDownsample(
// the downsampling process resumes without restarting the whole process
// but the blocks do not exist in the remote object storage anymore.
func removeAllNonMetaDirs(metas map[ulid.ULID]*metadata.Meta, dir string) error {
entries, err := ioutil.ReadDir(dir)
if err != nil {
return errors.Wrap(err, "read dir")
ignoreDirs := []string{}
for ulid := range metas {
ignoreDirs = append(ignoreDirs, ulid.String())
}
var groupErrs terrors.MultiError

for _, d := range entries {
if !d.IsDir() {
if err := os.RemoveAll(filepath.Join(dir, d.Name())); err != nil {
groupErrs.Add(err)
}
continue
}

var found bool
for ulid := range metas {
if ulid.String() == d.Name() {
found = true
break
}
}

if !found {
if err := os.RemoveAll(filepath.Join(dir, d.Name())); err != nil {
groupErrs.Add(err)
}
}
}

if groupErrs != nil {
return errors.Wrap(groupErrs, "delete file/dir")
}
return nil
return runutil.DeleteAllExceptDirs(dir, ignoreDirs)
}

func downsampleBucket(
Expand Down
47 changes: 0 additions & 47 deletions cmd/thanos/downsample_test.go
Expand Up @@ -2,50 +2,3 @@
// Licensed under the Apache License 2.0.

package main

import (
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/oklog/ulid"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestDownsampleClearsDirectoriesFiles(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
testutil.Ok(t, err)

t.Cleanup(func() {
os.RemoveAll(dir)
})

f, err := os.Create(filepath.Join(dir, "test123"))
testutil.Ok(t, err)
testutil.Ok(t, f.Close())

testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN8"), os.ModePerm))
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW1TN8"), os.ModePerm))
f, err = os.Create(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN9"))
testutil.Ok(t, err)
testutil.Ok(t, f.Close())

testutil.Ok(t, removeAllNonMetaDirs(map[ulid.ULID]*metadata.Meta{
ulid.MustParse("01EHBQRN4RF0HSRR1772KW0TN8"): {},
ulid.MustParse("01EHBQRN4RF0HSRR1772KW0TN9"): {},
}, dir))

_, err = os.Stat(filepath.Join(dir, "test123"))
testutil.Assert(t, os.IsNotExist(err))

_, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN9"))
testutil.Assert(t, os.IsNotExist(err))

_, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW1TN8/"))
testutil.Assert(t, os.IsNotExist(err))

_, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN8/"))
testutil.Ok(t, err)
}
2 changes: 1 addition & 1 deletion pkg/block/metadata/hash.go
Expand Up @@ -65,7 +65,7 @@ func CalculateHash(p string, hf HashFunc) (ObjectHash, error) {
// Ignores the given files.
func CalculateHashDir(dir string, hf HashFunc, ignore ...string) (map[string]ObjectHash, error) {
if hf == NoneFunc {
return make(map[string]ObjectHash), nil
return nil, nil
}
ret := map[string]ObjectHash{}
if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
Expand Down
37 changes: 5 additions & 32 deletions pkg/compact/compact.go
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/runutil"
)

type ResolutionLevel int64
Expand Down Expand Up @@ -899,39 +900,11 @@ func NewBucketCompactor(
// restarting the whole process but the blocks do not exist in the remote
// object storage anymore.
func removeAllNonCGDirs(groups []*Group, compactDir string) error {
entries, err := ioutil.ReadDir(compactDir)
if err != nil {
return errors.Wrap(err, "read dir")
ignoreDirs := []string{}
for _, gr := range groups {
ignoreDirs = append(ignoreDirs, gr.Key())
}
var groupErrs terrors.MultiError

for _, d := range entries {
if !d.IsDir() {
if err := os.RemoveAll(filepath.Join(compactDir, d.Name())); err != nil {
groupErrs.Add(err)
}
continue
}

var found bool
for _, gr := range groups {
if gr.Key() == d.Name() {
found = true
break
}
}

if !found {
if err := os.RemoveAll(filepath.Join(compactDir, d.Name())); err != nil {
groupErrs.Add(err)
}
}
}

if groupErrs != nil {
return errors.Wrap(groupErrs, "delete file/dir")
}
return nil
return runutil.DeleteAllExceptDirs(compactDir, ignoreDirs)
}

// Compact runs compaction over bucket.
Expand Down
39 changes: 39 additions & 0 deletions pkg/runutil/runutil.go
Expand Up @@ -54,6 +54,7 @@ import (
"io"
"io/ioutil"
"os"
"path/filepath"
"time"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -157,3 +158,41 @@ func ExhaustCloseWithErrCapture(err *error, r io.ReadCloser, format string, a ..

*err = merr.Err()
}

// DeleteAllExceptDirs deletes all files and directories inside the given
// dir except for the ignoreDirs directories.
func DeleteAllExceptDirs(dir string, ignoreDirs []string) error {
entries, err := ioutil.ReadDir(dir)
if err != nil {
return errors.Wrap(err, "read dir")
}
var groupErrs tsdberrors.MultiError

for _, d := range entries {
if !d.IsDir() {
if err := os.RemoveAll(filepath.Join(dir, d.Name())); err != nil {
groupErrs.Add(err)
}
continue
}

var found bool
for _, id := range ignoreDirs {
if id == d.Name() {
found = true
break
}
}

if !found {
if err := os.RemoveAll(filepath.Join(dir, d.Name())); err != nil {
groupErrs.Add(err)
}
}
}

if groupErrs != nil {
return errors.Wrap(groupErrs, "delete file/dir")
}
return nil
}
35 changes: 35 additions & 0 deletions pkg/runutil/runutil_test.go
Expand Up @@ -5,7 +5,9 @@ package runutil

import (
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"

Expand Down Expand Up @@ -122,3 +124,36 @@ func TestCloseMoreThanOnce(t *testing.T) {
CloseWithLogOnErr(lc, r, "should be called")
testutil.Equals(t, true, lc.WasCalled)
}

func TestClearsDirectoriesFilesProperly(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
testutil.Ok(t, err)

t.Cleanup(func() {
os.RemoveAll(dir)
})

f, err := os.Create(filepath.Join(dir, "test123"))
testutil.Ok(t, err)
testutil.Ok(t, f.Close())

testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN8"), os.ModePerm))
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW1TN8"), os.ModePerm))
f, err = os.Create(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN9"))
testutil.Ok(t, err)
testutil.Ok(t, f.Close())

testutil.Ok(t, DeleteAllExceptDirs(dir, []string{"01EHBQRN4RF0HSRR1772KW0TN8", "01EHBQRN4RF0HSRR1772KW0TN9"}))

_, err = os.Stat(filepath.Join(dir, "test123"))
testutil.Assert(t, os.IsNotExist(err))

_, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN9"))
testutil.Assert(t, os.IsNotExist(err))

_, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW1TN8/"))
testutil.Assert(t, os.IsNotExist(err))

_, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN8/"))
testutil.Ok(t, err)
}
4 changes: 2 additions & 2 deletions pkg/shipper/shipper_e2e_test.go
Expand Up @@ -45,7 +45,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) {
}()

extLset := labels.FromStrings("prometheus", "prom-1")
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false, false, metadata.SHA256Func)
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false, false, metadata.NoneFunc)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) {
defer upcancel2()
testutil.Ok(t, p.WaitPrometheusUp(upctx2))

shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, false, metadata.SHA256Func)
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, false, metadata.NoneFunc)

// Create 10 new blocks. 9 of them (non compacted) should be actually uploaded.
var (
Expand Down

0 comments on commit 95bf117

Please sign in to comment.