diff --git a/cmd/kes/server.go b/cmd/kes/server.go index 979c08f9..b78d5cc1 100644 --- a/cmd/kes/server.go +++ b/cmd/kes/server.go @@ -206,12 +206,12 @@ func server(args []string) { if err != nil { stdlog.Fatalf("Error: %v", err) } - manager := &key.Manager{ - CacheExpiryAny: config.Cache.Expiry.Any.Value(), - CacheExpiryUnused: config.Cache.Expiry.Unused.Value(), - CacheExpiryOffline: config.Cache.Expiry.Offline.Value(), - Store: store, - } + var cache = key.NewCache(store, &key.CacheConfig{ + Expiry: config.Cache.Expiry.Any.Value(), + ExpiryUnused: config.Cache.Expiry.Unused.Value(), + ExpiryOffline: config.Cache.Expiry.Offline.Value(), + }) + defer cache.Stop() for _, k := range config.Keys { bytes, err := sioutil.Random(key.Size) @@ -238,7 +238,7 @@ func server(args []string) { Addr: config.Address.Value(), Handler: xhttp.NewServerMux(&xhttp.ServerConfig{ Version: version, - Manager: manager, + Store: cache, Roles: roles, Proxy: proxy, AuditLog: auditLog, @@ -303,7 +303,7 @@ func server(args []string) { } }() go certificate.ReloadAfter(ctx, 5*time.Minute) // 5min is a quite reasonable reload interval - go key.LogStoreStatus(ctx, manager.Store, 1*time.Minute, errorLog.Log()) + go key.LogStoreStatus(ctx, cache, 1*time.Minute, errorLog.Log()) // The following code prints a server startup message similar to: // diff --git a/internal/http/handler.go b/internal/http/handler.go index 9abc119f..9438dbeb 100644 --- a/internal/http/handler.go +++ b/internal/http/handler.go @@ -123,7 +123,7 @@ func handleVersion(version string) http.HandlerFunc { // handleStatus returns a handler function that returns status // information, like server version and server up-time, as JSON // object to the client. -func handleStatus(version string, manager *key.Manager, log *xlog.Target) http.HandlerFunc { +func handleStatus(version string, store key.Store, log *xlog.Target) http.HandlerFunc { type Status struct { Version string `json:"version"` UpTime time.Duration `json:"uptime"` @@ -135,7 +135,7 @@ func handleStatus(version string, manager *key.Manager, log *xlog.Target) http.H } var startTime = time.Now() return func(w http.ResponseWriter, r *http.Request) { - kmsState, err := manager.Store.Status(r.Context()) + kmsState, err := store.Status(r.Context()) if err != nil { kmsState = key.StoreState{ State: key.StoreUnreachable, @@ -162,7 +162,7 @@ func handleStatus(version string, manager *key.Manager, log *xlog.Target) http.H // It infers the name of the new Secret from the request URL - in // particular from the URL's path base. // See: https://golang.org/pkg/path/#Base -func handleCreateKey(manager *key.Manager) http.HandlerFunc { +func handleCreateKey(store key.Store) http.HandlerFunc { var ErrInvalidKeyName = kes.NewError(http.StatusBadRequest, "invalid key name") return func(w http.ResponseWriter, r *http.Request) { @@ -178,7 +178,7 @@ func handleCreateKey(manager *key.Manager) http.HandlerFunc { return } - if err := manager.Create(r.Context(), name, key.New(bytes)); err != nil { + if err := store.Create(r.Context(), name, key.New(bytes)); err != nil { Error(w, err) } w.WriteHeader(http.StatusOK) @@ -192,7 +192,7 @@ func handleCreateKey(manager *key.Manager) http.HandlerFunc { // It infers the name of the new Secret from the request URL - in // particular from the URL's path base. // See: https://golang.org/pkg/path/#Base -func handleImportKey(manager *key.Manager) http.HandlerFunc { +func handleImportKey(store key.Store) http.HandlerFunc { var ( ErrInvalidKeyName = kes.NewError(http.StatusBadRequest, "invalid key name") ErrInvalidJSON = kes.NewError(http.StatusBadRequest, "invalid json") @@ -220,7 +220,7 @@ func handleImportKey(manager *key.Manager) http.HandlerFunc { return } - if err := manager.Create(r.Context(), name, key.New(req.Bytes)); err != nil { + if err := store.Create(r.Context(), name, key.New(req.Bytes)); err != nil { Error(w, err) return } @@ -228,7 +228,7 @@ func handleImportKey(manager *key.Manager) http.HandlerFunc { } } -func handleDeleteKey(manager *key.Manager) http.HandlerFunc { +func handleDeleteKey(store key.Store) http.HandlerFunc { var ErrInvalidKeyName = kes.NewError(http.StatusBadRequest, "invalid key name") return func(w http.ResponseWriter, r *http.Request) { @@ -237,7 +237,7 @@ func handleDeleteKey(manager *key.Manager) http.HandlerFunc { Error(w, ErrInvalidKeyName) return } - if err := manager.Delete(r.Context(), name); err != nil { + if err := store.Delete(r.Context(), name); err != nil { Error(w, err) return } @@ -257,7 +257,7 @@ func handleDeleteKey(manager *key.Manager) http.HandlerFunc { // returned http.HandlerFunc will authenticate but not encrypt // the context value. The client has to provide the same // context value again for decryption. -func handleGenerateKey(manager *key.Manager) http.HandlerFunc { +func handleGenerateKey(store key.Store) http.HandlerFunc { var ( ErrInvalidJSON = kes.NewError(http.StatusBadRequest, "invalid json") ErrInvalidKeyName = kes.NewError(http.StatusBadRequest, "invalid key name") @@ -281,7 +281,7 @@ func handleGenerateKey(manager *key.Manager) http.HandlerFunc { Error(w, ErrInvalidKeyName) return } - secret, err := manager.Get(r.Context(), name) + secret, err := store.Get(r.Context(), name) if err != nil { Error(w, err) return @@ -316,7 +316,7 @@ func handleGenerateKey(manager *key.Manager) http.HandlerFunc { // returned http.HandlerFunc will authenticate but not encrypt // the context value. The client has to provide the same // context value again for decryption. -func handleEncryptKey(manager *key.Manager) http.HandlerFunc { +func handleEncryptKey(store key.Store) http.HandlerFunc { var ( ErrInvalidJSON = kes.NewError(http.StatusBadRequest, "invalid json") ErrInvalidKeyName = kes.NewError(http.StatusBadRequest, "invalid key name") @@ -340,7 +340,7 @@ func handleEncryptKey(manager *key.Manager) http.HandlerFunc { Error(w, ErrInvalidKeyName) return } - secret, err := manager.Get(r.Context(), name) + secret, err := store.Get(r.Context(), name) if err != nil { Error(w, err) return @@ -363,7 +363,7 @@ func handleEncryptKey(manager *key.Manager) http.HandlerFunc { // If the client has provided a context value during // encryption / key generation then the client has to provide // the same context value again. -func handleDecryptKey(manager *key.Manager) http.HandlerFunc { +func handleDecryptKey(store key.Store) http.HandlerFunc { var ( ErrInvalidJSON = kes.NewError(http.StatusBadRequest, "invalid json") ErrInvalidKeyName = kes.NewError(http.StatusBadRequest, "invalid key name") @@ -387,7 +387,7 @@ func handleDecryptKey(manager *key.Manager) http.HandlerFunc { Error(w, ErrInvalidKeyName) return } - secret, err := manager.Get(r.Context(), name) + secret, err := store.Get(r.Context(), name) if err != nil { Error(w, err) return @@ -413,14 +413,14 @@ func handleDecryptKey(manager *key.Manager) http.HandlerFunc { // The client is expected to check for an error trailer // and only consider the listing complete if it receives // no such trailer. -func handleListKeys(manager *key.Manager) http.HandlerFunc { +func handleListKeys(store key.Store) http.HandlerFunc { type Response struct { Name string } return func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Trailer", "Status,Error") - iterator, err := manager.List(r.Context()) + iterator, err := store.List(r.Context()) if err != nil { Error(w, err) return diff --git a/internal/http/server.go b/internal/http/server.go index 8facea29..075ffdcb 100644 --- a/internal/http/server.go +++ b/internal/http/server.go @@ -21,10 +21,9 @@ type ServerConfig struct { // If empty, it defaults to v0.0.0-dev. Version string - // Manager is the key manager that fetches - // keys from a key store and stores them - // in a local in-memory cache. - Manager *key.Manager + // Store is the key store holding the cryptographic + // keys. + Store key.Store // Roles is the authorization system that // contains identities and the associated @@ -60,7 +59,7 @@ type ServerConfig struct { func NewServerMux(config *ServerConfig) *http.ServeMux { var ( version = config.Version - manager = config.Manager + store = config.Store roles = config.Roles proxy = config.Proxy auditLog = config.AuditLog @@ -73,13 +72,13 @@ func NewServerMux(config *ServerConfig) *http.ServeMux { const MaxBody = 1 << 20 var mux = http.NewServeMux() - mux.Handle("/v1/key/create/", timeout(15*time.Second, metrics.Count(metrics.Latency(audit(auditLog.Log(), roles, requireMethod(http.MethodPost, validatePath("/v1/key/create/*", limitRequestBody(0, tlsProxy(proxy, enforcePolicies(roles, handleCreateKey(manager))))))))))) - mux.Handle("/v1/key/import/", timeout(15*time.Second, metrics.Count(metrics.Latency(audit(auditLog.Log(), roles, requireMethod(http.MethodPost, validatePath("/v1/key/import/*", limitRequestBody(MaxBody, tlsProxy(proxy, enforcePolicies(roles, handleImportKey(manager))))))))))) - mux.Handle("/v1/key/delete/", timeout(15*time.Second, metrics.Count(metrics.Latency(audit(auditLog.Log(), roles, requireMethod(http.MethodDelete, validatePath("/v1/key/delete/*", limitRequestBody(0, tlsProxy(proxy, enforcePolicies(roles, handleDeleteKey(manager))))))))))) - mux.Handle("/v1/key/generate/", timeout(15*time.Second, metrics.Count(metrics.Latency(audit(auditLog.Log(), roles, requireMethod(http.MethodPost, validatePath("/v1/key/generate/*", limitRequestBody(MaxBody, tlsProxy(proxy, enforcePolicies(roles, handleGenerateKey(manager))))))))))) - mux.Handle("/v1/key/encrypt/", timeout(15*time.Second, metrics.Count(metrics.Latency(audit(auditLog.Log(), roles, requireMethod(http.MethodPost, validatePath("/v1/key/encrypt/*", limitRequestBody(MaxBody/2, tlsProxy(proxy, enforcePolicies(roles, handleEncryptKey(manager))))))))))) - mux.Handle("/v1/key/decrypt/", timeout(15*time.Second, metrics.Count(metrics.Latency(audit(auditLog.Log(), roles, requireMethod(http.MethodPost, validatePath("/v1/key/decrypt/*", limitRequestBody(MaxBody, tlsProxy(proxy, enforcePolicies(roles, handleDecryptKey(manager))))))))))) - mux.Handle("/v1/key/list/", timeout(15*time.Second, metrics.Count(metrics.Latency(audit(auditLog.Log(), roles, requireMethod(http.MethodGet, validatePath("/v1/key/list/*", limitRequestBody(0, tlsProxy(proxy, enforcePolicies(roles, handleListKeys(manager))))))))))) + mux.Handle("/v1/key/create/", timeout(15*time.Second, metrics.Count(metrics.Latency(audit(auditLog.Log(), roles, requireMethod(http.MethodPost, validatePath("/v1/key/create/*", limitRequestBody(0, tlsProxy(proxy, enforcePolicies(roles, handleCreateKey(store))))))))))) + mux.Handle("/v1/key/import/", timeout(15*time.Second, metrics.Count(metrics.Latency(audit(auditLog.Log(), roles, requireMethod(http.MethodPost, validatePath("/v1/key/import/*", limitRequestBody(MaxBody, tlsProxy(proxy, enforcePolicies(roles, handleImportKey(store))))))))))) + mux.Handle("/v1/key/delete/", timeout(15*time.Second, metrics.Count(metrics.Latency(audit(auditLog.Log(), roles, requireMethod(http.MethodDelete, validatePath("/v1/key/delete/*", limitRequestBody(0, tlsProxy(proxy, enforcePolicies(roles, handleDeleteKey(store))))))))))) + mux.Handle("/v1/key/generate/", timeout(15*time.Second, metrics.Count(metrics.Latency(audit(auditLog.Log(), roles, requireMethod(http.MethodPost, validatePath("/v1/key/generate/*", limitRequestBody(MaxBody, tlsProxy(proxy, enforcePolicies(roles, handleGenerateKey(store))))))))))) + mux.Handle("/v1/key/encrypt/", timeout(15*time.Second, metrics.Count(metrics.Latency(audit(auditLog.Log(), roles, requireMethod(http.MethodPost, validatePath("/v1/key/encrypt/*", limitRequestBody(MaxBody/2, tlsProxy(proxy, enforcePolicies(roles, handleEncryptKey(store))))))))))) + mux.Handle("/v1/key/decrypt/", timeout(15*time.Second, metrics.Count(metrics.Latency(audit(auditLog.Log(), roles, requireMethod(http.MethodPost, validatePath("/v1/key/decrypt/*", limitRequestBody(MaxBody, tlsProxy(proxy, enforcePolicies(roles, handleDecryptKey(store))))))))))) + mux.Handle("/v1/key/list/", timeout(15*time.Second, metrics.Count(metrics.Latency(audit(auditLog.Log(), roles, requireMethod(http.MethodGet, validatePath("/v1/key/list/*", limitRequestBody(0, tlsProxy(proxy, enforcePolicies(roles, handleListKeys(store))))))))))) mux.Handle("/v1/policy/write/", timeout(10*time.Second, metrics.Count(metrics.Latency(audit(auditLog.Log(), roles, requireMethod(http.MethodPost, validatePath("/v1/policy/write/*", limitRequestBody(MaxBody, tlsProxy(proxy, enforcePolicies(roles, handleWritePolicy(roles))))))))))) mux.Handle("/v1/policy/read/", timeout(10*time.Second, metrics.Count(metrics.Latency(audit(auditLog.Log(), roles, requireMethod(http.MethodGet, validatePath("/v1/policy/read/*", limitRequestBody(0, tlsProxy(proxy, enforcePolicies(roles, handleReadPolicy(roles))))))))))) @@ -93,7 +92,7 @@ func NewServerMux(config *ServerConfig) *http.ServeMux { mux.Handle("/v1/log/audit/trace", metrics.Count(metrics.Latency(audit(auditLog.Log(), roles, requireMethod(http.MethodGet, validatePath("/v1/log/audit/trace", limitRequestBody(0, tlsProxy(proxy, enforcePolicies(roles, handleTraceAuditLog(auditLog)))))))))) mux.Handle("/v1/log/error/trace", metrics.Count(metrics.Latency(audit(auditLog.Log(), roles, requireMethod(http.MethodGet, validatePath("/v1/log/error/trace", limitRequestBody(0, tlsProxy(proxy, enforcePolicies(roles, handleTraceErrorLog(errorLog)))))))))) - mux.Handle("/v1/status", timeout(10*time.Second, metrics.Count(metrics.Latency(audit(auditLog.Log(), roles, requireMethod(http.MethodGet, validatePath("/v1/status", limitRequestBody(0, tlsProxy(proxy, enforcePolicies(roles, handleStatus(version, manager, errorLog))))))))))) + mux.Handle("/v1/status", timeout(10*time.Second, metrics.Count(metrics.Latency(audit(auditLog.Log(), roles, requireMethod(http.MethodGet, validatePath("/v1/status", limitRequestBody(0, tlsProxy(proxy, enforcePolicies(roles, handleStatus(version, store, errorLog))))))))))) // Scrapping /v1/metrics should not change the metrics itself. // Further, scrapping /v1/metrics should, by default, not produce diff --git a/internal/key/cache.go b/internal/key/cache.go index 6fac8d41..b113103f 100644 --- a/internal/key/cache.go +++ b/internal/key/cache.go @@ -1,22 +1,185 @@ +// Copyright 2022 - MinIO, Inc. All rights reserved. +// Use of this source code is governed by the AGPLv3 +// license that can be found in the LICENSE file. + package key import ( "context" + "errors" + "net/http" "sync" "sync/atomic" "time" + + "github.com/minio/kes" +) + +// Typed errors that are returned to the client. +// The errors are generic on purpose to not leak +// any (potentially sensitive) information. +var ( + errCreateKey = kes.NewError(http.StatusBadGateway, "bad gateway: failed to create key") + errGetKey = kes.NewError(http.StatusBadGateway, "bad gateway: failed to access key") + errDeleteKey = kes.NewError(http.StatusBadGateway, "bad gateway: failed to delete key") + errListKey = kes.NewError(http.StatusBadGateway, "bad gateway: failed to list keys") ) -type cache struct { - lock sync.RWMutex - store map[string]*entry +// CacheConfig is a structure containing Cache +// configuration options. +type CacheConfig struct { + // Expiry is the time period keys remain, at + // most, in the cache. + Expiry time.Duration + + // ExpiryUnused is the time period keys remain + // in the cache even though they are not used. + // + // A key that is used before one ExpiryUnused + // interval elapses is marked as used again and + // remains in the cache. + ExpiryUnused time.Duration + + // ExpiryOffline is the time keys remain in the + // offline cache, if enabled. + // + // The offline cache is only used when the Store + // is not available and ExpiryOffline > 0. + // + // The offline cache, if enabled, gets cleared + // whenever the Store becomes available again. + ExpiryOffline time.Duration +} + +// NewCache returns a new Cache that caches keys +// from the Store in memory. +// +// A Cache removes cache entries when they expiry. +// Stop the cache to release associated resources. +func NewCache(store Store, config *CacheConfig) *Cache { + var ctx, cancel = context.WithCancel(context.Background()) + + var c = &Cache{ + Store: store, + cache: map[string]*cacheEntry{}, + offlineCache: map[string]*cacheEntry{}, + ctx: ctx, + cancel: cancel, + } + c.gc(config.Expiry) + c.gcUnused(config.ExpiryUnused) + + if config.ExpiryOffline > 0 { + c.gcOffline(config.ExpiryUnused) + c.watchOfflineStatus(10 * time.Second) + } + return c +} + +// Cache is a Store that caches keys from an underlying +// Store in memory. +type Cache struct { + Store Store + + lock sync.RWMutex + cache map[string]*cacheEntry + offlineCache map[string]*cacheEntry + + // Controls whether the offline cache is used: + // - 0: Offline cache is disabled + // - 1: Offline cache is enabled + // + // Concurrently modified when checking the Store + // status. + // By default, not in use + useOfflineCache uint32 + + ctx context.Context + cancel context.CancelFunc +} + +var _ Store = (*Cache)(nil) // compiler check + +type cacheEntry struct { + Key Key + + used uint32 } -func (c *cache) Get(name string) (Key, bool) { +// Status returns the current state of the Store. +func (c *Cache) Status(ctx context.Context) (StoreState, error) { return c.Store.Status(ctx) } + +// Create stors the givem key at the Store if and +// only if no entry with the given name exists. +// +// If such an entry already exists, Create returns +// kes.ErrKeyExists. +func (c *Cache) Create(ctx context.Context, name string, key Key) error { + switch err := c.Store.Create(ctx, name, key); { + case err == nil: + return nil + case errors.Is(err, kes.ErrKeyExists): + return kes.ErrKeyExists + default: + return errCreateKey + } +} + +// Get returns the key associated with the given name. +// If noc such entry exists, Get returns kes.ErrKeyNotFound. +func (c *Cache) Get(ctx context.Context, name string) (Key, error) { + if key, ok := c.lookup(c.cache, name); ok { + return key, nil + } + if atomic.LoadUint32(&c.useOfflineCache) == 1 { + if key, ok := c.lookup(c.offlineCache, name); ok { + return key, nil + } + } + switch key, err := c.Store.Get(ctx, name); { + case err == nil: + return c.insertOrRefresh(c.cache, name, key), nil + case errors.Is(err, kes.ErrKeyNotFound): + return Key{}, kes.ErrKeyNotFound + default: + return Key{}, errGetKey + } +} + +// Delete deletes the key associated with the given name. +func (c *Cache) Delete(ctx context.Context, name string) error { + if err := c.Store.Delete(ctx, name); err != nil && !errors.Is(err, kes.ErrKeyNotFound) { + return errDeleteKey + } + + c.lock.Lock() + defer c.lock.Unlock() + delete(c.cache, name) + delete(c.offlineCache, name) + return nil +} + +// List returns a new Iterator over the Store. +func (c *Cache) List(ctx context.Context) (Iterator, error) { + i, err := c.Store.List(ctx) + if err != nil { + return nil, errListKey + } + return i, nil +} + +// Stop stops all background tasks performed by the +// Cache. +func (c *Cache) Stop() { c.cancel() } + +// lookup returns the key associated with name in the +// cache. It returns an empty Key and false if there +// is no such entry in the cache. +func (c *Cache) lookup(cache map[string]*cacheEntry, name string) (Key, bool) { c.lock.RLock() defer c.lock.RUnlock() - entry, ok := c.store[name] + entry, ok := cache[name] if !ok { return Key{}, false } @@ -24,43 +187,30 @@ func (c *cache) Get(name string) (Key, bool) { return entry.Key, true } -func (c *cache) CompareAndSwap(name string, key Key) Key { +// insert inserts the given name / key pair into the +// cache if and only if no such entry exists. Otherwise +// it marks the existing entry as used. +func (c *Cache) insertOrRefresh(cache map[string]*cacheEntry, name string, key Key) Key { c.lock.Lock() defer c.lock.Unlock() - if entry, ok := c.store[name]; ok { + if entry, ok := cache[name]; ok { atomic.StoreUint32(&entry.used, 1) return entry.Key } - if c.store == nil { - c.store = map[string]*entry{} - } - c.store[name] = &entry{ + cache[name] = &cacheEntry{ Key: key, used: 1, } return key } -func (c *cache) Delete(name string) { - c.lock.Lock() - defer c.lock.Unlock() - - delete(c.store, name) -} - -type entry struct { - Key Key - - used uint32 -} - -// StartGC spawns a new go-routine that clears +// gc spawns a new go-routine that clears // the cache repeatedly in t intervals. // -// If t == 0, StartGC does nothing. -func (c *cache) StartGC(ctx context.Context, t time.Duration) { +// If t == 0, gc does nothing. +func (c *Cache) gc(t time.Duration) { if t == 0 { return } @@ -69,18 +219,18 @@ func (c *cache) StartGC(ctx context.Context, t time.Duration) { defer ticker.Stop() for { select { - case <-ctx.Done(): + case <-c.ctx.Done(): return case <-ticker.C: c.lock.Lock() - c.store = map[string]*entry{} + c.cache = map[string]*cacheEntry{} c.lock.Unlock() } } }() } -// StartUnusedGC spawns a new go-routine that: +// gcUnused spawns a new go-routine that: // 1. Removes all entries that are marked // as not recently used. // 2. Marks all remaining entries as not @@ -93,8 +243,8 @@ func (c *cache) StartGC(ctx context.Context, t time.Duration) { // them unused. Therefore, if unused cache entries // should survive x seconds, you should set t = x/2. // -// If t == 0, StartUnusedGC does nothing. -func (c *cache) StartUnusedGC(ctx context.Context, t time.Duration) { +// If t == 0, gcUnused does nothing. +func (c *Cache) gcUnused(t time.Duration) { if t == 0 { return } @@ -103,13 +253,13 @@ func (c *cache) StartUnusedGC(ctx context.Context, t time.Duration) { defer ticker.Stop() for { select { - case <-ctx.Done(): + case <-c.ctx.Done(): return case <-ticker.C: var names []string c.lock.RLock() - for name, entry := range c.store { + for name, entry := range c.cache { // We check whether Used == 1. If so, // we mark it as "to delete on next iteration // if not used in between" by setting it to 0. @@ -126,10 +276,79 @@ func (c *cache) StartUnusedGC(ctx context.Context, t time.Duration) { // Now delete all "expired" entries. c.lock.Lock() for _, name := range names { - delete(c.store, name) + delete(c.cache, name) } c.lock.Unlock() } } }() } + +// gc spawns a new go-routine that clears +// the offlineCache repeatedly in t intervals. +// +// If t == 0, gc does nothing. +func (c *Cache) gcOffline(t time.Duration) { + if t == 0 { + return + } + go func() { + ticker := time.NewTicker(t) + defer ticker.Stop() + for { + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + c.lock.Lock() + c.offlineCache = map[string]*cacheEntry{} + c.lock.Unlock() + } + } + }() +} + +// watchOfflineStatus fetches the Store status in +// t intervals. Once the store becomes unavailable, +// it enables the offline cache. Once the store +// becomes available again, it disables the offline +// cache again. +func (c *Cache) watchOfflineStatus(t time.Duration) { + if t == 0 { + return + } + + const ( + Online = 0 + Offline = 1 + ) + go func() { + ticker := time.NewTicker(t) + defer ticker.Stop() + for { + select { + case <-ticker.C: + // When the Store status changes from available to + // unreachable, we load the general cache into the + // offline cache. + // Once the Store becomes available again, we clear + // both caches and start with a clean state. + state, err := c.Store.Status(c.ctx) + if err != nil || state.State != StoreAvailable { + if atomic.CompareAndSwapUint32(&c.useOfflineCache, Online, Offline) { + c.lock.Lock() + c.offlineCache, c.cache = c.cache, map[string]*cacheEntry{} + c.lock.Unlock() + + } + } else if atomic.CompareAndSwapUint32(&c.useOfflineCache, Offline, Online) { + c.lock.Lock() + c.offlineCache, c.cache = map[string]*cacheEntry{}, map[string]*cacheEntry{} + c.lock.Unlock() + } + case <-c.ctx.Done(): + return + } + } + }() +} diff --git a/internal/key/key_test.go b/internal/key/key_test.go index e5dd3a34..f321db0d 100644 --- a/internal/key/key_test.go +++ b/internal/key/key_test.go @@ -1,3 +1,7 @@ +// Copyright 2022 - MinIO, Inc. All rights reserved. +// Use of this source code is governed by the AGPLv3 +// license that can be found in the LICENSE file. + package key import ( diff --git a/internal/key/manager.go b/internal/key/manager.go deleted file mode 100644 index 5542b887..00000000 --- a/internal/key/manager.go +++ /dev/null @@ -1,220 +0,0 @@ -package key - -import ( - "context" - "errors" - "net/http" - "sync" - "sync/atomic" - "time" - - "github.com/minio/kes" -) - -// Typed errors that are returned to the client. -// The errors are generic on purpose to not leak -// any (potentially sensitive) information. -var ( - errCreateKey = kes.NewError(http.StatusBadGateway, "bad gateway: failed to create key") - errGetKey = kes.NewError(http.StatusBadGateway, "bad gateway: failed to access key") - errDeleteKey = kes.NewError(http.StatusBadGateway, "bad gateway: failed to delete key") - errListKey = kes.NewError(http.StatusBadGateway, "bad gateway: failed to list keys") -) - -// Manager is a key manager that fetches keys from a key store -// and caches them in a local in-memory cache. -// -// It runs a garbage collection that periodically removes keys -// from the cache such that they have to be fetched from the key -// store again. -type Manager struct { - // Store is the key store that persists cryptographic - // keys. The key manager will fetch the key from it if - // the key isn't in its cache. - Store Store - - // CacheExpiryAny is the time period keys remain - at - // most - in the key manager cache. - // - // The key manager will clear the entire cache whenever - // this time period elapses and will start a new time - // interval such that the cache get cleared periodically. - CacheExpiryAny time.Duration - - // CacheExpiryUnused is the time keys remain in the cache - // even though they are not used. - // - // A key that is used before one interval elapses is - // marked as used again and remains in the cache. - CacheExpiryUnused time.Duration - - // CacheExpiryOffline is the time keys remain in the - // offline cache. - // - // The offline cache is only used when the Store is - // not available and CacheExpiryOffline > 0. - // - // The offline cache, if used, gets cleared whenever - // the Store becomes available again. - CacheExpiryOffline time.Duration - - // CacheContext is the context that controls the cache - // garbage collection. Once its Done() channel returns, - // the garbage collection stops. - CacheContext context.Context - - once sync.Once // Start the GC only once - cache cache - offlineCache cache - - // Controls whether the offline cache is used: - // - 0: Offline cache is disabled - // - 1: Offline cache is enabled - // - // Concurrently modified when checking the Store - // status. - // By default, don't use the offline cache - useOfflineCache uint32 -} - -// Create stores the given key at the key store. -// -// If an entry with the same name exists, Create -// returns kes.ErrKeyExists. -func (m *Manager) Create(ctx context.Context, name string, key Key) error { - switch err := m.Store.Create(ctx, name, key); { - case err == nil: - return nil - case errors.Is(err, kes.ErrKeyExists): - return kes.ErrKeyExists - default: - return errCreateKey - } -} - -// Get returns the key with the given name. -// -// If no key with the given name exists, -// Get returns kes.ErrKeyNotFound. -// -// Get tries to find the key in its cache -// first and fetches the key only from the -// key store if it's not in the cache. -func (m *Manager) Get(ctx context.Context, name string) (Key, error) { - m.once.Do(m.startGC) - if key, ok := m.cache.Get(name); ok { - return key, nil - } - if atomic.LoadUint32(&m.useOfflineCache) == 1 { - if key, ok := m.offlineCache.Get(name); ok { - return key, nil - } - } - switch key, err := m.Store.Get(ctx, name); { - case err == nil: - return m.cache.CompareAndSwap(name, key), nil - case errors.Is(err, kes.ErrKeyNotFound): - return Key{}, kes.ErrKeyNotFound - default: - return Key{}, errGetKey - } -} - -// Delete deletes the key with the given name -// at the key store. -// -// Delete does not return an error if no key -// with this name exists. -func (m *Manager) Delete(ctx context.Context, name string) error { - m.once.Do(m.startGC) - m.cache.Delete(name) - m.offlineCache.Delete(name) - - switch err := m.Store.Delete(ctx, name); { - case err == nil: - return nil - case errors.Is(err, kes.ErrKeyNotFound): - return nil - default: - return errDeleteKey - } -} - -// List returns an iterator over all keys at the -// key store. -// -// The returned iterator may or may not reflect any -// concurrent changes to the key store - i.e. creates -// or deletes. Further, it does not provide any ordering -// guarantees. -func (m *Manager) List(ctx context.Context) (Iterator, error) { - iter, err := m.Store.List(ctx) - if err != nil { - return nil, errListKey - } - return iter, nil -} - -func (m *Manager) startGC() { - var ctx = m.CacheContext - if ctx == nil { - ctx = context.Background() - } - m.cache.StartGC(ctx, m.CacheExpiryAny) - - // Actually, we also don't run the unused GC if CacheExpiryUnused/2 == 0, - // not if CacheExpiryUnused == 0. - // However, that can only happen if CacheExpiryUnused is 1ns - which is - // anyway an unreasonable value for the expiry. - m.cache.StartUnusedGC(ctx, m.CacheExpiryUnused/2) - - m.offlineCache.StartGC(ctx, m.CacheExpiryOffline) - m.watchStatus(ctx, 10*time.Second) -} - -func (m *Manager) watchStatus(ctx context.Context, t time.Duration) { - if t == 0 { - return - } - - const ( - Online = 0 - Offline = 1 - ) - go func() { - ticker := time.NewTicker(t) - defer ticker.Stop() - for { - select { - case <-ticker.C: - // When the Store status changes from available to - // unreachable, we load the general cache into the - // offline cache. - // Once the Store becomes available again, we clear - // both caches and start with a clean state. - state, err := m.Store.Status(ctx) - if err != nil || state.State != StoreAvailable { - if atomic.CompareAndSwapUint32(&m.useOfflineCache, Online, Offline) { - m.cache.lock.Lock() - m.offlineCache.lock.Lock() - - m.offlineCache.store, m.cache.store = m.cache.store, map[string]*entry{} - - m.cache.lock.Unlock() - m.offlineCache.lock.Unlock() - } - } else if atomic.CompareAndSwapUint32(&m.useOfflineCache, Offline, Online) { - m.cache.lock.Lock() - m.offlineCache.lock.Lock() - - m.offlineCache.store, m.cache.store = map[string]*entry{}, map[string]*entry{} - - m.cache.lock.Unlock() - m.offlineCache.lock.Unlock() - } - case <-ctx.Done(): - return - } - } - }() -} diff --git a/internal/key/store.go b/internal/key/store.go index 524f4fcf..274a57a6 100644 --- a/internal/key/store.go +++ b/internal/key/store.go @@ -1,3 +1,7 @@ +// Copyright 2022 - MinIO, Inc. All rights reserved. +// Use of this source code is governed by the AGPLv3 +// license that can be found in the LICENSE file. + package key import ( @@ -29,7 +33,7 @@ type Store interface { // Create stores the given key at the key store if // and only if no entry with the given name exists. // - // If no such entry exists, Create returns kes.ErrKeyExists. + // If such entry exists, Create returns kes.ErrKeyExists. Create(ctx context.Context, name string, key Key) error // Delete deletes the key associated with the given name diff --git a/kestest/server.go b/kestest/server.go index 92ce8633..79250dbf 100644 --- a/kestest/server.go +++ b/kestest/server.go @@ -116,11 +116,10 @@ func (s *Server) start() { var serverCert = issueCertificate("kestest: server", s.caCertificate, s.caPrivateKey, x509.ExtKeyUsageServerAuth) s.server = httptest.NewUnstartedServer(xhttp.NewServerMux(&xhttp.ServerConfig{ Version: "v0.0.0-dev", - Manager: &key.Manager{ - CacheExpiryAny: 30 * time.Second, - CacheExpiryUnused: 5 * time.Second, - Store: &mem.Store{}, - }, + Store: key.NewCache(&mem.Store{}, &key.CacheConfig{ + Expiry: 30 * time.Second, + ExpiryUnused: 5 * time.Second, + }), Roles: s.policies.roles, Proxy: nil, AuditLog: auditLog,