Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/cache: update memcached dns on interval #278

Merged
merged 1 commit into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 10 additions & 7 deletions cmd/telemeter-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func main() {
Ratelimit: 4*time.Minute + 30*time.Second,
TTL: 10 * time.Minute,
MemcachedExpire: 24 * 60 * 60,
MemcachedInterval: 10,
}
cmd := &cobra.Command{
Short: "Aggregate federated metrics pushes",
Expand Down Expand Up @@ -128,6 +129,7 @@ func main() {
cmd.Flags().StringVar(&opt.TenantKey, "tenant-key", opt.TenantKey, "The JSON key in the bearer token whose value to use as the tenant ID.")
cmd.Flags().StringSliceVar(&opt.Memcacheds, "memcached", opt.Memcacheds, "One or more Memcached server addresses.")
cmd.Flags().Int32Var(&opt.MemcachedExpire, "memcached-expire", opt.MemcachedExpire, "Time after which keys stored in Memcached should expire, given in seconds.")
cmd.Flags().Int32Var(&opt.MemcachedInterval, "memcached-interval", opt.MemcachedInterval, "The interval at which to update the Memcached DNS, given in seconds; use 0 to disable.")

cmd.Flags().DurationVar(&opt.Ratelimit, "ratelimit", opt.Ratelimit, "The rate limit of metric uploads per cluster ID. Uploads happening more often than this limit will be rejected.")
cmd.Flags().DurationVar(&opt.TTL, "ttl", opt.TTL, "The TTL for metrics to be held in memory.")
Expand Down Expand Up @@ -180,12 +182,13 @@ type Options struct {

AuthorizeEndpoint string

OIDCIssuer string
ClientID string
ClientSecret string
TenantKey string
Memcacheds []string
MemcachedExpire int32
OIDCIssuer string
ClientID string
ClientSecret string
TenantKey string
Memcacheds []string
MemcachedExpire int32
MemcachedInterval int32

PartitionKey string
LabelFlag []string
Expand Down Expand Up @@ -513,7 +516,7 @@ func (o *Options) Run() error {
v2AuthorizeClient := authorizeClient

if len(o.Memcacheds) > 0 {
mc := memcached.New(o.MemcachedExpire, o.Memcacheds...)
mc := memcached.New(context.Background(), o.MemcachedInterval, o.MemcachedExpire, o.Memcacheds...)
l := log.With(o.Logger, "component", "cache")
v2AuthorizeClient.Transport = cache.NewRoundTripper(mc, tollbooth.ExtractToken, v2AuthorizeClient.Transport, l, prometheus.DefaultRegisterer)
}
Expand Down
46 changes: 37 additions & 9 deletions pkg/cache/memcached/memcached.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package memcached

import (
"context"
"crypto/sha256"
"fmt"
"sync"
"time"

"github.com/bradfitz/gomemcache/memcache"
"github.com/pkg/errors"
Expand All @@ -12,17 +15,38 @@ import (

// cache is a Cacher implemented on top of Memcached.
type cache struct {
*memcache.Client
client *memcache.Client
expiration int32
mu sync.RWMutex
}

// New creates a new Cache from a list of Memcached servers
// and key expiration time given in seconds.
func New(expiration int32, servers ...string) tcache.Cacher {
return &cache{
memcache.New(servers...),
expiration,
// New creates a new Cacher from a list of Memcached servers,
// a key expiration time given in seconds, a DNS refresh interval,
// and a context. The Cacher will continue to update the DNS entries
// for the Memcached servers every interval as long as the context is valid.
func New(ctx context.Context, interval, expiration int32, servers ...string) tcache.Cacher {
c := &cache{
client: memcache.New(servers...),
expiration: expiration,
}

if interval > 0 {
go func() {
t := time.NewTicker(time.Duration(interval) * time.Second)
for {
select {
case <-t.C:
c.mu.Lock()
c.client = memcache.New(servers...)
c.mu.Unlock()
case <-ctx.Done():
t.Stop()
return
}
}
}()
}
return c
}

// Get returns a value from Memcached.
Expand All @@ -31,7 +55,9 @@ func (c *cache) Get(key string) ([]byte, bool, error) {
if err != nil {
return nil, false, err
}
i, err := c.Client.Get(key)
c.mu.RLock()
defer c.mu.RUnlock()
i, err := c.client.Get(key)
if err != nil {
if err == memcache.ErrCacheMiss {
return nil, false, nil
Expand All @@ -53,7 +79,9 @@ func (c *cache) Set(key string, value []byte) error {
Value: value,
Expiration: c.expiration,
}
return c.Client.Set(&i)
c.mu.RLock()
defer c.mu.RUnlock()
return c.client.Set(&i)
}

// hashKey hashes the given key to ensure that it is less than 250 bytes,
Expand Down