From 0b024f70cb171b6f183a8c58846fdf6030a029c9 Mon Sep 17 00:00:00 2001 From: boenn <13752080613@163.com> Date: Wed, 21 Jul 2021 16:39:18 +0800 Subject: [PATCH] save some state in PreFilter, update it in AddPod, RemovePod and use that in Filter --- .../volumerestrictions/volume_restrictions.go | 160 +++++++++++++----- .../volume_restrictions_test.go | 46 ++++- 2 files changed, 157 insertions(+), 49 deletions(-) diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go index 8f7985135b65..0e45ecdd94a0 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go @@ -18,6 +18,7 @@ package volumerestrictions import ( "context" + "fmt" "strings" "sync/atomic" @@ -49,6 +50,7 @@ var _ framework.EnqueueExtensions = &VolumeRestrictions{} const Name = names.VolumeRestrictions const ( + preFilterStateKey = "PreFilter" + Name // ErrReasonDiskConflict is used for NoDiskConflict predicate error. ErrReasonDiskConflict = "node(s) had no available disk" // ErrReasonReadWriteOncePodConflict is used when a pod is found using the same PVC with the ReadWriteOncePod access mode. @@ -125,7 +127,7 @@ func haveOverlap(a1, a2 []string) bool { func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status { if pl.enableReadWriteOncePod { - return pl.isReadWriteOncePodAccessModeConflict(pod) + return pl.isReadWriteOncePodAccessModeConflict(cycleState, pod) } return framework.NewStatus(framework.Success) } @@ -133,93 +135,133 @@ func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framewo // isReadWriteOncePodAccessModeConflict checks if a pod uses a PVC with the ReadWriteOncePod access mode. // This access mode restricts volume access to a single pod on a single node. Since only a single pod can // use a ReadWriteOncePod PVC, mark any other pods attempting to use this PVC as UnschedulableAndUnresolvable. -func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(pod *v1.Pod) *framework.Status { +func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(cycleState *framework.CycleState, pod *v1.Pod) *framework.Status { nodeInfos, err := pl.nodeInfoLister.NodeInfos().List() if err != nil { return framework.NewStatus(framework.Error, "error while getting node info") } - var pvcKeys []string - for _, volume := range pod.Spec.Volumes { - if volume.PersistentVolumeClaim == nil { - continue - } - - pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName) - if err != nil { - if apierrors.IsNotFound(err) { - return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) - } - return framework.AsStatus(err) - } - - if !v1helper.ContainsAccessMode(pvc.Spec.AccessModes, v1.ReadWriteOncePod) { - continue - } - - key := pod.Namespace + "/" + volume.PersistentVolumeClaim.ClaimName - pvcKeys = append(pvcKeys, key) + pvcKeys, status := pl.GetPvcKeys(pod) + if status != nil { + return status } - ctx, cancel := context.WithCancel(context.Background()) var conflicts uint32 - - var preemetStatus *framework.Status processNode := func(i int) { nodeInfo := nodeInfos[i] - pvcKeyslength := len(pvcKeys) - for _, key := range pvcKeys { + for key := range pvcKeys { refCount := nodeInfo.PVCRefCounts[key] if refCount > 1 { atomic.AddUint32(&conflicts, 1) cancel() - } else if refCount == 1 { - if canBePreempt(nodeInfo, key) { - pvcKeyslength-- - } } } - // node can meet all pvc - if pvcKeyslength == 0 { - preemetStatus = framework.NewStatus(framework.Unschedulable, InfoReasonReadWriteOncePodPreempt) - cancel() - } } pl.parallelizer.Until(ctx, len(nodeInfos), processNode) // Enforce ReadWriteOncePod access mode. This is also enforced during volume mount in kubelet. if conflicts > 0 { - preemetStatus = framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReadWriteOncePodConflict) + return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReadWriteOncePodConflict) } - return preemetStatus + // update cyclestate + p := &preFilterState{ + pvck: pvcKeys, + } + cycleState.Write(preFilterStateKey, p) + return nil } func (pl *VolumeRestrictions) PreFilterExtensions() framework.PreFilterExtensions { return nil } +func (pl *VolumeRestrictions) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { + state, err := getPreFilterState(cycleState) + if err != nil { + return framework.AsStatus(err) + } + + pvcKeys, status := pl.GetPvcKeys(podToSchedule) + if status != nil { + return status + } + state.updateWithPod(pvcKeys, 1) + return nil +} + +func (pl *VolumeRestrictions) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { + state, err := getPreFilterState(cycleState) + if err != nil { + return framework.AsStatus(err) + } + pvcKeys, status := pl.GetPvcKeys(podToSchedule) + if status != nil { + return status + } + state.updateWithPod(pvcKeys, -1) + return nil +} + +func (pl *VolumeRestrictions) GetPvcKeys(pod *v1.Pod) (sets.String, *framework.Status) { + var pvcKeys []string + for _, volume := range pod.Spec.Volumes { + if volume.PersistentVolumeClaim == nil { + continue + } + + pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) + } + return nil, framework.AsStatus(err) + } + + if !v1helper.ContainsAccessMode(pvc.Spec.AccessModes, v1.ReadWriteOncePod) { + continue + } + + key := pod.Namespace + "/" + volume.PersistentVolumeClaim.ClaimName + pvcKeys = append(pvcKeys, key) + } + return sets.NewString(pvcKeys...), nil +} + // Filter invoked at the filter extension point. // It evaluates if a pod can fit due to the volumes it requests, and those that -// are already mounted. If there is already a volume mounted on that node, another pod that uses the same volume -// can't be scheduled there. +// are already mounted. If there is already a volume mounted on that node, +// another pod can use the same volume by preempting // This is GCE, Amazon EBS, ISCSI and Ceph RBD specific for now: // - GCE PD allows multiple mounts as long as they're all read-only // - AWS EBS forbids any two pods mounting the same volume ID // - Ceph RBD forbids if any two pods share at least same monitor, and match pool and image, and the image is read-only // - ISCSI forbids if any two pods share at least same IQN and ISCSI volume is read-only -func (pl *VolumeRestrictions) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { +func (pl *VolumeRestrictions) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + state, err := getPreFilterState(cycleState) + if err != nil { + return framework.AsStatus(err) + } for i := range pod.Spec.Volumes { v := &pod.Spec.Volumes[i] // fast path if there is no conflict checking targets. if v.GCEPersistentDisk == nil && v.AWSElasticBlockStore == nil && v.RBD == nil && v.ISCSI == nil { continue } - for _, ev := range nodeInfo.Pods { if isVolumeConflict(v, ev.Pod) { return framework.NewStatus(framework.Unschedulable, ErrReasonDiskConflict) } } } + // Check preemption + for key := range state.pvck { + pvcKeyLen := state.pvck.Len() + if canBePreempt(nodeInfo, key) { + pvcKeyLen-- + } + if pvcKeyLen == 0 { + return framework.NewStatus(framework.Unschedulable, InfoReasonReadWriteOncePodPreempt) + } + } return nil } @@ -269,3 +311,37 @@ func canBePreempt(nodeInfo *framework.NodeInfo, key string) bool { } return false } + +// preFilterState computed at PreFilter and used at Filter. +type preFilterState struct { + // If the AccessModes of the pvc used by a pod is ReadWriteOncePod, + // pod's namespace and pvcName are added to pvck + pvck sets.String +} + +func (s *preFilterState) Clone() framework.StateData { + return s +} + +func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) { + c, err := cycleState.Read(preFilterStateKey) + if err != nil { + // preFilterState doesn't exist, likely PreFilter wasn't invoked. + return nil, fmt.Errorf("error reading %q from cycleState: %w", Name, err) + } + + s, ok := c.(*preFilterState) + if !ok { + return nil, fmt.Errorf("%+v convert to volumerestrictions.state error", c) + } + return s, nil +} + +func (s *preFilterState) updateWithPod(pvcKeys sets.String, multiplier int64) { + // update pvcKeys + if multiplier == 1 { + s.pvck = pvcKeys.Union(s.pvck) + } else { + s.pvck = pvcKeys.Intersection(s.pvck) + } +} diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go index 1a39cb4745c2..93c6ff634329 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go @@ -18,7 +18,9 @@ package volumerestrictions import ( "context" + "k8s.io/apimachinery/pkg/util/sets" "reflect" + "strings" "testing" v1 "k8s.io/api/core/v1" @@ -74,7 +76,12 @@ func TestGCEDiskConflicts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { p := newPlugin(t) - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) + state := framework.NewCycleState() + preFilterState := &preFilterState{ + pvck: sets.NewString(), + } + state.Write(preFilterStateKey, preFilterState) + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -122,7 +129,12 @@ func TestAWSDiskConflicts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { p := newPlugin(t) - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) + state := framework.NewCycleState() + preFilterState := &preFilterState{ + pvck: sets.NewString(), + } + state.Write(preFilterStateKey, preFilterState) + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -176,7 +188,12 @@ func TestRBDDiskConflicts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { p := newPlugin(t) - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) + state := framework.NewCycleState() + preFilterState := &preFilterState{ + pvck: sets.NewString(), + } + state.Write(preFilterStateKey, preFilterState) + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -230,7 +247,12 @@ func TestISCSIDiskConflicts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { p := newPlugin(t) - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) + state := framework.NewCycleState() + preFilterState := &preFilterState{ + pvck: sets.NewString(), + } + state.Write(preFilterStateKey, preFilterState) + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -410,9 +432,19 @@ func TestAccessModeConflicts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { p := newPluginWithListers(t, test.existingPods, test.existingNodes, test.existingPVCs, test.enableReadWriteOncePod) - gotStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), nil, test.pod) - if !reflect.DeepEqual(gotStatus, test.wantStatus) { - t.Errorf("status does not match: %+v, want: %+v", gotStatus, test.wantStatus) + state := framework.NewCycleState() + preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), state, test.pod) + if !preFilterStatus.IsSuccess() { + if !strings.Contains(preFilterStatus.Message(), test.wantStatus.Message()) { + t.Errorf("prefilter failed with status: %v", preFilterStatus) + } + } else { + nodeInfo := framework.NewNodeInfo(test.existingPods...) + nodeInfo.SetNode(node) + filterStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, nodeInfo) + if !reflect.DeepEqual(filterStatus, test.wantStatus) { + t.Errorf("status does not match: %+v, want: %+v", filterStatus, test.wantStatus) + } } }) }