Skip to content

Commit

Permalink
refactor processNextItem for Tenant/HealthCheck/MinioJob (#2022)
Browse files Browse the repository at this point in the history
refactor next item

refactor next item

Co-authored-by: guozhi.li <guozhi.li@daocloud.io>
  • Loading branch information
jiuker and guozhi.li committed Mar 8, 2024
1 parent 3afe8f1 commit 306afd6
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 198 deletions.
69 changes: 1 addition & 68 deletions pkg/controller/job-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,77 +45,10 @@ type JobController struct {
// workqueue.
func (c *JobController) runJobWorker() {
defer runtime.HandleCrash()
for c.processNextJobWorkItem() {
for processNextItem(c.workqueue, c.SyncHandler) {
}
}

// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *JobController) processNextJobWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}

// We wrap this block in a func so we can defer c.workqueue.Done.
processItem := func(obj interface{}) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}

// Run the syncHandler, passing it the namespace/name string of the tenant.
result, err := c.SyncHandler(key)
switch {
case err != nil:
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s", key, err.Error())
case result.RequeueAfter > 0:
// The result.RequeueAfter request will be lost, if it is returned
// along with a non-nil error. But this is intended as
// We need to drive to stable reconcile loops before queuing due
// to result.RequestAfter
c.workqueue.Forget(obj)
c.workqueue.AddAfter(key, result.RequeueAfter)
case result.Requeue:
c.workqueue.AddRateLimited(key)
default:
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
}

// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
return nil
}

if err := processItem(obj); err != nil {
runtime.HandleError(err)
return true
}
return true
}

func (c *JobController) enqueueJob(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
Expand Down
130 changes: 64 additions & 66 deletions pkg/controller/main-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ func (c *Controller) Stop() {
// workqueue.
func (c *Controller) runWorker() {
defer runtime.HandleCrash()
for c.processNextWorkItem() {
for processNextItem(c.workqueue, c.syncHandler) {
}
}

Expand All @@ -684,74 +684,10 @@ func (c *Controller) runWorker() {
// healthCheckQueue.
func (c *Controller) runHealthCheckWorker() {
defer runtime.HandleCrash()
for c.processNextHealthCheckItem() {
for processNextItem(c.healthCheckQueue, c.syncHealthCheckHandler) {
}
}

// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}

// We wrap this block in a func so we can defer c.workqueue.Done.
processItem := func(obj interface{}) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
klog.V(2).Infof("Key from workqueue: %s", key)

result, err := c.syncHandler(key)
switch {
case err != nil:
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s", key, err.Error())
case result.RequeueAfter > 0:
// The result.RequeueAfter request will be lost, if it is returned
// along with a non-nil error. But this is intended as
// We need to drive to stable reconcile loops before queuing due
// to result.RequestAfter
c.workqueue.Forget(obj)
c.workqueue.AddAfter(key, result.RequeueAfter)
case result.Requeue:
c.workqueue.AddRateLimited(key)
default:
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
klog.V(4).Infof("Successfully synced '%s'", key)
}
return nil
}

if err := processItem(obj); err != nil {
runtime.HandleError(err)
return true
}
return true
}

const slashSeparator = "/"

func key2NamespaceName(key string) (namespace, name string) {
Expand Down Expand Up @@ -1499,3 +1435,65 @@ type patchAnnotation struct {
Path string `json:"path"`
Value string `json:"value"`
}

func processNextItem(workqueue queue.RateLimitingInterface, syncer func(key string) (Result, error)) bool {
obj, shutdown := workqueue.Get()
if shutdown {
return false
}

// We wrap this block in a func so we can defer c.workqueue.Done.
processItem := func(obj interface{}) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
workqueue.Forget(obj)
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
klog.V(2).Infof("Key from workqueue: %s", key)

result, err := syncer(key)
switch {
case err != nil:
workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s", key, err.Error())
case result.RequeueAfter > 0:
// The result.RequeueAfter request will be lost, if it is returned
// along with a non-nil error. But this is intended as
// We need to drive to stable reconcile loops before queuing due
// to result.RequestAfter
workqueue.Forget(obj)
workqueue.AddAfter(key, result.RequeueAfter)
case result.Requeue:
workqueue.AddRateLimited(key)
default:
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
workqueue.Forget(obj)
klog.V(4).Infof("Successfully synced '%s'", key)
}
return nil
}

if err := processItem(obj); err != nil {
runtime.HandleError(err)
return true
}
return true
}
64 changes: 0 additions & 64 deletions pkg/controller/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,70 +262,6 @@ type HealthResult struct {
WriteQuorumDrives int
}

// processNextHealthCheckItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextHealthCheckItem() bool {
obj, shutdown := c.healthCheckQueue.Get()
if shutdown {
return false
}

// We wrap this block in a func so we can defer c.healthCheckQueue.Done.
processItem := func(obj interface{}) error {
// We call Done here so the healthCheckQueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the healthCheckQueue and attempted again after a back-off
// period.
defer c.healthCheckQueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the healthCheckQueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// healthCheckQueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// healthCheckQueue.
if key, ok = obj.(string); !ok {
// As the item in the healthCheckQueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.healthCheckQueue.Forget(obj)
runtime.HandleError(fmt.Errorf("expected string in healthCheckQueue but got %#v", obj))
return nil
}
klog.V(2).Infof("Key from healthCheckQueue: %s", key)

result, err := c.syncHealthCheckHandler(key)
switch {
case err != nil:
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error checking health check '%s': %s", key, err.Error())
case result.RequeueAfter > 0:
// The result.RequeueAfter request will be lost, if it is returned
// along with a non-nil error. But this is intended as
// We need to drive to stable reconcile loops before queuing due
// to result.RequestAfter
c.workqueue.Forget(obj)
c.workqueue.AddAfter(key, result.RequeueAfter)
case result.Requeue:
c.workqueue.AddRateLimited(key)
default:
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
klog.V(4).Infof("Successfully health checked '%s'", key)
}
return nil
}

if err := processItem(obj); err != nil {
runtime.HandleError(err)
return true
}
return true
}

// syncHealthCheckHandler acts on work items from the healthCheckQueue
func (c *Controller) syncHealthCheckHandler(key string) (Result, error) {
// Convert the namespace/name string into a distinct namespace and name
Expand Down

0 comments on commit 306afd6

Please sign in to comment.