Skip to content

Commit

Permalink
Merge pull request kubernetes#119307 from bart0sh/PR121-DRA-Unprepare…
Browse files Browse the repository at this point in the history
…Resources-Dont-Query-claim

DRA: don't query claims from API server
  • Loading branch information
k8s-ci-robot committed Jul 18, 2023
2 parents 6b0e66a + 0ec99fb commit 2381654
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 72 deletions.
17 changes: 10 additions & 7 deletions pkg/kubelet/cm/dra/claiminfo.go
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"sync"

resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/cm/dra/state"
Expand Down Expand Up @@ -80,14 +81,15 @@ type claimInfoCache struct {
claimInfo map[string]*ClaimInfo
}

func newClaimInfo(driverName, className string, claimUID types.UID, claimName, namespace string, podUIDs sets.Set[string]) *ClaimInfo {
func newClaimInfo(driverName, className string, claimUID types.UID, claimName, namespace string, podUIDs sets.Set[string], resourceHandles []resourcev1alpha2.ResourceHandle) *ClaimInfo {
claimInfoState := state.ClaimInfoState{
DriverName: driverName,
ClassName: className,
ClaimUID: claimUID,
ClaimName: claimName,
Namespace: namespace,
PodUIDs: podUIDs,
DriverName: driverName,
ClassName: className,
ClaimUID: claimUID,
ClaimName: claimName,
Namespace: namespace,
PodUIDs: podUIDs,
ResourceHandles: resourceHandles,
}
claimInfo := ClaimInfo{
ClaimInfoState: claimInfoState,
Expand Down Expand Up @@ -120,6 +122,7 @@ func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error)
entry.ClaimName,
entry.Namespace,
entry.PodUIDs,
entry.ResourceHandles,
)
for pluginName, cdiDevices := range entry.CDIDevices {
err := info.addCDIDevices(pluginName, cdiDevices)
Expand Down
29 changes: 6 additions & 23 deletions pkg/kubelet/cm/dra/manager.go
Expand Up @@ -140,6 +140,7 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
resourceClaim.Name,
resourceClaim.Namespace,
sets.New(string(pod.UID)),
resourceHandles,
)

// Loop through all plugins and prepare for calling NodePrepareResources.
Expand Down Expand Up @@ -342,26 +343,8 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
continue
}

// Query claim object from the API server
resourceClaim, err := m.kubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Get(
context.TODO(),
*claimName,
metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", *claimName, pod.Name, err)
}

// Grab the allocation.resourceHandles. If there are no
// allocation.resourceHandles, create a single resourceHandle with no
// content. This will trigger processing of this claim by a single
// kubelet plugin whose name matches resourceClaim.Status.DriverName.
resourceHandles := resourceClaim.Status.Allocation.ResourceHandles
if len(resourceHandles) == 0 {
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
}

// Loop through all plugins and prepare for calling NodeUnprepareResources.
for _, resourceHandle := range resourceHandles {
for _, resourceHandle := range claimInfo.ResourceHandles {
// If no DriverName is provided in the resourceHandle, we
// use the DriverName from the status
pluginName := resourceHandle.DriverName
Expand All @@ -370,14 +353,14 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
}

claim := &drapb.Claim{
Namespace: resourceClaim.Namespace,
Uid: string(resourceClaim.UID),
Name: resourceClaim.Name,
Namespace: claimInfo.Namespace,
Uid: string(claimInfo.ClaimUID),
Name: claimInfo.ClaimName,
ResourceHandle: resourceHandle.Data,
}
batches[pluginName] = append(batches[pluginName], claim)
}
claimInfos[resourceClaim.UID] = claimInfo
claimInfos[claimInfo.ClaimUID] = claimInfo
}

// Call NodeUnprepareResources for all claims in each batch.
Expand Down
56 changes: 18 additions & 38 deletions pkg/kubelet/cm/dra/manager_test.go
Expand Up @@ -714,38 +714,6 @@ func TestUnprepareResouces(t *testing.T) {
wantTimeout bool
wantResourceSkipped bool
}{
{
description: "failed to fetch resource claim",
driverName: driverName,
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "test-namespace",
UID: "test-reserved",
},
Spec: v1.PodSpec{
ResourceClaims: []v1.PodResourceClaim{
{
Name: "test-pod-claim-0",
Source: v1.ClaimSource{
ResourceClaimName: func() *string {
s := "test-pod-claim-0"
return &s
}(),
},
},
},
},
},
claimInfo: &ClaimInfo{
ClaimInfoState: state.ClaimInfoState{
DriverName: driverName,
ClaimName: "test-pod-claim-0",
Namespace: "test-namespace",
},
},
wantErr: true,
},
{
description: "plugin does not exist",
driverName: "this-plugin-does-not-exist",
Expand Down Expand Up @@ -774,6 +742,12 @@ func TestUnprepareResouces(t *testing.T) {
DriverName: driverName,
ClaimName: "another-claim-test",
Namespace: "test-namespace",
ResourceHandles: []resourcev1alpha2.ResourceHandle{
{
DriverName: driverName,
Data: "test data",
},
},
},
},
resourceClaim: &resourcev1alpha2.ResourceClaim{
Expand Down Expand Up @@ -899,6 +873,12 @@ func TestUnprepareResouces(t *testing.T) {
DriverName: driverName,
ClaimName: "test-pod-claim-2",
Namespace: "test-namespace",
ResourceHandles: []resourcev1alpha2.ResourceHandle{
{
DriverName: driverName,
Data: "test data",
},
},
},
},
resourceClaim: &resourcev1alpha2.ResourceClaim{
Expand Down Expand Up @@ -962,6 +942,12 @@ func TestUnprepareResouces(t *testing.T) {
DriverName: driverName,
ClaimName: "test-pod-claim-3",
Namespace: "test-namespace",
ResourceHandles: []resourcev1alpha2.ResourceHandle{
{
DriverName: driverName,
Data: "test data",
},
},
},
},
resourceClaim: &resourcev1alpha2.ResourceClaim{
Expand Down Expand Up @@ -1010,12 +996,6 @@ func TestUnprepareResouces(t *testing.T) {
cache: cache,
}

if test.resourceClaim != nil {
if _, err := fakeKubeClient.ResourceV1alpha2().ResourceClaims(test.pod.Namespace).Create(context.Background(), test.resourceClaim, metav1.CreateOptions{}); err != nil {
t.Fatalf("failed to create ResourceClaim %s: %+v", test.resourceClaim.Name, err)
}
}

if test.claimInfo != nil {
manager.cache.add(test.claimInfo)
}
Expand Down
54 changes: 54 additions & 0 deletions pkg/kubelet/cm/dra/state/checkpoint.go
Expand Up @@ -18,9 +18,14 @@ package state

import (
"encoding/json"
"fmt"
"hash/fnv"
"strings"

"k8s.io/apimachinery/pkg/util/dump"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
)

var _ checkpointmanager.Checkpoint = &DRAManagerCheckpoint{}
Expand All @@ -34,9 +39,20 @@ type DRAManagerCheckpoint struct {
Checksum checksum.Checksum `json:"checksum"`
}

// DraManagerCheckpoint struct is an old implementation of the DraManagerCheckpoint
type DRAManagerCheckpointWithoutResourceHandles struct {
Version string `json:"version"`
Entries ClaimInfoStateListWithoutResourceHandles `json:"entries,omitempty"`
Checksum checksum.Checksum `json:"checksum"`
}

// List of claim info to store in checkpoint
type ClaimInfoStateList []ClaimInfoState

// List of claim info to store in checkpoint
// TODO: remove in Beta
type ClaimInfoStateListWithoutResourceHandles []ClaimInfoStateWithoutResourceHandles

// NewDRAManagerCheckpoint returns an instance of Checkpoint
func NewDRAManagerCheckpoint() *DRAManagerCheckpoint {
return &DRAManagerCheckpoint{
Expand All @@ -63,6 +79,44 @@ func (dc *DRAManagerCheckpoint) VerifyChecksum() error {
ck := dc.Checksum
dc.Checksum = 0
err := ck.Verify(dc)
if err == errors.ErrCorruptCheckpoint {
// Verify with old structs without ResourceHandles field
// TODO: remove in Beta
err = verifyChecksumWithoutResourceHandles(dc, ck)
}
dc.Checksum = ck
return err
}

// verifyChecksumWithoutResourceHandles is a helper function that verifies checksum of the
// checkpoint in the old format, without ResourceHandles field.
// TODO: remove in Beta.
func verifyChecksumWithoutResourceHandles(dc *DRAManagerCheckpoint, checkSum checksum.Checksum) error {
entries := ClaimInfoStateListWithoutResourceHandles{}
for _, entry := range dc.Entries {
entries = append(entries, ClaimInfoStateWithoutResourceHandles{
DriverName: entry.DriverName,
ClassName: entry.ClassName,
ClaimUID: entry.ClaimUID,
ClaimName: entry.ClaimName,
Namespace: entry.Namespace,
PodUIDs: entry.PodUIDs,
CDIDevices: entry.CDIDevices,
})
}
oldcheckpoint := &DRAManagerCheckpointWithoutResourceHandles{
Version: checkpointVersion,
Entries: entries,
Checksum: 0,
}
// Calculate checksum for old checkpoint
object := dump.ForHash(oldcheckpoint)
object = strings.Replace(object, "DRAManagerCheckpointWithoutResourceHandles", "DRAManagerCheckpoint", 1)
object = strings.Replace(object, "ClaimInfoStateListWithoutResourceHandles", "ClaimInfoStateList", 1)
hash := fnv.New32a()
fmt.Fprintf(hash, "%v", object)
if checkSum != checksum.Checksum(hash.Sum32()) {
return errors.ErrCorruptCheckpoint
}
return nil
}
30 changes: 30 additions & 0 deletions pkg/kubelet/cm/dra/state/state_checkpoint.go
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"sync"

resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
Expand Down Expand Up @@ -54,6 +55,35 @@ type ClaimInfoState struct {
// PodUIDs is a set of pod UIDs that reference a resource
PodUIDs sets.Set[string]

// ResourceHandles is a list of opaque resource data for processing by a specific kubelet plugin
ResourceHandles []resourcev1alpha2.ResourceHandle

// CDIDevices is a map of DriverName --> CDI devices returned by the
// GRPC API call NodePrepareResource
CDIDevices map[string][]string
}

// ClaimInfoStateWithoutResourceHandles is an old implementation of the ClaimInfoState
// TODO: remove in Beta
type ClaimInfoStateWithoutResourceHandles struct {
// Name of the DRA driver
DriverName string

// ClassName is a resource class of the claim
ClassName string

// ClaimUID is an UID of the resource claim
ClaimUID types.UID

// ClaimName is a name of the resource claim
ClaimName string

// Namespace is a claim namespace
Namespace string

// PodUIDs is a set of pod UIDs that reference a resource
PodUIDs sets.Set[string]

// CDIDevices is a map of DriverName --> CDI devices returned by the
// GRPC API call NodePrepareResource
CDIDevices map[string][]string
Expand Down

0 comments on commit 2381654

Please sign in to comment.