Skip to content

Commit

Permalink
Using clientset to read the Custom Resource (#1992)
Browse files Browse the repository at this point in the history
Adding clientset to read the resource
  • Loading branch information
cniackz committed Feb 29, 2024
1 parent 9ccfa57 commit 4b5381b
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 16 deletions.
61 changes: 45 additions & 16 deletions pkg/controller/job-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ package controller
import (
"context"
"fmt"
"time"

"github.com/minio/minio-go/v7/pkg/set"
"k8s.io/apimachinery/pkg/api/meta"

clientset "github.com/minio/operator/pkg/client/clientset/versioned"
jobinformers "github.com/minio/operator/pkg/client/informers/externalversions/job.min.io/v1alpha1"
joblisters "github.com/minio/operator/pkg/client/listers/job.min.io/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -31,15 +33,7 @@ type JobController struct {
statefulSetLister appslisters.StatefulSetLister
recorder record.EventRecorder
workqueue workqueue.RateLimitingInterface
}

// JobControllerInterface is an interface for the controller with the methods supported by it.
type JobControllerInterface interface {
WorkQueue() workqueue.RateLimitingInterface
KeyFunc() cache.KeyFunc
HasSynced() cache.InformerSynced
SyncHandler(ctx context.Context, name, namespace string) error
HandleObject(obj metav1.Object)
minioClientSet clientset.Interface
}

// runWorker is a long-running function that will continually call the
Expand Down Expand Up @@ -83,13 +77,31 @@ func (c *JobController) processNextJobWorkItem() bool {
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
klog.V(2).Infof("Key from workqueue: %s", key)

c.SyncHandler(key)
// Run the syncHandler, passing it the namespace/name string of the tenant.
result, err := c.SyncHandler(key)
switch {
case err != nil:
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s", key, err.Error())
case result.RequeueAfter > 0:
// The result.RequeueAfter request will be lost, if it is returned
// along with a non-nil error. But this is intended as
// We need to drive to stable reconcile loops before queuing due
// to result.RequestAfter
c.workqueue.Forget(obj)
c.workqueue.AddAfter(key, result.RequeueAfter)
case result.Requeue:
c.workqueue.AddRateLimited(key)
default:
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
}

// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
klog.V(4).Infof("Successfully synced '%s'", key)
return nil
}

Expand Down Expand Up @@ -131,6 +143,7 @@ func NewJobController(
statefulSetLister appslisters.StatefulSetLister,
recorder record.EventRecorder,
workqueue workqueue.RateLimitingInterface,
minioClientSet clientset.Interface,
) *JobController {
controller := &JobController{
namespacesToWatch: namespacesToWatch,
Expand All @@ -140,6 +153,7 @@ func NewJobController(
statefulSetLister: statefulSetLister,
recorder: recorder,
workqueue: workqueue,
minioClientSet: minioClientSet,
}

// Set up an event handler for when resources change
Expand Down Expand Up @@ -182,8 +196,23 @@ func (c *JobController) HandleObject(obj metav1.Object) {
// SyncHandler compares the current Job state with the desired, and attempts to
// converge the two. It then updates the Status block of the Job resource
// with the current status of the resource.
func (c *JobController) SyncHandler(key string) error {
klog.Info("Job Controller Loop!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")

return nil
func (c *JobController) SyncHandler(key string) (Result, error) {
// Convert the namespace/name string into a distinct namespace and name
if key == "" {
runtime.HandleError(fmt.Errorf("Invalid resource key: %s", key))
return WrapResult(Result{}, nil)
}
namespace, tenantName := key2NamespaceName(key)
jobCR, err := c.minioClientSet.JobV1alpha1().MinIOJobs(namespace).Get(context.Background(), tenantName, metav1.GetOptions{})
if err != nil {
return WrapResult(Result{RequeueAfter: time.Second * 5}, nil)
}
// Loop through the different supported operations.
for _, val := range jobCR.Spec.Commands {
operation := val.Operation
if operation == "make-bucket" {
// TODO: Initiate a job to create the bucket(s) if they do not exist and if the Tenant is prepared for it.
}
}
return WrapResult(Result{}, err)
}
1 change: 1 addition & 0 deletions pkg/controller/main-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ func NewController(
statefulSetInformer.Lister(),
recorder,
queue.NewNamedRateLimitingQueue(MinIOControllerRateLimiter(), "Tenants"),
minioClientSet,
),
},
}
Expand Down

0 comments on commit 4b5381b

Please sign in to comment.