Skip to content

Commit

Permalink
save some state in PreFilter, update it in AddPod, RemovePod and use …
Browse files Browse the repository at this point in the history
…that in Filter
  • Loading branch information
boenn committed Jul 21, 2021
1 parent 39e693a commit 0b024f7
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 49 deletions.
Expand Up @@ -18,6 +18,7 @@ package volumerestrictions

import (
"context"
"fmt"
"strings"
"sync/atomic"

Expand Down Expand Up @@ -49,6 +50,7 @@ var _ framework.EnqueueExtensions = &VolumeRestrictions{}
const Name = names.VolumeRestrictions

const (
preFilterStateKey = "PreFilter" + Name
// ErrReasonDiskConflict is used for NoDiskConflict predicate error.
ErrReasonDiskConflict = "node(s) had no available disk"
// ErrReasonReadWriteOncePodConflict is used when a pod is found using the same PVC with the ReadWriteOncePod access mode.
Expand Down Expand Up @@ -125,101 +127,141 @@ func haveOverlap(a1, a2 []string) bool {

func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
if pl.enableReadWriteOncePod {
return pl.isReadWriteOncePodAccessModeConflict(pod)
return pl.isReadWriteOncePodAccessModeConflict(cycleState, pod)
}
return framework.NewStatus(framework.Success)
}

// isReadWriteOncePodAccessModeConflict checks if a pod uses a PVC with the ReadWriteOncePod access mode.
// This access mode restricts volume access to a single pod on a single node. Since only a single pod can
// use a ReadWriteOncePod PVC, mark any other pods attempting to use this PVC as UnschedulableAndUnresolvable.
func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(pod *v1.Pod) *framework.Status {
func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
nodeInfos, err := pl.nodeInfoLister.NodeInfos().List()
if err != nil {
return framework.NewStatus(framework.Error, "error while getting node info")
}
var pvcKeys []string
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}

pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName)
if err != nil {
if apierrors.IsNotFound(err) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
}
return framework.AsStatus(err)
}

if !v1helper.ContainsAccessMode(pvc.Spec.AccessModes, v1.ReadWriteOncePod) {
continue
}

key := pod.Namespace + "/" + volume.PersistentVolumeClaim.ClaimName
pvcKeys = append(pvcKeys, key)
pvcKeys, status := pl.GetPvcKeys(pod)
if status != nil {
return status
}

ctx, cancel := context.WithCancel(context.Background())
var conflicts uint32

var preemetStatus *framework.Status
processNode := func(i int) {
nodeInfo := nodeInfos[i]
pvcKeyslength := len(pvcKeys)
for _, key := range pvcKeys {
for key := range pvcKeys {
refCount := nodeInfo.PVCRefCounts[key]
if refCount > 1 {
atomic.AddUint32(&conflicts, 1)
cancel()
} else if refCount == 1 {
if canBePreempt(nodeInfo, key) {
pvcKeyslength--
}
}
}
// node can meet all pvc
if pvcKeyslength == 0 {
preemetStatus = framework.NewStatus(framework.Unschedulable, InfoReasonReadWriteOncePodPreempt)
cancel()
}
}
pl.parallelizer.Until(ctx, len(nodeInfos), processNode)
// Enforce ReadWriteOncePod access mode. This is also enforced during volume mount in kubelet.
if conflicts > 0 {
preemetStatus = framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReadWriteOncePodConflict)
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReadWriteOncePodConflict)
}

return preemetStatus
// update cyclestate
p := &preFilterState{
pvck: pvcKeys,
}
cycleState.Write(preFilterStateKey, p)
return nil
}

func (pl *VolumeRestrictions) PreFilterExtensions() framework.PreFilterExtensions {
return nil
}

func (pl *VolumeRestrictions) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
state, err := getPreFilterState(cycleState)
if err != nil {
return framework.AsStatus(err)
}

pvcKeys, status := pl.GetPvcKeys(podToSchedule)
if status != nil {
return status
}
state.updateWithPod(pvcKeys, 1)
return nil
}

func (pl *VolumeRestrictions) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
state, err := getPreFilterState(cycleState)
if err != nil {
return framework.AsStatus(err)
}
pvcKeys, status := pl.GetPvcKeys(podToSchedule)
if status != nil {
return status
}
state.updateWithPod(pvcKeys, -1)
return nil
}

func (pl *VolumeRestrictions) GetPvcKeys(pod *v1.Pod) (sets.String, *framework.Status) {
var pvcKeys []string
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}

pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName)
if err != nil {
if apierrors.IsNotFound(err) {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
}
return nil, framework.AsStatus(err)
}

if !v1helper.ContainsAccessMode(pvc.Spec.AccessModes, v1.ReadWriteOncePod) {
continue
}

key := pod.Namespace + "/" + volume.PersistentVolumeClaim.ClaimName
pvcKeys = append(pvcKeys, key)
}
return sets.NewString(pvcKeys...), nil
}

// Filter invoked at the filter extension point.
// It evaluates if a pod can fit due to the volumes it requests, and those that
// are already mounted. If there is already a volume mounted on that node, another pod that uses the same volume
// can't be scheduled there.
// are already mounted. If there is already a volume mounted on that node,
// another pod can use the same volume by preempting
// This is GCE, Amazon EBS, ISCSI and Ceph RBD specific for now:
// - GCE PD allows multiple mounts as long as they're all read-only
// - AWS EBS forbids any two pods mounting the same volume ID
// - Ceph RBD forbids if any two pods share at least same monitor, and match pool and image, and the image is read-only
// - ISCSI forbids if any two pods share at least same IQN and ISCSI volume is read-only
func (pl *VolumeRestrictions) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
func (pl *VolumeRestrictions) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
state, err := getPreFilterState(cycleState)
if err != nil {
return framework.AsStatus(err)
}
for i := range pod.Spec.Volumes {
v := &pod.Spec.Volumes[i]
// fast path if there is no conflict checking targets.
if v.GCEPersistentDisk == nil && v.AWSElasticBlockStore == nil && v.RBD == nil && v.ISCSI == nil {
continue
}

for _, ev := range nodeInfo.Pods {
if isVolumeConflict(v, ev.Pod) {
return framework.NewStatus(framework.Unschedulable, ErrReasonDiskConflict)
}
}
}
// Check preemption
for key := range state.pvck {
pvcKeyLen := state.pvck.Len()
if canBePreempt(nodeInfo, key) {
pvcKeyLen--
}
if pvcKeyLen == 0 {
return framework.NewStatus(framework.Unschedulable, InfoReasonReadWriteOncePodPreempt)
}
}
return nil
}

Expand Down Expand Up @@ -269,3 +311,37 @@ func canBePreempt(nodeInfo *framework.NodeInfo, key string) bool {
}
return false
}

// preFilterState computed at PreFilter and used at Filter.
type preFilterState struct {
// If the AccessModes of the pvc used by a pod is ReadWriteOncePod,
// pod's namespace and pvcName are added to pvck
pvck sets.String
}

func (s *preFilterState) Clone() framework.StateData {
return s
}

func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
c, err := cycleState.Read(preFilterStateKey)
if err != nil {
// preFilterState doesn't exist, likely PreFilter wasn't invoked.
return nil, fmt.Errorf("error reading %q from cycleState: %w", Name, err)
}

s, ok := c.(*preFilterState)
if !ok {
return nil, fmt.Errorf("%+v convert to volumerestrictions.state error", c)
}
return s, nil
}

func (s *preFilterState) updateWithPod(pvcKeys sets.String, multiplier int64) {
// update pvcKeys
if multiplier == 1 {
s.pvck = pvcKeys.Union(s.pvck)
} else {
s.pvck = pvcKeys.Intersection(s.pvck)
}
}
Expand Up @@ -18,7 +18,9 @@ package volumerestrictions

import (
"context"
"k8s.io/apimachinery/pkg/util/sets"
"reflect"
"strings"
"testing"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -74,7 +76,12 @@ func TestGCEDiskConflicts(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p := newPlugin(t)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo)
state := framework.NewCycleState()
preFilterState := &preFilterState{
pvck: sets.NewString(),
}
state.Write(preFilterStateKey, preFilterState)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, test.nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
}
Expand Down Expand Up @@ -122,7 +129,12 @@ func TestAWSDiskConflicts(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p := newPlugin(t)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo)
state := framework.NewCycleState()
preFilterState := &preFilterState{
pvck: sets.NewString(),
}
state.Write(preFilterStateKey, preFilterState)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, test.nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
}
Expand Down Expand Up @@ -176,7 +188,12 @@ func TestRBDDiskConflicts(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p := newPlugin(t)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo)
state := framework.NewCycleState()
preFilterState := &preFilterState{
pvck: sets.NewString(),
}
state.Write(preFilterStateKey, preFilterState)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, test.nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
}
Expand Down Expand Up @@ -230,7 +247,12 @@ func TestISCSIDiskConflicts(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p := newPlugin(t)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo)
state := framework.NewCycleState()
preFilterState := &preFilterState{
pvck: sets.NewString(),
}
state.Write(preFilterStateKey, preFilterState)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, test.nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
}
Expand Down Expand Up @@ -410,9 +432,19 @@ func TestAccessModeConflicts(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p := newPluginWithListers(t, test.existingPods, test.existingNodes, test.existingPVCs, test.enableReadWriteOncePod)
gotStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), nil, test.pod)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %+v, want: %+v", gotStatus, test.wantStatus)
state := framework.NewCycleState()
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), state, test.pod)
if !preFilterStatus.IsSuccess() {
if !strings.Contains(preFilterStatus.Message(), test.wantStatus.Message()) {
t.Errorf("prefilter failed with status: %v", preFilterStatus)
}
} else {
nodeInfo := framework.NewNodeInfo(test.existingPods...)
nodeInfo.SetNode(node)
filterStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, nodeInfo)
if !reflect.DeepEqual(filterStatus, test.wantStatus) {
t.Errorf("status does not match: %+v, want: %+v", filterStatus, test.wantStatus)
}
}
})
}
Expand Down

0 comments on commit 0b024f7

Please sign in to comment.