Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrated scheduler files binder.go binder_test.go to structured logging #105858

Merged
merged 1 commit into from
Nov 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
88 changes: 41 additions & 47 deletions pkg/scheduler/framework/plugins/volumebinding/binder.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,9 @@ func NewVolumeBinder(
// returned.
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error) {
podVolumes = &PodVolumes{}
podName := getPodName(pod)

// Warning: Below log needs high verbosity as it can be printed several times (#60933).
klog.V(5).Infof("FindPodVolumes for pod %q, node %q", podName, node.Name)
klog.V(5).InfoS("FindPodVolumes", "pod", klog.KObj(pod), "node", klog.KObj(node))

// Initialize to true for pods that don't have volumes. These
// booleans get translated into reason strings when the function
Expand Down Expand Up @@ -315,7 +314,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*

// Check PV node affinity on bound volumes
if len(boundClaims) > 0 {
boundVolumesSatisfied, boundPVsFound, err = b.checkBoundClaims(boundClaims, node, podName)
boundVolumesSatisfied, boundPVsFound, err = b.checkBoundClaims(boundClaims, node, pod)
if err != nil {
return
}
Expand Down Expand Up @@ -371,32 +370,31 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*
// 2. Update the pvcCache with the new PVCs with annotations set
// 3. Update PodVolumes again with cached API updates for PVs and PVCs.
func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (allFullyBound bool, err error) {
podName := getPodName(assumedPod)

klog.V(4).Infof("AssumePodVolumes for pod %q, node %q", podName, nodeName)
klog.V(4).InfoS("AssumePodVolumes", "pod", klog.KObj(assumedPod), "node", klog.KRef("", nodeName))
jyz0309 marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
if err != nil {
metrics.VolumeSchedulingStageFailed.WithLabelValues("assume").Inc()
}
}()

if allBound := b.arePodVolumesBound(assumedPod); allBound {
klog.V(4).Infof("AssumePodVolumes for pod %q, node %q: all PVCs bound and nothing to do", podName, nodeName)
klog.V(4).InfoS("AssumePodVolumes: all PVCs bound and nothing to do", "pod", klog.KObj(assumedPod), "node", klog.KRef("", nodeName))
return true, nil
}

// Assume PV
newBindings := []*BindingInfo{}
for _, binding := range podVolumes.StaticBindings {
newPV, dirty, err := pvutil.GetBindVolumeToClaim(binding.pv, binding.pvc)
klog.V(5).Infof("AssumePodVolumes: GetBindVolumeToClaim for pod %q, PV %q, PVC %q. newPV %p, dirty %v, err: %v",
podName,
binding.pv.Name,
binding.pvc.Name,
newPV,
dirty,
err)
klog.V(5).InfoS("AssumePodVolumes: GetBindVolumeToClaim",
"pod", klog.KObj(assumedPod),
"PV", klog.KObj(binding.pv),
"PVC", klog.KObj(binding.pvc),
"newPV", klog.KObj(newPV),
"dirty", dirty,
)
if err != nil {
klog.ErrorS(err, "AssumePodVolumes: fail to GetBindVolumeToClaim")
jyz0309 marked this conversation as resolved.
Show resolved Hide resolved
b.revertAssumedPVs(newBindings)
return false, err
}
Expand Down Expand Up @@ -443,8 +441,7 @@ func (b *volumeBinder) RevertAssumedPodVolumes(podVolumes *PodVolumes) {
// makes the API update for those PVs/PVCs, and waits for the PVCs to be completely bound
// by the PV controller.
func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod, podVolumes *PodVolumes) (err error) {
podName := getPodName(assumedPod)
klog.V(4).Infof("BindPodVolumes for pod %q, node %q", podName, assumedPod.Spec.NodeName)
klog.V(4).InfoS("BindPodVolumes", "pod", klog.KObj(assumedPod), "node", klog.KRef("", assumedPod.Spec.NodeName))

defer func() {
if err != nil {
Expand All @@ -456,7 +453,7 @@ func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod, podVolumes *PodVolumes
claimsToProvision := podVolumes.DynamicProvisions

// Start API operations
err = b.bindAPIUpdate(podName, bindings, claimsToProvision)
err = b.bindAPIUpdate(assumedPod, bindings, claimsToProvision)
if err != nil {
return err
}
Expand All @@ -480,7 +477,8 @@ func getPVCName(pvc *v1.PersistentVolumeClaim) string {
}

// bindAPIUpdate makes the API update for those PVs/PVCs.
func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*BindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) error {
func (b *volumeBinder) bindAPIUpdate(pod *v1.Pod, bindings []*BindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) error {
podName := getPodName(pod)
if bindings == nil {
return fmt.Errorf("failed to get cached bindings for pod %q", podName)
}
Expand Down Expand Up @@ -510,16 +508,15 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*BindingInfo, cl
// Do the actual prebinding. Let the PV controller take care of the rest
// There is no API rollback if the actual binding fails
for _, binding = range bindings {
klog.V(5).Infof("bindAPIUpdate: Pod %q, binding PV %q to PVC %q", podName, binding.pv.Name, binding.pvc.Name)
klog.V(5).InfoS("bindAPIUpdate: binding PV to PVC", "pod", klog.KObj(pod), "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc))
// TODO: does it hurt if we make an api call and nothing needs to be updated?
claimKey := getPVCName(binding.pvc)
klog.V(2).Infof("claim %q bound to volume %q", claimKey, binding.pv.Name)
klog.V(2).InfoS("Claim bound to volume", "PVC", klog.KObj(binding.pvc), "PV", klog.KObj(binding.pv))
newPV, err := b.kubeClient.CoreV1().PersistentVolumes().Update(context.TODO(), binding.pv, metav1.UpdateOptions{})
if err != nil {
klog.V(4).Infof("updating PersistentVolume[%s]: binding to %q failed: %v", binding.pv.Name, claimKey, err)
klog.V(4).InfoS("Updating PersistentVolume: binding to claim failed", "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc), "err", err)
jyz0309 marked this conversation as resolved.
Show resolved Hide resolved
return err
}
klog.V(4).Infof("updating PersistentVolume[%s]: bound to %q", binding.pv.Name, claimKey)
klog.V(4).InfoS("Updating PersistentVolume: bound to claim", "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc))
// Save updated object from apiserver for later checking.
binding.pv = newPV
lastProcessedBinding++
Expand All @@ -528,7 +525,7 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*BindingInfo, cl
// Update claims objects to trigger volume provisioning. Let the PV controller take care of the rest
// PV controller is expected to signal back by removing related annotations if actual provisioning fails
for i, claim = range claimsToProvision {
klog.V(5).Infof("bindAPIUpdate: Pod %q, PVC %q", podName, getPVCName(claim))
klog.V(5).InfoS("Updating claims objects to trigger volume provisioning", "pod", klog.KObj(pod), "PVC", klog.KObj(claim))
newClaim, err := b.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(context.TODO(), claim, metav1.UpdateOptions{})
if err != nil {
return err
Expand Down Expand Up @@ -572,7 +569,7 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*BindingInfo, claim
csiNode, err := b.csiNodeLister.Get(node.Name)
if err != nil {
// TODO: return the error once CSINode is created by default
klog.V(4).Infof("Could not get a CSINode object for the node %q: %v", node.Name, err)
klog.V(4).InfoS("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err)
}

// Check for any conditions that might require scheduling retry
Expand All @@ -584,7 +581,7 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*BindingInfo, claim
if apierrors.IsNotFound(err) {
return false, fmt.Errorf("pod does not exist any more: %w", err)
}
klog.Errorf("failed to get pod %s/%s from the lister: %v", pod.Namespace, pod.Name, err)
klog.ErrorS(err, "Failed to get pod from the lister", "pod", klog.KObj(pod))
}

for _, binding := range bindings {
Expand Down Expand Up @@ -680,7 +677,7 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*BindingInfo, claim
}

// All pvs and pvcs that we operated on are bound
klog.V(4).Infof("All PVCs for pod %q are bound", podName)
klog.V(4).InfoS("All PVCs for pod are bound", "pod", klog.KObj(pod))
return true, nil
}

Expand Down Expand Up @@ -728,12 +725,12 @@ func (b *volumeBinder) isPVCBound(namespace, pvcName string) (bool, *v1.Persiste

fullyBound := b.isPVCFullyBound(pvc)
if fullyBound {
klog.V(5).Infof("PVC %q is fully bound to PV %q", pvcKey, pvc.Spec.VolumeName)
klog.V(5).InfoS("PVC is fully bound to PV", "PVC", klog.KObj(pvc), "PV", klog.KRef("", pvc.Spec.VolumeName))
} else {
if pvc.Spec.VolumeName != "" {
klog.V(5).Infof("PVC %q is not fully bound to PV %q", pvcKey, pvc.Spec.VolumeName)
klog.V(5).InfoS("PVC is not fully bound to PV", "PVC", klog.KObj(pvc), "PV", klog.KRef("", pvc.Spec.VolumeName))
} else {
klog.V(5).Infof("PVC %q is not bound", pvcKey)
klog.V(5).InfoS("PVC is not bound", "PVC", klog.KObj(pvc))
}
}
return fullyBound, pvc, nil
Expand Down Expand Up @@ -790,11 +787,11 @@ func (b *volumeBinder) GetPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentV
return boundClaims, unboundClaimsDelayBinding, unboundClaimsImmediate, nil
}

func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node *v1.Node, podName string) (bool, bool, error) {
func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node *v1.Node, pod *v1.Pod) (bool, bool, error) {
csiNode, err := b.csiNodeLister.Get(node.Name)
if err != nil {
// TODO: return the error once CSINode is created by default
klog.V(4).Infof("Could not get a CSINode object for the node %q: %v", node.Name, err)
klog.V(4).InfoS("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err)
}

for _, pvc := range claims {
Expand All @@ -814,20 +811,19 @@ func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node

err = volumeutil.CheckNodeAffinity(pv, node.Labels)
if err != nil {
klog.V(4).Infof("PersistentVolume %q, Node %q mismatch for Pod %q: %v", pvName, node.Name, podName, err)
klog.V(4).InfoS("PersistentVolume and node mismatch for pod", "PV", klog.KRef("", pvName), "node", klog.KObj(node), "pod", klog.KObj(pod), "err", err)
return false, true, nil
}
klog.V(5).Infof("PersistentVolume %q, Node %q matches for Pod %q", pvName, node.Name, podName)
klog.V(5).InfoS("PersistentVolume and node matches for pod", "PV", klog.KRef("", pvName), "node", klog.KObj(node), "pod", klog.KObj(pod))
}

klog.V(4).Infof("All bound volumes for Pod %q match with Node %q", podName, node.Name)
klog.V(4).InfoS("All bound volumes for pod match with node", "pod", klog.KObj(pod), "node", klog.KObj(node))
return true, true, nil
}

// findMatchingVolumes tries to find matching volumes for given claims,
// and return unbound claims for further provision.
func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (foundMatches bool, bindings []*BindingInfo, unboundClaims []*v1.PersistentVolumeClaim, err error) {
podName := getPodName(pod)
// Sort all the claims by increasing size request to get the smallest fits
sort.Sort(byPVCSize(claimsToBind))

Expand All @@ -839,15 +835,14 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.Persi
// Get storage class name from each PVC
storageClassName := storagehelpers.GetPersistentVolumeClaimClass(pvc)
allPVs := b.pvCache.ListPVs(storageClassName)
pvcName := getPVCName(pvc)

// Find a matching PV
pv, err := pvutil.FindMatchingVolume(pvc, allPVs, node, chosenPVs, true)
if err != nil {
return false, nil, nil, err
}
if pv == nil {
klog.V(4).Infof("No matching volumes for Pod %q, PVC %q on node %q", podName, pvcName, node.Name)
klog.V(4).InfoS("No matching volumes for pod", "pod", klog.KObj(pod), "PVC", klog.KObj(pvc), "node", klog.KObj(node))
unboundClaims = append(unboundClaims, pvc)
foundMatches = false
continue
Expand All @@ -856,11 +851,11 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.Persi
// matching PV needs to be excluded so we don't select it again
chosenPVs[pv.Name] = pv
bindings = append(bindings, &BindingInfo{pv: pv, pvc: pvc})
klog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", pv.Name, pvcName, node.Name, podName)
klog.V(5).InfoS("Found matching PV for PVC for pod", "PV", klog.KObj(pv), "PVC", klog.KObj(pvc), "node", klog.KObj(node), "pod", klog.KObj(pod))
}

if foundMatches {
klog.V(4).Infof("Found matching volumes for pod %q on node %q", podName, node.Name)
klog.V(4).InfoS("Found matching volumes for pod", "pod", klog.KObj(pod), "node", klog.KObj(node))
}

return
Expand All @@ -870,7 +865,6 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.Persi
// findMatchingVolumes, and do not have matching volumes for binding), and return true
// if all of the claims are eligible for dynamic provision.
func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied, sufficientStorage bool, dynamicProvisions []*v1.PersistentVolumeClaim, err error) {
podName := getPodName(pod)
dynamicProvisions = []*v1.PersistentVolumeClaim{}

// We return early with provisionedClaims == nil if a check
Expand All @@ -888,13 +882,13 @@ func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v
}
provisioner := class.Provisioner
if provisioner == "" || provisioner == pvutil.NotSupportedProvisioner {
klog.V(4).Infof("storage class %q of claim %q does not support dynamic provisioning", className, pvcName)
klog.V(4).InfoS("Storage class of claim does not support dynamic provisioning", "storageClassName", className, "PVC", klog.KObj(claim))
return false, true, nil, nil
}

// Check if the node can satisfy the topology requirement in the class
if !v1helper.MatchTopologySelectorTerms(class.AllowedTopologies, labels.Set(node.Labels)) {
klog.V(4).Infof("Node %q cannot satisfy provisioning topology requirements of claim %q", node.Name, pvcName)
klog.V(4).InfoS("Node cannot satisfy provisioning topology requirements of claim", "node", klog.KObj(node), "PVC", klog.KObj(claim))
return false, true, nil, nil
}

Expand All @@ -911,7 +905,7 @@ func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v
dynamicProvisions = append(dynamicProvisions, claim)

}
klog.V(4).Infof("Provisioning for %d claims of pod %q that has no matching volumes on node %q ...", len(claimsToProvision), podName, node.Name)
klog.V(4).InfoS("Provisioning for claims of pod that has no matching volumes...", "claimCount", len(claimsToProvision), "pod", klog.KObj(pod), "node", klog.KObj(node))

return true, true, dynamicProvisions, nil
}
Expand Down Expand Up @@ -977,8 +971,8 @@ func (b *volumeBinder) hasEnoughCapacity(provisioner string, claim *v1.Persisten

// TODO (?): this doesn't give any information about which pools where considered and why
// they had to be rejected. Log that above? But that might be a lot of log output...
klog.V(4).Infof("Node %q has no accessible CSIStorageCapacity with enough capacity for PVC %s/%s of size %d and storage class %q",
node.Name, claim.Namespace, claim.Name, sizeInBytes, storageClass.Name)
klog.V(4).InfoS("Node has no accessible CSIStorageCapacity with enough capacity for PVC",
"node", klog.KObj(node), "PVC", klog.KObj(claim), "size", sizeInBytes, "storageClass", klog.KObj(storageClass))
return false, nil
}

Expand All @@ -1000,7 +994,7 @@ func (b *volumeBinder) nodeHasAccess(node *v1.Node, capacity *storagev1beta1.CSI
selector, err := metav1.LabelSelectorAsSelector(capacity.NodeTopology)
if err != nil {
// This should never happen because NodeTopology must be valid.
klog.Errorf("unexpected error converting %+v to a label selector: %v", capacity.NodeTopology, err)
klog.ErrorS(err, "Unexpected error converting to a label selector", "nodeTopology", capacity.NodeTopology)
return false
}
return selector.Matches(labels.Set(node.Labels))
Expand Down
8 changes: 5 additions & 3 deletions pkg/scheduler/framework/plugins/volumebinding/binder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package volumebinding
import (
"context"
"fmt"
"os"
"reflect"
"sort"
"testing"
Expand Down Expand Up @@ -187,7 +188,8 @@ func newTestBinder(t *testing.T, stopCh <-chan struct{}, csiStorageCapacity ...b
informerFactory.Start(stopCh)
for v, synced := range informerFactory.WaitForCacheSync(stopCh) {
if !synced {
klog.Fatalf("Error syncing informer for %v", v)
klog.ErrorS(nil, "Error syncing informer", "informer", v)
os.Exit(1)
}
}

Expand Down Expand Up @@ -1546,7 +1548,7 @@ func TestBindAPIUpdate(t *testing.T) {
testEnv.assumeVolumes(t, "node1", pod, scenario.bindings, scenario.provisionedPVCs)

// Execute
err := testEnv.internalBinder.bindAPIUpdate(pod.Name, scenario.bindings, scenario.provisionedPVCs)
err := testEnv.internalBinder.bindAPIUpdate(pod, scenario.bindings, scenario.provisionedPVCs)

// Validate
if !scenario.shouldFail && err != nil {
Expand Down Expand Up @@ -2087,7 +2089,7 @@ func TestBindPodVolumes(t *testing.T) {
go func(scenario scenarioType) {
time.Sleep(5 * time.Second)
// Sleep a while to run after bindAPIUpdate in BindPodVolumes
klog.V(5).Infof("Running delay function")
klog.V(5).InfoS("Running delay function")
scenario.delayFunc(t, testEnv, pod, scenario.initPVs, scenario.initPVCs)
}(scenario)
}
Expand Down