Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache: various index cache client improvements #6374

Merged
merged 3 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2345,7 +2345,7 @@ type postingPtr struct {
}

// fetchPostings fill postings requested by posting groups.
// It returns one postings for each key, in the same order.
// It returns one posting for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter) ([]index.Postings, []func(), error) {
var closeFns []func()
Expand Down
6 changes: 3 additions & 3 deletions pkg/store/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type IndexCache interface {
}

type cacheKey struct {
block ulid.ULID
block string
key interface{}
}

Expand Down Expand Up @@ -79,9 +79,9 @@ func (c cacheKey) string() string {
// which would end up in wrong query results.
lbl := c.key.(cacheKeyPostings)
lblHash := blake2b.Sum256([]byte(lbl.Name + ":" + lbl.Value))
return "P:" + c.block.String() + ":" + base64.RawURLEncoding.EncodeToString(lblHash[0:])
return "P:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(lblHash[0:])
case cacheKeySeries:
return "S:" + c.block.String() + ":" + strconv.FormatUint(uint64(c.key.(cacheKeySeries)), 10)
return "S:" + c.block + ":" + strconv.FormatUint(uint64(c.key.(cacheKeySeries)), 10)
default:
return ""
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/store/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ func TestCacheKey_string(t *testing.T) {
t.Parallel()

uid := ulid.MustNew(1, nil)
ulidString := uid.String()

tests := map[string]struct {
key cacheKey
expected string
}{
"should stringify postings cache key": {
key: cacheKey{uid, cacheKeyPostings(labels.Label{Name: "foo", Value: "bar"})},
key: cacheKey{ulidString, cacheKeyPostings(labels.Label{Name: "foo", Value: "bar"})},
expected: func() string {
hash := blake2b.Sum256([]byte("foo:bar"))
encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:])
Expand All @@ -41,7 +42,7 @@ func TestCacheKey_string(t *testing.T) {
}(),
},
"should stringify series cache key": {
key: cacheKey{uid, cacheKeySeries(12345)},
key: cacheKey{ulidString, cacheKeySeries(12345)},
expected: fmt.Sprintf("S:%s:12345", uid.String()),
},
}
Expand All @@ -58,6 +59,7 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) {
t.Parallel()

uid := ulid.MustNew(1, nil)
ulidString := uid.String()

tests := map[string]struct {
keys []cacheKey
Expand All @@ -66,14 +68,14 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) {
"should guarantee reasonably short key length for postings": {
expectedLen: 72,
keys: []cacheKey{
{uid, cacheKeyPostings(labels.Label{Name: "a", Value: "b"})},
{uid, cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})},
{ulidString, cacheKeyPostings(labels.Label{Name: "a", Value: "b"})},
{ulidString, cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})},
},
},
"should guarantee reasonably short key length for series": {
expectedLen: 49,
keys: []cacheKey{
{uid, cacheKeySeries(math.MaxUint64)},
{ulidString, cacheKeySeries(math.MaxUint64)},
},
},
}
Expand All @@ -89,7 +91,7 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) {

func BenchmarkCacheKey_string_Postings(b *testing.B) {
uid := ulid.MustNew(1, nil)
key := cacheKey{uid, cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})}
key := cacheKey{uid.String(), cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})}

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -99,7 +101,7 @@ func BenchmarkCacheKey_string_Postings(b *testing.B) {

func BenchmarkCacheKey_string_Series(b *testing.B) {
uid := ulid.MustNew(1, nil)
key := cacheKey{uid, cacheKeySeries(math.MaxUint64)}
key := cacheKey{uid.String(), cacheKeySeries(math.MaxUint64)}

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down
10 changes: 6 additions & 4 deletions pkg/store/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,16 +290,17 @@ func copyToKey(l labels.Label) cacheKeyPostings {
// StorePostings sets the postings identified by the ulid and label to the value v,
// if the postings already exists in the cache it is not mutated.
func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) {
c.set(cacheTypePostings, cacheKey{block: blockID, key: copyToKey(l)}, v)
c.set(cacheTypePostings, cacheKey{block: blockID.String(), key: copyToKey(l)}, v)
}

// FetchMultiPostings fetches multiple postings - each identified by a label -
// and returns a map containing cache hits, along with a list of missing keys.
func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) {
hits = map[labels.Label][]byte{}

blockIDKey := blockID.String()
for _, key := range keys {
if b, ok := c.get(cacheTypePostings, cacheKey{blockID, cacheKeyPostings(key)}); ok {
if b, ok := c.get(cacheTypePostings, cacheKey{blockIDKey, cacheKeyPostings(key)}); ok {
hits[key] = b
continue
}
Expand All @@ -313,16 +314,17 @@ func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.
// StoreSeries sets the series identified by the ulid and id to the value v,
// if the series already exists in the cache it is not mutated.
func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) {
c.set(cacheTypeSeries, cacheKey{blockID, cacheKeySeries(id)}, v)
c.set(cacheTypeSeries, cacheKey{blockID.String(), cacheKeySeries(id)}, v)
}

// FetchMultiSeries fetches multiple series - each identified by ID - from the cache
// and returns a map containing cache hits, along with a list of missing IDs.
func (c *InMemoryIndexCache) FetchMultiSeries(_ context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
hits = map[storage.SeriesRef][]byte{}

blockIDKey := blockID.String()
for _, id := range ids {
if b, ok := c.get(cacheTypeSeries, cacheKey{blockID, cacheKeySeries(id)}); ok {
if b, ok := c.get(cacheTypeSeries, cacheKey{blockIDKey, cacheKeySeries(id)}); ok {
hits[id] = b
continue
}
Expand Down
46 changes: 12 additions & 34 deletions pkg/store/cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli
// The function enqueues the request and returns immediately: the entry will be
// asynchronously stored in the cache.
func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) {
key := cacheKey{blockID, cacheKeyPostings(l)}.string()
key := cacheKey{blockID.String(), cacheKeyPostings(l)}.string()

if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {
level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err)
Expand All @@ -75,16 +75,12 @@ func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []
// and returns a map containing cache hits, along with a list of missing keys.
// In case of error, it logs and return an empty cache hits map.
func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, lbls []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) {
// Build the cache keys, while keeping a map between input label and the cache key
// so that we can easily reverse it back after the GetMulti().
keys := make([]string, 0, len(lbls))
keysMapping := map[labels.Label]string{}

blockIDKey := blockID.String()
for _, lbl := range lbls {
key := cacheKey{blockID, cacheKeyPostings(lbl)}.string()

key := cacheKey{blockIDKey, cacheKeyPostings(lbl)}.string()
keys = append(keys, key)
keysMapping[lbl] = key
}

// Fetch the keys from memcached in a single request.
Expand All @@ -96,18 +92,11 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.

// Construct the resulting hits map and list of missing keys. We iterate on the input
// list of labels to be able to easily create the list of ones in a single iteration.
hits = map[labels.Label][]byte{}

for _, lbl := range lbls {
key, ok := keysMapping[lbl]
if !ok {
level.Error(c.logger).Log("msg", "keys mapping inconsistency found in memcached index cache client", "type", "postings", "label", lbl.Name+":"+lbl.Value)
continue
}

hits = make(map[labels.Label][]byte, len(results))
for i, lbl := range lbls {
// Check if the key has been found in memcached. If not, we add it to the list
// of missing keys.
value, ok := results[key]
value, ok := results[keys[i]]
if !ok {
misses = append(misses, lbl)
continue
Expand All @@ -124,7 +113,7 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.
// The function enqueues the request and returns immediately: the entry will be
// asynchronously stored in the cache.
func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) {
key := cacheKey{blockID, cacheKeySeries(id)}.string()
key := cacheKey{blockID.String(), cacheKeySeries(id)}.string()

if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {
level.Error(c.logger).Log("msg", "failed to cache series in memcached", "err", err)
Expand All @@ -135,16 +124,12 @@ func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef,
// and returns a map containing cache hits, along with a list of missing IDs.
// In case of error, it logs and return an empty cache hits map.
func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
// Build the cache keys, while keeping a map between input id and the cache key
// so that we can easily reverse it back after the GetMulti().
keys := make([]string, 0, len(ids))
keysMapping := map[storage.SeriesRef]string{}

blockIDKey := blockID.String()
for _, id := range ids {
key := cacheKey{blockID, cacheKeySeries(id)}.string()

key := cacheKey{blockIDKey, cacheKeySeries(id)}.string()
keys = append(keys, key)
keysMapping[id] = key
}

// Fetch the keys from memcached in a single request.
Expand All @@ -156,18 +141,11 @@ func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.UL

// Construct the resulting hits map and list of missing keys. We iterate on the input
// list of ids to be able to easily create the list of ones in a single iteration.
hits = map[storage.SeriesRef][]byte{}

for _, id := range ids {
key, ok := keysMapping[id]
if !ok {
level.Error(c.logger).Log("msg", "keys mapping inconsistency found in memcached index cache client", "type", "series", "id", id)
continue
}

hits = make(map[storage.SeriesRef][]byte, len(results))
for i, id := range ids {
// Check if the key has been found in memcached. If not, we add it to the list
// of missing keys.
value, ok := results[key]
value, ok := results[keys[i]]
if !ok {
misses = append(misses, id)
continue
Expand Down
Loading