Skip to content

Commit

Permalink
Fix panic on concurrent index-header lazy reader usage and unload (#3760
Browse files Browse the repository at this point in the history
)

* Fix panic on concurrent index-header lazy reader usage and unload

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Addressed review comments

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Addressed review comments

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed Feb 2, 2021
1 parent 7f048e4 commit a24acb6
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -36,6 +36,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
- [#3527](https://github.com/thanos-io/thanos/pull/3527) Query Frontend: Fix query_range behavior when start/end times are the same
- [#3560](https://github.com/thanos-io/thanos/pull/3560) query-frontend: Allow separate label cache
- [#3672](https://github.com/thanos-io/thanos/pull/3672) rule: prevent rule crash from no such host error when using `dnssrv+` or `dnssrvnoa+`.
- [#3760](https://github.com/thanos-io/thanos/pull/3760) Store: Fix panic caused by a race condition happening on concurrent index-header reader usage and unload, when `--store.enable-index-header-lazy-reader` is enabled.

### Changed

Expand Down
36 changes: 27 additions & 9 deletions pkg/block/indexheader/lazy_binary_reader.go
Expand Up @@ -23,6 +23,10 @@ import (
"github.com/thanos-io/thanos/pkg/objstore"
)

var (
errNotIdle = errors.New("the reader is not idle")
)

// LazyBinaryReaderMetrics holds metrics tracked by LazyBinaryReader.
type LazyBinaryReaderMetrics struct {
loadCount prometheus.Counter
Expand Down Expand Up @@ -133,7 +137,8 @@ func (r *LazyBinaryReader) Close() error {
defer r.onClosed(r)
}

return r.unload()
// Unload without checking if idle.
return r.unloadIfIdleSince(0)
}

// IndexVersion implements Reader.
Expand Down Expand Up @@ -245,19 +250,22 @@ func (r *LazyBinaryReader) load() error {
return nil
}

// unload closes underlying BinaryReader. Calling this function on a already unloaded reader is a no-op.
func (r *LazyBinaryReader) unload() error {
// Always update the used timestamp so that the pool will not call unload() again until the next
// idle timeout is hit.
r.usedAt.Store(time.Now().UnixNano())

// unloadIfIdleSince closes underlying BinaryReader if the reader is idle since given time (as unix nano). If idleSince is 0,
// the check on the last usage is skipped. Calling this function on a already unloaded reader is a no-op.
func (r *LazyBinaryReader) unloadIfIdleSince(ts int64) error {
r.readerMx.Lock()
defer r.readerMx.Unlock()

// Nothing to do if already unloaded.
if r.reader == nil {
return nil
}

// Do not unloadIfIdleSince if not idle.
if ts > 0 && r.usedAt.Load() > ts {
return errNotIdle
}

r.metrics.unloadCount.Inc()
if err := r.reader.Close(); err != nil {
r.metrics.unloadFailedCount.Inc()
Expand All @@ -268,6 +276,16 @@ func (r *LazyBinaryReader) unload() error {
return nil
}

func (r *LazyBinaryReader) lastUsedAt() int64 {
return r.usedAt.Load()
// isIdleSince returns true if the reader is idle since given time (as unix nano).
func (r *LazyBinaryReader) isIdleSince(ts int64) bool {
if r.usedAt.Load() > ts {
return false
}

// A reader can be considered idle only if it's loaded.
r.readerMx.RLock()
loaded := r.reader != nil
r.readerMx.RUnlock()

return loaded
}
49 changes: 49 additions & 0 deletions pkg/block/indexheader/lazy_binary_reader_test.go
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
Expand Down Expand Up @@ -168,3 +169,51 @@ func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) {
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount))
}
}

func TestLazyBinaryReader_unload_ShouldReturnErrorIfNotIdle(t *testing.T) {
ctx := context.Background()

tmpDir, err := ioutil.TempDir("", "test-indexheader")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
testutil.Ok(t, err)
defer func() { testutil.Ok(t, bkt.Close()) }()

// Create block.
blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String())))

m := NewLazyBinaryReaderMetrics(nil)
r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil)
testutil.Ok(t, err)
testutil.Assert(t, r.reader == nil)

// Should lazy load the index upon first usage.
labelNames, err := r.LabelNames()
testutil.Ok(t, err)
testutil.Equals(t, []string{"a"}, labelNames)
testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount))

// Try to unload but not idle since enough time.
testutil.Equals(t, errNotIdle, r.unloadIfIdleSince(time.Now().Add(-time.Minute).UnixNano()))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount))

// Try to unload and idle since enough time.
testutil.Ok(t, r.unloadIfIdleSince(time.Now().UnixNano()))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount))
}
20 changes: 7 additions & 13 deletions pkg/block/indexheader/reader_pool.go
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/thanos-io/thanos/pkg/objstore"
Expand Down Expand Up @@ -98,29 +99,22 @@ func (p *ReaderPool) Close() {
}

func (p *ReaderPool) closeIdleReaders() {
for _, r := range p.getIdleReaders() {
// Closing an already closed reader is a no-op, so we close it and just update
// the last timestamp on success. If it will be still be idle the next time this
// function is called, we'll try to close it again and will just be a no-op.
//
// Due to concurrency, the current implementation may close a reader which was
// use between when the list of idle readers has been computed and now. This is
// an edge case we're willing to accept, to not further complicate the logic.
if err := r.unload(); err != nil {
idleTimeoutAgo := time.Now().Add(-p.lazyReaderIdleTimeout).UnixNano()

for _, r := range p.getIdleReadersSince(idleTimeoutAgo) {
if err := r.unloadIfIdleSince(idleTimeoutAgo); err != nil && !errors.Is(err, errNotIdle) {
level.Warn(p.logger).Log("msg", "failed to close idle index-header reader", "err", err)
}
}
}

func (p *ReaderPool) getIdleReaders() []*LazyBinaryReader {
func (p *ReaderPool) getIdleReadersSince(ts int64) []*LazyBinaryReader {
p.lazyReadersMx.Lock()
defer p.lazyReadersMx.Unlock()

var idle []*LazyBinaryReader
threshold := time.Now().Add(-p.lazyReaderIdleTimeout).UnixNano()

for r := range p.lazyReaders {
if r.lastUsedAt() < threshold {
if r.isIdleSince(ts) {
idle = append(idle, r)
}
}
Expand Down

0 comments on commit a24acb6

Please sign in to comment.