Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use OomScoreAdj in kubelet for newer docker api #21741

Merged
merged 3 commits into from Apr 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/kubelet/container/runtime.go
Expand Up @@ -55,8 +55,10 @@ type Runtime interface {

// Version returns the version information of the container runtime.
Version() (Version, error)
// APIVersion returns the API version information of the container
// runtime. This may be different from the runtime engine's version.

// APIVersion returns the cached API version information of the container
// runtime. Implementation is expected to update this cache periodically.
// This may be different from the runtime engine's version.
// TODO(random-liu): We should fold this into Version()
APIVersion() (Version, error)
// Status returns error if the runtime is unhealthy; nil otherwise.
Expand Down
2 changes: 2 additions & 0 deletions pkg/kubelet/dockertools/fake_docker_client.go
Expand Up @@ -402,6 +402,8 @@ func (f *FakeDockerClient) PullImage(opts docker.PullImageOptions, auth docker.A
}

func (f *FakeDockerClient) Version() (*docker.Env, error) {
f.Lock()
defer f.Unlock()
return &f.VersionInfo, f.popError("version")
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/kubelet/dockertools/fake_manager.go
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/network"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/cache"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/oom"
Expand Down Expand Up @@ -52,6 +53,9 @@ func NewFakeDockerManager(
burst, containerLogsDir, osInterface, networkPlugin, runtimeHelper, httpClient, &NativeExecHandler{},
fakeOOMAdjuster, fakeProcFs, false, imageBackOff, false, false, true)
dm.dockerPuller = &FakeDockerPuller{}
dm.versionCache = cache.NewVersionCache(func() (kubecontainer.Version, kubecontainer.Version, error) {
return dm.getVersionInfo()
})
return dm
}

Expand Down
127 changes: 112 additions & 15 deletions pkg/kubelet/dockertools/manager.go
Expand Up @@ -46,6 +46,7 @@ import (
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/qos"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/cache"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext"
Expand All @@ -62,6 +63,8 @@ const (

minimumDockerAPIVersion = "1.20"

dockerv110APIVersion = "1.21"

// ndots specifies the minimum number of dots that a domain name must contain for the resolver to consider it as FQDN (fully-qualified)
// we want to able to consider SRV lookup names like _dns._udp.kube-dns.default.svc to be considered relative.
// hence, setting ndots to be 5.
Expand Down Expand Up @@ -156,6 +159,9 @@ type DockerManager struct {
// A false value means the kubelet just backs off from setting it,
// it might already be true.
configureHairpinMode bool

// The api version cache of docker daemon.
versionCache *cache.VersionCache
}

// A subset of the pod.Manager interface extracted for testing purposes.
Expand Down Expand Up @@ -247,6 +253,15 @@ func NewDockerManager(
optf(dm)
}

// initialize versionCache with a updater
dm.versionCache = cache.NewVersionCache(func() (kubecontainer.Version, kubecontainer.Version, error) {
return dm.getVersionInfo()
})
// update version cache periodically.
if dm.machineInfo != nil {
dm.versionCache.UpdateCachePeriodly(dm.machineInfo.MachineID)
}

return dm
}

Expand Down Expand Up @@ -503,7 +518,8 @@ func (dm *DockerManager) runContainer(
ipcMode string,
utsMode string,
pidMode string,
restartCount int) (kubecontainer.ContainerID, error) {
restartCount int,
oomScoreAdj int) (kubecontainer.ContainerID, error) {

dockerName := KubeletContainerName{
PodFullName: kubecontainer.GetPodFullName(pod),
Expand Down Expand Up @@ -584,6 +600,14 @@ func (dm *DockerManager) runContainer(
SecurityOpt: securityOpts,
}

// If current api version is newer than docker 1.10 requested, set OomScoreAdj to HostConfig
result, err := dm.checkDockerAPIVersion(dockerv110APIVersion)
if err != nil {
glog.Errorf("Failed to check docker api version: %v", err)
} else if result >= 0 {
hc.OomScoreAdj = oomScoreAdj
}

if dm.cpuCFSQuota {
// if cpuLimit.Amount is nil, then the appropriate default value is returned to allow full usage of cpu resource.
cpuQuota, cpuPeriod := milliCPUToQuota(cpuLimit.MilliValue())
Expand Down Expand Up @@ -1429,17 +1453,7 @@ func (dm *DockerManager) applyOOMScoreAdj(container *api.Container, containerInf
}
return err
}
// Set OOM score of the container based on the priority of the container.
// Processes in lower-priority pods should be killed first if the system runs out of memory.
// The main pod infrastructure container is considered high priority, since if it is killed the
// whole pod will die.
// TODO: Cache this value.
var oomScoreAdj int
if containerInfo.Name == PodInfraContainerName {
oomScoreAdj = qos.PodInfraOOMAdj
} else {
oomScoreAdj = qos.GetContainerOOMScoreAdjust(container, int64(dm.machineInfo.MemoryCapacity))
}
oomScoreAdj := dm.calculateOomScoreAdj(container)
if err = dm.oomAdjuster.ApplyOOMScoreAdjContainer(cgroupName, oomScoreAdj, 5); err != nil {
if err == os.ErrNotExist {
// Container exited. We cannot do anything about it. Ignore this error.
Expand Down Expand Up @@ -1473,7 +1487,10 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe
if usesHostNetwork(pod) {
utsMode = namespaceModeHost
}
id, err := dm.runContainer(pod, container, opts, ref, netMode, ipcMode, utsMode, pidMode, restartCount)

oomScoreAdj := dm.calculateOomScoreAdj(container)

id, err := dm.runContainer(pod, container, opts, ref, netMode, ipcMode, utsMode, pidMode, restartCount, oomScoreAdj)
if err != nil {
return kubecontainer.ContainerID{}, fmt.Errorf("runContainer: %v", err)
}
Expand Down Expand Up @@ -1512,9 +1529,12 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe
return kubecontainer.ContainerID{}, fmt.Errorf("can't get init PID for container %q", id)
}

if err := dm.applyOOMScoreAdj(container, containerInfo); err != nil {
return kubecontainer.ContainerID{}, fmt.Errorf("failed to apply oom-score-adj to container %q- %v", err, containerInfo.Name)
// Check if current docker version is higher than 1.10. Otherwise, we have to apply OOMScoreAdj instead of using docker API.
err = dm.applyOOMScoreAdjIfNeeded(container, containerInfo)
if err != nil {
return kubecontainer.ContainerID{}, err
}

// The addNDotsOption call appends the ndots option to the resolv.conf file generated by docker.
// This resolv.conf file is shared by all containers of the same pod, and needs to be modified only once per pod.
// we modify it when the pause container is created since it is the first container created in the pod since it holds
Expand All @@ -1529,6 +1549,69 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe
return id, err
}

func (dm *DockerManager) applyOOMScoreAdjIfNeeded(container *api.Container, containerInfo *docker.Container) error {
// Compare current API version with expected api version.
result, err := dm.checkDockerAPIVersion(dockerv110APIVersion)
if err != nil {
return fmt.Errorf("Failed to check docker api version: %v", err)
}
// If current api version is older than OOMScoreAdj requested, use the old way.
if result < 0 {
if err := dm.applyOOMScoreAdj(container, containerInfo); err != nil {
return fmt.Errorf("Failed to apply oom-score-adj to container %q- %v", err, containerInfo.Name)
}
}

return nil
}

func (dm *DockerManager) calculateOomScoreAdj(container *api.Container) int {
// Set OOM score of the container based on the priority of the container.
// Processes in lower-priority pods should be killed first if the system runs out of memory.
// The main pod infrastructure container is considered high priority, since if it is killed the
// whole pod will die.
var oomScoreAdj int
if container.Name == PodInfraContainerName {
oomScoreAdj = qos.PodInfraOOMAdj
} else {
oomScoreAdj = qos.GetContainerOOMScoreAdjust(container, int64(dm.machineInfo.MemoryCapacity))

}

return oomScoreAdj
}

// getCachedVersionInfo gets cached version info of docker runtime.
func (dm *DockerManager) getCachedVersionInfo() (kubecontainer.Version, kubecontainer.Version, error) {
apiVersion, daemonVersion, err := dm.versionCache.Get(dm.machineInfo.MachineID)
if err != nil {
glog.Errorf("Failed to get cached docker api version %v ", err)
}
// If we got nil versions, try to update version info.
if apiVersion == nil || daemonVersion == nil {
dm.versionCache.Update(dm.machineInfo.MachineID)
}
return apiVersion, daemonVersion, err
}

// checkDockerAPIVersion checks current docker API version against expected version.
// Return:
// 1 : newer than expected version
// -1: older than expected version
// 0 : same version
func (dm *DockerManager) checkDockerAPIVersion(expectedVersion string) (int, error) {
apiVersion, _, err := dm.getCachedVersionInfo()
if err != nil {
return 0, err
}
result, err := apiVersion.Compare(expectedVersion)
if err != nil {
return 0, fmt.Errorf("Failed to compare current docker api version %v with OOMScoreAdj supported Docker version %q - %v",
apiVersion, expectedVersion, err)
}
return result, nil
}

func addNDotsOption(resolvFilePath string) error {
if len(resolvFilePath) == 0 {
glog.Errorf("ResolvConfPath is empty.")
Expand Down Expand Up @@ -2112,3 +2195,17 @@ func (dm *DockerManager) GetPodStatus(uid types.UID, name, namespace string) (*k
podStatus.ContainerStatuses = containerStatuses
return podStatus, nil
}

// getVersionInfo returns apiVersion & daemonVersion of docker runtime
func (dm *DockerManager) getVersionInfo() (kubecontainer.Version, kubecontainer.Version, error) {
apiVersion, err := dm.APIVersion()
if err != nil {
return nil, nil, err
}
daemonVersion, err := dm.Version()
if err != nil {
return nil, nil, err
}

return apiVersion, daemonVersion, nil
}
2 changes: 1 addition & 1 deletion pkg/kubelet/dockertools/manager_test.go
Expand Up @@ -1933,7 +1933,7 @@ func TestGetPodStatusNoSuchContainer(t *testing.T) {
},
}})

fakeDocker.Errors = map[string]error{"inspect": &docker.NoSuchContainer{}}
fakeDocker.InjectErrors(map[string]error{"inspect": &docker.NoSuchContainer{}})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@resouer Thanks for the cleanup! :)

runSyncPod(t, dm, fakeDocker, pod, nil, false)

// Verify that we will try to start new contrainers even if the inspections
Expand Down
80 changes: 80 additions & 0 deletions pkg/kubelet/util/cache/version_cache.go
@@ -0,0 +1,80 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
"fmt"
"sync"
"time"

"github.com/golang/glog"

kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util/wait"
)

type VersionCache struct {
lock sync.RWMutex
cache map[string]versionInfo
updater func() (kubecontainer.Version, kubecontainer.Version, error)
}

// versionInfo caches api version and daemon version.
type versionInfo struct {
apiVersion kubecontainer.Version
version kubecontainer.Version
}

const maxVersionCacheEntries = 1000

func NewVersionCache(f func() (kubecontainer.Version, kubecontainer.Version, error)) *VersionCache {
return &VersionCache{
cache: map[string]versionInfo{},
updater: f,
}
}

// Update updates cached versionInfo by using a unique string (e.g. machineInfo) as the key.
func (c *VersionCache) Update(key string) {
apiVersion, daemonVersion, err := c.updater()

if err != nil {
glog.Errorf("Fail to get version info from container runtime: %v", err)
} else {
c.lock.Lock()
defer c.lock.Unlock()
c.cache[key] = versionInfo{apiVersion, daemonVersion}
}
}

// Get gets cached versionInfo by using a unique string (e.g. machineInfo) as the key.
// It returns apiVersion first and followed by daemon version.
func (c *VersionCache) Get(key string) (kubecontainer.Version, kubecontainer.Version, error) {
c.lock.RLock()
defer c.lock.RUnlock()
value, ok := c.cache[key]
if !ok {
return nil, nil, fmt.Errorf("Failed to get version info from cache by key: ", key)
}
return value.apiVersion, value.version, nil
}

func (c *VersionCache) UpdateCachePeriodly(key string) {
go wait.Until(func() {
c.Update(key)
}, 1*time.Minute, wait.NeverStop)
}