Skip to content
Permalink
Browse files

Merge pull request #72045 from cofyc/fix71928

Make volume binder resilient to races
  • Loading branch information...
k8s-ci-robot committed Jan 12, 2019
2 parents 171485f + 1a62f53 commit ccb1e1f26db172e23a438041da072aa5483ab65a
@@ -44,6 +44,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
@@ -92,6 +93,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
@@ -29,11 +29,13 @@ import (

"k8s.io/klog"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"
@@ -136,6 +138,7 @@ type volumeReactor struct {
fakeClaimWatch *watch.FakeWatcher
lock sync.Mutex
errors []reactorError
watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher
}

// reactorError is an error that is returned by test reactor (=simulated
@@ -189,11 +192,34 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj

// Store the updated object to appropriate places.
r.volumes[volume.Name] = volume
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Add(volume)
}
r.changedObjects = append(r.changedObjects, volume)
r.changedSinceLastSync++
klog.V(4).Infof("created volume %s", volume.Name)
return true, volume, nil

case action.Matches("create", "persistentvolumeclaims"):
obj := action.(core.UpdateAction).GetObject()
claim := obj.(*v1.PersistentVolumeClaim)

// check the claim does not exist
_, found := r.claims[claim.Name]
if found {
return true, nil, fmt.Errorf("Cannot create claim %s: claim already exists", claim.Name)
}

// Store the updated object to appropriate places.
r.claims[claim.Name] = claim
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Add(claim)
}
r.changedObjects = append(r.changedObjects, claim)
r.changedSinceLastSync++
klog.V(4).Infof("created claim %s", claim.Name)
return true, claim, nil

case action.Matches("update", "persistentvolumes"):
obj := action.(core.UpdateAction).GetObject()
volume := obj.(*v1.PersistentVolume)
@@ -206,6 +232,10 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
if storedVer != requestedVer {
return true, obj, versionConflictError
}
if reflect.DeepEqual(storedVolume, volume) {
klog.V(4).Infof("nothing updated volume %s", volume.Name)
return true, volume, nil
}
// Don't modify the existing object
volume = volume.DeepCopy()
volume.ResourceVersion = strconv.Itoa(storedVer + 1)
@@ -214,6 +244,9 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
}

// Store the updated object to appropriate places.
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Modify(volume)
}
r.volumes[volume.Name] = volume
r.changedObjects = append(r.changedObjects, volume)
r.changedSinceLastSync++
@@ -232,6 +265,10 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
if storedVer != requestedVer {
return true, obj, versionConflictError
}
if reflect.DeepEqual(storedClaim, claim) {
klog.V(4).Infof("nothing updated claim %s", claim.Name)
return true, claim, nil
}
// Don't modify the existing object
claim = claim.DeepCopy()
claim.ResourceVersion = strconv.Itoa(storedVer + 1)
@@ -240,6 +277,9 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
}

// Store the updated object to appropriate places.
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Modify(claim)
}
r.claims[claim.Name] = claim
r.changedObjects = append(r.changedObjects, claim)
r.changedSinceLastSync++
@@ -251,18 +291,32 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
volume, found := r.volumes[name]
if found {
klog.V(4).Infof("GetVolume: found %s", volume.Name)
return true, volume, nil
return true, volume.DeepCopy(), nil
} else {
klog.V(4).Infof("GetVolume: volume %s not found", name)
return true, nil, fmt.Errorf("Cannot find volume %s", name)
}

case action.Matches("get", "persistentvolumeclaims"):
name := action.(core.GetAction).GetName()
claim, found := r.claims[name]
if found {
klog.V(4).Infof("GetClaim: found %s", claim.Name)
return true, claim.DeepCopy(), nil
} else {
klog.V(4).Infof("GetClaim: claim %s not found", name)
return true, nil, apierrs.NewNotFound(action.GetResource().GroupResource(), name)
}

case action.Matches("delete", "persistentvolumes"):
name := action.(core.DeleteAction).GetName()
klog.V(4).Infof("deleted volume %s", name)
_, found := r.volumes[name]
obj, found := r.volumes[name]
if found {
delete(r.volumes, name)
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Delete(obj)
}
r.changedSinceLastSync++
return true, nil, nil
} else {
@@ -272,9 +326,12 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
case action.Matches("delete", "persistentvolumeclaims"):
name := action.(core.DeleteAction).GetName()
klog.V(4).Infof("deleted claim %s", name)
_, found := r.volumes[name]
obj, found := r.claims[name]
if found {
delete(r.claims, name)
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Delete(obj)
}
r.changedSinceLastSync++
return true, nil, nil
} else {
@@ -285,6 +342,36 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
return false, nil, nil
}

// Watch watches objects from the volumeReactor. Watch returns a channel which
// will push added / modified / deleted object.
func (r *volumeReactor) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) {
r.lock.Lock()
defer r.lock.Unlock()

fakewatcher := watch.NewRaceFreeFake()

if _, exists := r.watchers[gvr]; !exists {
r.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher)
}
r.watchers[gvr][ns] = append(r.watchers[gvr][ns], fakewatcher)
return fakewatcher, nil
}

func (r *volumeReactor) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher {
watches := []*watch.RaceFreeFakeWatcher{}
if r.watchers[gvr] != nil {
if w := r.watchers[gvr][ns]; w != nil {
watches = append(watches, w...)
}
if ns != metav1.NamespaceAll {
if w := r.watchers[gvr][metav1.NamespaceAll]; w != nil {
watches = append(watches, w...)
}
}
}
return watches
}

// injectReactError returns an error when the test requested given action to
// fail. nil is returned otherwise.
func (r *volumeReactor) injectReactError(action core.Action) error {
@@ -596,11 +683,14 @@ func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController,
fakeVolumeWatch: fakeVolumeWatch,
fakeClaimWatch: fakeClaimWatch,
errors: errors,
watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher),
}
client.AddReactor("create", "persistentvolumes", reactor.React)
client.AddReactor("create", "persistentvolumeclaims", reactor.React)
client.AddReactor("update", "persistentvolumes", reactor.React)
client.AddReactor("update", "persistentvolumeclaims", reactor.React)
client.AddReactor("get", "persistentvolumes", reactor.React)
client.AddReactor("get", "persistentvolumeclaims", reactor.React)
client.AddReactor("delete", "persistentvolumes", reactor.React)
client.AddReactor("delete", "persistentvolumeclaims", reactor.React)

@@ -285,15 +285,16 @@ func checkVolumeSatisfyClaim(volume *v1.PersistentVolume, claim *v1.PersistentVo
return nil
}

func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentVolumeClaim) (bool, error) {
func (ctrl *PersistentVolumeController) isDelayBindingProvisioning(claim *v1.PersistentVolumeClaim) bool {
// When feature VolumeScheduling enabled,
// Scheduler signal to the PV controller to start dynamic
// provisioning by setting the "annSelectedNode" annotation
// in the PVC
if _, ok := claim.Annotations[annSelectedNode]; ok {
return false, nil
}
_, ok := claim.Annotations[annSelectedNode]
return ok
}

func (ctrl *PersistentVolumeController) isDelayBindingMode(claim *v1.PersistentVolumeClaim) (bool, error) {
className := v1helper.GetPersistentVolumeClaimClass(claim)
if className == "" {
return false, nil
@@ -311,6 +312,18 @@ func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentV
return *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer, nil
}

// shouldDelayBinding returns true if binding of claim should be delayed, false otherwise.
// If binding of claim should be delayed, only claims pbound by scheduler
func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentVolumeClaim) (bool, error) {
// If claim has already been assigned a node by scheduler for dynamic provisioning.
if ctrl.isDelayBindingProvisioning(claim) {
return false, nil
}

// If claim is in delay binding mode.
return ctrl.isDelayBindingMode(claim)
}

// syncUnboundClaim is the main controller method to decide what to do with an
// unbound claim.
func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVolumeClaim) error {
@@ -42,6 +42,9 @@ type AssumeCache interface {
// Get the object by name
Get(objName string) (interface{}, error)

// Get the API object by name
GetAPIObj(objName string) (interface{}, error)

// List all the objects in the cache
List(indexObj interface{}) []interface{}
}
@@ -250,6 +253,17 @@ func (c *assumeCache) Get(objName string) (interface{}, error) {
return objInfo.latestObj, nil
}

func (c *assumeCache) GetAPIObj(objName string) (interface{}, error) {
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()

objInfo, err := c.getObjInfo(objName)
if err != nil {
return nil, err
}
return objInfo.apiObj, nil
}

func (c *assumeCache) List(indexObj interface{}) []interface{} {
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()
@@ -297,7 +311,7 @@ func (c *assumeCache) Assume(obj interface{}) error {
}

if newVersion < storedVersion {
return fmt.Errorf("%v %q is out of sync", c.description, name)
return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion)
}

// Only update the cached object
@@ -325,6 +339,7 @@ type PVAssumeCache interface {
AssumeCache

GetPV(pvName string) (*v1.PersistentVolume, error)
GetAPIPV(pvName string) (*v1.PersistentVolume, error)
ListPVs(storageClassName string) []*v1.PersistentVolume
}

@@ -356,6 +371,18 @@ func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
return pv, nil
}

func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) {
obj, err := c.GetAPIObj(pvName)
if err != nil {
return nil, err
}
pv, ok := obj.(*v1.PersistentVolume)
if !ok {
return nil, &errWrongType{"v1.PersistentVolume", obj}
}
return pv, nil
}

func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume {
objs := c.List(&v1.PersistentVolume{
Spec: v1.PersistentVolumeSpec{
@@ -380,6 +407,7 @@ type PVCAssumeCache interface {
// GetPVC returns the PVC from the cache with given pvcKey.
// pvcKey is the result of MetaNamespaceKeyFunc on PVC obj
GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
}

type pvcAssumeCache struct {
@@ -402,3 +430,15 @@ func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error
}
return pvc, nil
}

func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
obj, err := c.GetAPIObj(pvcKey)
if err != nil {
return nil, err
}
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
}
return pvc, nil
}
Oops, something went wrong.

0 comments on commit ccb1e1f

Please sign in to comment.
You can’t perform that action at this time.