Skip to content

Commit

Permalink
Fixed Downsampling process; Fixed runutil.CloseAndCaptureErr (#1070)
Browse files Browse the repository at this point in the history
* runutil. Simplified CloseWithErrCapture.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>

* Fixed Downsampling process; Fixed runutil.CloseAndCaptureErr

Fixes #1065

Root cause:
* runutil defered capture error function was not passing error properly so unit tests were passing, event though there was bug
* streamed block write index cache requires index file which was not closed (saved) properly yet. Closers need to be closed to perform this.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Apr 23, 2019
1 parent 8b818b0 commit da70cb0
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 59 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ We use *breaking* word for marking changes that are not backward compatible (rel

## Unreleased


### Fixed

- [#1070](https://github.com/improbable-eng/thanos/pull/1070) Downsampling works back again. Deferred closer errors are now properly captured.


## [v0.4.0-rc.0](https://github.com/improbable-eng/thanos/releases/tag/v0.4.0-rc.0) - 2019.04.18

:warning: **IMPORTANT** :warning: This is the last release that supports gossip. From Thanos v0.5.0, gossip will be completely removed.
Expand Down
12 changes: 6 additions & 6 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime
if err != nil {
return stats, errors.Wrap(err, "open index file")
}
defer runutil.CloseWithErrCapture(logger, &err, r, "gather index issue file reader")
defer runutil.CloseWithErrCapture(&err, r, "gather index issue file reader")

p, err := r.Postings(index.AllPostingsKey())
if err != nil {
Expand Down Expand Up @@ -460,33 +460,33 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT
if err != nil {
return resid, errors.Wrap(err, "open block")
}
defer runutil.CloseWithErrCapture(logger, &err, b, "repair block reader")
defer runutil.CloseWithErrCapture(&err, b, "repair block reader")

indexr, err := b.Index()
if err != nil {
return resid, errors.Wrap(err, "open index")
}
defer runutil.CloseWithErrCapture(logger, &err, indexr, "repair index reader")
defer runutil.CloseWithErrCapture(&err, indexr, "repair index reader")

chunkr, err := b.Chunks()
if err != nil {
return resid, errors.Wrap(err, "open chunks")
}
defer runutil.CloseWithErrCapture(logger, &err, chunkr, "repair chunk reader")
defer runutil.CloseWithErrCapture(&err, chunkr, "repair chunk reader")

resdir := filepath.Join(dir, resid.String())

chunkw, err := chunks.NewWriter(filepath.Join(resdir, ChunksDirname))
if err != nil {
return resid, errors.Wrap(err, "open chunk writer")
}
defer runutil.CloseWithErrCapture(logger, &err, chunkw, "repair chunk writer")
defer runutil.CloseWithErrCapture(&err, chunkw, "repair chunk writer")

indexw, err := index.NewWriter(filepath.Join(resdir, IndexFilename))
if err != nil {
return resid, errors.Wrap(err, "open index writer")
}
defer runutil.CloseWithErrCapture(logger, &err, indexw, "repair index writer")
defer runutil.CloseWithErrCapture(&err, indexw, "repair index writer")

// TODO(fabxc): adapt so we properly handle the version once we update to an upstream
// that has multiple.
Expand Down
7 changes: 4 additions & 3 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ func Downsample(
if err != nil {
return id, errors.Wrap(err, "open index reader")
}
defer runutil.CloseWithErrCapture(logger, &err, indexr, "downsample index reader")
defer runutil.CloseWithErrCapture(&err, indexr, "downsample index reader")

chunkr, err := b.Chunks()
if err != nil {
return id, errors.Wrap(err, "open chunk reader")
}
defer runutil.CloseWithErrCapture(logger, &err, chunkr, "downsample chunk reader")
defer runutil.CloseWithErrCapture(&err, chunkr, "downsample chunk reader")

// Generate new block id.
uid := ulid.MustNew(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano())))
Expand Down Expand Up @@ -81,12 +81,13 @@ func Downsample(
if err != nil {
return id, errors.Wrap(err, "get streamed block writer")
}
defer runutil.CloseWithErrCapture(logger, &err, streamedBlockWriter, "close stream block writer")
defer runutil.CloseWithErrCapture(&err, streamedBlockWriter, "close stream block writer")

postings, err := indexr.Postings(index.AllPostingsKey())
if err != nil {
return id, errors.Wrap(err, "get all postings list")
}

var (
aggrChunks []*AggrChunk
all []sample
Expand Down
3 changes: 3 additions & 0 deletions pkg/compact/downsample/downsample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ func testDownsample(t *testing.T, data []*downsampleTestSet, meta *metadata.Meta
id, err := Downsample(log.NewNopLogger(), meta, mb, dir, resolution)
testutil.Ok(t, err)

_, err = metadata.Read(filepath.Join(dir, id.String()))
testutil.Ok(t, err)

exp := map[uint64]map[AggrType][]sample{}
got := map[uint64]map[AggrType][]sample{}

Expand Down
52 changes: 25 additions & 27 deletions pkg/compact/downsample/streamed_block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,26 +167,20 @@ func (w *streamedBlockWriter) Close() error {
if w.finalized {
return nil
}

var merr tsdb.MultiError
w.finalized = true

// Finalise data block only if there wasn't any internal errors.
if !w.ignoreFinalize {
merr.Add(w.finalize())
}
merr := tsdb.MultiError{}

for _, cl := range w.closers {
merr.Add(cl.Close())
if w.ignoreFinalize {
// Close open file descriptors anyway.
for _, cl := range w.closers {
merr.Add(cl.Close())
}
return merr.Err()
}

return errors.Wrap(merr.Err(), "close closers")
}
// Finalize saves prepared index and metadata to corresponding files.

// finalize saves prepared index and meta data to corresponding files.
// It is called on Close. Even if an error happened outside of StreamWriter, it will finalize the block anyway,
// so it's a caller's responsibility to remove the block's directory.
func (w *streamedBlockWriter) finalize() error {
if err := w.writeLabelSets(); err != nil {
return errors.Wrap(err, "write label sets")
}
Expand All @@ -195,7 +189,15 @@ func (w *streamedBlockWriter) finalize() error {
return errors.Wrap(err, "write mem postings")
}

if err := w.writeIndexCache(); err != nil {
for _, cl := range w.closers {
merr.Add(cl.Close())
}

if err := block.WriteIndexCache(
w.logger,
filepath.Join(w.blockDir, block.IndexFilename),
filepath.Join(w.blockDir, block.IndexCacheFilename),
); err != nil {
return errors.Wrap(err, "write index cache")
}

Expand All @@ -207,8 +209,14 @@ func (w *streamedBlockWriter) finalize() error {
return errors.Wrap(err, "sync blockDir")
}

if err := merr.Err(); err != nil {
return errors.Wrap(err, "finalize")
}

// No error, claim success.

level.Info(w.logger).Log(
"msg", "write downsampled block",
"msg", "finalized downsampled block",
"mint", w.meta.MinTime,
"maxt", w.meta.MaxTime,
"ulid", w.meta.ULID,
Expand All @@ -224,7 +232,7 @@ func (w *streamedBlockWriter) syncDir() (err error) {
return errors.Wrap(err, "open temporary block blockDir")
}

defer runutil.CloseWithErrCapture(w.logger, &err, df, "close temporary block blockDir")
defer runutil.CloseWithErrCapture(&err, df, "close temporary block blockDir")

if err := fileutil.Fsync(df); err != nil {
return errors.Wrap(err, "sync temporary blockDir")
Expand Down Expand Up @@ -257,16 +265,6 @@ func (w *streamedBlockWriter) writeMemPostings() error {
return nil
}

func (w *streamedBlockWriter) writeIndexCache() error {
indexFile := filepath.Join(w.blockDir, block.IndexFilename)
indexCacheFile := filepath.Join(w.blockDir, block.IndexCacheFilename)
if err := block.WriteIndexCache(w.logger, indexFile, indexCacheFile); err != nil {
return errors.Wrap(err, "write index cache")
}

return nil
}

// writeMetaFile writes meta file.
func (w *streamedBlockWriter) writeMetaFile() error {
w.meta.Version = metadata.MetaVersion1
Expand Down
30 changes: 9 additions & 21 deletions pkg/runutil/runutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
// For capturing error, use CloseWithErrCapture:
//
// var err error
// defer runutil.CloseWithErrCapture(logger, &err, closer, "log format message")
// defer runutil.CloseWithErrCapture(&err, closer, "log format message")
//
// // ...
//
Expand All @@ -49,6 +49,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/tsdb"
)

// Repeat executes f every interval seconds until stopc is closed.
Expand Down Expand Up @@ -107,26 +108,13 @@ func CloseWithLogOnErr(logger log.Logger, closer io.Closer, format string, a ...
level.Warn(logger).Log("msg", "detected close error", "err", errors.Wrap(err, fmt.Sprintf(format, a...)))
}

// CloseWithErrCapture runs function and on error tries to return error by argument.
// If error is already there we assume that error has higher priority and we just log the function error.
func CloseWithErrCapture(logger log.Logger, err *error, closer io.Closer, format string, a ...interface{}) {
closeErr := closer.Close()
if closeErr == nil {
return
}

if *err == nil {
err = &closeErr
return
}
// CloseWithErrCapture runs function and on error return error by argument including the given error (usually
// from caller function).
func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...interface{}) {
merr := tsdb.MultiError{}

// There is already an error, let's log this one.
if logger == nil {
logger = log.NewLogfmtLogger(os.Stderr)
}
merr.Add(*err)
merr.Add(errors.Wrapf(closer.Close(), format, a...))

level.Warn(logger).Log(
"msg", "detected best effort close error that was preempted from the more important one",
"err", errors.Wrap(closeErr, fmt.Sprintf(format, a...)),
)
*err = merr.Err()
}
70 changes: 70 additions & 0 deletions pkg/runutil/runutil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package runutil

import (
"github.com/pkg/errors"
"io"
"testing"
)

type testCloser struct {
err error
}

func (c testCloser) Close() error {
return c.err
}

func TestCloseWithErrCapture(t *testing.T) {
for _, tcase := range []struct{
err error
closer io.Closer

expectedErrStr string
}{
{
err: nil,
closer: testCloser{err:nil},
expectedErrStr: "",
},
{
err: errors.New("test"),
closer: testCloser{err:nil},
expectedErrStr: "test",
},
{
err: nil,
closer: testCloser{err:errors.New("test")},
expectedErrStr: "close: test",
},
{
err: errors.New("test"),
closer: testCloser{err:errors.New("test")},
expectedErrStr: "2 errors: test; close: test",
},
}{
if ok := t.Run("", func(t *testing.T) {
ret := tcase.err
CloseWithErrCapture(&ret, tcase.closer, "close")

if tcase.expectedErrStr == "" {
if ret != nil {
t.Error("Expected error to be nil")
t.Fail()
}
} else {
if ret == nil {
t.Error("Expected error to be not nil")
t.Fail()
}

if tcase.expectedErrStr != ret.Error() {
t.Errorf("%s != %s", tcase.expectedErrStr, ret.Error())
t.Fail()
}
}

}); !ok {
return
}
}
}
4 changes: 2 additions & 2 deletions pkg/testutil/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (p *Prometheus) SetConfig(s string) (err error) {
if err != nil {
return err
}
defer runutil.CloseWithErrCapture(nil, &err, f, "prometheus config")
defer runutil.CloseWithErrCapture(&err, f, "prometheus config")

_, err = f.Write([]byte(s))
return err
Expand Down Expand Up @@ -302,7 +302,7 @@ func createBlock(
if err != nil {
return id, errors.Wrap(err, "create head block")
}
defer runutil.CloseWithErrCapture(log.NewNopLogger(), &err, h, "TSDB Head")
defer runutil.CloseWithErrCapture(&err, h, "TSDB Head")

var g errgroup.Group
var timeStepSize = (maxt - mint) / int64(numSamples+1)
Expand Down

0 comments on commit da70cb0

Please sign in to comment.