diff --git a/Makefile b/Makefile index a631f7c..052ec2e 100644 --- a/Makefile +++ b/Makefile @@ -54,3 +54,21 @@ filelicense: install-addlicense $(ADDLICENSE_BIN) -y $(shell date +"%Y") -c "sealyun." -f hack/LICENSE ./$$file ; \ fi \ done + + +DEEPCOPY_BIN = $(shell pwd)/bin/deepcopy-gen +install-deepcopy: ## check license if not exist install go-lint tools + $(call go-get-tool,$(DEEPCOPY_BIN),k8s.io/code-generator/cmd/deepcopy-gen@latest) + +HEAD_FILE := hack/boilerplate.go.txt +INPUT_DIR := github.com/sealyun/endpoints-operator/api/network/v1beta1 +deepcopy:install-deepcopy + $(DEEPCOPY_BIN) \ + --input-dirs="$(INPUT_DIR)" \ + -O zz_generated.deepcopy \ + --go-header-file "$(HEAD_FILE)" \ + --output-base "${GOPATH}/src" + +# Generate manifests e.g. CRD, RBAC etc. +manifests: + controller-gen crd:trivialVersions=true rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd diff --git a/api/network/v1beta1/doc.go b/api/network/v1beta1/doc.go new file mode 100644 index 0000000..ce872ac --- /dev/null +++ b/api/network/v1beta1/doc.go @@ -0,0 +1,26 @@ +/* +Copyright 2022 cuisongliu@qq.com. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by api-gen. EDIT THIS FILE! + +package v1beta1 + +// +k8s:deepcopy-gen=package,register +// +k8s:protobuf-gen=package +// +k8s:openapi-gen=false +// +k8s:defaulter-gen=TypeMeta + +// +groupName=sealyun.com diff --git a/api/network/v1beta1/register.go b/api/network/v1beta1/register.go new file mode 100644 index 0000000..fcf5edd --- /dev/null +++ b/api/network/v1beta1/register.go @@ -0,0 +1,66 @@ +/* +Copyright 2022 cuisongliu@qq.com. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by register-gen. DO NOT EDIT. + +package v1beta1 + +import ( + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// GroupName specifies the group name used to register the objects. +const GroupName = "sealyun.com" + +// GroupVersion specifies the group and the version used to register the objects. +var GroupVersion = v1.GroupVersion{Group: GroupName, Version: "v1beta1"} + +// SchemeGroupVersion is group version used to register these objects +// Deprecated: use GroupVersion instead. +var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1beta1"} + +// Resource takes an unqualified resource and returns a Group qualified GroupResource +func Resource(resource string) schema.GroupResource { + return SchemeGroupVersion.WithResource(resource).GroupResource() +} + +var ( + // localSchemeBuilder and AddToScheme will stay in k8s.io/kubernetes. + SchemeBuilder runtime.SchemeBuilder + localSchemeBuilder = &SchemeBuilder + // Depreciated: use Install instead + AddToScheme = localSchemeBuilder.AddToScheme + Install = localSchemeBuilder.AddToScheme +) + +func init() { + // We only register manually written functions here. The registration of the + // generated functions takes place in the generated files. The separation + // makes the code compile even when the generated files are missing. + localSchemeBuilder.Register(addKnownTypes) +} + +// Adds the list of known types to Scheme. +func addKnownTypes(scheme *runtime.Scheme) error { + scheme.AddKnownTypes(SchemeGroupVersion, + &ClusterEndpoint{}, + &ClusterEndpointList{}, + ) + // AddToGroupVersion allows the serialization of client types like ListOptions. + v1.AddToGroupVersion(scheme, SchemeGroupVersion) + return nil +} diff --git a/api/network/v1beta1/types.go b/api/network/v1beta1/types.go new file mode 100644 index 0000000..63cfbe1 --- /dev/null +++ b/api/network/v1beta1/types.go @@ -0,0 +1,137 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by api-gen. EDIT THIS FILE! + +package v1beta1 + +import ( + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// ServicePort contains information on service's port. +type ServicePort struct { + // The name of this port within the service. This must be a DNS_LABEL. + // All ports within a ServiceSpec must have unique names. When considering + // the endpoints for a Service, this must match the 'name' field in the + // EndpointPort. + // Optional if only one ServicePort is defined on this service. + // +optional + Name string `json:"name,omitempty" protobuf:"bytes,1,opt,name=name"` + + // The IP protocol for this port. Supports "TCP", "UDP", and "SCTP". + // Default is TCP. + // +optional + Protocol v1.Protocol `json:"protocol,omitempty" protobuf:"bytes,2,opt,name=protocol,casttype=Protocol"` + + // The port that will be exposed by this service. + Port int32 `json:"port" protobuf:"varint,3,opt,name=port"` + + // Number or name of the port to access on the pods targeted by the service. + // Number must be in the range 1 to 65535. Name must be an IANA_SVC_NAME. + // If this is a string, it will be looked up as a named port in the + // target Pod's container ports. If this is not specified, the value + // of the 'port' field is used (an identity map). + TargetPort int32 `json:"targetPort" protobuf:"varint,4,opt,name=targetPort"` +} + +// ClusterEndpointSpec defines the desired state of ClusterEndpoint +type ClusterEndpointSpec struct { + ClusterIP string `json:"clusterIP" protobuf:"bytes,1,opt,name=clusterIP"` + + Ports []ServicePort `json:"ports,omitempty" patchStrategy:"merge" patchMergeKey:"port" protobuf:"bytes,2,rep,name=ports"` + Hosts []string `json:"hosts,omitempty" patchStrategy:"merge" patchMergeKey:"host" protobuf:"bytes,3,rep,name=hosts"` + // How often (in seconds) to perform the probe. + // Default to 10 seconds. Minimum value is 1. + // +optional + PeriodSeconds int32 `json:"periodSeconds,omitempty" protobuf:"varint,4,opt,name=periodSeconds"` + Probes []v1.Handler `json:"probes,omitempty" protobuf:"bytes,5,rep,name=probes"` +} + +type Phase string + +// These are the valid phases of node. +const ( + // Pending means the node has been created/added by the system. + Pending Phase = "Pending" + // Healthy means the cluster service is healthy. + Healthy Phase = "Healthy" + // UnHealthy means the cluster service is not healthy. + UnHealthy Phase = "UnHealthy" +) + +type ConditionType string + +const ( + ProbesReady ConditionType = "ProbesReady" + SyncServiceReady ConditionType = "SyncServiceReady" + SyncEndpointReady ConditionType = "SyncEndpointReady" + Initialized ConditionType = "Initialized" + Ready ConditionType = "Ready" +) + +type Condition struct { + Type ConditionType `json:"type" protobuf:"bytes,1,opt,name=type,casttype=ConditionType"` + // Status is the status of the condition. One of True, False, Unknown. + Status v1.ConditionStatus `json:"status" protobuf:"bytes,2,opt,name=status,casttype=ConditionStatus"` + // LastHeartbeatTime is the last time this condition was updated. + // +optional + LastHeartbeatTime metav1.Time `json:"lastHeartbeatTime,omitempty" protobuf:"bytes,3,opt,name=lastHeartbeatTime"` + // LastTransitionTime is the last time the condition changed from one status to another. + // +optional + LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty" protobuf:"bytes,4,opt,name=lastTransitionTime"` + // Reason is a (brief) reason for the condition's last status change. + // +optional + Reason string `json:"reason,omitempty" protobuf:"bytes,5,opt,name=reason"` + // Message is a human-readable message indicating details about the last status change. + // +optional + Message string `json:"message,omitempty" protobuf:"bytes,6,opt,name=message"` +} + +// ClusterEndpointStatus defines the observed state of ClusterEndpoint +type ClusterEndpointStatus struct { + // Phase is the recently observed lifecycle phase of the cluster endpoints. + Phase Phase `json:"phase,omitempty" protobuf:"bytes,1,opt,name=phase,casttype=Phase"` + // Conditions contains the different condition statuses for this workspace. + Conditions []Condition `json:"conditions" protobuf:"bytes,3,rep,name=conditions"` +} + +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status +// +kubebuilder:resource:shortName=cep +// +kubebuilder:printcolumn:name="Age",type=date,description="The creation date",JSONPath=`.metadata.creationTimestamp`,priority=0 +// +kubebuilder:printcolumn:name="Status",type=string,description="The status",JSONPath=`.status.phase`,priority=0 +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// ClusterEndpoint is the Schema for the tests API +type ClusterEndpoint struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + + Spec ClusterEndpointSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"` + Status ClusterEndpointStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"` +} + +// +kubebuilder:object:root=true +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// ClusterEndpointList contains a list of ClusterEndpoint +type ClusterEndpointList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []ClusterEndpoint `json:"items" protobuf:"bytes,2,opt,name=items"` +} diff --git a/api/network/v1beta1/zz_generated.deepcopy.go b/api/network/v1beta1/zz_generated.deepcopy.go new file mode 100644 index 0000000..8e6a28c --- /dev/null +++ b/api/network/v1beta1/zz_generated.deepcopy.go @@ -0,0 +1,176 @@ +// +build !ignore_autogenerated + +/* +Copyright 2022 cuisongliu@qq.com. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by deepcopy-gen. DO NOT EDIT. + +package v1beta1 + +import ( + v1 "k8s.io/api/core/v1" + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterEndpoint) DeepCopyInto(out *ClusterEndpoint) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterEndpoint. +func (in *ClusterEndpoint) DeepCopy() *ClusterEndpoint { + if in == nil { + return nil + } + out := new(ClusterEndpoint) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ClusterEndpoint) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterEndpointList) DeepCopyInto(out *ClusterEndpointList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]ClusterEndpoint, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterEndpointList. +func (in *ClusterEndpointList) DeepCopy() *ClusterEndpointList { + if in == nil { + return nil + } + out := new(ClusterEndpointList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ClusterEndpointList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterEndpointSpec) DeepCopyInto(out *ClusterEndpointSpec) { + *out = *in + if in.Ports != nil { + in, out := &in.Ports, &out.Ports + *out = make([]ServicePort, len(*in)) + copy(*out, *in) + } + if in.Hosts != nil { + in, out := &in.Hosts, &out.Hosts + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.Probes != nil { + in, out := &in.Probes, &out.Probes + *out = make([]v1.Handler, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterEndpointSpec. +func (in *ClusterEndpointSpec) DeepCopy() *ClusterEndpointSpec { + if in == nil { + return nil + } + out := new(ClusterEndpointSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterEndpointStatus) DeepCopyInto(out *ClusterEndpointStatus) { + *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterEndpointStatus. +func (in *ClusterEndpointStatus) DeepCopy() *ClusterEndpointStatus { + if in == nil { + return nil + } + out := new(ClusterEndpointStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Condition) DeepCopyInto(out *Condition) { + *out = *in + in.LastHeartbeatTime.DeepCopyInto(&out.LastHeartbeatTime) + in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Condition. +func (in *Condition) DeepCopy() *Condition { + if in == nil { + return nil + } + out := new(Condition) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ServicePort) DeepCopyInto(out *ServicePort) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ServicePort. +func (in *ServicePort) DeepCopy() *ServicePort { + if in == nil { + return nil + } + out := new(ServicePort) + in.DeepCopyInto(out) + return out +} diff --git a/cmd/app/server.go b/cmd/app/server.go index 9494346..95b76de 100644 --- a/cmd/app/server.go +++ b/cmd/app/server.go @@ -101,7 +101,7 @@ func run(s *options.Options, stopCh <-chan struct{}) error { } controllers.Install(scheme) - clusterReconciler := &controllers.ServiceReconciler{} + clusterReconciler := &controllers.Reconciler{} if err = clusterReconciler.SetupWithManager(mgr); err != nil { klog.Fatal("Unable to create cluster controller ", err) diff --git a/config/crd/sealyun.com_clusterendpoints.yaml b/config/crd/sealyun.com_clusterendpoints.yaml new file mode 100644 index 0000000..f3150c8 --- /dev/null +++ b/config/crd/sealyun.com_clusterendpoints.yaml @@ -0,0 +1,252 @@ +# Copyright © 2022 sealyun. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: (devel) + creationTimestamp: null + name: clusterendpoints.sealyun.com +spec: + group: sealyun.com + names: + kind: ClusterEndpoint + listKind: ClusterEndpointList + plural: clusterendpoints + shortNames: + - cep + singular: clusterendpoint + scope: Namespaced + versions: + - additionalPrinterColumns: + - description: The creation date + jsonPath: .metadata.creationTimestamp + name: Age + type: date + - description: The status + jsonPath: .status.phase + name: Status + type: string + name: v1beta1 + schema: + openAPIV3Schema: + description: ClusterEndpoint is the Schema for the tests API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: ClusterEndpointSpec defines the desired state of ClusterEndpoint + properties: + clusterIP: + type: string + hosts: + items: + type: string + type: array + periodSeconds: + description: How often (in seconds) to perform the probe. Default + to 10 seconds. Minimum value is 1. + format: int32 + type: integer + ports: + items: + description: ServicePort contains information on service's port. + properties: + name: + description: The name of this port within the service. This + must be a DNS_LABEL. All ports within a ServiceSpec must have + unique names. When considering the endpoints for a Service, + this must match the 'name' field in the EndpointPort. Optional + if only one ServicePort is defined on this service. + type: string + port: + description: The port that will be exposed by this service. + format: int32 + type: integer + protocol: + description: The IP protocol for this port. Supports "TCP", + "UDP", and "SCTP". Default is TCP. + type: string + targetPort: + description: Number or name of the port to access on the pods + targeted by the service. Number must be in the range 1 to + 65535. Name must be an IANA_SVC_NAME. If this is a string, + it will be looked up as a named port in the target Pod's container + ports. If this is not specified, the value of the 'port' field + is used (an identity map). + format: int32 + type: integer + required: + - port + - targetPort + type: object + type: array + probes: + items: + description: 'Handler defines a specific action that should be taken + TODO: pass structured data to these actions, and document that + data here.' + properties: + exec: + description: One and only one of the following should be specified. + Exec specifies the action to take. + properties: + command: + description: Command is the command line to execute inside + the container, the working directory for the command is + root ('/') in the container's filesystem. The command + is simply exec'd, it is not run inside a shell, so traditional + shell instructions ('|', etc) won't work. To use a shell, + you need to explicitly call out to that shell. Exit status + of 0 is treated as live/healthy and non-zero is unhealthy. + items: + type: string + type: array + type: object + httpGet: + description: HTTPGet specifies the http request to perform. + properties: + host: + description: Host name to connect to, defaults to the pod + IP. You probably want to set "Host" in httpHeaders instead. + type: string + httpHeaders: + description: Custom headers to set in the request. HTTP + allows repeated headers. + items: + description: HTTPHeader describes a custom header to be + used in HTTP probes + properties: + name: + description: The header field name + type: string + value: + description: The header field value + type: string + required: + - name + - value + type: object + type: array + path: + description: Path to access on the HTTP server. + type: string + port: + anyOf: + - type: integer + - type: string + description: Name or number of the port to access on the + container. Number must be in the range 1 to 65535. Name + must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true + scheme: + description: Scheme to use for connecting to the host. Defaults + to HTTP. + type: string + required: + - port + type: object + tcpSocket: + description: 'TCPSocket specifies an action involving a TCP + port. TCP hooks not yet supported TODO: implement a realistic + TCP lifecycle hook' + properties: + host: + description: 'Optional: Host name to connect to, defaults + to the pod IP.' + type: string + port: + anyOf: + - type: integer + - type: string + description: Number or name of the port to access on the + container. Number must be in the range 1 to 65535. Name + must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true + required: + - port + type: object + type: object + type: array + required: + - clusterIP + type: object + status: + description: ClusterEndpointStatus defines the observed state of ClusterEndpoint + properties: + conditions: + description: Conditions contains the different condition statuses + for this workspace. + items: + properties: + lastHeartbeatTime: + description: LastHeartbeatTime is the last time this condition + was updated. + format: date-time + type: string + lastTransitionTime: + description: LastTransitionTime is the last time the condition + changed from one status to another. + format: date-time + type: string + message: + description: Message is a human-readable message indicating + details about the last status change. + type: string + reason: + description: Reason is a (brief) reason for the condition's + last status change. + type: string + status: + description: Status is the status of the condition. One of True, + False, Unknown. + type: string + type: + type: string + required: + - status + - type + type: object + type: array + phase: + description: Phase is the recently observed lifecycle phase of the + cluster endpoints. + type: string + required: + - conditions + type: object + type: object + served: true + storage: true + subresources: + status: {} +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] diff --git a/config/demo/dmz-kube.yaml b/config/demo/dmz-kube.yaml new file mode 100644 index 0000000..e9623f8 --- /dev/null +++ b/config/demo/dmz-kube.yaml @@ -0,0 +1,28 @@ +# Copyright © 2022 sealyun. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: sealyun.com/v1beta1 +kind: ClusterEndpoint +metadata: + name: dmz-kube +spec: + clusterIP: 10.96.0.100 + periodSeconds: 10 +# hosts: +# - 10.0.112.251 + ports: + - name: https + port: 6443 + protocol: TCP + targetPort: 6443 diff --git a/controllers/controller.go b/controllers/controller.go new file mode 100644 index 0000000..b0bc9f2 --- /dev/null +++ b/controllers/controller.go @@ -0,0 +1,322 @@ +/* +Copyright 2022 cuisongliu@qq.com. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "github.com/go-logr/logr" + "github.com/sealyun/endpoints-operator/api/network/v1beta1" + "github.com/sealyun/endpoints-operator/library/controller" + "github.com/sealyun/endpoints-operator/library/convert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/retry" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/source" + "time" +) + +const ( + controllerName = "cluster_endpoints_controller" +) + +// Reconciler reconciles a Service object +type Reconciler struct { + client.Client + Logger logr.Logger + Recorder record.EventRecorder + cache cache.Cache + scheme *runtime.Scheme + LoopCount int +} + +func (r *Reconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { + rootCtx := context.Background() + r.Logger.V(4).Info("start reconcile for ceps") + ceps := &v1beta1.ClusterEndpoint{} + ctr := controller.Controller{ + Client: r.Client, + Eventer: r.Recorder, + Operator: r, + Gvk: schema.GroupVersionKind{ + Group: v1beta1.GroupVersion.Group, + Version: v1beta1.GroupVersion.Version, + Kind: "ClusterEndpoint", + }, + FinalizerName: "sealyun.com/cluster-endpoints.finalizers", + } + ceps.APIVersion = ctr.Gvk.GroupVersion().String() + ceps.Kind = ctr.Gvk.Kind + return ctr.Run(rootCtx, req, ceps) +} + +func (c *Reconciler) SetupWithManager(mgr ctrl.Manager) error { + if c.Client == nil { + c.Client = mgr.GetClient() + } + if c.Logger == nil { + c.Logger = log.Log.WithName(controllerName) + } + if c.Recorder == nil { + c.Recorder = mgr.GetEventRecorderFor(controllerName) + } + c.scheme = mgr.GetScheme() + c.cache = mgr.GetCache() + c.Logger.V(4).Info("init reconcile controller service") + owner := &handler.EnqueueRequestForOwner{OwnerType: &v1beta1.ClusterEndpoint{}, IsController: false} + return ctrl.NewControllerManagedBy(mgr).WithEventFilter(&ResourceChangedPredicate{}). + Watches(&source.Kind{Type: &corev1.Service{}}, owner). + For(&v1beta1.ClusterEndpoint{}).Complete(c) +} + +func (c *Reconciler) Update(ctx context.Context, req ctrl.Request, gvk schema.GroupVersionKind, obj runtime.Object) (ctrl.Result, error) { + c.Logger.V(4).Info("update reconcile controller service", "request", req) + cep := &v1beta1.ClusterEndpoint{} + err := convert.JsonConvert(obj, cep) + if err != nil { + return ctrl.Result{Requeue: true}, err + } + return c.UpdateStatus(ctx, req, cep) +} + +func (c *Reconciler) UpdateStatus(ctx context.Context, req ctrl.Request, cep *v1beta1.ClusterEndpoint) (ctrl.Result, error) { + initializedCondition := v1beta1.Condition{ + Type: v1beta1.Initialized, + Status: corev1.ConditionTrue, + Reason: string(v1beta1.Initialized), + Message: "cluster endpoints has been initialized", + LastHeartbeatTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + } + cep.Status.Phase = v1beta1.Pending + if !isConditionTrue(cep, v1beta1.Initialized) { + c.updateCondition(cep, initializedCondition) + } + + c.syncService(ctx, cep) + c.syncEndpoint(ctx, cep) + + c.Logger.V(4).Info("update finished reconcile controller service", "request", req) + c.syncFinalStatus(cep) + err := c.updateStatus(ctx, req.NamespacedName, &cep.Status) + if err != nil { + c.Recorder.Eventf(cep, corev1.EventTypeWarning, "SyncStatus", "Sync status %s is error: %v", cep.Name, err) + } + sec := time.Duration(cep.Spec.PeriodSeconds) * time.Second + return ctrl.Result{RequeueAfter: sec}, nil +} + +func (c *Reconciler) syncService(ctx context.Context, cep *v1beta1.ClusterEndpoint) { + serviceCondition := v1beta1.Condition{ + Type: v1beta1.SyncServiceReady, + Status: corev1.ConditionTrue, + LastHeartbeatTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + Reason: string(v1beta1.SyncServiceReady), + Message: "sync service successfully", + } + + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + svc := &corev1.Service{} + svc.SetName(cep.Name) + svc.SetNamespace(cep.Namespace) + _, err := controllerutil.CreateOrUpdate(ctx, c.Client, svc, func() error { + svc.Labels = map[string]string{} + if err := controllerutil.SetControllerReference(cep, svc, c.scheme); err != nil { + return err + } + svc.Spec.ClusterIP = cep.Spec.ClusterIP + svc.Spec.Type = corev1.ServiceTypeClusterIP + svc.Spec.SessionAffinity = corev1.ServiceAffinityNone + svc.Spec.Ports = convertServicePorts(cep.Spec.Ports) + return nil + }) + return err + }); err != nil { + serviceCondition.LastHeartbeatTime = metav1.Now() + serviceCondition.Status = corev1.ConditionFalse + serviceCondition.Reason = "ServiceSyncError" + serviceCondition.Message = err.Error() + c.updateCondition(cep, serviceCondition) + c.Logger.V(4).Info("error updating service", "name", cep.Name, "msg", err.Error()) + return + } + if !isConditionTrue(cep, v1beta1.SyncServiceReady) { + c.updateCondition(cep, serviceCondition) + } +} +func (c *Reconciler) syncEndpoint(ctx context.Context, cep *v1beta1.ClusterEndpoint) { + endpointCondition := v1beta1.Condition{ + Type: v1beta1.SyncEndpointReady, + Status: corev1.ConditionTrue, + LastHeartbeatTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + Reason: string(v1beta1.SyncEndpointReady), + Message: "sync endpoint successfully", + } + + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + ep := &corev1.Endpoints{} + ep.SetName(cep.Name) + ep.SetNamespace(cep.Namespace) + _, err := controllerutil.CreateOrUpdate(ctx, c.Client, ep, func() error { + ep.Labels = map[string]string{} + ep.Subsets = make([]corev1.EndpointSubset, 0) + hosts := convertAddress(cep.Spec.Hosts) + if len(hosts) != 0 { + es := corev1.EndpointSubset{ + Addresses: hosts, + Ports: convertPorts(cep.Spec.Ports), + } + ep.Subsets = append(ep.Subsets, es) + } + return nil + }) + return err + }); err != nil { + endpointCondition.LastHeartbeatTime = metav1.Now() + endpointCondition.Status = corev1.ConditionFalse + endpointCondition.Reason = "EndpointSyncError" + endpointCondition.Message = err.Error() + c.updateCondition(cep, endpointCondition) + c.Logger.V(4).Info("error updating endpoint", "name", cep.Name, "msg", err.Error()) + return + } + if !isConditionTrue(cep, v1beta1.SyncEndpointReady) { + c.updateCondition(cep, endpointCondition) + } +} +func (c *Reconciler) updateStatus(ctx context.Context, nn types.NamespacedName, status *v1beta1.ClusterEndpointStatus) error { + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + original := &v1beta1.ClusterEndpoint{} + if err := c.Get(ctx, nn, original); err != nil { + return err + } + original.Status = *status + if err := c.Client.Status().Update(ctx, original); err != nil { + return err + } + return nil + }); err != nil { + return err + } + return nil +} +func (c *Reconciler) syncFinalStatus(cep *v1beta1.ClusterEndpoint) { + clusterReadyCondition := v1beta1.Condition{ + Type: v1beta1.Ready, + Status: corev1.ConditionTrue, + LastHeartbeatTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + Reason: string(v1beta1.Ready), + Message: "ClusterEndpoint is available now", + } + if isConditionsTrue(cep) { + cep.Status.Phase = v1beta1.Healthy + } else { + clusterReadyCondition.LastHeartbeatTime = metav1.Now() + clusterReadyCondition.Status = corev1.ConditionFalse + clusterReadyCondition.Reason = "Not" + string(v1beta1.Ready) + clusterReadyCondition.Message = "ClusterEndpoint is not available now" + cep.Status.Phase = v1beta1.UnHealthy + } + c.updateCondition(cep, clusterReadyCondition) +} + +// updateCondition updates condition in cluster conditions using giving condition +// adds condition if not existed +func (c *Reconciler) updateCondition(cep *v1beta1.ClusterEndpoint, condition v1beta1.Condition) { + if cep.Status.Conditions == nil { + cep.Status.Conditions = make([]v1beta1.Condition, 0) + } + hasCondition := false + for i, cond := range cep.Status.Conditions { + if cond.Type == condition.Type { + hasCondition = true + if cond.Reason != condition.Reason || cond.Status != condition.Status { + cep.Status.Conditions[i] = condition + } + } + } + if !hasCondition { + cep.Status.Conditions = append(cep.Status.Conditions, condition) + } +} +func (c *Reconciler) deleteCondition(cep *v1beta1.ClusterEndpoint, conditionType v1beta1.ConditionType) { + if cep.Status.Conditions == nil { + cep.Status.Conditions = make([]v1beta1.Condition, 0) + } + newConditions := make([]v1beta1.Condition, 0) + for _, cond := range cep.Status.Conditions { + if cond.Type == conditionType { + continue + } + newConditions = append(newConditions, cond) + } + cep.Status.Conditions = newConditions +} + +func convertAddress(addresses []string) []corev1.EndpointAddress { + eas := make([]corev1.EndpointAddress, 0) + for _, s := range addresses { + eas = append(eas, corev1.EndpointAddress{ + IP: s, + }) + } + return eas +} + +func convertPorts(sps []v1beta1.ServicePort) []corev1.EndpointPort { + s := make([]corev1.EndpointPort, 0) + for _, sp := range sps { + endPoint := corev1.EndpointPort{ + Name: sp.Name, + Port: sp.TargetPort, + Protocol: sp.Protocol, + } + s = append(s, endPoint) + } + return s +} + +func convertServicePorts(sps []v1beta1.ServicePort) []corev1.ServicePort { + s := make([]corev1.ServicePort, 0) + for _, sp := range sps { + endPoint := corev1.ServicePort{ + Name: sp.Name, + Port: sp.Port, + Protocol: sp.Protocol, + TargetPort: intstr.FromInt(int(sp.TargetPort)), + } + s = append(s, endPoint) + } + return s +} + +func (c *Reconciler) Delete(ctx context.Context, req ctrl.Request, gvk schema.GroupVersionKind, obj runtime.Object) error { + return nil +} diff --git a/controllers/healthy.go b/controllers/healthy.go index f6dcadd..15e147a 100644 --- a/controllers/healthy.go +++ b/controllers/healthy.go @@ -18,5 +18,5 @@ package controllers const ( annotationServer = "sealyun.com/server" - annotationHTTP = "sealyun.com/http" //interval=5s,port=6443,schema=https:,path=/healthz + annotationProb = "sealyun.com/prob" //type=httpGet,interval=5s,port=6443,schema=https,path=/healthz;type=tcpSocket,interval=5s,port=6443,host=10.0.0.1 ) diff --git a/controllers/helper.go b/controllers/helper.go new file mode 100644 index 0000000..36b3fdc --- /dev/null +++ b/controllers/helper.go @@ -0,0 +1,42 @@ +/* +Copyright 2020 KubeSphere Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "github.com/sealyun/endpoints-operator/api/network/v1beta1" + v1 "k8s.io/api/core/v1" +) + +func isConditionTrue(ce *v1beta1.ClusterEndpoint, conditionType v1beta1.ConditionType) bool { + for _, condition := range ce.Status.Conditions { + if condition.Type == conditionType && condition.Status == v1.ConditionTrue { + return true + } + } + return false +} +func isConditionsTrue(ce *v1beta1.ClusterEndpoint) bool { + if len(ce.Status.Conditions) == 0 { + return false + } + for _, condition := range ce.Status.Conditions { + if condition.Status != v1.ConditionTrue { + return false + } + } + return true +} diff --git a/controllers/install.go b/controllers/install.go index e098d0d..e982f99 100644 --- a/controllers/install.go +++ b/controllers/install.go @@ -17,6 +17,7 @@ limitations under the License. package controllers import ( + "github.com/sealyun/endpoints-operator/api/network/v1beta1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" k8sruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -24,4 +25,5 @@ import ( func Install(scheme *runtime.Scheme) { k8sruntime.Must(v1.AddToScheme(scheme)) + k8sruntime.Must(v1beta1.Install(scheme)) } diff --git a/controllers/predicate.go b/controllers/predicate.go new file mode 100644 index 0000000..5e04060 --- /dev/null +++ b/controllers/predicate.go @@ -0,0 +1,42 @@ +// Copyright © 2022 sealyun. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controllers + +import ( + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +type ResourceChangedPredicate struct { + predicate.Funcs +} + +func (rl *ResourceChangedPredicate) Update(e event.UpdateEvent) bool { + return true +} + +func (rl *ResourceChangedPredicate) Create(e event.CreateEvent) bool { + return true +} + +// Delete returns true if the Delete event should be processed +func (rl *ResourceChangedPredicate) Delete(e event.DeleteEvent) bool { + return true +} + +// Generic returns true if the Generic event should be processed +func (rl *ResourceChangedPredicate) Generic(e event.GenericEvent) bool { + return true +} diff --git a/controllers/probe.go b/controllers/run_probe.go similarity index 99% rename from controllers/probe.go rename to controllers/run_probe.go index 1ce3318..a37972d 100644 --- a/controllers/probe.go +++ b/controllers/run_probe.go @@ -51,6 +51,7 @@ func newProber() *prober { tcp: tcpprobe.New(), } } + func (pb *prober) runProbe(p *v1.Probe) (probe.Result, string, error) { timeout := time.Duration(p.TimeoutSeconds) * time.Second if p.Exec != nil { diff --git a/controllers/service_controller.go b/controllers/service_controller.go deleted file mode 100644 index 559dcfd..0000000 --- a/controllers/service_controller.go +++ /dev/null @@ -1,179 +0,0 @@ -/* -Copyright 2022 cuisongliu@qq.com. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controllers - -import ( - "context" - "github.com/sealyun/endpoints-operator/library/controller" - "github.com/sealyun/endpoints-operator/library/convert" - "github.com/sealyun/endpoints-operator/library/hash" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/tools/record" - "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "sigs.k8s.io/controller-runtime/pkg/log" - "strings" - - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/predicate" - - "github.com/go-logr/logr" - corev1 "k8s.io/api/core/v1" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -const ( - controllerName = "service_controller" -) - -// ServiceReconciler reconciles a Service object -type ServiceReconciler struct { - client.Client - Logger logr.Logger - Recorder record.EventRecorder - cache cache.Cache - LoopCount int -} - -func (r *ServiceReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { - rootCtx := context.Background() - r.Logger.V(4).Info("start reconcile for service") - service := &corev1.Service{} - ctr := controller.Controller{ - Client: r.Client, - Eventer: r.Recorder, - Operator: r, - Gvk: schema.GroupVersionKind{ - Group: corev1.SchemeGroupVersion.Group, - Version: corev1.SchemeGroupVersion.Version, - Kind: "Service", - }, - FinalizerName: "sealyun.com/endpoints-operator.finalizers", - } - service.APIVersion = ctr.Gvk.GroupVersion().String() - service.Kind = ctr.Gvk.Kind - return ctr.Run(rootCtx, req, service) -} - -func (c *ServiceReconciler) SetupWithManager(mgr ctrl.Manager) error { - if c.Client == nil { - c.Client = mgr.GetClient() - } - if c.Logger == nil { - c.Logger = log.Log.WithName(controllerName) - } - if c.Recorder == nil { - c.Recorder = mgr.GetEventRecorderFor(controllerName) - } - c.cache = mgr.GetCache() - c.Logger.V(4).Info("init reconcile controller service") - return ctrl.NewControllerManagedBy(mgr). - For(&corev1.Service{}).WithEventFilter(predicate.Funcs{ - CreateFunc: func(event event.CreateEvent) bool { - return c.processService(event.Meta) - }, - DeleteFunc: func(deleteEvent event.DeleteEvent) bool { - return c.processService(deleteEvent.Meta) - }, - UpdateFunc: func(updateEvent event.UpdateEvent) bool { - if c.processService(updateEvent.MetaOld) { - if newService, ok := updateEvent.ObjectNew.(*corev1.Service); ok { - if oldService, ok := updateEvent.ObjectOld.(*corev1.Service); ok { - if newService.DeletionTimestamp != nil { - return true - } - newhash := hash.Hash(newService.Annotations) - oldhash := hash.Hash(oldService.Annotations) - if newhash != oldhash { - return true - } - } - } - } - return false - }, - }).Complete(c) -} - -func (r *ServiceReconciler) processService(meta metav1.Object) bool { - if meta.GetAnnotations() != nil && meta.GetAnnotations()[annotationServer] != "" { - return true - } - return false -} - -func (c *ServiceReconciler) Update(ctx context.Context, req ctrl.Request, gvk schema.GroupVersionKind, obj runtime.Object) (ctrl.Result, error) { - c.Logger.V(4).Info("update reconcile controller service", "request", req) - svc := &corev1.Service{} - err := convert.JsonConvert(obj, svc) - if err != nil { - return ctrl.Result{Requeue: true}, err - } - ep := &corev1.Endpoints{} - ep.SetName(svc.Name) - ep.SetNamespace(svc.Namespace) - annotationServers := svc.Annotations[annotationServer] - ports := svc.Spec.Ports - _, err = controllerutil.CreateOrUpdate(ctx, c.Client, ep, func() error { - ep.Labels = map[string]string{} - - ep.Subsets = make([]corev1.EndpointSubset, 0) - es := corev1.EndpointSubset{ - Addresses: convertAddress(annotationServers), - Ports: convertPorts(ports), - } - ep.Subsets = append(ep.Subsets, es) - return nil - }) - if err != nil { - c.Logger.Error(err, "endpoint create or update failed") - return ctrl.Result{Requeue: true}, err - } - c.Logger.V(4).Info("update finished reconcile controller service", "request", req) - return ctrl.Result{}, nil -} - -func convertAddress(address string) []corev1.EndpointAddress { - servers := strings.Split(address, ",") - eas := make([]corev1.EndpointAddress, 0) - for _, s := range servers { - eas = append(eas, corev1.EndpointAddress{ - IP: s, - }) - } - return eas -} - -func convertPorts(sps []corev1.ServicePort) []corev1.EndpointPort { - s := make([]corev1.EndpointPort, 0) - for _, sp := range sps { - endPoint := corev1.EndpointPort{ - Name: sp.Name, - Port: sp.TargetPort.IntVal, - Protocol: sp.Protocol, - } - s = append(s, endPoint) - } - return s -} - -func (c *ServiceReconciler) Delete(ctx context.Context, req ctrl.Request, gvk schema.GroupVersionKind, obj runtime.Object) error { - return nil -} diff --git a/library/controller/controller.go b/library/controller/controller.go index 197ef19..1b4f399 100644 --- a/library/controller/controller.go +++ b/library/controller/controller.go @@ -29,6 +29,7 @@ import ( "k8s.io/client-go/util/retry" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/reconcile" "strings" ) @@ -73,16 +74,11 @@ func (r *Controller) Run(ctx context.Context, req ctrl.Request, obj runtime.Obje // The object is not being deleted, so if it does not have our finalizer, // then lets add the finalizer and update the object. This is equivalent // registering our finalizer. - if !tools.ContainsString(ustructObj.GetFinalizers(), r.FinalizerName) { - finalizers := ustructObj.GetFinalizers() - finalizers = append(finalizers, r.FinalizerName) - ustructObj.SetFinalizers(finalizers) - if err = r.setFinalizers(ctx, req, obj, finalizers); err != nil { - r.Eventer.Eventf(obj, corev1.EventTypeWarning, "FailedUpdate", "Update %s: %v", lowerKind, err) - //如果修改失败重新放入队列 - r.Logger.Error(err, "unable to set finalizer", "finalizer", r.FinalizerName) - return ctrl.Result{Requeue: true}, err - } + if err = controllerutil.AddFinalizerWithError(ustructObj, r.FinalizerName); err != nil { + r.Eventer.Eventf(obj, corev1.EventTypeWarning, "FailedUpdate", "Update %s: %v", lowerKind, err) + //如果修改失败重新放入队列 + r.Logger.Error(err, "unable to set finalizer", "finalizer", r.FinalizerName) + return ctrl.Result{Requeue: true}, err } return r.Operator.Update(ctx, req, r.Gvk, ustructObj) } else { @@ -102,10 +98,7 @@ func (r *Controller) Run(ctx context.Context, req ctrl.Request, obj runtime.Obje return ctrl.Result{Requeue: true}, err } // remove our finalizer from the list and update it. - finalizers := ustructObj.GetFinalizers() - finalizers = tools.RemoveString(finalizers, r.FinalizerName) - ustructObj.SetFinalizers(finalizers) - if err = r.setFinalizers(ctx, req, obj, finalizers); err != nil { + if err = controllerutil.RemoveFinalizerWithError(ustructObj, r.FinalizerName); err != nil { r.Eventer.Eventf(obj, corev1.EventTypeWarning, "FailedDelete", "Deleted %s: %v", lowerKind, err) r.Logger.Error(err, "failed set finalizer the resource", "err", err.Error()) return ctrl.Result{Requeue: true}, err