Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve concurrency and cache for kubelet credential provider #102168

Merged
merged 1 commit into from Jul 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -90,6 +90,7 @@ require (
golang.org/x/exp v0.0.0-20210220032938-85be41e4509f // indirect
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
Expand Down
67 changes: 53 additions & 14 deletions pkg/credentialprovider/plugin/plugin.go
Expand Up @@ -28,10 +28,13 @@ import (
"sync"
"time"

"golang.org/x/sync/singleflight"
adisky marked this conversation as resolved.
Show resolved Hide resolved

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
credentialproviderapi "k8s.io/kubelet/pkg/apis/credentialprovider"
Expand All @@ -43,7 +46,8 @@ import (
)

const (
globalCacheKey = "global"
globalCacheKey = "global"
cachePurgeInterval = time.Minute * 15
)

var (
Expand Down Expand Up @@ -116,10 +120,14 @@ func newPluginProvider(pluginBinDir string, provider kubeletconfig.CredentialPro
return nil, fmt.Errorf("invalid apiVersion: %q", provider.APIVersion)
}

clock := clock.RealClock{}
adisky marked this conversation as resolved.
Show resolved Hide resolved

return &pluginProvider{
clock: clock,
matchImages: provider.MatchImages,
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: clock}),
defaultCacheDuration: provider.DefaultCacheDuration.Duration,
lastCachePurge: clock.Now(),
plugin: &execPlugin{
name: provider.Name,
apiVersion: provider.APIVersion,
Expand All @@ -133,8 +141,12 @@ func newPluginProvider(pluginBinDir string, provider kubeletconfig.CredentialPro

// pluginProvider is the plugin-based implementation of the DockerConfigProvider interface.
type pluginProvider struct {
clock clock.Clock

sync.Mutex

group singleflight.Group

// matchImages defines the matching image URLs this plugin should operate against.
// The plugin provider will not return any credentials for images that do not match
// against this list of match URLs.
Expand All @@ -149,6 +161,9 @@ type pluginProvider struct {

// plugin is the exec implementation of the credential providing plugin.
plugin Plugin

// lastCachePurge is the last time cache is cleaned for expired entries.
lastCachePurge time.Time
}

// cacheEntry is the cache object that will be stored in cache.Store.
Expand All @@ -165,12 +180,14 @@ func cacheKeyFunc(obj interface{}) (string, error) {
}

// cacheExpirationPolicy defines implements cache.ExpirationPolicy, determining expiration based on the expiresAt timestamp.
type cacheExpirationPolicy struct{}
type cacheExpirationPolicy struct {
clock clock.Clock
}

// IsExpired returns true if the current time is after cacheEntry.expiresAt, which is determined by the
// cache duration returned from the credential provider plugin response.
func (c *cacheExpirationPolicy) IsExpired(entry *cache.TimestampedEntry) bool {
return time.Now().After(entry.Obj.(*cacheEntry).expiresAt)
return c.clock.Now().After(entry.Obj.(*cacheEntry).expiresAt)
}

// Provide returns a credentialprovider.DockerConfig based on the credentials returned
Expand All @@ -180,9 +197,6 @@ func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig {
return credentialprovider.DockerConfig{}
}

p.Lock()
defer p.Unlock()

cachedConfig, found, err := p.getCachedCredentials(image)
liggitt marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
klog.Errorf("Failed to get cached docker config: %v", err)
Expand All @@ -193,12 +207,27 @@ func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig {
return cachedConfig
}

response, err := p.plugin.ExecPlugin(context.Background(), image)
// ExecPlugin is wrapped in single flight to exec plugin once for concurrent same image request.
// The caveat here is we don't know cacheKeyType yet, so if cacheKeyType is registry/global and credentials saved in cache
adisky marked this conversation as resolved.
Show resolved Hide resolved
// on per registry/global basis then exec will be called for all requests if requests are made concurrently.
// foo.bar.registry
// foo.bar.registry/image1
// foo.bar.registry/image2
res, err, _ := p.group.Do(image, func() (interface{}, error) {
return p.plugin.ExecPlugin(context.Background(), image)
adisky marked this conversation as resolved.
Show resolved Hide resolved
adisky marked this conversation as resolved.
Show resolved Hide resolved
})

if err != nil {
klog.Errorf("Failed getting credential from external registry credential provider: %v", err)
return credentialprovider.DockerConfig{}
}

response, ok := res.(*credentialproviderapi.CredentialProviderResponse)
if !ok {
klog.Errorf("Invalid response type returned by external credential provider")
return credentialprovider.DockerConfig{}
}

var cacheKey string
switch cacheKeyType := response.CacheKeyType; cacheKeyType {
case credentialproviderapi.ImagePluginCacheKeyType:
Expand Down Expand Up @@ -232,10 +261,9 @@ func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig {
if p.defaultCacheDuration == 0 {
return dockerConfig
}

expiresAt = time.Now().Add(p.defaultCacheDuration)
expiresAt = p.clock.Now().Add(p.defaultCacheDuration)
} else {
expiresAt = time.Now().Add(response.CacheDuration.Duration)
expiresAt = p.clock.Now().Add(response.CacheDuration.Duration)
}

cachedEntry := &cacheEntry{
Expand Down Expand Up @@ -269,6 +297,16 @@ func (p *pluginProvider) isImageAllowed(image string) bool {

// getCachedCredentials returns a credentialprovider.DockerConfig if cached from the plugin.
func (p *pluginProvider) getCachedCredentials(image string) (credentialprovider.DockerConfig, bool, error) {
p.Lock()
if p.clock.Now().After(p.lastCachePurge.Add(cachePurgeInterval)) {
// NewExpirationCache purges expired entries when List() is called
// The expired entry in the cache is removed only when Get or List called on it.
// List() is called on some interval to remove those expired entries on which Get is never called.
_ = p.cache.List()
adisky marked this conversation as resolved.
Show resolved Hide resolved
p.lastCachePurge = p.clock.Now()
}
p.Unlock()

obj, found, err := p.cache.GetByKey(image)
if err != nil {
return nil, false, err
Expand Down Expand Up @@ -325,6 +363,8 @@ type execPlugin struct {
// The plugin is expected to receive the CredentialProviderRequest API via stdin from the kubelet and
// return CredentialProviderResponse via stdout.
func (e *execPlugin) ExecPlugin(ctx context.Context, image string) (*credentialproviderapi.CredentialProviderResponse, error) {
klog.V(5).Infof("Getting image %s credentials from external exec plugin %s", image, e.name)

authRequest := &credentialproviderapi.CredentialProviderRequest{Image: image}
data, err := e.encodeRequest(authRequest)
if err != nil {
Expand Down Expand Up @@ -361,18 +401,17 @@ func (e *execPlugin) ExecPlugin(ctx context.Context, image string) (*credentialp
}

data = stdout.Bytes()

// check that the response apiVersion matches what is expected
gvk, err := json.DefaultMetaFactory.Interpret(data)
if err != nil {
return nil, fmt.Errorf("error reading GVK from response: %w", err)
}

if gvk.GroupVersion().String() != e.apiVersion {
return nil, errors.New("apiVersion from credential plugin response did not match")
return nil, fmt.Errorf("apiVersion from credential plugin response did not match expected apiVersion:%s, actual apiVersion:%s", e.apiVersion, gvk.GroupVersion().String())
}

response, err := e.decodeResponse(stdout.Bytes())
response, err := e.decodeResponse(data)
if err != nil {
// err is explicitly not wrapped since it may contain credentials in the response.
return nil, errors.New("error decoding credential provider plugin response from stdout")
Expand Down