Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #102882: devicemanager: checkpoint: support pre-1.20 data #106295

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kubelet/cm/devicemanager/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ go_test(
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1:go_default_library",
"//staging/src/k8s.io/kubelet/pkg/apis/pluginregistration/v1:go_default_library",
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
Expand Down
8 changes: 7 additions & 1 deletion pkg/kubelet/cm/devicemanager/checkpoint/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["checkpoint.go"],
srcs = [
"checkpoint.go",
"checkpointv1.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/checkpointmanager/checksum:go_default_library",
"//pkg/kubelet/checkpointmanager/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)

Expand Down
17 changes: 11 additions & 6 deletions pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// DeviceManagerCheckpoint defines the operations to retrieve pod devices
type DeviceManagerCheckpoint interface {
checkpointmanager.Checkpoint
GetData() ([]PodDevicesEntry, map[string][]string)
GetDataInLatestFormat() ([]PodDevicesEntry, map[string][]string)
}

// DevicesPerNUMA represents device ids obtained from device plugin per NUMA node id
Expand Down Expand Up @@ -72,9 +72,13 @@ func (dev DevicesPerNUMA) Devices() sets.String {
return result
}

// New returns an instance of Checkpoint
func New(devEntries []PodDevicesEntry,
devices map[string][]string) DeviceManagerCheckpoint {
// New returns an instance of Checkpoint - must be an alias for the most recent version
func New(devEntries []PodDevicesEntry, devices map[string][]string) DeviceManagerCheckpoint {
return NewV2(devEntries, devices)
}

// NewV2 returns an instance of Checkpoint (k8s >= 1.20)
func NewV2(devEntries []PodDevicesEntry, devices map[string][]string) DeviceManagerCheckpoint {
return &Data{
Data: checkpointData{
PodDeviceEntries: devEntries,
Expand All @@ -99,7 +103,8 @@ func (cp *Data) VerifyChecksum() error {
return cp.Checksum.Verify(cp.Data)
}

// GetData returns device entries and registered devices
func (cp *Data) GetData() ([]PodDevicesEntry, map[string][]string) {
// GetDataInLatestFormat returns device entries and registered devices in the *most recent*
// checkpoint format, *not* in the original format stored on disk.
func (cp *Data) GetDataInLatestFormat() ([]PodDevicesEntry, map[string][]string) {
return cp.Data.PodDeviceEntries, cp.Data.RegisteredDevices
}
124 changes: 124 additions & 0 deletions pkg/kubelet/cm/devicemanager/checkpoint/checkpointv1.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package checkpoint

import (
"encoding/json"
"hash/fnv"
"strings"

"github.com/davecgh/go-spew/spew"

"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
)

// PodDevicesEntryV1 connects pod information to devices, without topology information (k8s <= 1.19)
type PodDevicesEntryV1 struct {
PodUID string
ContainerName string
ResourceName string
DeviceIDs []string
AllocResp []byte
}

// checkpointDataV1 struct is used to store pod to device allocation information
// in a checkpoint file, without topology information (k8s <= 1.19)
type checkpointDataV1 struct {
PodDeviceEntries []PodDevicesEntryV1
RegisteredDevices map[string][]string
}

// checksum compute the checksum using the same algorithms (and data type names) k8s 1.19 used.
// We need this special code path to be able to correctly validate the checksum k8s 1.19 wrote.
// credits to https://github.com/kubernetes/kubernetes/pull/102717/commits/353f93895118d2ffa2d59a29a1fbc225160ea1d6
func (cp checkpointDataV1) checksum() checksum.Checksum {
printer := spew.ConfigState{
Indent: " ",
SortKeys: true,
DisableMethods: true,
SpewKeys: true,
}

object := printer.Sprintf("%#v", cp)
object = strings.Replace(object, "checkpointDataV1", "checkpointData", 1)
object = strings.Replace(object, "PodDevicesEntryV1", "PodDevicesEntry", -1)
hash := fnv.New32a()
printer.Fprintf(hash, "%v", object)
return checksum.Checksum(hash.Sum32())
}

// DataV1 holds checkpoint data and its checksum, in V1 (k8s <= 1.19) format
type DataV1 struct {
Data checkpointDataV1
Checksum checksum.Checksum
}

// NewV1 returns an instance of Checkpoint, in V1 (k8s <= 1.19) format.
// Users should avoid creating checkpoints in formats different than the most recent one,
// use the old formats only to validate existing checkpoint and convert them to most recent
// format. The only exception should be test code.
func NewV1(devEntries []PodDevicesEntryV1,
devices map[string][]string) DeviceManagerCheckpoint {
return &DataV1{
Data: checkpointDataV1{
PodDeviceEntries: devEntries,
RegisteredDevices: devices,
},
}
}

// MarshalCheckpoint is needed to implement the Checkpoint interface, but should not be called anymore
func (cp *DataV1) MarshalCheckpoint() ([]byte, error) {
klog.InfoS("Marshalling a device manager V1 checkpoint")
cp.Checksum = cp.Data.checksum()
return json.Marshal(*cp)
}

// UnmarshalCheckpoint returns unmarshalled data
func (cp *DataV1) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}

// VerifyChecksum verifies that passed checksum is same as calculated checksum
func (cp *DataV1) VerifyChecksum() error {
if cp.Checksum != cp.Data.checksum() {
return errors.ErrCorruptCheckpoint
}
return nil
}

// GetDataInLatestFormat returns device entries and registered devices in the *most recent*
// checkpoint format, *not* in the original format stored on disk.
func (cp *DataV1) GetDataInLatestFormat() ([]PodDevicesEntry, map[string][]string) {
var podDevs []PodDevicesEntry
for _, entryV1 := range cp.Data.PodDeviceEntries {
devsPerNuma := NewDevicesPerNUMA()
// no NUMA cell affinity was recorded. The only possible choice
// is to set all the devices affine to node 0.
devsPerNuma[0] = entryV1.DeviceIDs
podDevs = append(podDevs, PodDevicesEntry{
PodUID: entryV1.PodUID,
ContainerName: entryV1.ContainerName,
ResourceName: entryV1.ResourceName,
DeviceIDs: devsPerNuma,
AllocResp: entryV1.AllocResp,
})
}
return podDevs, cp.Data.RegisteredDevices
}
40 changes: 34 additions & 6 deletions pkg/kubelet/cm/devicemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,20 +585,32 @@ func (m *ManagerImpl) writeCheckpoint() error {
// Reads device to container allocation information from disk, and populates
// m.allocatedDevices accordingly.
func (m *ManagerImpl) readCheckpoint() error {
registeredDevs := make(map[string][]string)
devEntries := make([]checkpoint.PodDevicesEntry, 0)
cp := checkpoint.New(devEntries, registeredDevs)
err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp)
// the vast majority of time we restore a compatible checkpoint, so we try
// the current version first. Trying to restore older format checkpoints is
// relevant only in the kubelet upgrade flow, which happens once in a
// (long) while.
cp, err := m.getCheckpointV2()
if err != nil {
if err == errors.ErrCheckpointNotFound {
klog.Warningf("Failed to retrieve checkpoint for %q: %v", kubeletDeviceManagerCheckpoint, err)
return nil
}
return err

var errv1 error
// one last try: maybe it's a old format checkpoint?
cp, errv1 = m.getCheckpointV1()
if errv1 != nil {
klog.InfoS("Failed to read checkpoint V1 file", "err", errv1)
// intentionally return the parent error. We expect to restore V1 checkpoints
// a tiny fraction of time, so what matters most is the current checkpoint read error.
return err
}
klog.InfoS("Read data from a V1 checkpoint", "checkpoint", kubeletDeviceManagerCheckpoint)
}

m.mutex.Lock()
defer m.mutex.Unlock()
podDevices, registeredDevs := cp.GetData()
podDevices, registeredDevs := cp.GetDataInLatestFormat()
m.podDevices.fromCheckpointData(podDevices)
m.allocatedDevices = m.podDevices.devices()
for resource := range registeredDevs {
Expand All @@ -611,6 +623,22 @@ func (m *ManagerImpl) readCheckpoint() error {
return nil
}

func (m *ManagerImpl) getCheckpointV2() (checkpoint.DeviceManagerCheckpoint, error) {
registeredDevs := make(map[string][]string)
devEntries := make([]checkpoint.PodDevicesEntry, 0)
cp := checkpoint.New(devEntries, registeredDevs)
err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp)
return cp, err
}

func (m *ManagerImpl) getCheckpointV1() (checkpoint.DeviceManagerCheckpoint, error) {
registeredDevs := make(map[string][]string)
devEntries := make([]checkpoint.PodDevicesEntryV1, 0)
cp := checkpoint.NewV1(devEntries, registeredDevs)
err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp)
return cp, err
}

// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
func (m *ManagerImpl) UpdateAllocatedDevices() {
activePods := m.activePods()
Expand Down
23 changes: 23 additions & 0 deletions pkg/kubelet/cm/devicemanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"testing"
"time"

cadvisorapi "github.com/google/cadvisor/info/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -1080,3 +1081,25 @@ func makeDevice(devOnNUMA checkpoint.DevicesPerNUMA, topology bool) map[string]p
}
return res
}

const deviceManagerCheckpointFilename = "kubelet_internal_checkpoint"

var oldCheckpoint string = `{"Data":{"PodDeviceEntries":[{"PodUID":"13ac2284-0d19-44b7-b94f-055b032dba9b","ContainerName":"centos","ResourceName":"example.com/deviceA","DeviceIDs":["DevA3"],"AllocResp":"CiIKHUVYQU1QTEVDT01ERVZJQ0VBX0RFVkEzX1RUWTEwEgEwGhwKCi9kZXYvdHR5MTASCi9kZXYvdHR5MTAaAnJ3"},{"PodUID":"86b9a017-c9ca-4069-815f-46ca3e53c1e4","ContainerName":"centos","ResourceName":"example.com/deviceA","DeviceIDs":["DevA4"],"AllocResp":"CiIKHUVYQU1QTEVDT01ERVZJQ0VBX0RFVkE0X1RUWTExEgEwGhwKCi9kZXYvdHR5MTESCi9kZXYvdHR5MTEaAnJ3"}],"RegisteredDevices":{"example.com/deviceA":["DevA1","DevA2","DevA3","DevA4"]}},"Checksum":405612085}`

func TestReadPreNUMACheckpoint(t *testing.T) {
socketDir, socketName, _, err := tmpSocketDir()
require.NoError(t, err)
defer os.RemoveAll(socketDir)

err = ioutil.WriteFile(filepath.Join(socketDir, deviceManagerCheckpointFilename), []byte(oldCheckpoint), 0644)
require.NoError(t, err)

topologyStore := topologymanager.NewFakeManager()
nodes := []cadvisorapi.Node{{Id: 0}}
m, err := newManagerImpl(socketName, nodes, topologyStore)
require.NoError(t, err)

// TODO: we should not calling private methods, but among the existing tests we do anyway
err = m.readCheckpoint()
require.NoError(t, err)
}
5 changes: 5 additions & 0 deletions test/e2e_node/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ go_library(
"resource_collector.go",
"util.go",
"util_sriov.go",
"util_sriov_linux.go",
"util_sriov_unsupported.go",
"util_xfs_linux.go",
"util_xfs_unsupported.go",
"utils_linux.go",
Expand Down Expand Up @@ -116,6 +118,7 @@ go_test(
"cpu_manager_test.go",
"critical_pod_test.go",
"density_test.go",
"device_manager_test.go",
"device_plugin_test.go",
"docker_test.go",
"dockershim_checkpoint_test.go",
Expand Down Expand Up @@ -156,10 +159,12 @@ go_test(
"//pkg/features:go_default_library",
"//pkg/kubelet:go_default_library",
"//pkg/kubelet/apis/config:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/cm/cpumanager:go_default_library",
"//pkg/kubelet/cm/cpumanager/state:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
"//pkg/kubelet/cm/devicemanager/checkpoint:go_default_library",
"//pkg/kubelet/cm/topologymanager:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/events:go_default_library",
Expand Down