Skip to content

Commit

Permalink
Address comments, delete non-CG/block dirs
Browse files Browse the repository at this point in the history
Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>
  • Loading branch information
GiedriusS committed Sep 4, 2020
1 parent 8ef2eec commit 761a456
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 9 deletions.
46 changes: 46 additions & 0 deletions cmd/thanos/downsample.go
Expand Up @@ -5,6 +5,7 @@ package main

import (
"context"
"io/ioutil"
"os"
"path/filepath"
"time"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
terrors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
Expand Down Expand Up @@ -151,6 +153,46 @@ func RunDownsample(
return nil
}

// removeAllNonMetaDirs removes all subdirectories and all files under the given dir
// that do not correspond to any metas. This is needed in the case when
// the downsampling process resumes without restarting the whole process
// but the blocks do not exist in the remote object storage anymore.
func removeAllNonMetaDirs(metas map[ulid.ULID]*metadata.Meta, dir string) error {
entries, err := ioutil.ReadDir(dir)
if err != nil {
return errors.Wrap(err, "read dir")
}
var groupErrs terrors.MultiError

for _, d := range entries {
if !d.IsDir() {
if err := os.RemoveAll(filepath.Join(dir, d.Name())); err != nil {
groupErrs.Add(err)
}
continue
}

var found bool
for ulid := range metas {
if ulid.String() == d.Name() {
found = true
break
}
}

if !found {
if err := os.RemoveAll(filepath.Join(dir, d.Name())); err != nil {
groupErrs.Add(err)
}
}
}

if groupErrs != nil {
return errors.Wrap(groupErrs, "delete file/dir")
}
return nil
}

func downsampleBucket(
ctx context.Context,
logger log.Logger,
Expand All @@ -170,6 +212,10 @@ func downsampleBucket(
}
}()

if err := removeAllNonMetaDirs(metas, dir); err != nil {
level.Warn(logger).Log("msg", "failed deleting potentially outdated directories/files, some disk space usage might have leaked. Continuing", "err", err, "dir", dir)
}

// mapping from a hash over all source IDs to blocks. We don't need to downsample a block
// if a downsampled version with the same hash already exists.
sources5m := map[ulid.ULID]struct{}{}
Expand Down
51 changes: 51 additions & 0 deletions cmd/thanos/downsample_test.go
@@ -0,0 +1,51 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package main

import (
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/oklog/ulid"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestDownsampleClearsDirectoriesFiles(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
testutil.Ok(t, err)

t.Cleanup(func() {
os.RemoveAll(dir)
})

f, err := os.Create(filepath.Join(dir, "test123"))
testutil.Ok(t, err)
testutil.Ok(t, f.Close())

testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN8"), os.ModePerm))
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW1TN8"), os.ModePerm))
f, err = os.Create(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN9"))
testutil.Ok(t, err)
testutil.Ok(t, f.Close())

testutil.Ok(t, removeAllNonMetaDirs(map[ulid.ULID]*metadata.Meta{
ulid.MustParse("01EHBQRN4RF0HSRR1772KW0TN8"): {},
ulid.MustParse("01EHBQRN4RF0HSRR1772KW0TN9"): {},
}, dir))

_, err = os.Stat(filepath.Join(dir, "test123"))
testutil.Assert(t, os.IsNotExist(err))

_, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN9"))
testutil.Assert(t, os.IsNotExist(err))

_, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW1TN8/"))
testutil.Assert(t, os.IsNotExist(err))

_, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN8/"))
testutil.Ok(t, err)
}
2 changes: 1 addition & 1 deletion pkg/block/metadata/hash.go
Expand Up @@ -102,4 +102,4 @@ func CalculateHashDir(dir string, hf HashFunc, ignore ...string) (map[string]Obj

// NominalIgnoredFiles is a list of files that you want to typically ignore
// when calculating the hashes of a TSDB block.
var NominalIgnoredFiles = []string{DeletionMarkFilename, MetaFilename}
var NominalIgnoredFiles = []string{MetaFilename}
45 changes: 45 additions & 0 deletions pkg/compact/compact.go
Expand Up @@ -893,6 +893,47 @@ func NewBucketCompactor(
}, nil
}

// removeAllNonCGDirs removes all subdirectories of the given directory
// that are not part of the given compaction groups, and all files.
// This is needed in the case when the compaction process resumes without
// restarting the whole process but the blocks do not exist in the remote
// object storage anymore.
func removeAllNonCGDirs(groups []*Group, compactDir string) error {
entries, err := ioutil.ReadDir(compactDir)
if err != nil {
return errors.Wrap(err, "read dir")
}
var groupErrs terrors.MultiError

for _, d := range entries {
if !d.IsDir() {
if err := os.RemoveAll(filepath.Join(compactDir, d.Name())); err != nil {
groupErrs.Add(err)
}
continue
}

var found bool
for _, gr := range groups {
if gr.Key() == d.Name() {
found = true
break
}
}

if !found {
if err := os.RemoveAll(filepath.Join(compactDir, d.Name())); err != nil {
groupErrs.Add(err)
}
}
}

if groupErrs != nil {
return errors.Wrap(groupErrs, "delete file/dir")
}
return nil
}

// Compact runs compaction over bucket.
func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
defer func() {
Expand Down Expand Up @@ -966,6 +1007,10 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
return errors.Wrap(err, "build compaction groups")
}

if err := removeAllNonCGDirs(groups, c.compactDir); err != nil {
level.Warn(c.logger).Log("msg", "failed deleting non-compaction group directories/files, some disk space usage might have leaked. Continuing", "err", err, "dir", c.compactDir)
}

level.Info(c.logger).Log("msg", "start of compactions")

// Send all groups found during this pass to the compaction workers.
Expand Down
8 changes: 5 additions & 3 deletions pkg/shipper/shipper.go
Expand Up @@ -114,6 +114,7 @@ func New(
source: source,
allowOutOfOrderUploads: allowOutOfOrderUploads,
uploadCompacted: uploadCompacted,
hashFunc: hashFunc,
}
}

Expand Down Expand Up @@ -332,6 +333,8 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
// sync uploads the block if not exists in remote storage.
// TODO(khyatisoneji): Double check if block does not have deletion-mark.json for some reason, otherwise log it or return error.
func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error {
var err error

level.Info(s.logger).Log("msg", "upload new block", "id", meta.ULID)

// We hard-link the files into a temporary upload directory so we are not affected
Expand Down Expand Up @@ -360,11 +363,10 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error {
meta.Thanos.Labels = lset.Map()
}
meta.Thanos.Source = s.source
hashes, err := metadata.CalculateHashDir(updir, metadata.SHA256Func, metadata.NominalIgnoredFiles...)
if err != nil {

if meta.Thanos.Hashes, err = metadata.CalculateHashDir(updir, s.hashFunc, metadata.NominalIgnoredFiles...); err != nil {
return errors.Wrapf(err, "calculating hashes in %s directory", updir)
}
meta.Thanos.Hashes = hashes

if err := metadata.Write(s.logger, updir, meta); err != nil {
return errors.Wrap(err, "write meta file")
Expand Down
6 changes: 1 addition & 5 deletions pkg/verifier/safe_delete.go
Expand Up @@ -63,11 +63,7 @@ func BackupAndDelete(ctx context.Context, logger log.Logger, bkt, backupBkt objs
// Download the TSDB block.
dir := filepath.Join(tempdir, id.String())

meta, err := block.DownloadMeta(ctx, logger, bkt, id)
if err != nil {
return errors.Wrapf(err, "download %s meta", id)
}
if err := block.Download(ctx, logger, bkt, id, meta.Thanos.Hashes, dir); err != nil {
if err := block.Download(ctx, logger, bkt, id, nil, dir); err != nil {
return errors.Wrap(err, "download from source")
}

Expand Down

0 comments on commit 761a456

Please sign in to comment.