Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add work queues to PV controller #38615

Merged
merged 1 commit into from
Jan 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
EnableDynamicProvisioning: s.VolumeConfiguration.EnableDynamicProvisioning,
}
volumeController := persistentvolumecontroller.NewController(params)
volumeController.Run(stop)
go volumeController.Run(stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

attachDetachController, attachDetachControllerErr :=
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/volume/persistentvolume/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ go_library(
"//pkg/client/clientset_generated/clientset/typed/core/v1:go_default_library",
"//pkg/client/record:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/labels:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/types:go_default_library",
"//pkg/util/goroutinemap:go_default_library",
"//pkg/util/io:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/util/wait:go_default_library",
"//pkg/util/workqueue:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/watch:go_default_library",
"//vendor:github.com/golang/glog",
Expand Down
39 changes: 29 additions & 10 deletions pkg/controller/volume/persistentvolume/pv_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/util/workqueue"
vol "k8s.io/kubernetes/pkg/volume"

"github.com/golang/glog"
Expand Down Expand Up @@ -146,8 +147,10 @@ const createProvisionedPVInterval = 10 * time.Second
// changes.
type PersistentVolumeController struct {
volumeController *cache.Controller
volumeInformer cache.Indexer
volumeSource cache.ListerWatcher
claimController *cache.Controller
claimInformer cache.Store
claimSource cache.ListerWatcher
classReflector *cache.Reflector
classSource cache.ListerWatcher
Expand All @@ -163,10 +166,34 @@ type PersistentVolumeController struct {
// must be cloned before any modification. These caches get updated both by
// "xxx added/updated/deleted" events from etcd and by the controller when
// it saves newer version to etcd.
// Why local cache: binding a volume to a claim generates 4 events, roughly
// in this order (depends on goroutine ordering):
// - volume.Spec update
// - volume.Status update
// - claim.Spec update
// - claim.Status update
// With these caches, the controller can check that it has already saved
// volume.Status and claim.Spec+Status and does not need to do anything
// when e.g. volume.Spec update event arrives before all the other events.
// Without this cache, it would see the old version of volume.Status and
// claim in the informers (it has not been updated from API server events
// yet) and it would try to fix these objects to be bound together.
// Any write to API server would fail with version conflict - these objects
// have been already written.
volumes persistentVolumeOrderedIndex
claims cache.Store
classes cache.Store

// Work queues of claims and volumes to process. Every queue should have
// exactly one worker thread, especially syncClaim() is not reentrant.
// Two syncClaims could bind two different claims to the same volume or one
// claim to two volumes. The controller would recover from this (due to
// version errors in API server and other checks in this controller),
// however overall speed of multi-worker controller would be lower than if
// it runs single thread only.
claimQueue *workqueue.Type
volumeQueue *workqueue.Type

// Map of scheduled/running operations.
runningOperations goroutinemap.GoRoutineMap

Expand Down Expand Up @@ -463,19 +490,11 @@ func (ctrl *PersistentVolumeController) syncVolume(volume *v1.PersistentVolume)
// In both cases, the volume is Bound and the claim is Pending.
// Next syncClaim will fix it. To speed it up, we enqueue the claim
// into the controller, which results in syncClaim to be called
// shortly (and in the right goroutine).
// shortly (and in the right worker goroutine).
// This speeds up binding of provisioned volumes - provisioner saves
// only the new PV and it expects that next syncClaim will bind the
// claim to it.
clone, err := api.Scheme.DeepCopy(claim)
if err != nil {
return fmt.Errorf("error cloning claim %q: %v", claimToClaimKey(claim), err)
}
glog.V(5).Infof("requeueing claim %q for faster syncClaim", claimToClaimKey(claim))
err = ctrl.claimController.Requeue(clone)
if err != nil {
return fmt.Errorf("error enqueing claim %q for faster sync: %v", claimToClaimKey(claim), err)
}
ctrl.claimQueue.Add(claimToClaimKey(claim))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want add it in a hot-loop like this or add it with a delay using the rate limited or time limits functions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a loop, it pokes a claim processing from code that processes a volume.

return nil
} else if claim.Spec.VolumeName == volume.Name {
// Volume is bound to a claim properly, update status if necessary
Expand Down