From 76f54f33d5cb9047faac783ec1f79f94a8691cad Mon Sep 17 00:00:00 2001 From: Mariano Cano Date: Wed, 6 Apr 2022 12:05:10 -0700 Subject: [PATCH] Add collections interface and play around with collections. --- authority/admins.go | 13 ++++ authority/authority.go | 13 +++- authority/cache/cache.go | 89 ++++++++++++++++++++++++++++ authority/options.go | 9 +++ authority/provisioner/collection.go | 54 +++++++++++++++-- authority/provisioner/provisioner.go | 6 ++ authority/provisioners.go | 14 +++++ 7 files changed, 189 insertions(+), 9 deletions(-) create mode 100644 authority/cache/cache.go diff --git a/authority/admins.go b/authority/admins.go index b975297ac..5ac8ed65e 100644 --- a/authority/admins.go +++ b/authority/admins.go @@ -8,6 +8,19 @@ import ( "go.step.sm/linkedca" ) +// Admins is the interface used by the admin collection. +type Admins interface { + Store(adm *linkedca.Admin, prov provisioner.Interface) error + Update(id string, nu *linkedca.Admin) (*linkedca.Admin, error) + Remove(id string) error + LoadByID(id string) (*linkedca.Admin, bool) + LoadBySubProv(sub, provName string) (*linkedca.Admin, bool) + LoadByProvisioner(provName string) ([]*linkedca.Admin, bool) + Find(cursor string, limit int) ([]*linkedca.Admin, string) + SuperCount() int + SuperCountByProvisioner(provName string) int +} + // LoadAdminByID returns an *linkedca.Admin with the given ID. func (a *Authority) LoadAdminByID(id string) (*linkedca.Admin, bool) { a.adminMutex.RLock() diff --git a/authority/authority.go b/authority/authority.go index cc26635ec..a9cb6db49 100644 --- a/authority/authority.go +++ b/authority/authority.go @@ -15,6 +15,7 @@ import ( "github.com/smallstep/certificates/authority/admin" adminDBNosql "github.com/smallstep/certificates/authority/admin/db/nosql" "github.com/smallstep/certificates/authority/administrator" + "github.com/smallstep/certificates/authority/cache" "github.com/smallstep/certificates/authority/config" "github.com/smallstep/certificates/authority/provisioner" "github.com/smallstep/certificates/cas" @@ -35,8 +36,8 @@ import ( type Authority struct { config *config.Config keyManager kms.KeyManager - provisioners *provisioner.Collection - admins *administrator.Collection + provisioners Provisioners + admins Admins db db.AuthDB adminDB admin.DB templates *templates.Templates @@ -76,6 +77,7 @@ type Authority struct { getIdentityFunc provisioner.GetIdentityFunc authorizeRenewFunc provisioner.AuthorizeRenewFunc authorizeSSHRenewFunc provisioner.AuthorizeSSHRenewFunc + cachePool cache.Pool adminMutex sync.RWMutex } @@ -175,7 +177,7 @@ func (a *Authority) reloadAdminResources(ctx context.Context) error { } // Create provisioner collection. - provClxn := provisioner.NewCollection(provisionerConfig.Audiences) + provClxn := provisioner.NewCollection(provisionerConfig) for _, p := range provList { if err := p.Init(provisionerConfig); err != nil { return err @@ -502,6 +504,11 @@ func (a *Authority) init() error { } } + // Initialize the default cache pool. + if a.cachePool == nil { + a.cachePool = cache.DefaultPool() + } + provs, err := a.adminDB.GetProvisioners(context.Background()) if err != nil { return admin.WrapErrorISE(err, "error loading provisioners to initialize authority") diff --git a/authority/cache/cache.go b/authority/cache/cache.go new file mode 100644 index 000000000..4cd56d36c --- /dev/null +++ b/authority/cache/cache.go @@ -0,0 +1,89 @@ +package cache + +import ( + "context" + "errors" + "sync" +) + +var ErrNotFound = errors.New("not found") + +type Cache interface { + Get(context.Context, string) ([]byte, error) + Set(context.Context, string, []byte) error + Delete(context.Context, string) error +} + +type Getter interface { + Get(ctx context.Context, key string) ([]byte, error) +} + +// A GetterFunc implements Getter with a function. +type GetterFunc func(ctx context.Context, key string) ([]byte, error) + +func (f GetterFunc) Get(ctx context.Context, key string) ([]byte, error) { + return f(ctx, key) +} + +type Pool interface { + New(name string, getter Getter) Cache + Get(name string) (Cache, bool) +} + +func DefaultPool() Pool { + return &defaultPool{ + caches: make(map[string]Cache), + } +} + +type defaultPool struct { + mu sync.RWMutex + caches map[string]Cache +} + +func (p *defaultPool) New(name string, getter Getter) Cache { + c := &mapCache{ + m: new(sync.Map), + getter: getter, + } + p.mu.Lock() + p.caches[name] = c + p.mu.Unlock() + return c +} + +func (p *defaultPool) Get(name string) (Cache, bool) { + p.mu.RLock() + c, ok := p.caches[name] + p.mu.RUnlock() + return c, ok +} + +type mapCache struct { + name string + m *sync.Map + getter Getter +} + +func (m *mapCache) Get(ctx context.Context, key string) ([]byte, error) { + v, ok := m.m.Load(key) + if !ok { + b, err := m.getter.Get(ctx, key) + if err != nil { + return nil, err + } + m.m.Store(key, b) + return b, nil + } + return v.([]byte), nil +} + +func (m *mapCache) Set(ctx context.Context, key string, value []byte) error { + m.m.Store(key, value) + return nil +} + +func (m *mapCache) Delete(ctx context.Context, key string) error { + m.m.Delete(key) + return nil +} diff --git a/authority/options.go b/authority/options.go index 1c1545775..acfe773b4 100644 --- a/authority/options.go +++ b/authority/options.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" "github.com/smallstep/certificates/authority/admin" + "github.com/smallstep/certificates/authority/cache" "github.com/smallstep/certificates/authority/config" "github.com/smallstep/certificates/authority/provisioner" "github.com/smallstep/certificates/cas" @@ -284,6 +285,14 @@ func WithX509Enforcers(ces ...provisioner.CertificateEnforcer) Option { } } +// WithCachePool is an options that allows to define a custom cache pool. +func WithCachePool(pool cache.Pool) Option { + return func(a *Authority) error { + a.cachePool = pool + return nil + } +} + func readCertificateBundle(pemCerts []byte) ([]*x509.Certificate, error) { var block *pem.Block var certs []*x509.Certificate diff --git a/authority/provisioner/collection.go b/authority/provisioner/collection.go index 8bbace5fc..69d5d8375 100644 --- a/authority/provisioner/collection.go +++ b/authority/provisioner/collection.go @@ -1,19 +1,25 @@ package provisioner import ( + "context" "crypto/sha1" "crypto/x509" "encoding/asn1" "encoding/binary" "encoding/hex" "fmt" + "log" "net/url" "sort" "strings" "sync" + "time" "github.com/smallstep/certificates/authority/admin" + "github.com/smallstep/certificates/authority/cache" "go.step.sm/crypto/jose" + "go.step.sm/linkedca" + "google.golang.org/protobuf/proto" ) // DefaultProvisionersLimit is the default limit for listing provisioners. @@ -33,6 +39,10 @@ func (p provisionerSlice) Len() int { return len(p) } func (p provisionerSlice) Less(i, j int) bool { return p[i].uid < p[j].uid } func (p provisionerSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func defaultContext() (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), 5*time.Second) +} + // loadByTokenPayload is a payload used to extract the id used to load the // provisioner. type loadByTokenPayload struct { @@ -50,22 +60,53 @@ type Collection struct { byTokenID *sync.Map sorted provisionerSlice audiences Audiences + + // new + byIDCache cache.Cache + byNameCache cache.Cache } // NewCollection initializes a collection of provisioners. The given list of // audiences are the audiences used by the JWT provisioner. -func NewCollection(audiences Audiences) *Collection { +func NewCollection(config Config) *Collection { + byID := config.CachePool.New("provisioner_by_id", cache.GetterFunc(func(ctx context.Context, id string) ([]byte, error) { + p, err := config.AdminDB.GetProvisioner(ctx, id) + if err != nil { + return nil, err + } + return proto.Marshal(p) + })) + // byName maps a name with a provisioner id, we will manually fill this cache. + byName := config.CachePool.New("provisioner_by_name", cache.GetterFunc(func(ctx context.Context, name string) ([]byte, error) { + return nil, cache.ErrNotFound + })) + return &Collection{ - byID: new(sync.Map), - byKey: new(sync.Map), - byName: new(sync.Map), - byTokenID: new(sync.Map), - audiences: audiences, + byID: new(sync.Map), + byKey: new(sync.Map), + byName: new(sync.Map), + byTokenID: new(sync.Map), + audiences: config.Audiences, + byIDCache: byID, + byNameCache: byName, } } // Load a provisioner by the ID. func (c *Collection) Load(id string) (Interface, bool) { + ctx, cancel := defaultContext() + defer cancel() + + b, err := c.byIDCache.Get(ctx, id) + if err != nil { + return nil, false + } + + var p linkedca.Provisioner + if err := proto.Unmarshal(b, &p); err != nil { + return nil, false + } + log.Printf("Provisioner.Load(%s): %v", id, p) return loadProvisioner(c.byID, id) } @@ -180,6 +221,7 @@ func (c *Collection) LoadEncryptedKey(keyID string) (string, bool) { // Store adds a provisioner to the collection and enforces the uniqueness of // provisioner IDs. func (c *Collection) Store(p Interface) error { + // Store provisioner always in byID. ID must be unique. if _, loaded := c.byID.LoadOrStore(p.GetID(), p); loaded { return admin.NewError(admin.ErrorBadRequestType, diff --git a/authority/provisioner/provisioner.go b/authority/provisioner/provisioner.go index 7438ea17e..3a1aa4c7a 100644 --- a/authority/provisioner/provisioner.go +++ b/authority/provisioner/provisioner.go @@ -9,6 +9,8 @@ import ( "strings" "github.com/pkg/errors" + "github.com/smallstep/certificates/authority/admin" + "github.com/smallstep/certificates/authority/cache" "github.com/smallstep/certificates/db" "github.com/smallstep/certificates/errs" "golang.org/x/crypto/ssh" @@ -214,6 +216,8 @@ type Config struct { Audiences Audiences // DB is the interface to the authority DB client. DB db.AuthDB + // AdminDB is the interface to the administration DB client. + AdminDB admin.DB // SSHKeys are the root SSH public keys SSHKeys *SSHKeys // GetIdentityFunc is a function that returns an identity that will be @@ -225,6 +229,8 @@ type Config struct { // AuthorizeSSHRenewFunc is a function that returns nil if a given SSH // certificate can be renewed. AuthorizeSSHRenewFunc AuthorizeSSHRenewFunc + // CachePool is a type that allows to create new caches. + CachePool cache.Pool } type provisioner struct { diff --git a/authority/provisioners.go b/authority/provisioners.go index a6ac5aa85..475223389 100644 --- a/authority/provisioners.go +++ b/authority/provisioners.go @@ -21,6 +21,20 @@ import ( "gopkg.in/square/go-jose.v2/jwt" ) +// Provisioners is the interface used by the provisioners collection. +type Provisioners interface { + Load(id string) (provisioner.Interface, bool) + Store(p provisioner.Interface) error + Update(p provisioner.Interface) error + Remove(id string) error + LoadByName(name string) (provisioner.Interface, bool) + LoadByToken(token *jose.JSONWebToken, claims *jose.Claims) (provisioner.Interface, bool) + LoadByTokenID(tokenProvisionerID string) (provisioner.Interface, bool) + LoadByCertificate(cert *x509.Certificate) (provisioner.Interface, bool) + Find(cursor string, limit int) (provisioner.List, string) + LoadEncryptedKey(keyID string) (string, bool) +} + // GetEncryptedKey returns the JWE key corresponding to the given kid argument. func (a *Authority) GetEncryptedKey(kid string) (string, error) { a.adminMutex.RLock()