From 645fce25013c4e5173b1322b700f9d06c33a686b Mon Sep 17 00:00:00 2001 From: whg517 Date: Fri, 24 May 2024 16:41:47 +0800 Subject: [PATCH] feat: add basic reconciler (#44) --- pkg/reconciler/cluster.go | 148 ++++++++++++++++++++++++++++++++++ pkg/reconciler/deployment.go | 74 +++++++++++++++++ pkg/reconciler/reconciler.go | 63 +++++++++++++++ pkg/reconciler/resource.go | 98 ++++++++++++++++++++++ pkg/reconciler/result.go | 31 +++++++ pkg/reconciler/service.go | 29 +++++++ pkg/reconciler/statefulset.go | 69 ++++++++++++++++ pkg/util/code_test.go | 77 ++++++++++++++++++ 8 files changed, 589 insertions(+) create mode 100644 pkg/reconciler/cluster.go create mode 100644 pkg/reconciler/deployment.go create mode 100644 pkg/reconciler/reconciler.go create mode 100644 pkg/reconciler/resource.go create mode 100644 pkg/reconciler/result.go create mode 100644 pkg/reconciler/service.go create mode 100644 pkg/reconciler/statefulset.go create mode 100644 pkg/util/code_test.go diff --git a/pkg/reconciler/cluster.go b/pkg/reconciler/cluster.go new file mode 100644 index 0000000..3b2882f --- /dev/null +++ b/pkg/reconciler/cluster.go @@ -0,0 +1,148 @@ +package reconciler + +import ( + "context" + "reflect" + + apiv1alpha1 "github.com/zncdatadev/operator-go/pkg/apis/commons/v1alpha1" + "github.com/zncdatadev/operator-go/pkg/builder" + "github.com/zncdatadev/operator-go/pkg/client" + ctrl "sigs.k8s.io/controller-runtime" +) + +var ( + logger = ctrl.Log.WithName("reconciler") +) + +type ClusterReconciler interface { + Reconciler + GetClusterOperation() *apiv1alpha1.ClusterOperationSpec + GetResources() []Reconciler + AddResource(resource Reconciler) + RegisterResources(ctx context.Context) error +} + +type BaseClusterReconciler[T AnySpec] struct { + BaseReconciler[T] + resources []Reconciler +} + +func NewBaseClusterReconciler[T AnySpec]( + client *client.Client, + options builder.Options, + spec T, +) *BaseClusterReconciler[T] { + return &BaseClusterReconciler[T]{ + BaseReconciler: BaseReconciler[T]{ + Client: client, + Options: options, + Spec: spec, + }, + } +} + +func (r *BaseClusterReconciler[T]) GetResources() []Reconciler { + return r.resources +} + +func (r *BaseClusterReconciler[T]) AddResource(resource Reconciler) { + r.resources = append(r.resources, resource) +} + +func (r *BaseClusterReconciler[T]) RegisterResources(ctx context.Context) error { + panic("unimplemented") +} + +func (r *BaseClusterReconciler[T]) Ready(ctx context.Context) Result { + for _, resource := range r.resources { + if result := resource.Ready(ctx); result.RequeueOrNot() { + return result + } + } + return NewResult(false, 0, nil) +} + +func (r *BaseClusterReconciler[T]) Reconcile(ctx context.Context) Result { + for _, resource := range r.resources { + result := resource.Reconcile(ctx) + if result.RequeueOrNot() { + return result + } + } + return NewResult(false, 0, nil) +} + +type RoleReconciler interface { + ClusterReconciler +} + +var _ RoleReconciler = &BaseRoleReconciler[AnySpec]{} + +type BaseRoleReconciler[T AnySpec] struct { + BaseClusterReconciler[T] + Options *builder.RoleOptions +} + +// MergeRoleGroupSpec +// merge right to left, if field of right not exist in left, add it to left. +// else skip it. +// merge will modify left, so left must be a pointer. +func (b *BaseRoleReconciler[T]) MergeRoleGroupSpec(roleGroup any) { + leftValue := reflect.ValueOf(roleGroup) + rightValue := reflect.ValueOf(b.Spec) + + if leftValue.Kind() == reflect.Ptr { + leftValue = leftValue.Elem() + } else { + panic("roleGroup is not a pointer") + } + + if rightValue.Kind() == reflect.Ptr { + rightValue = rightValue.Elem() + } + + for i := 0; i < rightValue.NumField(); i++ { + rightField := rightValue.Field(i) + + if rightField.IsZero() { + continue + } + rightFieldName := rightValue.Type().Field(i).Name + leftField := leftValue.FieldByName(rightFieldName) + + // if field exist in left, add it to left + if leftField.IsValid() && leftField.IsZero() { + leftValue.Set(rightField) + logger.V(5).Info("Merge role group", "field", rightFieldName, "value", rightField) + } + } +} + +func (b *BaseRoleReconciler[T]) GetClusterOperation() *apiv1alpha1.ClusterOperationSpec { + return b.Options.GetClusterOperation() +} + +func (b *BaseRoleReconciler[T]) Ready(ctx context.Context) Result { + for _, resource := range b.resources { + if result := resource.Ready(ctx); result.RequeueOrNot() { + return result + } + } + return NewResult(false, 0, nil) +} + +func NewBaseRoleReconciler[T AnySpec]( + client *client.Client, + roleOptions *builder.RoleOptions, + spec T, +) *BaseRoleReconciler[T] { + + return &BaseRoleReconciler[T]{ + BaseClusterReconciler: *NewBaseClusterReconciler[T]( + client, + roleOptions, + spec, + ), + Options: roleOptions, + } +} diff --git a/pkg/reconciler/deployment.go b/pkg/reconciler/deployment.go new file mode 100644 index 0000000..e9c4780 --- /dev/null +++ b/pkg/reconciler/deployment.go @@ -0,0 +1,74 @@ +package reconciler + +import ( + "context" + + "github.com/zncdatadev/operator-go/pkg/builder" + "github.com/zncdatadev/operator-go/pkg/client" + appv1 "k8s.io/api/apps/v1" +) + +var _ ResourceReconciler[builder.DeploymentBuilder] = &DeploymentReconciler{} + +type DeploymentReconciler struct { + GenericResourceReconciler[builder.DeploymentBuilder] + Options *builder.RoleGroupOptions +} + +// getReplicas returns the number of replicas for the role group. +// handle cluster operation stopped state. +func (r *DeploymentReconciler) getReplicas() *int32 { + clusterOperations := r.Options.GetClusterOperation() + if clusterOperations != nil && clusterOperations.Stopped { + logger.Info("Cluster operation stopped, set replicas to 0") + zero := int32(0) + return &zero + } + return nil +} + +func (r *DeploymentReconciler) Reconcile(ctx context.Context) Result { + resourceBuilder := r.GetBuilder() + replicas := r.getReplicas() + if replicas != nil { + resourceBuilder.SetReplicas(replicas) + } + resource, err := resourceBuilder.Build(ctx) + + if err != nil { + return NewResult(true, 0, err) + } + return r.ResourceReconcile(ctx, resource) +} + +func (r *DeploymentReconciler) Ready(ctx context.Context) Result { + + obj := appv1.Deployment{ + ObjectMeta: r.GetObjectMeta(), + } + logger.V(1).Info("Checking deployment ready", "namespace", obj.Namespace, "name", obj.Name) + if err := r.Client.Get(ctx, &obj); err != nil { + return NewResult(true, 0, err) + } + if obj.Status.ReadyReplicas == *obj.Spec.Replicas { + logger.Info("Deployment is ready", "namespace", obj.Namespace, "name", obj.Name, "replicas", *obj.Spec.Replicas, "readyReplicas", obj.Status.ReadyReplicas) + return NewResult(false, 0, nil) + } + logger.Info("Deployment is not ready", "namespace", obj.Namespace, "name", obj.Name, "replicas", *obj.Spec.Replicas, "readyReplicas", obj.Status.ReadyReplicas) + return NewResult(false, 5, nil) +} + +func NewDeploymentReconciler( + client *client.Client, + options *builder.RoleGroupOptions, + deployBuilder builder.DeploymentBuilder, +) *DeploymentReconciler { + return &DeploymentReconciler{ + GenericResourceReconciler: *NewGenericResourceReconciler[builder.DeploymentBuilder]( + client, + options, + deployBuilder, + ), + Options: options, + } +} diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go new file mode 100644 index 0000000..f8ba8b7 --- /dev/null +++ b/pkg/reconciler/reconciler.go @@ -0,0 +1,63 @@ +package reconciler + +import ( + "context" + + "github.com/zncdatadev/operator-go/pkg/builder" + "github.com/zncdatadev/operator-go/pkg/client" + "k8s.io/apimachinery/pkg/runtime" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +type AnySpec any + +type Reconciler interface { + GetName() string + GetNamespace() string + GetClient() *client.Client + GetCtrlClient() ctrlclient.Client + GetCtrlScheme() *runtime.Scheme + Reconcile(ctx context.Context) Result + Ready(ctx context.Context) Result +} + +var _ Reconciler = &BaseReconciler[AnySpec]{} + +type BaseReconciler[T AnySpec] struct { + // Do not use ptr, to avoid other packages to modify the client + Client *client.Client + Options builder.Options + Spec T +} + +func (b *BaseReconciler[T]) GetClient() *client.Client { + return b.Client +} + +func (b *BaseReconciler[T]) GetName() string { + return b.Options.GetFullName() +} + +func (b *BaseReconciler[T]) GetNamespace() string { + return b.Options.GetNamespace() +} + +func (b *BaseReconciler[T]) GetCtrlClient() ctrlclient.Client { + return b.Client.GetCtrlClient() +} + +func (b *BaseReconciler[T]) GetCtrlScheme() *runtime.Scheme { + return b.Client.GetCtrlScheme() +} + +func (b *BaseReconciler[T]) Ready(ctx context.Context) Result { + panic("unimplemented") +} + +func (b *BaseReconciler[T]) Reconcile(ctx context.Context) Result { + panic("unimplemented") +} + +func (b *BaseReconciler[T]) GetSpec() T { + return b.Spec +} diff --git a/pkg/reconciler/resource.go b/pkg/reconciler/resource.go new file mode 100644 index 0000000..07d5125 --- /dev/null +++ b/pkg/reconciler/resource.go @@ -0,0 +1,98 @@ +package reconciler + +import ( + "context" + "time" + + "github.com/zncdatadev/operator-go/pkg/builder" + "github.com/zncdatadev/operator-go/pkg/client" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +var ( + resourceLogger = ctrl.Log.WithName("reconciler").WithName("resource") +) + +type ResourceReconciler[B builder.Builder] interface { + Reconciler + GetObjectMeta() metav1.ObjectMeta + GetBuilder() B + ResourceReconcile(ctx context.Context, resource ctrlclient.Object) Result +} + +var _ ResourceReconciler[builder.Builder] = &GenericResourceReconciler[builder.Builder]{} + +type GenericResourceReconciler[B builder.Builder] struct { + BaseReconciler[AnySpec] + Builder B +} + +func NewGenericResourceReconciler[B builder.Builder]( + client *client.Client, + options builder.Options, + builder B, +) *GenericResourceReconciler[B] { + return &GenericResourceReconciler[B]{ + BaseReconciler: BaseReconciler[AnySpec]{ + Client: client, + Options: options, + Spec: nil, + }, + Builder: builder, + } +} + +func (r *GenericResourceReconciler[b]) GetObjectMeta() metav1.ObjectMeta { + return r.Builder.GetObjectMeta() +} + +func (r *GenericResourceReconciler[B]) GetBuilder() B { + return r.Builder +} + +func (r *GenericResourceReconciler[B]) ResourceReconcile(ctx context.Context, resource ctrlclient.Object) Result { + + if mutation, err := r.Client.CreateOrUpdate(ctx, resource); err != nil { + resourceLogger.Error(err, "Failed to create or update resource", "name", resource.GetName(), "namespace", resource.GetNamespace(), "cluster", r.Options.GetClusterName()) + return NewResult(true, 0, err) + } else if mutation { + resourceLogger.Info("Resource created or updated", "name", resource.GetName(), "namespace", resource.GetNamespace(), "cluster", r.Options.GetClusterName()) + return NewResult(true, time.Second, nil) + } + return NewResult(false, 0, nil) +} + +func (r *GenericResourceReconciler[B]) Reconcile(ctx context.Context) Result { + resource, err := r.GetBuilder().Build(ctx) + + if err != nil { + return NewResult(true, 0, err) + } + return r.ResourceReconcile(ctx, resource) +} + +func (r *GenericResourceReconciler[B]) Ready(ctx context.Context) Result { + return NewResult(false, 0, nil) +} + +type SimpleResourceReconciler[B builder.Builder] struct { + GenericResourceReconciler[B] +} + +// NewSimpleResourceReconciler creates a new resource reconciler with a simple builder +// that does not require a spec, and can not use the spec. +func NewSimpleResourceReconciler[B builder.Builder]( + client *client.Client, + clusterOptions builder.Options, + builder B, +) *SimpleResourceReconciler[B] { + return &SimpleResourceReconciler[B]{ + GenericResourceReconciler: *NewGenericResourceReconciler[B]( + client, + clusterOptions, + builder, + ), + } +} diff --git a/pkg/reconciler/result.go b/pkg/reconciler/result.go new file mode 100644 index 0000000..2230923 --- /dev/null +++ b/pkg/reconciler/result.go @@ -0,0 +1,31 @@ +package reconciler + +import ( + "time" + + ctrl "sigs.k8s.io/controller-runtime" +) + +type Result struct { + requeue bool + requeueAfter time.Duration + error error +} + +func NewResult(requeue bool, requeueAfter time.Duration, err error) Result { + return Result{requeue: requeue, requeueAfter: requeueAfter, error: err} +} + +func (r *Result) RequeueOrNot() bool { + if r.requeue || r.requeueAfter > 0 { + return true + } + return false +} + +func (r *Result) Result() (ctrl.Result, error) { + if r.RequeueOrNot() { + return ctrl.Result{Requeue: r.requeue, RequeueAfter: r.requeueAfter}, r.error + } + return ctrl.Result{}, r.error +} diff --git a/pkg/reconciler/service.go b/pkg/reconciler/service.go new file mode 100644 index 0000000..4229f8b --- /dev/null +++ b/pkg/reconciler/service.go @@ -0,0 +1,29 @@ +package reconciler + +import ( + "github.com/zncdatadev/operator-go/pkg/builder" + "github.com/zncdatadev/operator-go/pkg/client" +) + +var _ ResourceReconciler[builder.ServiceBuilder] = &GenericServiceReconciler{} + +type GenericServiceReconciler struct { + GenericResourceReconciler[builder.ServiceBuilder] +} + +func NewServiceReconciler( + client *client.Client, + options builder.Options, +) *GenericServiceReconciler { + svcBuilder := builder.NewServiceBuilder( + client, + options, + ) + return &GenericServiceReconciler{ + GenericResourceReconciler: *NewGenericResourceReconciler[builder.ServiceBuilder]( + client, + options, + svcBuilder, + ), + } +} diff --git a/pkg/reconciler/statefulset.go b/pkg/reconciler/statefulset.go new file mode 100644 index 0000000..ee6d79f --- /dev/null +++ b/pkg/reconciler/statefulset.go @@ -0,0 +1,69 @@ +package reconciler + +import ( + "context" + + "github.com/zncdatadev/operator-go/pkg/builder" + "github.com/zncdatadev/operator-go/pkg/client" + appv1 "k8s.io/api/apps/v1" +) + +var _ ResourceReconciler[builder.StatefulSetBuilder] = &StatefulSetReconciler{} + +type StatefulSetReconciler struct { + GenericResourceReconciler[builder.StatefulSetBuilder] + Options *builder.RoleGroupOptions +} + +// getReplicas returns the number of replicas for the role group. +// handle cluster operation stopped state. +func (r *StatefulSetReconciler) getReplicas() *int32 { + clusterOptions := r.Options.GetClusterOperation() + if clusterOptions != nil && clusterOptions.Stopped { + logger.Info("Cluster operation stopped, set replicas to 0") + zero := int32(0) + return &zero + } + return nil +} + +func (r *StatefulSetReconciler) Reconcile(ctx context.Context) Result { + resourceBuilder := r.GetBuilder() + resourceBuilder.SetReplicas(r.getReplicas()) + resource, err := resourceBuilder.Build(ctx) + + if err != nil { + return NewResult(true, 0, err) + } + return r.ResourceReconcile(ctx, resource) +} + +func (r *StatefulSetReconciler) Ready(ctx context.Context) Result { + obj := appv1.StatefulSet{ + ObjectMeta: r.GetObjectMeta(), + } + logger.V(1).Info("Checking statefulset ready", "namespace", obj.Namespace, "name", obj.Name) + if err := r.Client.Get(ctx, &obj); err != nil { + return NewResult(true, 0, err) + } + if obj.Status.ReadyReplicas == *obj.Spec.Replicas { + logger.Info("StatefulSet is ready", "namespace", obj.Namespace, "name", obj.Name, "replicas", *obj.Spec.Replicas, "readyReplicas", obj.Status.ReadyReplicas) + return NewResult(false, 0, nil) + } + logger.Info("StatefulSet is not ready", "namespace", obj.Namespace, "name", obj.Name, "replicas", *obj.Spec.Replicas, "readyReplicas", obj.Status.ReadyReplicas) + return NewResult(false, 5, nil) +} + +func NewStatefulSetReconciler( + client *client.Client, + options *builder.RoleGroupOptions, + stsBuilder builder.StatefulSetBuilder, +) *StatefulSetReconciler { + return &StatefulSetReconciler{ + GenericResourceReconciler: *NewGenericResourceReconciler[builder.StatefulSetBuilder]( + client, + options, + stsBuilder, + ), + } +} diff --git a/pkg/util/code_test.go b/pkg/util/code_test.go new file mode 100644 index 0000000..9b290e7 --- /dev/null +++ b/pkg/util/code_test.go @@ -0,0 +1,77 @@ +package util + +import "testing" + +func TestIndentTabToSpaces(t *testing.T) { + tests := []struct { + name string + code string + spaces int + expected string + }{ + { + name: "Converts tabs to 4 spaces", + code: "\tfunc main() {}", + spaces: 4, + expected: " func main() {}", + }, + { + name: "Converts tabs to 2 spaces", + code: "\tfunc main() {}", + spaces: 2, + expected: " func main() {}", + }, + { + name: "Ignores spaces", + code: " func main() {}", + spaces: 4, + expected: " func main() {}", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := IndentTabToSpaces(tt.code, tt.spaces) + if result != tt.expected { + t.Errorf("Expected '%s', got '%s'", tt.expected, result) + } + }) + } +} + +func TestIndentSpacesToTab(t *testing.T) { + tests := []struct { + name string + code string + spaces int + expected string + }{ + { + name: "Converts 4 spaces to tabs", + code: " func main() {}", + spaces: 4, + expected: "\tfunc main() {}", + }, + { + name: "Converts 2 spaces to tabs", + code: " func main() {}", + spaces: 2, + expected: "\tfunc main() {}", + }, + { + name: "Ignores tabs", + code: "\tfunc main() {}", + spaces: 4, + expected: "\tfunc main() {}", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := IndentSpacesToTab(tt.code, tt.spaces) + if result != tt.expected { + t.Errorf("Expected '%s', got '%s'", tt.expected, result) + } + }) + } +}