Skip to content

Commit

Permalink
feat(platform): anywhere component upgrade (#2227)
Browse files Browse the repository at this point in the history
  • Loading branch information
wl-chen committed Feb 8, 2023
1 parent facc146 commit ec34187
Show file tree
Hide file tree
Showing 14 changed files with 658 additions and 350 deletions.
7 changes: 7 additions & 0 deletions api/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions api/platform/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ const (
AnywhereLocalizationsAnno = "tkestack.io/anywhere-localizations"
// AnywhereMachinesAnno contains base64 machines json data
AnywhereMachinesAnno = "tkestack.io/anywhere-machines"
// AnywhereUpgradeRetryComponentAnno describe curent retry component when upgrade failed
AnywhereUpgradeRetryComponentAnno = "tkestack.io/anywhere-upgrade-retry-component"
// AnywhereUpgradeRetryComponentAnno describe anywhere upgrade stats
AnywhereUpgradeStatsAnno = "tkestack.io/anywhere-upgrade-stats"
// ClusterNameLable contains related cluster's name for no-cluster resources
ClusterNameLable = "tkestack.io/cluster-name"
// HubAPIServerAnno describe hub cluster api server url
Expand Down Expand Up @@ -270,6 +274,9 @@ type ClusterStatus struct {
// AppVersion is the overall version of system components
// +optional
AppVersion string
// ComponentPhase is the status of components, contains "deployed", "pending-upgrade", "failed" status
// +optional
ComponentPhase ComponentPhase
}

// FinalizerName is the name identifying a finalizer during cluster lifecycle.
Expand Down Expand Up @@ -325,6 +332,18 @@ const (
ClusterDownscaling ClusterPhase = "Downscaling"
)

// ComponentPhase defines the phase of anywhere cluster component
type ComponentPhase string

const (
// ComponentDeployed is the normal phase of anywhere cluster component
ComponentDeployed ComponentPhase = "deployed"
// ComponentPendingUpgrade means the anywhere cluster component is upgrading
ComponentPendingUpgrade ComponentPhase = "pending-upgrade"
// ComponentFailed means the anywhere cluster component upgrade failed
ComponentFailed ComponentPhase = "failed"
)

// ClusterCondition contains details for the current condition of this cluster.
type ClusterCondition struct {
// Type is the type of the condition.
Expand Down
709 changes: 376 additions & 333 deletions api/platform/v1/generated.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions api/platform/v1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions api/platform/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ const (
AnywhereLocalizationsAnno = "tkestack.io/anywhere-localizations"
// AnywhereMachinesAnno contains base64 machines json data
AnywhereMachinesAnno = "tkestack.io/anywhere-machines"
// AnywhereUpgradeRetryComponentAnno describe curent retry component when upgrade failed
AnywhereUpgradeRetryComponentAnno = "tkestack.io/anywhere-upgrade-retry-component"
// AnywhereUpgradeRetryComponentAnno describe anywhere upgrade stats
AnywhereUpgradeStatsAnno = "tkestack.io/anywhere-upgrade-stats"
// ClusterNameLable contains related cluster's name for no-cluster resources
ClusterNameLable = "tkestack.io/cluster-name"
// HubAPIServerAnno describe hub cluster api server url
Expand Down Expand Up @@ -281,6 +285,9 @@ type ClusterStatus struct {
// AppVersion is the overall version of system components
// +optional
AppVersion string `json:"appVersion,omitempty" protobuf:"bytes,21,opt,name=appVersion"`
// ComponentPhase is the status of components, contains "deployed", "pending-upgrade", "failed" status
// +optional
ComponentPhase ComponentPhase `json:"componentPhase,omitempty" protobuf:"bytes,22,opt,name=componentPhase"`
}

// FinalizerName is the name identifying a finalizer during cluster lifecycle.
Expand Down Expand Up @@ -340,6 +347,18 @@ const (
ClusterDownscaling ClusterPhase = "Downscaling"
)

// ComponentPhase defines the phase of anywhere cluster component
type ComponentPhase string

const (
// ComponentDeployed is the normal phase of anywhere cluster component
ComponentDeployed ComponentPhase = "deployed"
// ComponentPendingUpgrade means the anywhere cluster component is upgrading
ComponentPendingUpgrade ComponentPhase = "pending-upgrade"
// ComponentFailed means the anywhere cluster component upgrade failed
ComponentFailed ComponentPhase = "failed"
)

// ClusterCondition contains details for the current condition of this cluster.
type ClusterCondition struct {
// Type is the type of the condition.
Expand Down
11 changes: 6 additions & 5 deletions api/platform/v1/types_swagger_doc_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions api/platform/v1/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/tke-platform-controller/app/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func Run(cfg *config.Config, stopCh <-chan struct{}) error {
if cfg.ClusterController.IsCRDMode {
log.Info("tke platform controller start validate listening...")
serverMux.HandleFunc("/validate", webhook.Validate)
serverMux.HandleFunc("/mutate", webhook.Mutate)
}
handler := controller.BuildHandlerChain(serverMux, &cfg.Authorization, &cfg.Authentication, platform.Codecs)
if _, err := cfg.SecureServing.Serve(handler, 0, stopCh); err != nil {
Expand Down
136 changes: 136 additions & 0 deletions cmd/tke-platform-controller/app/webhook/mutate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package webhook

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

v1 "k8s.io/api/admission/v1"
"k8s.io/apimachinery/pkg/util/validation/field"
platform "tkestack.io/tke/api/platform"
platformv1 "tkestack.io/tke/api/platform/v1"
clusterprovider "tkestack.io/tke/pkg/platform/provider/cluster"
"tkestack.io/tke/pkg/platform/types"
"tkestack.io/tke/pkg/util/log"
)

func Mutate(reponseWriter http.ResponseWriter, request *http.Request) {
var body []byte
var err error
if request.Body != nil {
if body, err = ioutil.ReadAll(request.Body); err != nil {
log.Errorf("request body read failed, err: %v", err)
http.Error(reponseWriter, fmt.Sprintf("request body read failed, err: %v", err), http.StatusBadRequest)
return
}
if len(body) == 0 {
log.Errorf("request body length 0")
http.Error(reponseWriter, "request body length 0", http.StatusBadRequest)
return
}
} else {
log.Errorf("request body nil")
http.Error(reponseWriter, "request body nil", http.StatusBadRequest)
return
}

contentType := request.Header.Get("Content-Type")
if contentType != "application/json" {
log.Errorf("Content-Type=%s, expect `application/json`", contentType)
http.Error(reponseWriter, fmt.Sprintf("Content-Type=%s, expect `application/json`", contentType), http.StatusUnsupportedMediaType)
return
}

admissionReview := v1.AdmissionReview{}
if _, _, err := deserializer.Decode(body, nil, &admissionReview); err != nil {
log.Errorf("decode request body to admission review failed, err: %v", err)
http.Error(reponseWriter, fmt.Sprintf("decode request body to admission review failed, err: %v", err), http.StatusBadRequest)
return
}

var admissionResponse *v1.AdmissionResponse
switch admissionReview.Request.Kind.Kind {
case "Cluster":
v1Cluster := platformv1.Cluster{}
if err := json.Unmarshal(admissionReview.Request.Object.Raw, &v1Cluster); err != nil {
log.Errorf("Can't unmarshal cluster, err: %v", err)
http.Error(reponseWriter, fmt.Sprintf("Can't unmarshal cluster, err: %v", err), http.StatusInternalServerError)
return
}

cluster := platform.Cluster{}
if err = platformv1.Convert_v1_Cluster_To_platform_Cluster(&v1Cluster, &cluster, nil); err != nil {
log.Errorf("Can't convert v1cluster to cluster, err: %v", err)
http.Error(reponseWriter, fmt.Sprintf("Can't convert v1cluster to cluster, err: %v", err), http.StatusInternalServerError)
return
}

if admissionReview.Request.Operation == v1.Update {
v1OldCluster := platformv1.Cluster{}
if err := json.Unmarshal(admissionReview.Request.OldObject.Raw, &v1OldCluster); err != nil {
log.Errorf("Can't unmarshal old cluster, err: %v", err)
http.Error(reponseWriter, fmt.Sprintf("Can't unmarshal old cluster, err: %v", err), http.StatusInternalServerError)
return
}
oldCluster := platform.Cluster{}
if err = platformv1.Convert_v1_Cluster_To_platform_Cluster(&v1OldCluster, &oldCluster, nil); err != nil {
log.Errorf("Can't convert v1oldcluster to oldcluster, err: %v", err)
http.Error(reponseWriter, fmt.Sprintf("Can't convert v1oldcluster to oldcluster, err: %v", err), http.StatusInternalServerError)
return
}
admissionResponse = MutateClusterUpdate(&cluster, &oldCluster)
}
default:
log.Errorf("Can't recognized request kind %v", admissionReview.Request.Kind)
http.Error(reponseWriter, fmt.Sprintf("Can't recognized request kind %v", admissionReview.Request.Kind), http.StatusBadRequest)
return
}

admissionReview.Response = admissionResponse
admissionReview.Response.UID = admissionReview.Request.UID

admissionReviewBytes, err := json.Marshal(admissionReview)
if err != nil {
log.Errorf("Can't encode response: %v", err)
http.Error(reponseWriter, fmt.Sprintf("Can't encode response: %v", err), http.StatusInternalServerError)
return
}
if _, err := reponseWriter.Write(admissionReviewBytes); err != nil {
log.Errorf("Can't write response: %v", err)
http.Error(reponseWriter, fmt.Sprintf("Can't write response: %v", err), http.StatusInternalServerError)
return
}
}

func MutateClusterUpdate(cluster *platform.Cluster, oldCluster *platform.Cluster) *v1.AdmissionResponse {
typeCluster := types.Cluster{
Cluster: cluster,
}
oldTypeCluster := types.Cluster{
Cluster: oldCluster,
}

errorList := field.ErrorList{}
p, err := clusterprovider.GetProvider(cluster.Spec.Type)
if err != nil {
errorList = append(errorList, field.NotFound(field.NewPath("spec").Child("type"), cluster.Spec.Type))
return transferErrorList(&errorList, fmt.Sprintf("cluster %s update mutate failed: %v", oldCluster.Name, errorList.ToAggregate().Errors()))
}
jsonPatchByte, errorList := p.MutateUpdate(&typeCluster, &oldTypeCluster)
if len(errorList) != 0 {
return transferErrorList(&errorList, fmt.Sprintf("cluster %s update mutate failed: %v", oldCluster.Name, errorList.ToAggregate().Errors()))
}
if jsonPatchByte == nil {
return &v1.AdmissionResponse{
Allowed: true,
}
}

patchType := v1.PatchTypeJSONPatch
return &v1.AdmissionResponse{
Allowed: true,
Patch: jsonPatchByte,
PatchType: &patchType,
}
}
23 changes: 18 additions & 5 deletions cmd/tke-platform-controller/app/webhook/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
platform "tkestack.io/tke/api/platform"
platformv1 "tkestack.io/tke/api/platform/v1"
"tkestack.io/tke/api/platform/validation"
clusterprovider "tkestack.io/tke/pkg/platform/provider/cluster"
"tkestack.io/tke/pkg/platform/types"
"tkestack.io/tke/pkg/util/log"
)
Expand All @@ -31,7 +32,6 @@ func init() {
func Validate(reponseWriter http.ResponseWriter, request *http.Request) {
var body []byte
var err error
log.Infof("receive validate request, request: %v", request)
if request.Body != nil {
if body, err = ioutil.ReadAll(request.Body); err != nil {
log.Errorf("request body read failed, err: %v", err)
Expand Down Expand Up @@ -84,10 +84,16 @@ func Validate(reponseWriter http.ResponseWriter, request *http.Request) {
admissionResponse = ValidateCluster(&cluster)
}
if admissionReview.Request.Operation == v1.Update {
v1OldCluster := platformv1.Cluster{}
if err := json.Unmarshal(admissionReview.Request.OldObject.Raw, &v1OldCluster); err != nil {
log.Errorf("Can't unmarshal old cluster, err: %v", err)
http.Error(reponseWriter, fmt.Sprintf("Can't unmarshal old cluster, err: %v", err), http.StatusInternalServerError)
return
}
oldCluster := platform.Cluster{}
if err := json.Unmarshal(admissionReview.Request.Object.Raw, &oldCluster); err != nil {
log.Errorf("Can't unmarshal cluster, err: %v", err)
http.Error(reponseWriter, fmt.Sprintf("Can't unmarshal cluster, err: %v", err), http.StatusInternalServerError)
if err = platformv1.Convert_v1_Cluster_To_platform_Cluster(&v1OldCluster, &oldCluster, nil); err != nil {
log.Errorf("Can't convert v1oldcluster to oldcluster, err: %v", err)
http.Error(reponseWriter, fmt.Sprintf("Can't convert v1oldcluster to oldcluster, err: %v", err), http.StatusInternalServerError)
return
}
admissionResponse = ValidateClusterUpdate(&cluster, &oldCluster)
Expand Down Expand Up @@ -134,7 +140,14 @@ func ValidateClusterUpdate(cluster *platform.Cluster, oldCluster *platform.Clust
oldTypeCluster := types.Cluster{
Cluster: oldCluster,
}
errorList := validation.ValidateClusterUpdate(&typeCluster, &oldTypeCluster)

errorList := field.ErrorList{}
p, err := clusterprovider.GetProvider(cluster.Spec.Type)
if err != nil {
errorList = append(errorList, field.NotFound(field.NewPath("spec").Child("type"), cluster.Spec.Type))
return transferErrorList(&errorList, fmt.Sprintf("cluster %s update validate failed: %v", oldCluster.Name, errorList.ToAggregate().Errors()))
}
errorList = append(errorList, p.ValidateUpdate(&typeCluster, &oldTypeCluster)...)
if len(errorList) == 0 {
return &v1.AdmissionResponse{
Allowed: true,
Expand Down
Loading

0 comments on commit ec34187

Please sign in to comment.