Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client-go workqueue example #44320

Merged
merged 1 commit into from
Apr 21, 2017
Merged

Conversation

rmohr
Copy link
Contributor

@rmohr rmohr commented Apr 11, 2017

Demonstrates how to compose a controller out of cache.Controller,
cache.Indexer and a workqueue.

@k8s-ci-robot k8s-ci-robot added the cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. label Apr 11, 2017
@k8s-github-robot k8s-github-robot added size/L Denotes a PR that changes 100-499 lines, ignoring generated files. release-note-label-needed labels Apr 11, 2017
@k8s-reviewable
Copy link

This change is Reviewable

@k8s-ci-robot
Copy link
Contributor

Hi @rmohr. Thanks for your PR.

I'm waiting for a kubernetes member to verify that this patch is reasonable to test. If it is, they should reply with @k8s-bot ok to test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work. Regular contributors should join the org to skip this step.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here.

@rmohr
Copy link
Contributor Author

rmohr commented Apr 11, 2017

Follow up for kubernetes/client-go#65.

/cc @mbohlool @caesarxuchao

glog.Info("Starting Pod controller")

go c.informer.Run(stopCh)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to wait for your caches to sync before running any worker. See for example here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I only have one cache, I don't think so. The key is dirtied for every entry anyway, right?

But maybe it would make the example more complete. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of caches does not matter. When a cluster restarts, controllers that don't wait for their caches to sync may end up doing all sorts of weird things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really, could you elaborate on that?

I think the informer has a well defined behaviour when you are using it's callback. So whenever something is added/updated/deleted in the underlying store, we dirty the key via the callbacks. This means that when we process the key in the controller, and fetch the item from the informer store, we get the object from this event or a newer version. Or do I miss something here?

But I totally agree, that when you have "secondary" caches, it is vital to sync.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I was thinking of a bug we had with a secondary cache. Yet, I think it still makes sense to have the check here for all caches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

// Tricky what to do in this situation. One thing we can do, is enqueueing it a few times to
// add some backoff delays on the invalid key. This way we avoid hotlooping
// on invalid keys.
if queue.NumRequeues(key) < 5 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see where the business logic of your controller exists. As a convention we use a separate function for it which surfaces an error at this level and then another function that is your error handling code which also facilitates requeues vs drops of objects. See #8 in https://github.com/kubernetes/community/blob/master/contributors/devel/controllers.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx. Should be addressed now. @Kargakis could you have another look?

}

runtime.HandleError(err)
glog.Infof("Dropping deployment %q out of the queue: %v", key, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are missing c.queue.Forget(key) here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
glog.Fatal("Timed out waiting for caches to sync")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of Fatal, use utilruntime.HandleError everywhere. You should also return if this path is accessed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
// Let the workers stop when we are done
defer c.queue.ShutDown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing a defer utilruntime.HandleCrash()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@0xmichalis
Copy link
Contributor

/lgtm
/release-note-none

@k8s-ci-robot k8s-ci-robot added release-note-none Denotes a PR that doesn't merit a release note. lgtm "Looks good to me", indicates that a PR is ready to be merged. and removed release-note-label-needed labels Apr 11, 2017
Copy link
Member

@caesarxuchao caesarxuchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few nits. Thanks.

// create the workqueue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

// Bind the workqueue to a cache with the help of an informer. This way we make sure than whenever the cache
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/than/that

controller := NewController(queue, indexer, informer)

// We can now warm up the cache for initial synchronization
// Le's suppose that we knew about a pod mypod on our last run, so we add it to the cache
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Le/Let


// Bind the workqueue to a cache with the help of an informer. This way we make sure than whenever the cache
// is updated, the pod key is added to the workqueue. Note than when we finally process the item from the
// workqueue we might see a newer version of the Pod than the version which was responsible for triggering the update.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you wrap this line?

flag.StringVar(&master, "master", "", "master url")
flag.Parse()

// Create the connection
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/Create/creates


func (c *Controller) handleErr(err error, key interface{}) {
if err == nil {
c.queue.Forget(key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a line of comment for Forget and later at AddRateLimited?

@k8s-github-robot k8s-github-robot removed the lgtm "Looks good to me", indicates that a PR is ready to be merged. label Apr 12, 2017
@rmohr
Copy link
Contributor Author

rmohr commented Apr 12, 2017

@caesarxuchao comments are updated 👍

@caesarxuchao
Copy link
Member

/lgtm
/approve

@k8s-ci-robot k8s-ci-robot added the lgtm "Looks good to me", indicates that a PR is ready to be merged. label Apr 13, 2017
@caesarxuchao
Copy link
Member

Thanks for the example, @rmohr !

@k8s-github-robot k8s-github-robot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Apr 13, 2017
@caesarxuchao
Copy link
Member

need to run hack/make-rules/../../hack/verify-bazel.sh

@caesarxuchao
Copy link
Member

@rmohr Could you run hack/update-bazel.sh and squash the commits?

@k8s-github-robot k8s-github-robot removed the lgtm "Looks good to me", indicates that a PR is ready to be merged. label Apr 21, 2017
@rmohr
Copy link
Contributor Author

rmohr commented Apr 21, 2017

@caesarxuchao squashed and ran hack/update-bazel.sh

}
c.queue.Forget(key)
runtime.HandleError(err)
glog.Infof("Dropping deployment %q out of the queue: %v", key, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/deployment/pod/

}

if c.queue.NumRequeues(key) < 5 {
glog.Infof("Error syncing controller %v: %v", key, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/controller/pod/

return true
}

func (c *Controller) syncToStdout(key string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment here that this function is responsible for handling the business logic of the controller

Demonstrates how to compose a controller out of cache.Controller,
cache.Indexer and a workqueue.
@0xmichalis
Copy link
Contributor

/lgtm

@k8s-ci-robot k8s-ci-robot added the lgtm "Looks good to me", indicates that a PR is ready to be merged. label Apr 21, 2017
@k8s-github-robot
Copy link

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: caesarxuchao, kargakis, rmohr

Needs approval from an approver in each of these OWNERS Files:

You can indicate your approval by writing /approve in a comment
You can cancel your approval by writing /approve cancel in a comment

@k8s-github-robot
Copy link

Automatic merge from submit-queue

@k8s-github-robot k8s-github-robot merged commit ef50807 into kubernetes:master Apr 21, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. lgtm "Looks good to me", indicates that a PR is ready to be merged. release-note-none Denotes a PR that doesn't merit a release note. size/L Denotes a PR that changes 100-499 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants