Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deviceplugin checkpoint #51744

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
78 changes: 70 additions & 8 deletions pkg/kubelet/cm/device_plugin_handler.go
Expand Up @@ -17,7 +17,10 @@ limitations under the License.
package cm

import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"sync"

"github.com/golang/glog"
Expand Down Expand Up @@ -108,7 +111,7 @@ func NewDevicePluginHandlerImpl(updateCapacityFunc func(v1.ResourceList)) (*Devi
glog.V(2).Infof("Creating Device Plugin Handler")
handler := &DevicePluginHandlerImpl{
allDevices: make(map[string]sets.String),
allocatedDevices: devicesInUse(),
allocatedDevices: make(map[string]podDevices),
}

deviceManagerMonitorCallback := func(resourceName string, added, updated, deleted []*pluginapi.Device) {
Expand Down Expand Up @@ -140,6 +143,11 @@ func NewDevicePluginHandlerImpl(updateCapacityFunc func(v1.ResourceList)) (*Devi

handler.devicePluginManager = mgr
handler.devicePluginManagerMonitorCallback = deviceManagerMonitorCallback
// Loads in allocatedDevices information from disk.
err = handler.readCheckpoint()
if err != nil {
glog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
}
return handler, nil
}

Expand Down Expand Up @@ -202,16 +210,13 @@ func (h *DevicePluginHandlerImpl) Allocate(pod *v1.Pod, container *v1.Container,
}
ret = append(ret, resp)
}
// Checkpoints device to container allocation information.
if err := h.writeCheckpoint(); err != nil {
return nil, err
}
return ret, nil
}

// devicesInUse returns a list of custom devices in use along with the
// respective pods that are using them.
func devicesInUse() map[string]podDevices {
// TODO: gets the initial state from checkpointing.
return make(map[string]podDevices)
}

// updateAllocatedDevices updates the list of GPUs in use.
// It gets a list of active pods and then frees any GPUs that are bound to
// terminated pods. Returns error on failure.
Expand All @@ -229,3 +234,60 @@ func (h *DevicePluginHandlerImpl) updateAllocatedDevices(activePods []*v1.Pod) {
podDevs.delete(podsToBeRemoved.List())
}
}

type checkpointEntry struct {
PodUID string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add json field tags here and in checkpointData.

ContainerName string
ResourceName string
DeviceID string
}

// checkpointData struct is used to store pod to device allocation information
// in a checkpoint file.
// TODO: add version control when we need to change checkpoint format.
type checkpointData struct {
Entries []checkpointEntry
}

// Checkpoints device to container allocation information to disk.
func (h *DevicePluginHandlerImpl) writeCheckpoint() error {
filepath := h.devicePluginManager.CheckpointFile()
var data checkpointData
for resourceName, podDev := range h.allocatedDevices {
for podUID, conDev := range podDev {
for conName, devs := range conDev {
for _, devId := range devs.UnsortedList() {
data.Entries = append(data.Entries, checkpointEntry{podUID, conName, resourceName, devId})
}
}
}
}
dataJson, err := json.Marshal(data)
if err != nil {
return err
}
return ioutil.WriteFile(filepath, dataJson, 0644)
}

// Reads device to container allocation information from disk, and populates
// h.allocatedDevices accordingly.
func (h *DevicePluginHandlerImpl) readCheckpoint() error {
filepath := h.devicePluginManager.CheckpointFile()
content, err := ioutil.ReadFile(filepath)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to read checkpoint file %q: %v", filepath, err)
}
glog.V(2).Infof("Read checkpoint file %s\n", filepath)
var data checkpointData
if err := json.Unmarshal(content, &data); err != nil {
return fmt.Errorf("failed to unmarshal checkpoint data: %v", err)
}
for _, entry := range data.Entries {
glog.V(2).Infof("Get checkpoint entry: %v %v %v %v\n", entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceID)
if h.allocatedDevices[entry.ResourceName] == nil {
h.allocatedDevices[entry.ResourceName] = make(podDevices)
}
h.allocatedDevices[entry.ResourceName].insert(entry.PodUID, entry.ContainerName, entry.DeviceID)
}
return nil
}
35 changes: 35 additions & 0 deletions pkg/kubelet/cm/device_plugin_handler_test.go
Expand Up @@ -128,6 +128,41 @@ func (m *DevicePluginManagerTestStub) Stop() error {
return nil
}

func (m *DevicePluginManagerTestStub) CheckpointFile() string {
return "/tmp/device-plugin-checkpoint"
}

func TestCheckpoint(t *testing.T) {
resourceName1 := "domain1.com/resource1"
resourceName2 := "domain2.com/resource2"

m, err := NewDevicePluginManagerTestStub()
as := assert.New(t)
as.Nil(err)

testDevicePluginHandler := &DevicePluginHandlerImpl{
devicePluginManager: m,
allDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]podDevices),
}
testDevicePluginHandler.allocatedDevices[resourceName1] = make(podDevices)
testDevicePluginHandler.allocatedDevices[resourceName1].insert("pod1", "con1", "dev1")
testDevicePluginHandler.allocatedDevices[resourceName1].insert("pod1", "con1", "dev2")
testDevicePluginHandler.allocatedDevices[resourceName1].insert("pod1", "con2", "dev1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that it matters for this test but I found using dev1 again in this line and the next line a bit confusing at first.

testDevicePluginHandler.allocatedDevices[resourceName1].insert("pod2", "con1", "dev1")
testDevicePluginHandler.allocatedDevices[resourceName2] = make(podDevices)
testDevicePluginHandler.allocatedDevices[resourceName2].insert("pod1", "con1", "dev3")
testDevicePluginHandler.allocatedDevices[resourceName2].insert("pod1", "con1", "dev4")

err = testDevicePluginHandler.writeCheckpoint()
as.Nil(err)
expected := testDevicePluginHandler.allocatedDevices
testDevicePluginHandler.allocatedDevices = make(map[string]podDevices)
err = testDevicePluginHandler.readCheckpoint()
as.Nil(err)
as.Equal(expected, testDevicePluginHandler.allocatedDevices)
}

func TestPodContainerDeviceAllocation(t *testing.T) {
flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
var logLevel string
Expand Down
24 changes: 20 additions & 4 deletions pkg/kubelet/deviceplugin/manager.go
Expand Up @@ -63,7 +63,7 @@ func NewManagerImpl(socketPath string, f MonitorCallback) (*ManagerImpl, error)
}, nil
}

func removeContents(dir string) error {
func (m *ManagerImpl) removeContents(dir string) error {
d, err := os.Open(dir)
if err != nil {
return err
Expand All @@ -74,15 +74,31 @@ func removeContents(dir string) error {
return err
}
for _, name := range names {
// TODO: skip checkpoint file and check for file type.
err = os.RemoveAll(filepath.Join(dir, name))
filePath := filepath.Join(dir, name)
if filePath == m.CheckpointFile() {
continue
}
stat, err := os.Stat(filePath)
if err != nil {
glog.Errorf("Failed to stat file %v: %v", filePath, err)
continue
}
if stat.IsDir() {
continue
}
err = os.RemoveAll(filePath)
if err != nil {
return err
}
}
return nil
}

// CheckpointFile returns device plugin checkpoint file path.
func (m *ManagerImpl) CheckpointFile() string {
return filepath.Join(m.socketdir, "kubelet_internal_checkpoint")
}

// Start starts the Device Plugin Manager
func (m *ManagerImpl) Start() error {
glog.V(2).Infof("Starting Device Plugin manager")
Expand All @@ -92,7 +108,7 @@ func (m *ManagerImpl) Start() error {

// Removes all stale sockets in m.socketdir. Device plugins can monitor
// this and use it as a signal to re-register with the new Kubelet.
if err := removeContents(m.socketdir); err != nil {
if err := m.removeContents(m.socketdir); err != nil {
glog.Errorf("Fail to clean up stale contents under %s: %+v", m.socketdir, err)
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/kubelet/deviceplugin/types.go
Expand Up @@ -29,6 +29,7 @@ type MonitorCallback func(resourceName string, added, updated, deleted []*plugin
type Manager interface {
// Start starts the gRPC Registration service.
Start() error

// Devices is the map of devices that have registered themselves
// against the manager.
// The map key is the ResourceName of the device plugins.
Expand All @@ -40,6 +41,9 @@ type Manager interface {

// Stop stops the manager.
Stop() error

// Returns checkpoint file path.
CheckpointFile() string
}

// TODO: evaluate whether we need these error definitions.
Expand Down