From def93317b489ee499ca91b9e2e8c157aca358c3e Mon Sep 17 00:00:00 2001 From: Aditi Sharma Date: Tue, 18 May 2021 13:37:36 +0000 Subject: [PATCH] Kubelet Credential Provider Improve concurrency and cache for credential provider Removed lock from "Provide" as it can be called in parallel from image puller. To avoid execing for the same image concurrently wrapped exec in singleflight. Purging the cache for expried data with 15mins interval only when a request for credential is made. KEP:2133 Signed-off-by: Aditi Sharma --- go.mod | 1 + pkg/credentialprovider/plugin/plugin.go | 67 +++-- pkg/credentialprovider/plugin/plugin_test.go | 250 +++++++++++++++++-- vendor/modules.txt | 1 + 4 files changed, 285 insertions(+), 34 deletions(-) diff --git a/go.mod b/go.mod index 66d47846bbd2..aeb7d6e95561 100644 --- a/go.mod +++ b/go.mod @@ -90,6 +90,7 @@ require ( golang.org/x/exp v0.0.0-20210220032938-85be41e4509f // indirect golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba diff --git a/pkg/credentialprovider/plugin/plugin.go b/pkg/credentialprovider/plugin/plugin.go index eee135b94237..30b5b0439c91 100644 --- a/pkg/credentialprovider/plugin/plugin.go +++ b/pkg/credentialprovider/plugin/plugin.go @@ -28,10 +28,13 @@ import ( "sync" "time" + "golang.org/x/sync/singleflight" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer/json" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" credentialproviderapi "k8s.io/kubelet/pkg/apis/credentialprovider" @@ -43,7 +46,8 @@ import ( ) const ( - globalCacheKey = "global" + globalCacheKey = "global" + cachePurgeInterval = time.Minute * 15 ) var ( @@ -116,10 +120,14 @@ func newPluginProvider(pluginBinDir string, provider kubeletconfig.CredentialPro return nil, fmt.Errorf("invalid apiVersion: %q", provider.APIVersion) } + clock := clock.RealClock{} + return &pluginProvider{ + clock: clock, matchImages: provider.MatchImages, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: clock}), defaultCacheDuration: provider.DefaultCacheDuration.Duration, + lastCachePurge: clock.Now(), plugin: &execPlugin{ name: provider.Name, apiVersion: provider.APIVersion, @@ -133,8 +141,12 @@ func newPluginProvider(pluginBinDir string, provider kubeletconfig.CredentialPro // pluginProvider is the plugin-based implementation of the DockerConfigProvider interface. type pluginProvider struct { + clock clock.Clock + sync.Mutex + group singleflight.Group + // matchImages defines the matching image URLs this plugin should operate against. // The plugin provider will not return any credentials for images that do not match // against this list of match URLs. @@ -149,6 +161,9 @@ type pluginProvider struct { // plugin is the exec implementation of the credential providing plugin. plugin Plugin + + // lastCachePurge is the last time cache is cleaned for expired entries. + lastCachePurge time.Time } // cacheEntry is the cache object that will be stored in cache.Store. @@ -165,12 +180,14 @@ func cacheKeyFunc(obj interface{}) (string, error) { } // cacheExpirationPolicy defines implements cache.ExpirationPolicy, determining expiration based on the expiresAt timestamp. -type cacheExpirationPolicy struct{} +type cacheExpirationPolicy struct { + clock clock.Clock +} // IsExpired returns true if the current time is after cacheEntry.expiresAt, which is determined by the // cache duration returned from the credential provider plugin response. func (c *cacheExpirationPolicy) IsExpired(entry *cache.TimestampedEntry) bool { - return time.Now().After(entry.Obj.(*cacheEntry).expiresAt) + return c.clock.Now().After(entry.Obj.(*cacheEntry).expiresAt) } // Provide returns a credentialprovider.DockerConfig based on the credentials returned @@ -180,9 +197,6 @@ func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig { return credentialprovider.DockerConfig{} } - p.Lock() - defer p.Unlock() - cachedConfig, found, err := p.getCachedCredentials(image) if err != nil { klog.Errorf("Failed to get cached docker config: %v", err) @@ -193,12 +207,27 @@ func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig { return cachedConfig } - response, err := p.plugin.ExecPlugin(context.Background(), image) + // ExecPlugin is wrapped in single flight to exec plugin once for concurrent same image request. + // The caveat here is we don't know cacheKeyType yet, so if cacheKeyType is registry/global and credentials saved in cache + // on per registry/global basis then exec will be called for all requests if requests are made concurrently. + // foo.bar.registry + // foo.bar.registry/image1 + // foo.bar.registry/image2 + res, err, _ := p.group.Do(image, func() (interface{}, error) { + return p.plugin.ExecPlugin(context.Background(), image) + }) + if err != nil { klog.Errorf("Failed getting credential from external registry credential provider: %v", err) return credentialprovider.DockerConfig{} } + response, ok := res.(*credentialproviderapi.CredentialProviderResponse) + if !ok { + klog.Errorf("Invalid response type returned by external credential provider") + return credentialprovider.DockerConfig{} + } + var cacheKey string switch cacheKeyType := response.CacheKeyType; cacheKeyType { case credentialproviderapi.ImagePluginCacheKeyType: @@ -232,10 +261,9 @@ func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig { if p.defaultCacheDuration == 0 { return dockerConfig } - - expiresAt = time.Now().Add(p.defaultCacheDuration) + expiresAt = p.clock.Now().Add(p.defaultCacheDuration) } else { - expiresAt = time.Now().Add(response.CacheDuration.Duration) + expiresAt = p.clock.Now().Add(response.CacheDuration.Duration) } cachedEntry := &cacheEntry{ @@ -269,6 +297,16 @@ func (p *pluginProvider) isImageAllowed(image string) bool { // getCachedCredentials returns a credentialprovider.DockerConfig if cached from the plugin. func (p *pluginProvider) getCachedCredentials(image string) (credentialprovider.DockerConfig, bool, error) { + p.Lock() + if p.clock.Now().After(p.lastCachePurge.Add(cachePurgeInterval)) { + // NewExpirationCache purges expired entries when List() is called + // The expired entry in the cache is removed only when Get or List called on it. + // List() is called on some interval to remove those expired entries on which Get is never called. + _ = p.cache.List() + p.lastCachePurge = p.clock.Now() + } + p.Unlock() + obj, found, err := p.cache.GetByKey(image) if err != nil { return nil, false, err @@ -325,6 +363,8 @@ type execPlugin struct { // The plugin is expected to receive the CredentialProviderRequest API via stdin from the kubelet and // return CredentialProviderResponse via stdout. func (e *execPlugin) ExecPlugin(ctx context.Context, image string) (*credentialproviderapi.CredentialProviderResponse, error) { + klog.V(5).Infof("Getting image %s credentials from external exec plugin %s", image, e.name) + authRequest := &credentialproviderapi.CredentialProviderRequest{Image: image} data, err := e.encodeRequest(authRequest) if err != nil { @@ -361,7 +401,6 @@ func (e *execPlugin) ExecPlugin(ctx context.Context, image string) (*credentialp } data = stdout.Bytes() - // check that the response apiVersion matches what is expected gvk, err := json.DefaultMetaFactory.Interpret(data) if err != nil { @@ -369,10 +408,10 @@ func (e *execPlugin) ExecPlugin(ctx context.Context, image string) (*credentialp } if gvk.GroupVersion().String() != e.apiVersion { - return nil, errors.New("apiVersion from credential plugin response did not match") + return nil, fmt.Errorf("apiVersion from credential plugin response did not match expected apiVersion:%s, actual apiVersion:%s", e.apiVersion, gvk.GroupVersion().String()) } - response, err := e.decodeResponse(stdout.Bytes()) + response, err := e.decodeResponse(data) if err != nil { // err is explicitly not wrapped since it may contain credentials in the response. return nil, errors.New("error decoding credential provider plugin response from stdout") diff --git a/pkg/credentialprovider/plugin/plugin_test.go b/pkg/credentialprovider/plugin/plugin_test.go index 6c9a9358e56d..817ad486f088 100644 --- a/pkg/credentialprovider/plugin/plugin_test.go +++ b/pkg/credentialprovider/plugin/plugin_test.go @@ -18,12 +18,17 @@ package plugin import ( "context" + "fmt" "reflect" + "sync" "testing" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/client-go/tools/cache" credentialproviderapi "k8s.io/kubelet/pkg/apis/credentialprovider" credentialproviderv1alpha1 "k8s.io/kubelet/pkg/apis/credentialprovider/v1alpha1" @@ -48,6 +53,7 @@ func (f *fakeExecPlugin) ExecPlugin(ctx context.Context, image string) (*credent } func Test_Provide(t *testing.T) { + tclock := clock.RealClock{} testcases := []struct { name string pluginProvider *pluginProvider @@ -57,8 +63,10 @@ func Test_Provide(t *testing.T) { { name: "exact image match, with Registry cache key", pluginProvider: &pluginProvider{ - matchImages: []string{"test.registry.io"}, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"test.registry.io"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), plugin: &fakeExecPlugin{ cacheKeyType: credentialproviderapi.RegistryPluginCacheKeyType, auth: map[string]credentialproviderapi.AuthConfig{ @@ -80,8 +88,10 @@ func Test_Provide(t *testing.T) { { name: "exact image match, with Image cache key", pluginProvider: &pluginProvider{ - matchImages: []string{"test.registry.io/foo/bar"}, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"test.registry.io/foo/bar"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), plugin: &fakeExecPlugin{ cacheKeyType: credentialproviderapi.ImagePluginCacheKeyType, auth: map[string]credentialproviderapi.AuthConfig{ @@ -103,8 +113,10 @@ func Test_Provide(t *testing.T) { { name: "exact image match, with Global cache key", pluginProvider: &pluginProvider{ - matchImages: []string{"test.registry.io"}, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"test.registry.io"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), plugin: &fakeExecPlugin{ cacheKeyType: credentialproviderapi.GlobalPluginCacheKeyType, auth: map[string]credentialproviderapi.AuthConfig{ @@ -126,8 +138,10 @@ func Test_Provide(t *testing.T) { { name: "wild card image match, with Registry cache key", pluginProvider: &pluginProvider{ - matchImages: []string{"*.registry.io:8080"}, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"*.registry.io:8080"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), plugin: &fakeExecPlugin{ cacheKeyType: credentialproviderapi.RegistryPluginCacheKeyType, auth: map[string]credentialproviderapi.AuthConfig{ @@ -149,8 +163,10 @@ func Test_Provide(t *testing.T) { { name: "wild card image match, with Image cache key", pluginProvider: &pluginProvider{ - matchImages: []string{"*.*.registry.io"}, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"*.*.registry.io"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), plugin: &fakeExecPlugin{ cacheKeyType: credentialproviderapi.ImagePluginCacheKeyType, auth: map[string]credentialproviderapi.AuthConfig{ @@ -172,8 +188,10 @@ func Test_Provide(t *testing.T) { { name: "wild card image match, with Global cache key", pluginProvider: &pluginProvider{ - matchImages: []string{"*.registry.io"}, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"*.registry.io"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), plugin: &fakeExecPlugin{ cacheKeyType: credentialproviderapi.GlobalPluginCacheKeyType, auth: map[string]credentialproviderapi.AuthConfig{ @@ -195,7 +213,9 @@ func Test_Provide(t *testing.T) { } for _, testcase := range testcases { + testcase := testcase t.Run(testcase.name, func(t *testing.T) { + t.Parallel() dockerconfig := testcase.pluginProvider.Provide(testcase.image) if !reflect.DeepEqual(dockerconfig, testcase.dockerconfig) { t.Logf("actual docker config: %v", dockerconfig) @@ -206,6 +226,184 @@ func Test_Provide(t *testing.T) { } } +// This test calls Provide in parallel for different registries and images +// The purpose of this is to detect any race conditions while cache rw. +func Test_ProvideParallel(t *testing.T) { + tclock := clock.RealClock{} + + testcases := []struct { + name string + registry string + }{ + { + name: "provide for registry 1", + registry: "test1.registry.io", + }, + { + name: "provide for registry 2", + registry: "test2.registry.io", + }, + { + name: "provide for registry 3", + registry: "test3.registry.io", + }, + { + name: "provide for registry 4", + registry: "test4.registry.io", + }, + } + + pluginProvider := &pluginProvider{ + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"test1.registry.io", "test2.registry.io", "test3.registry.io", "test4.registry.io"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), + plugin: &fakeExecPlugin{ + cacheDuration: time.Minute * 1, + cacheKeyType: credentialproviderapi.RegistryPluginCacheKeyType, + auth: map[string]credentialproviderapi.AuthConfig{ + "test.registry.io": { + Username: "user", + Password: "password", + }, + }, + }, + } + + dockerconfig := credentialprovider.DockerConfig{ + "test.registry.io": credentialprovider.DockerConfigEntry{ + Username: "user", + Password: "password", + }, + } + + for _, testcase := range testcases { + testcase := testcase + t.Run(testcase.name, func(t *testing.T) { + t.Parallel() + var wg sync.WaitGroup + wg.Add(5) + + for i := 0; i < 5; i++ { + go func(w *sync.WaitGroup) { + image := fmt.Sprintf(testcase.registry+"/%s", rand.String(5)) + dockerconfigResponse := pluginProvider.Provide(image) + if !reflect.DeepEqual(dockerconfigResponse, dockerconfig) { + t.Logf("actual docker config: %v", dockerconfigResponse) + t.Logf("expected docker config: %v", dockerconfig) + t.Error("unexpected docker config") + } + w.Done() + }(&wg) + } + wg.Wait() + + }) + } +} + +func Test_getCachedCredentials(t *testing.T) { + fakeClock := clock.NewFakeClock(time.Now()) + p := &pluginProvider{ + clock: fakeClock, + lastCachePurge: fakeClock.Now(), + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: fakeClock}), + plugin: &fakeExecPlugin{}, + } + + testcases := []struct { + name string + step time.Duration + cacheEntry cacheEntry + expectedResponse credentialprovider.DockerConfig + keyLength int + getKey string + }{ + { + name: "It should return not expired credential", + step: 1 * time.Second, + keyLength: 1, + getKey: "image1", + expectedResponse: map[string]credentialprovider.DockerConfigEntry{ + "image1": { + Username: "user1", + Password: "pass1", + }, + }, + cacheEntry: cacheEntry{ + key: "image1", + expiresAt: fakeClock.Now().Add(1 * time.Minute), + credentials: map[string]credentialprovider.DockerConfigEntry{ + "image1": { + Username: "user1", + Password: "pass1", + }, + }, + }, + }, + + { + name: "It should not return expired credential", + step: 2 * time.Minute, + getKey: "image2", + keyLength: 1, + cacheEntry: cacheEntry{ + key: "image2", + expiresAt: fakeClock.Now(), + credentials: map[string]credentialprovider.DockerConfigEntry{ + "image2": { + Username: "user2", + Password: "pass2", + }, + }, + }, + }, + + { + name: "It should delete expired credential during purge", + step: 18 * time.Minute, + keyLength: 0, + // while get call for random, cache purge will be called and it will delete expired + // image3 credentials. We cannot use image3 as getKey here, as it will get deleted during + // get only, we will not be able verify the purge call. + getKey: "random", + cacheEntry: cacheEntry{ + key: "image3", + expiresAt: fakeClock.Now().Add(2 * time.Minute), + credentials: map[string]credentialprovider.DockerConfigEntry{ + "image3": { + Username: "user3", + Password: "pass3", + }, + }, + }, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + p.cache.Add(&tc.cacheEntry) + fakeClock.Step(tc.step) + + // getCachedCredentials returns unexpired credentials. + res, _, err := p.getCachedCredentials(tc.getKey) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + if !reflect.DeepEqual(res, tc.expectedResponse) { + t.Logf("response %v", res) + t.Logf("expected response %v", tc.expectedResponse) + t.Errorf("Unexpected response") + } + + // Listkeys returns all the keys present in cache including expired keys. + if len(p.cache.ListKeys()) != tc.keyLength { + t.Errorf("Unexpected cache key length") + } + }) + } +} + func Test_encodeRequest(t *testing.T) { testcases := []struct { name string @@ -316,9 +514,12 @@ func Test_decodeResponse(t *testing.T) { } func Test_RegistryCacheKeyType(t *testing.T) { + tclock := clock.RealClock{} pluginProvider := &pluginProvider{ - matchImages: []string{"*.registry.io"}, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"*.registry.io"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), plugin: &fakeExecPlugin{ cacheKeyType: credentialproviderapi.RegistryPluginCacheKeyType, cacheDuration: time.Hour, @@ -366,9 +567,12 @@ func Test_RegistryCacheKeyType(t *testing.T) { } func Test_ImageCacheKeyType(t *testing.T) { + tclock := clock.RealClock{} pluginProvider := &pluginProvider{ - matchImages: []string{"*.registry.io"}, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"*.registry.io"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), plugin: &fakeExecPlugin{ cacheKeyType: credentialproviderapi.ImagePluginCacheKeyType, cacheDuration: time.Hour, @@ -416,9 +620,12 @@ func Test_ImageCacheKeyType(t *testing.T) { } func Test_GlobalCacheKeyType(t *testing.T) { + tclock := clock.RealClock{} pluginProvider := &pluginProvider{ - matchImages: []string{"*.registry.io"}, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"*.registry.io"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), plugin: &fakeExecPlugin{ cacheKeyType: credentialproviderapi.GlobalPluginCacheKeyType, cacheDuration: time.Hour, @@ -466,9 +673,12 @@ func Test_GlobalCacheKeyType(t *testing.T) { } func Test_NoCacheResponse(t *testing.T) { + tclock := clock.RealClock{} pluginProvider := &pluginProvider{ - matchImages: []string{"*.registry.io"}, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"*.registry.io"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), plugin: &fakeExecPlugin{ cacheKeyType: credentialproviderapi.GlobalPluginCacheKeyType, cacheDuration: 0, // no cache diff --git a/vendor/modules.txt b/vendor/modules.txt index bc07c60b8f43..9e4bad67add0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1023,6 +1023,7 @@ golang.org/x/oauth2/internal golang.org/x/oauth2/jws golang.org/x/oauth2/jwt # golang.org/x/sync v0.0.0-20210220032951-036812b2e83c => golang.org/x/sync v0.0.0-20210220032951-036812b2e83c +## explicit golang.org/x/sync/singleflight # golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 => golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 ## explicit