Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 97 additions & 102 deletions pkg/kubelet/cm/dra/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,112 +66,107 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string) (
// for each new resource requirement, process their responses and update the cached
// containerResources on success.
func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
// Process resources for each resource claim referenced by container
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
for range container.Resources.Claims {
for i := range pod.Spec.ResourceClaims {
claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
klog.V(3).InfoS("Processing resource", "claim", claimName, "pod", pod.Name)

// Resource is already prepared, add pod UID to it
if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil {
// We delay checkpointing of this change until this call
// returns successfully. It is OK to do this because we
// will only return successfully from this call if the
// checkpoint has succeeded. That means if the kubelet is
// ever restarted before this checkpoint succeeds, the pod
// whose resources are being prepared would never have
// started, so it's OK (actually correct) to not include it
// in the cache.
claimInfo.addPodReference(pod.UID)
continue
}
for i := range pod.Spec.ResourceClaims {
claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
klog.V(3).InfoS("Processing resource", "claim", claimName, "pod", pod.Name)

// Resource is already prepared, add pod UID to it
if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil {
// We delay checkpointing of this change until this call
// returns successfully. It is OK to do this because we
// will only return successfully from this call if the
// checkpoint has succeeded. That means if the kubelet is
// ever restarted before this checkpoint succeeds, the pod
// whose resources are being prepared would never have
// started, so it's OK (actually correct) to not include it
// in the cache.
claimInfo.addPodReference(pod.UID)
continue
}

// Query claim object from the API server
resourceClaim, err := m.kubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Get(
context.TODO(),
claimName,
metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", claimName, pod.Name, err)
}
// Query claim object from the API server
resourceClaim, err := m.kubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Get(
context.TODO(),
claimName,
metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", claimName, pod.Name, err)
}

// Check if pod is in the ReservedFor for the claim
if !resourceclaim.IsReservedForPod(pod, resourceClaim) {
return fmt.Errorf("pod %s(%s) is not allowed to use resource claim %s(%s)",
pod.Name, pod.UID, claimName, resourceClaim.UID)
}
// Check if pod is in the ReservedFor for the claim
if !resourceclaim.IsReservedForPod(pod, resourceClaim) {
return fmt.Errorf("pod %s(%s) is not allowed to use resource claim %s(%s)",
pod.Name, pod.UID, claimName, resourceClaim.UID)
}

// Grab the allocation.resourceHandles. If there are no
// allocation.resourceHandles, create a single resourceHandle with no
// content. This will trigger processing of this claim by a single
// kubelet plugin whose name matches resourceClaim.Status.DriverName.
resourceHandles := resourceClaim.Status.Allocation.ResourceHandles
if len(resourceHandles) == 0 {
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
}
// Grab the allocation.resourceHandles. If there are no
// allocation.resourceHandles, create a single resourceHandle with no
// content. This will trigger processing of this claim by a single
// kubelet plugin whose name matches resourceClaim.Status.DriverName.
resourceHandles := resourceClaim.Status.Allocation.ResourceHandles
if len(resourceHandles) == 0 {
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
}

// Create a claimInfo object to store the relevant claim info.
claimInfo := newClaimInfo(
resourceClaim.Status.DriverName,
resourceClaim.Spec.ResourceClassName,
resourceClaim.UID,
resourceClaim.Name,
resourceClaim.Namespace,
sets.New(string(pod.UID)),
)

// Walk through each resourceHandle
for _, resourceHandle := range resourceHandles {
// If no DriverName is provided in the resourceHandle, we
// use the DriverName from the status
pluginName := resourceHandle.DriverName
if pluginName == "" {
pluginName = resourceClaim.Status.DriverName
}

// Call NodePrepareResource RPC for each resourceHandle
client, err := dra.NewDRAPluginClient(pluginName)
if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", pluginName, err)
}
response, err := client.NodePrepareResource(
context.Background(),
resourceClaim.Namespace,
resourceClaim.UID,
resourceClaim.Name,
resourceHandle.Data)
if err != nil {
return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v",
resourceClaim.UID, resourceClaim.Name, resourceHandle.Data, err)
}
klog.V(3).InfoS("NodePrepareResource succeeded", "pluginName", pluginName, "response", response)

// Add the CDI Devices returned by NodePrepareResource to
// the claimInfo object.
err = claimInfo.addCDIDevices(pluginName, response.CdiDevices)
if err != nil {
return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err)
}

// TODO: We (re)add the claimInfo object to the cache and
// sync it to the checkpoint *after* the
// NodePrepareResource call has completed. This will cause
// issues if the kubelet gets restarted between
// NodePrepareResource and syncToCheckpoint. It will result
// in not calling NodeUnprepareResource for this claim
// because no claimInfo will be synced back to the cache
// for it after the restart. We need to resolve this issue
// before moving to beta.
m.cache.add(claimInfo)

// Checkpoint to reduce redundant calls to
// NodePrepareResource() after a kubelet restart.
err = m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}
}
// Create a claimInfo object to store the relevant claim info.
claimInfo := newClaimInfo(
resourceClaim.Status.DriverName,
resourceClaim.Spec.ResourceClassName,
resourceClaim.UID,
resourceClaim.Name,
resourceClaim.Namespace,
sets.New(string(pod.UID)),
)

// Walk through each resourceHandle
for _, resourceHandle := range resourceHandles {
// If no DriverName is provided in the resourceHandle, we
// use the DriverName from the status
pluginName := resourceHandle.DriverName
if pluginName == "" {
pluginName = resourceClaim.Status.DriverName
}

// Call NodePrepareResource RPC for each resourceHandle
client, err := dra.NewDRAPluginClient(pluginName)
if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", pluginName, err)
}
response, err := client.NodePrepareResource(
context.Background(),
resourceClaim.Namespace,
resourceClaim.UID,
resourceClaim.Name,
resourceHandle.Data)
if err != nil {
return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v",
resourceClaim.UID, resourceClaim.Name, resourceHandle.Data, err)
}
klog.V(3).InfoS("NodePrepareResource succeeded", "pluginName", pluginName, "response", response)

// Add the CDI Devices returned by NodePrepareResource to
// the claimInfo object.
err = claimInfo.addCDIDevices(pluginName, response.CdiDevices)
if err != nil {
return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err)
}

// TODO: We (re)add the claimInfo object to the cache and
// sync it to the checkpoint *after* the
// NodePrepareResource call has completed. This will cause
// issues if the kubelet gets restarted between
// NodePrepareResource and syncToCheckpoint. It will result
// in not calling NodeUnprepareResource for this claim
// because no claimInfo will be synced back to the cache
// for it after the restart. We need to resolve this issue
// before moving to beta.
m.cache.add(claimInfo)

// Checkpoint to reduce redundant calls to
// NodePrepareResource() after a kubelet restart.
err = m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}
}
}
Expand Down