From b4244a079f650e84fdefd281e061b3ab67f1d427 Mon Sep 17 00:00:00 2001 From: Harry Zhang Date: Tue, 26 Jan 2016 23:03:37 +0800 Subject: [PATCH 1/3] Implement OomScoreAdj in kubelet --- pkg/kubelet/dockertools/manager.go | 64 ++++++++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 4 deletions(-) diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 36d0e4295247..97d9419a6693 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -62,6 +62,8 @@ const ( minimumDockerAPIVersion = "1.20" + dockerv110APIVersion = "1.21" + // ndots specifies the minimum number of dots that a domain name must contain for the resolver to consider it as FQDN (fully-qualified) // we want to able to consider SRV lookup names like _dns._udp.kube-dns.default.svc to be considered relative. // hence, setting ndots to be 5. @@ -503,7 +505,8 @@ func (dm *DockerManager) runContainer( ipcMode string, utsMode string, pidMode string, - restartCount int) (kubecontainer.ContainerID, error) { + restartCount int, + oomScoreAdj int) (kubecontainer.ContainerID, error) { dockerName := KubeletContainerName{ PodFullName: kubecontainer.GetPodFullName(pod), @@ -584,6 +587,11 @@ func (dm *DockerManager) runContainer( SecurityOpt: securityOpts, } + // If current api version is newer than docker 1.10 requested, set OomScoreAdj to HostConfig + if dm.checkDockerAPIVersion(dockerv110APIVersion) >= 0 { + hc.OomScoreAdj = oomScoreAdj + } + if dm.cpuCFSQuota { // if cpuLimit.Amount is nil, then the appropriate default value is returned to allow full usage of cpu resource. cpuQuota, cpuPeriod := milliCPUToQuota(cpuLimit.MilliValue()) @@ -1473,7 +1481,20 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe if usesHostNetwork(pod) { utsMode = namespaceModeHost } - id, err := dm.runContainer(pod, container, opts, ref, netMode, ipcMode, utsMode, pidMode, restartCount) + + // Set OOM score of the container based on the priority of the container. + // Processes in lower-priority pods should be killed first if the system runs out of memory. + // The main pod infrastructure container is considered high priority, since if it is killed the + // whole pod will die. + var oomScoreAdj int + if container.Name == PodInfraContainerName { + oomScoreAdj = qos.PodInfraOOMAdj + } else { + oomScoreAdj = qos.GetContainerOOMScoreAdjust(container, int64(dm.machineInfo.MemoryCapacity)) + + } + + id, err := dm.runContainer(pod, container, opts, ref, netMode, ipcMode, utsMode, pidMode, restartCount, oomScoreAdj) if err != nil { return kubecontainer.ContainerID{}, fmt.Errorf("runContainer: %v", err) } @@ -1512,9 +1533,12 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe return kubecontainer.ContainerID{}, fmt.Errorf("can't get init PID for container %q", id) } - if err := dm.applyOOMScoreAdj(container, containerInfo); err != nil { - return kubecontainer.ContainerID{}, fmt.Errorf("failed to apply oom-score-adj to container %q- %v", err, containerInfo.Name) + // Check if current docker version is higher than 1.10. Otherwise, we have to apply OOMScoreAdj instead of using docker API. + err = dm.applyOOMScoreAdjIfNeeded(container, containerInfo) + if err != nil { + return kubecontainer.ContainerID{}, err } + // The addNDotsOption call appends the ndots option to the resolv.conf file generated by docker. // This resolv.conf file is shared by all containers of the same pod, and needs to be modified only once per pod. // we modify it when the pause container is created since it is the first container created in the pod since it holds @@ -1529,6 +1553,38 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe return id, err } +func (dm *DockerManager) applyOOMScoreAdjIfNeeded(container *api.Container, containerInfo *docker.Container) error { + // Compare current API version with expected api version + result := dm.checkDockerAPIVersion(dockerv110APIVersion) + // If current api version is older than OOMScoreAdj requested, use the old way. + if result < 0 { + if err := dm.applyOOMScoreAdj(container, containerInfo); err != nil { + return fmt.Errorf("failed to apply oom-score-adj to container %q- %v", err, containerInfo.Name) + } + } + + return nil +} + +// Check current docker API version against expected version. +// Return: +// 1 : newer than expected version +// -1: older than expected version +// 0 : same version +func (dm *DockerManager) checkDockerAPIVersion(expectedVersion string) int { + apiVersion, err := dm.APIVersion() + if err != nil { + glog.Errorf("failed to get current docker version - %v", err) + } + + result, err := apiVersion.Compare(expectedVersion) + if err != nil { + glog.Errorf("failed to compare current docker version %v with OOMScoreAdj supported Docker version %q - %v", + apiVersion, expectedVersion, err) + } + return result +} + func addNDotsOption(resolvFilePath string) error { if len(resolvFilePath) == 0 { glog.Errorf("ResolvConfPath is empty.") From f9e2f522b4b99359c23d6b15f80075d9c58a32bf Mon Sep 17 00:00:00 2001 From: harry Date: Fri, 26 Feb 2016 17:06:26 +0800 Subject: [PATCH 2/3] Add cache for api version Expose runtime interface --- pkg/kubelet/container/runtime.go | 6 +- pkg/kubelet/dockertools/manager.go | 86 ++++++++++++++++-------- pkg/kubelet/dockertools/version_cache.go | 62 +++++++++++++++++ 3 files changed, 125 insertions(+), 29 deletions(-) create mode 100644 pkg/kubelet/dockertools/version_cache.go diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 9fda09993cbe..1508dc1679b0 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -55,8 +55,10 @@ type Runtime interface { // Version returns the version information of the container runtime. Version() (Version, error) - // APIVersion returns the API version information of the container - // runtime. This may be different from the runtime engine's version. + + // APIVersion returns the cached API version information of the container + // runtime. Implementation is expected to update this cache periodically. + // This may be different from the runtime engine's version. // TODO(random-liu): We should fold this into Version() APIVersion() (Version, error) // Status returns error if the runtime is unhealthy; nil otherwise. diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 97d9419a6693..f97c56bba5ca 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -55,6 +55,7 @@ import ( "k8s.io/kubernetes/pkg/util/procfs" utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilstrings "k8s.io/kubernetes/pkg/util/strings" + "k8s.io/kubernetes/pkg/util/wait" ) const ( @@ -158,6 +159,9 @@ type DockerManager struct { // A false value means the kubelet just backs off from setting it, // it might already be true. configureHairpinMode bool + + // The api version cache of docker daemon. + versionCache *VersionCache } // A subset of the pod.Manager interface extracted for testing purposes. @@ -235,6 +239,7 @@ func NewDockerManager( cpuCFSQuota: cpuCFSQuota, enableCustomMetrics: enableCustomMetrics, configureHairpinMode: hairpinMode, + versionCache: NewVersionCache(), } dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm) if serializeImagePulls { @@ -249,6 +254,21 @@ func NewDockerManager( optf(dm) } + apiVersion, err := dm.APIVersion() + if err != nil { + glog.Errorf("Failed to get api version from docker %v", err) + } + + daemonVersion, err := dm.Version() + if err != nil { + glog.Errorf("Failed to get daemon version from docker %v", err) + } + + // Update version cache periodically + go wait.Until(func() { + dm.versionCache.Update(dm.machineInfo.MachineID, apiVersion, daemonVersion) + }, 5*time.Second, wait.NeverStop) + return dm } @@ -1437,17 +1457,7 @@ func (dm *DockerManager) applyOOMScoreAdj(container *api.Container, containerInf } return err } - // Set OOM score of the container based on the priority of the container. - // Processes in lower-priority pods should be killed first if the system runs out of memory. - // The main pod infrastructure container is considered high priority, since if it is killed the - // whole pod will die. - // TODO: Cache this value. - var oomScoreAdj int - if containerInfo.Name == PodInfraContainerName { - oomScoreAdj = qos.PodInfraOOMAdj - } else { - oomScoreAdj = qos.GetContainerOOMScoreAdjust(container, int64(dm.machineInfo.MemoryCapacity)) - } + oomScoreAdj := dm.calculateOomScoreAdj(container) if err = dm.oomAdjuster.ApplyOOMScoreAdjContainer(cgroupName, oomScoreAdj, 5); err != nil { if err == os.ErrNotExist { // Container exited. We cannot do anything about it. Ignore this error. @@ -1482,17 +1492,7 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe utsMode = namespaceModeHost } - // Set OOM score of the container based on the priority of the container. - // Processes in lower-priority pods should be killed first if the system runs out of memory. - // The main pod infrastructure container is considered high priority, since if it is killed the - // whole pod will die. - var oomScoreAdj int - if container.Name == PodInfraContainerName { - oomScoreAdj = qos.PodInfraOOMAdj - } else { - oomScoreAdj = qos.GetContainerOOMScoreAdjust(container, int64(dm.machineInfo.MemoryCapacity)) - - } + oomScoreAdj := dm.calculateOomScoreAdj(container) id, err := dm.runContainer(pod, container, opts, ref, netMode, ipcMode, utsMode, pidMode, restartCount, oomScoreAdj) if err != nil { @@ -1566,20 +1566,52 @@ func (dm *DockerManager) applyOOMScoreAdjIfNeeded(container *api.Container, cont return nil } -// Check current docker API version against expected version. +func (dm *DockerManager) calculateOomScoreAdj(container *api.Container) int { + // Set OOM score of the container based on the priority of the container. + // Processes in lower-priority pods should be killed first if the system runs out of memory. + // The main pod infrastructure container is considered high priority, since if it is killed the + // whole pod will die. + var oomScoreAdj int + if container.Name == PodInfraContainerName { + oomScoreAdj = qos.PodInfraOOMAdj + } else { + oomScoreAdj = qos.GetContainerOOMScoreAdjust(container, int64(dm.machineInfo.MemoryCapacity)) + + } + + return oomScoreAdj +} + +// getCachedApiVersion gets cached api version of docker runtime. +func (dm *DockerManager) getCachedApiVersion() (kubecontainer.Version, error) { + apiVersion, _, err := dm.versionCache.Get(dm.machineInfo.MachineID) + if err != nil { + glog.Errorf("Failed to get cached docker api version %v ", err) + } + // If we got nil apiVersion, try to get api version directly. + if apiVersion == nil { + apiVersion, err = dm.APIVersion() + if err != nil { + glog.Errorf("Failed to get docker api version directly %v ", err) + } + dm.versionCache.Update(dm.machineInfo.MachineID, apiVersion, nil) + } + return apiVersion, err +} + +// checkDockerAPIVersion checks current docker API version against expected version. // Return: // 1 : newer than expected version // -1: older than expected version // 0 : same version func (dm *DockerManager) checkDockerAPIVersion(expectedVersion string) int { - apiVersion, err := dm.APIVersion() + apiVersion, err := dm.getCachedApiVersion() if err != nil { - glog.Errorf("failed to get current docker version - %v", err) + glog.Errorf("Failed to get cached docker api version %v ", err) } - result, err := apiVersion.Compare(expectedVersion) if err != nil { - glog.Errorf("failed to compare current docker version %v with OOMScoreAdj supported Docker version %q - %v", + glog.Errorf("Failed to compare current docker api version %v with OOMScoreAdj supported Docker version %q - %v", apiVersion, expectedVersion, err) } return result diff --git a/pkg/kubelet/dockertools/version_cache.go b/pkg/kubelet/dockertools/version_cache.go new file mode 100644 index 000000000000..278e5e5c507e --- /dev/null +++ b/pkg/kubelet/dockertools/version_cache.go @@ -0,0 +1,62 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dockertools + +import ( + "fmt" + "sync" + + "github.com/golang/groupcache/lru" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" +) + +type VersionCache struct { + lock sync.RWMutex + cache *lru.Cache +} + +// versionInfo caches api version and daemon version. +type versionInfo struct { + apiVersion kubecontainer.Version + version kubecontainer.Version +} + +const maxVersionCacheEntries = 1000 + +func NewVersionCache() *VersionCache { + return &VersionCache{cache: lru.New(maxVersionCacheEntries)} +} + +// Update updates cached versionInfo by using a unique string (e.g. machineInfo) as the key. +func (c *VersionCache) Update(key string, apiVersion kubecontainer.Version, version kubecontainer.Version) { + c.lock.Lock() + defer c.lock.Unlock() + c.cache.Add(key, versionInfo{apiVersion, version}) +} + +// Get gets cached versionInfo by using a unique string (e.g. machineInfo) as the key. +// It returns apiVersion first and followed by daemon version. +func (c *VersionCache) Get(key string) (kubecontainer.Version, kubecontainer.Version, error) { + c.lock.RLock() + defer c.lock.RUnlock() + value, ok := c.cache.Get(key) + if !ok { + return nil, nil, fmt.Errorf("Failed to get version info from cache by key: ", key) + } + versions := value.(versionInfo) + return versions.apiVersion, versions.version, nil +} From c31ec5607ac9a47c4ddf9ecbaa4c4c5bc4857f84 Mon Sep 17 00:00:00 2001 From: Harry Zhang Date: Mon, 14 Mar 2016 16:35:49 +0800 Subject: [PATCH 3/3] Refactor version cache into kubelet util --- pkg/kubelet/dockertools/fake_docker_client.go | 2 + pkg/kubelet/dockertools/fake_manager.go | 4 + pkg/kubelet/dockertools/manager.go | 81 ++++++++++--------- pkg/kubelet/dockertools/manager_test.go | 2 +- .../cache}/version_cache.go | 44 +++++++--- 5 files changed, 83 insertions(+), 50 deletions(-) rename pkg/kubelet/{dockertools => util/cache}/version_cache.go (60%) diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index 1a905cb90be3..742cc6368b42 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -402,6 +402,8 @@ func (f *FakeDockerClient) PullImage(opts docker.PullImageOptions, auth docker.A } func (f *FakeDockerClient) Version() (*docker.Env, error) { + f.Lock() + defer f.Unlock() return &f.VersionInfo, f.popError("version") } diff --git a/pkg/kubelet/dockertools/fake_manager.go b/pkg/kubelet/dockertools/fake_manager.go index 73984f70d32d..28fcffcd41e0 100644 --- a/pkg/kubelet/dockertools/fake_manager.go +++ b/pkg/kubelet/dockertools/fake_manager.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/network" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/kubelet/util/cache" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/oom" @@ -52,6 +53,9 @@ func NewFakeDockerManager( burst, containerLogsDir, osInterface, networkPlugin, runtimeHelper, httpClient, &NativeExecHandler{}, fakeOOMAdjuster, fakeProcFs, false, imageBackOff, false, false, true) dm.dockerPuller = &FakeDockerPuller{} + dm.versionCache = cache.NewVersionCache(func() (kubecontainer.Version, kubecontainer.Version, error) { + return dm.getVersionInfo() + }) return dm } diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index f97c56bba5ca..6b9699ba7084 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -46,6 +46,7 @@ import ( proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/qos" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/kubelet/util/cache" "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/securitycontext" @@ -55,7 +56,6 @@ import ( "k8s.io/kubernetes/pkg/util/procfs" utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilstrings "k8s.io/kubernetes/pkg/util/strings" - "k8s.io/kubernetes/pkg/util/wait" ) const ( @@ -161,7 +161,7 @@ type DockerManager struct { configureHairpinMode bool // The api version cache of docker daemon. - versionCache *VersionCache + versionCache *cache.VersionCache } // A subset of the pod.Manager interface extracted for testing purposes. @@ -239,7 +239,6 @@ func NewDockerManager( cpuCFSQuota: cpuCFSQuota, enableCustomMetrics: enableCustomMetrics, configureHairpinMode: hairpinMode, - versionCache: NewVersionCache(), } dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm) if serializeImagePulls { @@ -254,21 +253,15 @@ func NewDockerManager( optf(dm) } - apiVersion, err := dm.APIVersion() - if err != nil { - glog.Errorf("Failed to get api version from docker %v", err) + // initialize versionCache with a updater + dm.versionCache = cache.NewVersionCache(func() (kubecontainer.Version, kubecontainer.Version, error) { + return dm.getVersionInfo() + }) + // update version cache periodically. + if dm.machineInfo != nil { + dm.versionCache.UpdateCachePeriodly(dm.machineInfo.MachineID) } - daemonVersion, err := dm.Version() - if err != nil { - glog.Errorf("Failed to get daemon version from docker %v", err) - } - - // Update version cache periodically - go wait.Until(func() { - dm.versionCache.Update(dm.machineInfo.MachineID, apiVersion, daemonVersion) - }, 5*time.Second, wait.NeverStop) - return dm } @@ -608,7 +601,10 @@ func (dm *DockerManager) runContainer( } // If current api version is newer than docker 1.10 requested, set OomScoreAdj to HostConfig - if dm.checkDockerAPIVersion(dockerv110APIVersion) >= 0 { + result, err := dm.checkDockerAPIVersion(dockerv110APIVersion) + if err != nil { + glog.Errorf("Failed to check docker api version: %v", err) + } else if result >= 0 { hc.OomScoreAdj = oomScoreAdj } @@ -1554,12 +1550,15 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe } func (dm *DockerManager) applyOOMScoreAdjIfNeeded(container *api.Container, containerInfo *docker.Container) error { - // Compare current API version with expected api version - result := dm.checkDockerAPIVersion(dockerv110APIVersion) + // Compare current API version with expected api version. + result, err := dm.checkDockerAPIVersion(dockerv110APIVersion) + if err != nil { + return fmt.Errorf("Failed to check docker api version: %v", err) + } // If current api version is older than OOMScoreAdj requested, use the old way. if result < 0 { if err := dm.applyOOMScoreAdj(container, containerInfo); err != nil { - return fmt.Errorf("failed to apply oom-score-adj to container %q- %v", err, containerInfo.Name) + return fmt.Errorf("Failed to apply oom-score-adj to container %q- %v", err, containerInfo.Name) } } @@ -1582,21 +1581,17 @@ func (dm *DockerManager) calculateOomScoreAdj(container *api.Container) int { return oomScoreAdj } -// getCachedApiVersion gets cached api version of docker runtime. -func (dm *DockerManager) getCachedApiVersion() (kubecontainer.Version, error) { - apiVersion, _, err := dm.versionCache.Get(dm.machineInfo.MachineID) +// getCachedVersionInfo gets cached version info of docker runtime. +func (dm *DockerManager) getCachedVersionInfo() (kubecontainer.Version, kubecontainer.Version, error) { + apiVersion, daemonVersion, err := dm.versionCache.Get(dm.machineInfo.MachineID) if err != nil { glog.Errorf("Failed to get cached docker api version %v ", err) } - // If we got nil apiVersion, try to get api version directly. - if apiVersion == nil { - apiVersion, err = dm.APIVersion() - if err != nil { - glog.Errorf("Failed to get docker api version directly %v ", err) - } - dm.versionCache.Update(dm.machineInfo.MachineID, apiVersion, nil) + // If we got nil versions, try to update version info. + if apiVersion == nil || daemonVersion == nil { + dm.versionCache.Update(dm.machineInfo.MachineID) } - return apiVersion, err + return apiVersion, daemonVersion, err } // checkDockerAPIVersion checks current docker API version against expected version. @@ -1604,17 +1599,17 @@ func (dm *DockerManager) getCachedApiVersion() (kubecontainer.Version, error) { // 1 : newer than expected version // -1: older than expected version // 0 : same version -func (dm *DockerManager) checkDockerAPIVersion(expectedVersion string) int { - apiVersion, err := dm.getCachedApiVersion() +func (dm *DockerManager) checkDockerAPIVersion(expectedVersion string) (int, error) { + apiVersion, _, err := dm.getCachedVersionInfo() if err != nil { - glog.Errorf("Failed to get cached docker api version %v ", err) + return 0, err } result, err := apiVersion.Compare(expectedVersion) if err != nil { - glog.Errorf("Failed to compare current docker api version %v with OOMScoreAdj supported Docker version %q - %v", + return 0, fmt.Errorf("Failed to compare current docker api version %v with OOMScoreAdj supported Docker version %q - %v", apiVersion, expectedVersion, err) } - return result + return result, nil } func addNDotsOption(resolvFilePath string) error { @@ -2200,3 +2195,17 @@ func (dm *DockerManager) GetPodStatus(uid types.UID, name, namespace string) (*k podStatus.ContainerStatuses = containerStatuses return podStatus, nil } + +// getVersionInfo returns apiVersion & daemonVersion of docker runtime +func (dm *DockerManager) getVersionInfo() (kubecontainer.Version, kubecontainer.Version, error) { + apiVersion, err := dm.APIVersion() + if err != nil { + return nil, nil, err + } + daemonVersion, err := dm.Version() + if err != nil { + return nil, nil, err + } + + return apiVersion, daemonVersion, nil +} diff --git a/pkg/kubelet/dockertools/manager_test.go b/pkg/kubelet/dockertools/manager_test.go index 089e60564985..0bf4997bbdd1 100644 --- a/pkg/kubelet/dockertools/manager_test.go +++ b/pkg/kubelet/dockertools/manager_test.go @@ -1933,7 +1933,7 @@ func TestGetPodStatusNoSuchContainer(t *testing.T) { }, }}) - fakeDocker.Errors = map[string]error{"inspect": &docker.NoSuchContainer{}} + fakeDocker.InjectErrors(map[string]error{"inspect": &docker.NoSuchContainer{}}) runSyncPod(t, dm, fakeDocker, pod, nil, false) // Verify that we will try to start new contrainers even if the inspections diff --git a/pkg/kubelet/dockertools/version_cache.go b/pkg/kubelet/util/cache/version_cache.go similarity index 60% rename from pkg/kubelet/dockertools/version_cache.go rename to pkg/kubelet/util/cache/version_cache.go index 278e5e5c507e..151889c05747 100644 --- a/pkg/kubelet/dockertools/version_cache.go +++ b/pkg/kubelet/util/cache/version_cache.go @@ -14,19 +14,23 @@ See the License for the specific language governing permissions and limitations under the License. */ -package dockertools +package cache import ( "fmt" "sync" + "time" + + "github.com/golang/glog" - "github.com/golang/groupcache/lru" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/util/wait" ) type VersionCache struct { - lock sync.RWMutex - cache *lru.Cache + lock sync.RWMutex + cache map[string]versionInfo + updater func() (kubecontainer.Version, kubecontainer.Version, error) } // versionInfo caches api version and daemon version. @@ -37,15 +41,24 @@ type versionInfo struct { const maxVersionCacheEntries = 1000 -func NewVersionCache() *VersionCache { - return &VersionCache{cache: lru.New(maxVersionCacheEntries)} +func NewVersionCache(f func() (kubecontainer.Version, kubecontainer.Version, error)) *VersionCache { + return &VersionCache{ + cache: map[string]versionInfo{}, + updater: f, + } } // Update updates cached versionInfo by using a unique string (e.g. machineInfo) as the key. -func (c *VersionCache) Update(key string, apiVersion kubecontainer.Version, version kubecontainer.Version) { - c.lock.Lock() - defer c.lock.Unlock() - c.cache.Add(key, versionInfo{apiVersion, version}) +func (c *VersionCache) Update(key string) { + apiVersion, daemonVersion, err := c.updater() + + if err != nil { + glog.Errorf("Fail to get version info from container runtime: %v", err) + } else { + c.lock.Lock() + defer c.lock.Unlock() + c.cache[key] = versionInfo{apiVersion, daemonVersion} + } } // Get gets cached versionInfo by using a unique string (e.g. machineInfo) as the key. @@ -53,10 +66,15 @@ func (c *VersionCache) Update(key string, apiVersion kubecontainer.Version, vers func (c *VersionCache) Get(key string) (kubecontainer.Version, kubecontainer.Version, error) { c.lock.RLock() defer c.lock.RUnlock() - value, ok := c.cache.Get(key) + value, ok := c.cache[key] if !ok { return nil, nil, fmt.Errorf("Failed to get version info from cache by key: ", key) } - versions := value.(versionInfo) - return versions.apiVersion, versions.version, nil + return value.apiVersion, value.version, nil +} + +func (c *VersionCache) UpdateCachePeriodly(key string) { + go wait.Until(func() { + c.Update(key) + }, 1*time.Minute, wait.NeverStop) }