Skip to content

Commit

Permalink
refactor: seperate reoncile functions
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsxu committed Feb 17, 2022
1 parent bf8638b commit b7f1f6a
Showing 1 changed file with 69 additions and 39 deletions.
108 changes: 69 additions & 39 deletions controllers/zookeepercluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package controllers
import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -44,6 +42,8 @@ type ZookeeperClusterReconciler struct {
Logger logr.Logger
}

type reconcileFunc func(ctx context.Context, zk *zookeeperv1alpha1.ZookeeperCluster) error

//+kubebuilder:rbac:groups=zookeeper.atmax.io,resources=zookeeperclusters,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=zookeeper.atmax.io,resources=zookeeperclusters/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=zookeeper.atmax.io,resources=zookeeperclusters/finalizers,verbs=update
Expand All @@ -59,7 +59,7 @@ type ZookeeperClusterReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/reconcile
func (r *ZookeeperClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.Logger = ctrl_log.FromContext(ctx)
r.Logger.Info("Starting reconcile")
r.Logger.Info("Start reconcile")

zk := &zookeeperv1alpha1.ZookeeperCluster{}

Expand All @@ -68,7 +68,22 @@ func (r *ZookeeperClusterReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, client.IgnoreNotFound(err)
}

r.Logger.Info("Creating headless service")
for _, fn := range []reconcileFunc{
r.reconcileHeadlessService,
r.reconcileStatefulSet,
r.reconcileClientService,
} {
if err := fn(ctx, zk); err != nil {
return ctrl.Result{}, err
}
}

return ctrl.Result{}, nil
}

func (r *ZookeeperClusterReconciler) reconcileHeadlessService(ctx context.Context, zk *zookeeperv1alpha1.ZookeeperCluster) error {
r.Logger.Info("Reconciling headless service")

desiredServiceHeadless := r.createHeadlessService(zk)
actualServiceHeadless := &corev1.Service{}

Expand All @@ -78,18 +93,23 @@ func (r *ZookeeperClusterReconciler) Reconcile(ctx context.Context, req ctrl.Req
}, actualServiceHeadless); err != nil {
// Not Found. Create
if apierrors.IsNotFound(err) {
r.Logger.Info("Creating headless service")
if err = r.Create(ctx, desiredServiceHeadless); err != nil {
r.Logger.Error(err, "creating headless service")
return ctrl.Result{}, err
r.Logger.Error(err, "Creating headless service")
return err
}
return ctrl.Result{Requeue: true}, nil
return nil
}
return ctrl.Result{}, err
return err
} else {
// TODO: Found. Update
}
return nil
}

func (r *ZookeeperClusterReconciler) reconcileStatefulSet(ctx context.Context, zk *zookeeperv1alpha1.ZookeeperCluster) error {
r.Logger.Info("Reconciling statefulset")

r.Logger.Info("Creating statefulset")
desiredStatefulSet := r.createStatefulSet(zk)
actualStatefulSet := &appsv1.StatefulSet{}

Expand All @@ -99,18 +119,23 @@ func (r *ZookeeperClusterReconciler) Reconcile(ctx context.Context, req ctrl.Req
}, actualStatefulSet); err != nil {
// Not Found. Create
if apierrors.IsNotFound(err) {
r.Logger.Info("Creating statefulset")
if err = r.Create(ctx, desiredStatefulSet); err != nil {
r.Logger.Error(err, "creating statefulset")
return ctrl.Result{}, err
r.Logger.Error(err, "Creating statefulset")
return err
}
return ctrl.Result{Requeue: true}, nil
return nil
}
return ctrl.Result{}, err
return err
} else {
// TODO: Found. Update
}
return nil
}

func (r *ZookeeperClusterReconciler) reconcileClientService(ctx context.Context, zk *zookeeperv1alpha1.ZookeeperCluster) error {
r.Logger.Info("Reconciling client service")

r.Logger.Info("Creating client service")
desiredServiceClient := r.createService(zk)
actualServiceClient := &corev1.Service{}

Expand All @@ -119,42 +144,47 @@ func (r *ZookeeperClusterReconciler) Reconcile(ctx context.Context, req ctrl.Req
Name: desiredServiceClient.Name,
}, actualServiceClient); err != nil {
if apierrors.IsNotFound(err) {
r.Logger.Info("Creating client service")
if err = r.Create(ctx, desiredServiceClient); err != nil {
r.Logger.Error(err, "creating client service")
return ctrl.Result{}, err
r.Logger.Error(err, "Creating client service")
return err
}
return ctrl.Result{Requeue: true}, nil
return nil
}
return ctrl.Result{}, err
return err
} else {
// TODO: Found. Update
}
return nil
}

r.Logger.Info("Updating zk status")
pods := &corev1.PodList{}
if err := r.List(ctx, pods, &client.ListOptions{
func (r *ZookeeperClusterReconciler) reconcileZookeeperClusterStatus(ctx context.Context, zk *zookeeperv1alpha1.ZookeeperCluster) error {
r.Logger.Info("Reconciling ZookeeperCluster status")

actualServiceClient := &corev1.Service{}
if err := r.Get(ctx, types.NamespacedName{
Namespace: zk.Namespace,
Name: zk.Name,
}, actualServiceClient); err != nil {
return err
}

actualPods := &corev1.PodList{}
if err := r.List(ctx, actualPods, &client.ListOptions{
Namespace: zk.Namespace,
LabelSelector: labels.SelectorFromSet(map[string]string{"app": zk.Name}),
}); err != nil {
r.Logger.Error(err, "list pods")
return ctrl.Result{}, err
}

if err := r.Get(ctx, types.NamespacedName{
Namespace: desiredStatefulSet.Namespace,
Name: desiredStatefulSet.Name,
}, actualStatefulSet); err != nil {
r.Logger.Error(err, "get client service")
return ctrl.Result{}, err
r.Logger.Error(err, "list Pods")
return err
}

zk.Status.ReadyReplicas = int32(len(pods.Items))
if len(pods.Items) > 0 && len(pods.Items[0].Status.HostIP) > 0 {
zk.Status.Address = fmt.Sprintf("%s:%d", pods.Items[0].Status.HostIP, actualServiceClient.Spec.Ports[0].NodePort)
zk.Status.ReadyReplicas = int32(len(actualPods.Items))
if len(actualPods.Items) > 0 && len(actualPods.Items[0].Status.HostIP) > 0 {
zk.Status.Address = fmt.Sprintf("%s:%d", actualPods.Items[0].Status.HostIP, actualServiceClient.Spec.Ports[0].NodePort)
}

zk.Status.Nodes = make(map[string]string)
for _, pod := range pods.Items {
for _, pod := range actualPods.Items {
podIP := pod.Status.PodIP
if len(podIP) == 0 {
continue
Expand All @@ -170,15 +200,15 @@ func (r *ZookeeperClusterReconciler) Reconcile(ctx context.Context, req ctrl.Req

if err := r.Status().Update(ctx, zk); err != nil {
r.Logger.Error(err, "update zk status")
return ctrl.Result{}, err
return err
}

if zk.Spec.Replicas == int32(len(pods.Items)) && zk.Spec.Replicas == int32(len(zk.Status.Nodes)) {
if zk.Spec.Replicas == int32(len(actualPods.Items)) && zk.Spec.Replicas == int32(len(zk.Status.Nodes)) {
r.Logger.Info("Stop reconciling")
return ctrl.Result{}, nil
return nil
}

return ctrl.Result{RequeueAfter: time.Second}, nil
return nil
}

func (r *ZookeeperClusterReconciler) createHeadlessService(zk *zookeeperv1alpha1.ZookeeperCluster) *corev1.Service {
Expand Down

0 comments on commit b7f1f6a

Please sign in to comment.