Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kubelet plugin manager #73891

Merged
merged 1 commit into from May 31, 2019
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -167,6 +167,7 @@ pkg/kubelet/dockershim/network/testing
pkg/kubelet/events
pkg/kubelet/lifecycle
pkg/kubelet/metrics
pkg/kubelet/pluginmanager/pluginwatcher
pkg/kubelet/pod/testing
pkg/kubelet/preemption
pkg/kubelet/prober
@@ -180,7 +181,6 @@ pkg/kubelet/status
pkg/kubelet/status/testing
pkg/kubelet/sysctl
pkg/kubelet/types
pkg/kubelet/util/pluginwatcher
pkg/kubemark
pkg/master
pkg/master/controller/crdregistration
@@ -21,8 +21,8 @@ set -o pipefail
KUBE_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../" && pwd -P)"
KUBELET_PLUGIN_REGISTRATION_V1ALPHA="${KUBE_ROOT}/pkg/kubelet/apis/pluginregistration/v1alpha1/"
KUBELET_PLUGIN_REGISTRATION_V1BETA="${KUBE_ROOT}/pkg/kubelet/apis/pluginregistration/v1beta1/"
KUBELET_EXAMPLE_PLUGIN_V1BETA1="${KUBE_ROOT}/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1/"
KUBELET_EXAMPLE_PLUGIN_V1BETA2="${KUBE_ROOT}/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2/"
KUBELET_EXAMPLE_PLUGIN_V1BETA1="${KUBE_ROOT}/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta1/"
KUBELET_EXAMPLE_PLUGIN_V1BETA2="${KUBE_ROOT}/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta2/"

source "${KUBE_ROOT}/hack/lib/protoc.sh"
kube::protoc::generate_proto "${KUBELET_PLUGIN_REGISTRATION_V1ALPHA}"
@@ -70,6 +70,8 @@ go_library(
"//pkg/kubelet/nodestatus:go_default_library",
"//pkg/kubelet/oom:go_default_library",
"//pkg/kubelet/pleg:go_default_library",
"//pkg/kubelet/pluginmanager:go_default_library",
"//pkg/kubelet/pluginmanager/cache:go_default_library",
"//pkg/kubelet/pod:go_default_library",
"//pkg/kubelet/preemption:go_default_library",
"//pkg/kubelet/prober:go_default_library",
@@ -90,7 +92,6 @@ go_library(
"//pkg/kubelet/util:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/kubelet/util/manager:go_default_library",
"//pkg/kubelet/util/pluginwatcher:go_default_library",
"//pkg/kubelet/util/queue:go_default_library",
"//pkg/kubelet/util/sliceutils:go_default_library",
"//pkg/kubelet/volumemanager:go_default_library",
@@ -189,6 +190,7 @@ go_test(
"//pkg/kubelet/network/dns:go_default_library",
"//pkg/kubelet/nodestatus:go_default_library",
"//pkg/kubelet/pleg:go_default_library",
"//pkg/kubelet/pluginmanager:go_default_library",
"//pkg/kubelet/pod:go_default_library",
"//pkg/kubelet/pod/testing:go_default_library",
"//pkg/kubelet/prober/results:go_default_library",
@@ -296,6 +298,7 @@ filegroup(
"//pkg/kubelet/nodestatus:all-srcs",
"//pkg/kubelet/oom:all-srcs",
"//pkg/kubelet/pleg:all-srcs",
"//pkg/kubelet/pluginmanager:all-srcs",
"//pkg/kubelet/pod:all-srcs",
"//pkg/kubelet/preemption:all-srcs",
"//pkg/kubelet/prober:all-srcs",
@@ -32,8 +32,8 @@ go_library(
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/eviction/api:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/pluginmanager/cache:go_default_library",
"//pkg/kubelet/status:go_default_library",
"//pkg/kubelet/util/pluginwatcher:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
@@ -28,8 +28,8 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"

"fmt"
@@ -100,7 +100,7 @@ type ContainerManager interface {
// GetPluginRegistrationHandler returns a plugin registration handler
// The pluginwatcher's Handlers allow to have a single module for handling
// registration.
GetPluginRegistrationHandler() pluginwatcher.PluginHandler
GetPluginRegistrationHandler() cache.PluginHandler

// GetDevices returns information about the devices assigned to pods and containers
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
@@ -52,10 +52,10 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/oom"
@@ -620,7 +620,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
return nil
}

func (cm *containerManagerImpl) GetPluginRegistrationHandler() pluginwatcher.PluginHandler {
func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler {
return cm.deviceManager.GetWatcherHandler()
}

@@ -27,8 +27,8 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)

@@ -80,7 +80,7 @@ func (cm *containerManagerStub) GetCapacity() v1.ResourceList {
return c
}

func (cm *containerManagerStub) GetPluginRegistrationHandler() pluginwatcher.PluginHandler {
func (cm *containerManagerStub) GetPluginRegistrationHandler() cache.PluginHandler {
return nil
}

@@ -37,8 +37,8 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/util/mount"
)
@@ -140,7 +140,7 @@ func (cm *containerManagerImpl) GetCapacity() v1.ResourceList {
return cm.capacity
}

func (cm *containerManagerImpl) GetPluginRegistrationHandler() pluginwatcher.PluginHandler {
func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler {
return nil
}

@@ -25,7 +25,7 @@ go_library(
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/metrics:go_default_library",
"//pkg/kubelet/util/pluginwatcher:go_default_library",
"//pkg/kubelet/pluginmanager/cache:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/util/selinux:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
@@ -48,14 +48,17 @@ go_test(
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
"//pkg/kubelet/apis/pluginregistration/v1:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/util/pluginwatcher:go_default_library",
"//pkg/kubelet/pluginmanager:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
@@ -42,7 +42,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics"
watcher "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/util/selinux"
)
@@ -242,15 +242,15 @@ func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.Sourc
}

// GetWatcherHandler returns the plugin handler
func (m *ManagerImpl) GetWatcherHandler() watcher.PluginHandler {
func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
if f, err := os.Create(m.socketdir + "DEPRECATION"); err != nil {
klog.Errorf("Failed to create deprecation file at %s", m.socketdir)
} else {
f.Close()
klog.V(4).Infof("created deprecation file %s", f.Name())
}

return watcher.PluginHandler(m)
return cache.PluginHandler(m)
}

// ValidatePlugin validates a plugin if the version is correct and the name has the format of an extended resource
@@ -21,7 +21,7 @@ import (
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)

@@ -59,7 +59,7 @@ func (h *ManagerStub) GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
}

// GetWatcherHandler returns plugin watcher interface
func (h *ManagerStub) GetWatcherHandler() pluginwatcher.PluginHandler {
func (h *ManagerStub) GetWatcherHandler() cache.PluginHandler {
return nil
}

@@ -32,11 +32,14 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
watcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)

@@ -51,6 +54,7 @@ func tmpSocketDir() (socketDir, socketName, pluginSocketName string, err error)
}
socketName = socketDir + "/server.sock"
pluginSocketName = socketDir + "/device-plugin.sock"
os.MkdirAll(socketDir, 0755)
return
}

@@ -68,17 +72,17 @@ func TestNewManagerImplStart(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(socketDir)
m, _, p := setup(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName)
cleanup(t, m, p, nil)
cleanup(t, m, p)
// Stop should tolerate being called more than once.
cleanup(t, m, p, nil)
cleanup(t, m, p)
}

func TestNewManagerImplStartProbeMode(t *testing.T) {
socketDir, socketName, pluginSocketName, err := tmpSocketDir()
require.NoError(t, err)
defer os.RemoveAll(socketDir)
m, _, p, w := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName)
cleanup(t, m, p, w)
m, _, p, _ := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName)
cleanup(t, m, p)
}

// Tests that the device plugin manager correctly handles registration and re-registration by
@@ -144,7 +148,7 @@ func TestDevicePluginReRegistration(t *testing.T) {
require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of plugin previously registered should be removed.")
p2.Stop()
p3.Stop()
cleanup(t, m, p1, nil)
cleanup(t, m, p1)
}
}

@@ -165,7 +169,7 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
{ID: "Dev3", Health: pluginapi.Healthy},
}

m, ch, p1, w := setupInProbeMode(t, devs, nil, socketName, pluginSocketName)
m, ch, p1, _ := setupInProbeMode(t, devs, nil, socketName, pluginSocketName)

// Wait for the first callback to be issued.
select {
@@ -213,7 +217,7 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of previous registered should be removed")
p2.Stop()
p3.Stop()
cleanup(t, m, p1, w)
cleanup(t, m, p1)
}

func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string) (Manager, <-chan interface{}) {
@@ -247,12 +251,21 @@ func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName
return p
}

func setupPluginWatcher(pluginSocketName string, m Manager) *pluginwatcher.Watcher {
w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName), "" /* deprecatedSockDir */)
w.AddHandler(watcherapi.DevicePlugin, m.GetWatcherHandler())
w.Start()
func setupPluginManager(t *testing.T, pluginSocketName string, m Manager) pluginmanager.PluginManager {
pluginManager := pluginmanager.NewPluginManager(
filepath.Dir(pluginSocketName), /* sockDir */
"", /* deprecatedSockDir */
&record.FakeRecorder{},
)

return w
runPluginManager(pluginManager)
pluginManager.AddHandler(watcherapi.DevicePlugin, m.GetWatcherHandler())
return pluginManager
}

func runPluginManager(pluginManager pluginmanager.PluginManager) {
sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return true })
go pluginManager.Run(sourcesReady, wait.NeverStop)
}

func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub) {
@@ -261,19 +274,16 @@ func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, soc
return m, updateChan, p
}

func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub, *pluginwatcher.Watcher) {
func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub, pluginmanager.PluginManager) {
m, updateChan := setupDeviceManager(t, devs, callback, socketName)
w := setupPluginWatcher(pluginSocketName, m)
pm := setupPluginManager(t, pluginSocketName, m)
p := setupDevicePlugin(t, devs, pluginSocketName)
return m, updateChan, p, w
return m, updateChan, p, pm
}

func cleanup(t *testing.T, m Manager, p *Stub, w *pluginwatcher.Watcher) {
func cleanup(t *testing.T, m Manager, p *Stub) {
p.Stop()
m.Stop()
if w != nil {
require.NoError(t, w.Stop())
}
}

func TestUpdateCapacityAllocatable(t *testing.T) {
@@ -24,7 +24,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
watcher "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)

@@ -54,7 +54,7 @@ type Manager interface {
// GetCapacity returns the amount of available device plugin resource capacity, resource allocatable
// and inactive device plugin resources previously registered on the node.
GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
GetWatcherHandler() watcher.PluginHandler
GetWatcherHandler() cache.PluginHandler

// GetDevices returns information about the devices assigned to pods and containers
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.