Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: Expose bucket index operation duration histogram #2725

Merged
merged 7 commits into from Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cacheutil/memcached_client.go
Expand Up @@ -466,7 +466,7 @@ func (c *memcachedClient) resolveAddrs() error {

// If some of the dns resolution fails, log the error.
if err := c.dnsProvider.Resolve(ctx, c.config.Addresses); err != nil {
level.Error(c.logger).Log("msg", "failed to resolve addresses for storeAPIs", "addresses", strings.Join(c.config.Addresses, ","), "err", err)
level.Error(c.logger).Log("msg", "failed to resolve addresses for memcached", "addresses", strings.Join(c.config.Addresses, ","), "err", err)
}
// Fail in case no server address is resolved.
servers := c.dnsProvider.Addresses()
Expand Down
2 changes: 2 additions & 0 deletions pkg/objstore/objstore.go
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/thanos-io/thanos/pkg/runutil"
)

Expand Down Expand Up @@ -257,6 +258,7 @@ func BucketWithMetrics(name string, b Bucket, reg prometheus.Registerer) *metric
ConstLabels: prometheus.Labels{"bucket": name},
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
}, []string{"operation"}),

lastSuccessfulUploadTime: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "thanos_objstore_bucket_last_successful_upload_time",
Help: "Second timestamp of the last successful upload to the bucket.",
Expand Down
41 changes: 34 additions & 7 deletions pkg/store/bucket.go
Expand Up @@ -111,6 +111,9 @@ type bucketStoreMetrics struct {
cachedPostingsCompressionTimeSeconds *prometheus.CounterVec
cachedPostingsOriginalSizeBytes prometheus.Counter
cachedPostingsCompressedSizeBytes prometheus.Counter

seriesLookupDuration prometheus.Histogram
postingsLookupDuration prometheus.Histogram
}

func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Expand Down Expand Up @@ -221,6 +224,18 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Help: "Compressed size of postings stored into cache.",
})

m.seriesLookupDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "thanos_bucket_store_index_series_lookup_duration_seconds",
Help: "Time it takes to lookup series from a bucket to respond a query. It also includes the cache fetch and store operations.",
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
})

m.postingsLookupDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have just label? It will much easier to aggregate, plus we already do that for other statistics, WDYT?

I think also bucket can be adjusted for timeout. BTW What lookup means is quite vague by this help 🤔 Maybe we can be explicit more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bwplotka The first version was using a single histogram with labels 8a46f33. Then I decided to change it, just to be consistent with what we already have in here.

BTW What lookup means is quite vague by this help 🤔 Maybe we can be explicit more.

I'm gonna change the name as you suggested and try to clarify the description.

I think also bucket can be adjusted for timeout.

I've put the boundary at the query timeout. Which timeout value is used for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bwplotka friendly ping.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok for now 👍

Name: "thanos_bucket_store_index_postings_lookup_duration_seconds",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Name: "thanos_bucket_store_index_postings_lookup_duration_seconds",
Name: "thanos_bucket_store_index_cachable_fetches_duration_seconds",

I would add cachable to put highlight that it includes cache (:

Help: "Time it takes to lookup postings from a bucket to respond a query. It also includes the cache fetch and store operations.",
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
})

return &m
}

Expand Down Expand Up @@ -473,7 +488,14 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
lset := labels.FromMap(meta.Thanos.Labels)
h := lset.Hash()

indexHeaderReader, err := indexheader.NewBinaryReader(ctx, s.logger, s.bkt, s.dir, meta.ULID, s.postingOffsetsInMemSampling)
indexHeaderReader, err := indexheader.NewBinaryReader(
ctx,
s.logger,
s.bkt,
s.dir,
meta.ULID,
s.postingOffsetsInMemSampling,
)
if err != nil {
return errors.Wrap(err, "create index header reader")
}
Expand All @@ -486,14 +508,14 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
b, err := newBucketBlock(
ctx,
log.With(s.logger, "block", meta.ULID),
s.metrics,
meta,
s.bkt,
dir,
s.indexCache,
s.chunkPool,
indexHeaderReader,
s.partitioner,
s.metrics.seriesRefetches,
s.enablePostingsCompression,
)
if err != nil {
Expand Down Expand Up @@ -1258,6 +1280,7 @@ func (s *bucketBlockSet) labelMatchers(matchers ...*labels.Matcher) ([]*labels.M
// state for the block on local disk.
type bucketBlock struct {
logger log.Logger
metrics *bucketStoreMetrics
bkt objstore.BucketReader
meta *metadata.Meta
dir string
Expand All @@ -1272,8 +1295,6 @@ type bucketBlock struct {

partitioner partitioner

seriesRefetches prometheus.Counter

enablePostingsCompression bool

// Block's labels used by block-level matchers to filter blocks to query. These are used to select blocks using
Expand All @@ -1284,26 +1305,26 @@ type bucketBlock struct {
func newBucketBlock(
ctx context.Context,
logger log.Logger,
metrics *bucketStoreMetrics,
meta *metadata.Meta,
bkt objstore.BucketReader,
dir string,
indexCache storecache.IndexCache,
chunkPool pool.BytesPool,
indexHeadReader indexheader.Reader,
p partitioner,
seriesRefetches prometheus.Counter,
enablePostingsCompression bool,
) (b *bucketBlock, err error) {
b = &bucketBlock{
logger: logger,
metrics: metrics,
bkt: bkt,
indexCache: indexCache,
chunkPool: chunkPool,
dir: dir,
partitioner: p,
meta: meta,
indexHeaderReader: indexHeadReader,
seriesRefetches: seriesRefetches,
enablePostingsCompression: enablePostingsCompression,
}

Expand Down Expand Up @@ -1610,6 +1631,9 @@ type postingPtr struct {
// It returns one postings for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings, error) {
start := time.Now()
defer r.block.metrics.postingsLookupDuration.Observe(time.Since(start).Seconds())
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved

var ptrs []postingPtr

output := make([]index.Postings, len(keys))
Expand Down Expand Up @@ -1827,6 +1851,9 @@ func (it *bigEndianPostings) length() int {
}

func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
start := time.Now()
defer r.block.metrics.seriesLookupDuration.Observe(float64(time.Since(start)))

// Load series from cache, overwriting the list of ids to preload
// with the missing ones.
fromCache, ids := r.block.indexCache.FetchMultiSeries(r.ctx, r.block.meta.ULID, ids)
Expand Down Expand Up @@ -1877,7 +1904,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, refetc
}

// Inefficient, but should be rare.
r.block.seriesRefetches.Inc()
r.block.metrics.seriesRefetches.Inc()
level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", maxSeriesSize)

// Fetch plus to get the size of next one if exists.
Expand Down
37 changes: 20 additions & 17 deletions pkg/store/bucket_test.go
Expand Up @@ -30,15 +30,15 @@ import (
"github.com/leanovate/gopter/gen"
"github.com/leanovate/gopter/prop"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/encoding"

"go.uber.org/atomic"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand All @@ -52,7 +52,6 @@ import (
storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
"go.uber.org/atomic"
)

var emptyRelabelConfig = make([]*relabel.Config, 0)
Expand Down Expand Up @@ -209,7 +208,7 @@ func TestBucketBlock_matchLabels(t *testing.T) {
},
}

b, err := newBucketBlock(context.Background(), log.NewNopLogger(), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, true)
b, err := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, true)
testutil.Ok(t, err)

cases := []struct {
Expand Down Expand Up @@ -921,10 +920,10 @@ func TestReadIndexCache_LoadSeries(t *testing.T) {
ULID: ulid.MustNew(1, nil),
},
},
bkt: bkt,
seriesRefetches: s.seriesRefetches,
logger: log.NewNopLogger(),
indexCache: noopCache{},
bkt: bkt,
logger: log.NewNopLogger(),
metrics: s,
indexCache: noopCache{},
}

buf := encoding.Encbuf{}
Expand Down Expand Up @@ -1130,6 +1129,7 @@ func benchmarkExpandedPostings(
t.Run(c.name, func(t testutil.TB) {
b := &bucketBlock{
logger: log.NewNopLogger(),
metrics: newBucketStoreMetrics(nil),
indexHeaderReader: r,
indexCache: noopCache{},
bkt: bkt,
Expand Down Expand Up @@ -1228,15 +1228,16 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String())))

m := newBucketStoreMetrics(nil)
b := &bucketBlock{
indexCache: noopCache{},
logger: logger,
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
seriesRefetches: promauto.With(nil).NewCounter(prometheus.CounterOpts{}),
indexCache: noopCache{},
logger: logger,
metrics: m,
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
}
blocks = append(blocks, b)
}
Expand Down Expand Up @@ -1289,7 +1290,7 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request

for _, b := range blocks {
// NOTE(bwplotka): It is 4 x 1.0 for 100mln samples. Kind of make sense: long series.
testutil.Equals(t, 0.0, promtest.ToFloat64(b.seriesRefetches))
testutil.Equals(t, 0.0, promtest.ToFloat64(b.metrics.seriesRefetches))
}
}
}
Expand Down Expand Up @@ -1393,6 +1394,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
b1 = &bucketBlock{
indexCache: indexCache,
logger: logger,
metrics: newBucketStoreMetrics(nil),
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
Expand Down Expand Up @@ -1431,6 +1433,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
b2 = &bucketBlock{
indexCache: indexCache,
logger: logger,
metrics: newBucketStoreMetrics(nil),
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
Expand Down