Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fill controller caches on startup #26518

Merged
merged 1 commit into from
Jun 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/controller/persistentvolume/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,10 @@ const createProvisionedPVInterval = 10 * time.Second
type PersistentVolumeController struct {
volumeController *framework.Controller
volumeControllerStopCh chan struct{}
volumeSource cache.ListerWatcher
claimController *framework.Controller
claimControllerStopCh chan struct{}
claimSource cache.ListerWatcher
kubeClient clientset.Interface
eventRecorder record.EventRecorder
cloud cloudprovider.Interface
Expand Down
73 changes: 41 additions & 32 deletions pkg/controller/persistentvolume/controller_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func NewPersistentVolumeController(
},
}
}
controller.volumeSource = volumeSource

if claimSource == nil {
claimSource = &cache.ListWatch{
Expand All @@ -100,6 +101,7 @@ func NewPersistentVolumeController(
},
}
}
controller.claimSource = claimSource

_, controller.volumeController = framework.NewIndexerInformer(
volumeSource,
Expand All @@ -125,6 +127,40 @@ func NewPersistentVolumeController(
return controller
}

// initalizeCaches fills all controller caches with initial data from etcd in
// order to have the caches already filled when first addClaim/addVolume to
// perform initial synchronization of the controller.
func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSource cache.ListerWatcher) {
volumeListObj, err := volumeSource.List(api.ListOptions{})
if err != nil {
glog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
return
}
volumeList, ok := volumeListObj.(*api.List)
if !ok {
glog.Errorf("PersistentVolumeController can't initialize caches, expected list of volumes, got: %+v", volumeListObj)
return
}
for _, volume := range volumeList.Items {
storeObjectUpdate(ctrl.volumes.store, volume, "volume")
}

claimListObj, err := claimSource.List(api.ListOptions{})
if err != nil {
glog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
return
}
claimList, ok := claimListObj.(*api.List)
if !ok {
glog.Errorf("PersistentVolumeController can't initialize caches, expected list of claims, got: %+v", volumeListObj)
return
}
for _, claim := range claimList.Items {
storeObjectUpdate(ctrl.claims, claim, "claim")
}
glog.V(4).Infof("controller initialized")
}

// addVolume is callback from framework.Controller watching PersistentVolume
// events.
func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
Expand All @@ -138,10 +174,6 @@ func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
return
}

if !ctrl.isFullySynced() {
return
}

pv, ok := obj.(*api.PersistentVolume)
if !ok {
glog.Errorf("expected PersistentVolume but handler received %+v", obj)
Expand Down Expand Up @@ -171,10 +203,6 @@ func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{})
return
}

if !ctrl.isFullySynced() {
return
}

newVolume, ok := newObj.(*api.PersistentVolume)
if !ok {
glog.Errorf("Expected PersistentVolume but handler received %+v", newObj)
Expand All @@ -196,10 +224,6 @@ func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{})
func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) {
_ = ctrl.volumes.store.Delete(obj)

if !ctrl.isFullySynced() {
return
}

var volume *api.PersistentVolume
var ok bool
volume, ok = obj.(*api.PersistentVolume)
Expand All @@ -220,6 +244,8 @@ func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) {
return
}

glog.V(4).Infof("volume %q deleted", volume.Name)

if claimObj, exists, _ := ctrl.claims.GetByKey(claimrefToClaimKey(volume.Spec.ClaimRef)); exists {
if claim, ok := claimObj.(*api.PersistentVolumeClaim); ok && claim != nil {
// sync the claim when its volume is deleted. Explicitly syncing the
Expand Down Expand Up @@ -254,10 +280,6 @@ func (ctrl *PersistentVolumeController) addClaim(obj interface{}) {
return
}

if !ctrl.isFullySynced() {
return
}

claim, ok := obj.(*api.PersistentVolumeClaim)
if !ok {
glog.Errorf("Expected PersistentVolumeClaim but addClaim received %+v", obj)
Expand Down Expand Up @@ -287,10 +309,6 @@ func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{})
return
}

if !ctrl.isFullySynced() {
return
}

newClaim, ok := newObj.(*api.PersistentVolumeClaim)
if !ok {
glog.Errorf("Expected PersistentVolumeClaim but updateClaim received %+v", newObj)
Expand All @@ -312,10 +330,6 @@ func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{})
func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) {
_ = ctrl.claims.Delete(obj)

if !ctrl.isFullySynced() {
return
}

var volume *api.PersistentVolume
var claim *api.PersistentVolumeClaim
var ok bool
Expand All @@ -337,6 +351,7 @@ func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) {
if !ok || claim == nil {
return
}
glog.V(4).Infof("claim %q deleted", claimToClaimKey(claim))

if pvObj, exists, _ := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName); exists {
if volume, ok = pvObj.(*api.PersistentVolume); ok {
Expand Down Expand Up @@ -365,6 +380,8 @@ func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) {
func (ctrl *PersistentVolumeController) Run() {
glog.V(4).Infof("starting PersistentVolumeController")

ctrl.initializeCaches(ctrl.volumeSource, ctrl.claimSource)

if ctrl.volumeControllerStopCh == nil {
ctrl.volumeControllerStopCh = make(chan struct{})
go ctrl.volumeController.Run(ctrl.volumeControllerStopCh)
Expand All @@ -383,14 +400,6 @@ func (ctrl *PersistentVolumeController) Stop() {
close(ctrl.claimControllerStopCh)
}

// isFullySynced returns true, if both volume and claim caches are fully loaded
// after startup.
// We do not want to process events with not fully loaded caches - e.g. we might
// recycle/delete PVs that don't have corresponding claim in the cache yet.
func (ctrl *PersistentVolumeController) isFullySynced() bool {
return ctrl.volumeController.HasSynced() && ctrl.claimController.HasSynced()
}

// Stateless functions

func hasAnnotation(obj api.ObjectMeta, ann string) bool {
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/persistentvolume/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,10 @@ func TestControllerSync(t *testing.T) {
go ctrl.Run()

// Wait for the controller to pass initial sync.
for !ctrl.isFullySynced() {
for !ctrl.volumeController.HasSynced() || !ctrl.claimController.HasSynced() {
time.Sleep(10 * time.Millisecond)
}
glog.V(4).Infof("controller synced, starting test")

count := reactor.getChangeCount()

Expand Down