Skip to content

Commit

Permalink
fix interface
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 committed Oct 5, 2023
1 parent a6b5376 commit 7501e88
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 42 deletions.
24 changes: 12 additions & 12 deletions pkg/store/cache/filter_cache.go
Expand Up @@ -28,49 +28,49 @@ func NewFilteredIndexCache(cache IndexCache, enabledItems []string) *FilteredInd

// StorePostings sets the postings identified by the ulid and label to the value v,
// if the postings already exists in the cache it is not mutated.
func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) {
func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) {
c.cache.StorePostings(blockID, l, v)
c.cache.StorePostings(blockID, l, v, tenant)
}
}

// FetchMultiPostings fetches multiple postings - each identified by a label -
// and returns a map containing cache hits, along with a list of missing keys.
func (c *FilteredIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) {
func (c *FilteredIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) {
return c.cache.FetchMultiPostings(ctx, blockID, keys)
return c.cache.FetchMultiPostings(ctx, blockID, keys, tenant)
}
return nil, keys
}

// StoreExpandedPostings stores expanded postings for a set of label matchers.
func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) {
func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) {
c.cache.StoreExpandedPostings(blockID, matchers, v)
c.cache.StoreExpandedPostings(blockID, matchers, v, tenant)
}
}

// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) {
func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) {
return c.cache.FetchExpandedPostings(ctx, blockID, matchers)
return c.cache.FetchExpandedPostings(ctx, blockID, matchers, tenant)
}
return nil, false
}

// StoreSeries sets the series identified by the ulid and id to the value v,
// if the series already exists in the cache it is not mutated.
func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) {
func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) {
c.cache.StoreSeries(blockID, id, v)
c.cache.StoreSeries(blockID, id, v, tenant)
}
}

// FetchMultiSeries fetches multiple series - each identified by ID - from the cache
// and returns a map containing cache hits, along with a list of missing IDs.
func (c *FilteredIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
func (c *FilteredIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) {
return c.cache.FetchMultiSeries(ctx, blockID, ids)
return c.cache.FetchMultiSeries(ctx, blockID, ids, tenant)
}
return nil, ids
}
Expand Down
62 changes: 32 additions & 30 deletions pkg/store/cache/filter_cache_test.go
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"

"github.com/thanos-io/thanos/pkg/tenancy"
)

func TestFilterCache(t *testing.T) {
Expand Down Expand Up @@ -46,19 +48,19 @@ func TestFilterCache(t *testing.T) {
{
name: "empty enabled items",
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData)
c.StoreSeries(blockID, 1, testSeriesData)
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys)
hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(missed))
testutil.Equals(t, testPostingData, hits[postingKeys[0]])

ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers)
ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Equals(t, true, hit)
testutil.Equals(t, testExpandedPostingsData, ep)

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1})
seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(misses))
testutil.Equals(t, testSeriesData, seriesHit[1])
},
Expand All @@ -67,19 +69,19 @@ func TestFilterCache(t *testing.T) {
name: "all enabled items",
enabledItems: []string{cacheTypeSeries, cacheTypePostings, cacheTypeExpandedPostings},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData)
c.StoreSeries(blockID, 1, testSeriesData)
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys)
hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(missed))
testutil.Equals(t, testPostingData, hits[postingKeys[0]])

ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers)
ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Assert(t, true, hit)
testutil.Equals(t, testExpandedPostingsData, ep)

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1})
seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(misses))
testutil.Equals(t, testSeriesData, seriesHit[1])
},
Expand All @@ -88,18 +90,18 @@ func TestFilterCache(t *testing.T) {
name: "only enable postings",
enabledItems: []string{cacheTypePostings},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData)
c.StoreSeries(blockID, 1, testSeriesData)
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys)
hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(missed))
testutil.Equals(t, testPostingData, hits[postingKeys[0]])

_, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers)
_, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Equals(t, false, hit)

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1})
seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(misses))
testutil.Equals(t, 0, len(seriesHit))
},
Expand All @@ -108,19 +110,19 @@ func TestFilterCache(t *testing.T) {
name: "only enable expanded postings",
enabledItems: []string{cacheTypeExpandedPostings},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData)
c.StoreSeries(blockID, 1, testSeriesData)
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys)
hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(missed))
testutil.Equals(t, 0, len(hits))

ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers)
ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Equals(t, true, hit)
testutil.Equals(t, testExpandedPostingsData, ep)

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1})
seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(misses))
testutil.Equals(t, 0, len(seriesHit))
},
Expand All @@ -129,18 +131,18 @@ func TestFilterCache(t *testing.T) {
name: "only enable series",
enabledItems: []string{cacheTypeSeries},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData)
c.StoreSeries(blockID, 1, testSeriesData)
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys)
hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(missed))
testutil.Equals(t, 0, len(hits))

_, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers)
_, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Equals(t, false, hit)

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1})
seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(misses))
testutil.Equals(t, testSeriesData, seriesHit[1])
},
Expand Down

0 comments on commit 7501e88

Please sign in to comment.