diff --git a/pkg/kubelet/cm/device_plugin_handler.go b/pkg/kubelet/cm/device_plugin_handler.go index 8360fad9d177..8da350bc873f 100644 --- a/pkg/kubelet/cm/device_plugin_handler.go +++ b/pkg/kubelet/cm/device_plugin_handler.go @@ -17,7 +17,10 @@ limitations under the License. package cm import ( + "encoding/json" "fmt" + "io/ioutil" + "os" "sync" "github.com/golang/glog" @@ -108,7 +111,7 @@ func NewDevicePluginHandlerImpl(updateCapacityFunc func(v1.ResourceList)) (*Devi glog.V(2).Infof("Creating Device Plugin Handler") handler := &DevicePluginHandlerImpl{ allDevices: make(map[string]sets.String), - allocatedDevices: devicesInUse(), + allocatedDevices: make(map[string]podDevices), } deviceManagerMonitorCallback := func(resourceName string, added, updated, deleted []*pluginapi.Device) { @@ -140,6 +143,11 @@ func NewDevicePluginHandlerImpl(updateCapacityFunc func(v1.ResourceList)) (*Devi handler.devicePluginManager = mgr handler.devicePluginManagerMonitorCallback = deviceManagerMonitorCallback + // Loads in allocatedDevices information from disk. + err = handler.readCheckpoint() + if err != nil { + glog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err) + } return handler, nil } @@ -202,16 +210,13 @@ func (h *DevicePluginHandlerImpl) Allocate(pod *v1.Pod, container *v1.Container, } ret = append(ret, resp) } + // Checkpoints device to container allocation information. + if err := h.writeCheckpoint(); err != nil { + return nil, err + } return ret, nil } -// devicesInUse returns a list of custom devices in use along with the -// respective pods that are using them. -func devicesInUse() map[string]podDevices { - // TODO: gets the initial state from checkpointing. - return make(map[string]podDevices) -} - // updateAllocatedDevices updates the list of GPUs in use. // It gets a list of active pods and then frees any GPUs that are bound to // terminated pods. Returns error on failure. @@ -229,3 +234,60 @@ func (h *DevicePluginHandlerImpl) updateAllocatedDevices(activePods []*v1.Pod) { podDevs.delete(podsToBeRemoved.List()) } } + +type checkpointEntry struct { + PodUID string + ContainerName string + ResourceName string + DeviceID string +} + +// checkpointData struct is used to store pod to device allocation information +// in a checkpoint file. +// TODO: add version control when we need to change checkpoint format. +type checkpointData struct { + Entries []checkpointEntry +} + +// Checkpoints device to container allocation information to disk. +func (h *DevicePluginHandlerImpl) writeCheckpoint() error { + filepath := h.devicePluginManager.CheckpointFile() + var data checkpointData + for resourceName, podDev := range h.allocatedDevices { + for podUID, conDev := range podDev { + for conName, devs := range conDev { + for _, devId := range devs.UnsortedList() { + data.Entries = append(data.Entries, checkpointEntry{podUID, conName, resourceName, devId}) + } + } + } + } + dataJson, err := json.Marshal(data) + if err != nil { + return err + } + return ioutil.WriteFile(filepath, dataJson, 0644) +} + +// Reads device to container allocation information from disk, and populates +// h.allocatedDevices accordingly. +func (h *DevicePluginHandlerImpl) readCheckpoint() error { + filepath := h.devicePluginManager.CheckpointFile() + content, err := ioutil.ReadFile(filepath) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to read checkpoint file %q: %v", filepath, err) + } + glog.V(2).Infof("Read checkpoint file %s\n", filepath) + var data checkpointData + if err := json.Unmarshal(content, &data); err != nil { + return fmt.Errorf("failed to unmarshal checkpoint data: %v", err) + } + for _, entry := range data.Entries { + glog.V(2).Infof("Get checkpoint entry: %v %v %v %v\n", entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceID) + if h.allocatedDevices[entry.ResourceName] == nil { + h.allocatedDevices[entry.ResourceName] = make(podDevices) + } + h.allocatedDevices[entry.ResourceName].insert(entry.PodUID, entry.ContainerName, entry.DeviceID) + } + return nil +} diff --git a/pkg/kubelet/cm/device_plugin_handler_test.go b/pkg/kubelet/cm/device_plugin_handler_test.go index e178cd9c7c89..e9df09ef2cc8 100644 --- a/pkg/kubelet/cm/device_plugin_handler_test.go +++ b/pkg/kubelet/cm/device_plugin_handler_test.go @@ -128,6 +128,41 @@ func (m *DevicePluginManagerTestStub) Stop() error { return nil } +func (m *DevicePluginManagerTestStub) CheckpointFile() string { + return "/tmp/device-plugin-checkpoint" +} + +func TestCheckpoint(t *testing.T) { + resourceName1 := "domain1.com/resource1" + resourceName2 := "domain2.com/resource2" + + m, err := NewDevicePluginManagerTestStub() + as := assert.New(t) + as.Nil(err) + + testDevicePluginHandler := &DevicePluginHandlerImpl{ + devicePluginManager: m, + allDevices: make(map[string]sets.String), + allocatedDevices: make(map[string]podDevices), + } + testDevicePluginHandler.allocatedDevices[resourceName1] = make(podDevices) + testDevicePluginHandler.allocatedDevices[resourceName1].insert("pod1", "con1", "dev1") + testDevicePluginHandler.allocatedDevices[resourceName1].insert("pod1", "con1", "dev2") + testDevicePluginHandler.allocatedDevices[resourceName1].insert("pod1", "con2", "dev1") + testDevicePluginHandler.allocatedDevices[resourceName1].insert("pod2", "con1", "dev1") + testDevicePluginHandler.allocatedDevices[resourceName2] = make(podDevices) + testDevicePluginHandler.allocatedDevices[resourceName2].insert("pod1", "con1", "dev3") + testDevicePluginHandler.allocatedDevices[resourceName2].insert("pod1", "con1", "dev4") + + err = testDevicePluginHandler.writeCheckpoint() + as.Nil(err) + expected := testDevicePluginHandler.allocatedDevices + testDevicePluginHandler.allocatedDevices = make(map[string]podDevices) + err = testDevicePluginHandler.readCheckpoint() + as.Nil(err) + as.Equal(expected, testDevicePluginHandler.allocatedDevices) +} + func TestPodContainerDeviceAllocation(t *testing.T) { flag.Set("alsologtostderr", fmt.Sprintf("%t", true)) var logLevel string diff --git a/pkg/kubelet/deviceplugin/manager.go b/pkg/kubelet/deviceplugin/manager.go index 041d53842212..4aed8708c1af 100644 --- a/pkg/kubelet/deviceplugin/manager.go +++ b/pkg/kubelet/deviceplugin/manager.go @@ -63,7 +63,7 @@ func NewManagerImpl(socketPath string, f MonitorCallback) (*ManagerImpl, error) }, nil } -func removeContents(dir string) error { +func (m *ManagerImpl) removeContents(dir string) error { d, err := os.Open(dir) if err != nil { return err @@ -74,8 +74,19 @@ func removeContents(dir string) error { return err } for _, name := range names { - // TODO: skip checkpoint file and check for file type. - err = os.RemoveAll(filepath.Join(dir, name)) + filePath := filepath.Join(dir, name) + if filePath == m.CheckpointFile() { + continue + } + stat, err := os.Stat(filePath) + if err != nil { + glog.Errorf("Failed to stat file %v: %v", filePath, err) + continue + } + if stat.IsDir() { + continue + } + err = os.RemoveAll(filePath) if err != nil { return err } @@ -83,6 +94,11 @@ func removeContents(dir string) error { return nil } +// CheckpointFile returns device plugin checkpoint file path. +func (m *ManagerImpl) CheckpointFile() string { + return filepath.Join(m.socketdir, "kubelet_internal_checkpoint") +} + // Start starts the Device Plugin Manager func (m *ManagerImpl) Start() error { glog.V(2).Infof("Starting Device Plugin manager") @@ -92,7 +108,7 @@ func (m *ManagerImpl) Start() error { // Removes all stale sockets in m.socketdir. Device plugins can monitor // this and use it as a signal to re-register with the new Kubelet. - if err := removeContents(m.socketdir); err != nil { + if err := m.removeContents(m.socketdir); err != nil { glog.Errorf("Fail to clean up stale contents under %s: %+v", m.socketdir, err) } diff --git a/pkg/kubelet/deviceplugin/types.go b/pkg/kubelet/deviceplugin/types.go index 3ce272816bb6..3a2eae6fee1a 100644 --- a/pkg/kubelet/deviceplugin/types.go +++ b/pkg/kubelet/deviceplugin/types.go @@ -29,6 +29,7 @@ type MonitorCallback func(resourceName string, added, updated, deleted []*plugin type Manager interface { // Start starts the gRPC Registration service. Start() error + // Devices is the map of devices that have registered themselves // against the manager. // The map key is the ResourceName of the device plugins. @@ -40,6 +41,9 @@ type Manager interface { // Stop stops the manager. Stop() error + + // Returns checkpoint file path. + CheckpointFile() string } // TODO: evaluate whether we need these error definitions.