From bc1252d8c259952a190199888f14a1232f4f9c31 Mon Sep 17 00:00:00 2001 From: "kevin.qiao" Date: Sun, 26 Feb 2023 16:31:31 +0800 Subject: [PATCH 1/2] support dynamic flags --- apis/apps/v1alpha1/nebulacluster.go | 12 ++-- apis/apps/v1alpha1/nebulacluster_common.go | 35 ++++++----- .../v1alpha1/nebulacluster_componentter.go | 2 +- apis/apps/v1alpha1/nebulacluster_graphd.go | 13 ++-- apis/apps/v1alpha1/nebulacluster_metad.go | 13 ++-- apis/apps/v1alpha1/nebulacluster_storaged.go | 13 ++-- apis/apps/v1alpha1/template.go | 33 ++++++++++ charts/nebula-cluster/values.yaml | 8 +-- pkg/annotation/annotation.go | 2 + pkg/controller/component/graphd_cluster.go | 14 +++-- pkg/controller/component/helper.go | 62 ++++++++++++++++--- pkg/controller/component/metad_cluster.go | 21 +++++-- pkg/controller/component/storaged_cluster.go | 16 ++++- .../nebularestore/nebula_restore_manager.go | 14 ++--- pkg/util/http/http.go | 56 +++++++++++++++++ pkg/util/maputil/map.go | 24 +++++++ 16 files changed, 265 insertions(+), 73 deletions(-) create mode 100644 pkg/util/http/http.go diff --git a/apis/apps/v1alpha1/nebulacluster.go b/apis/apps/v1alpha1/nebulacluster.go index b6e8d5ff..190af4e1 100644 --- a/apis/apps/v1alpha1/nebulacluster.go +++ b/apis/apps/v1alpha1/nebulacluster.go @@ -52,12 +52,16 @@ func (nc *NebulaCluster) GetMetadThriftConnAddress() string { return nc.MetadComponent().GetConnAddress(MetadPortNameThrift) } -func (nc *NebulaCluster) GetMetadEndpoints() []string { - return nc.MetadComponent().GetHeadlessConnAddresses(MetadPortNameThrift) +func (nc *NebulaCluster) GetMetadEndpoints(portName string) []string { + return nc.MetadComponent().GetEndpoints(portName) } -func (nc *NebulaCluster) GetStoragedEndpoints() []string { - return nc.StoragedComponent().GetHeadlessConnAddresses(StoragedPortNameThrift) +func (nc *NebulaCluster) GetStoragedEndpoints(portName string) []string { + return nc.StoragedComponent().GetEndpoints(portName) +} + +func (nc *NebulaCluster) GetGraphdEndpoints(portName string) []string { + return nc.GraphdComponent().GetEndpoints(portName) } func (nc *NebulaCluster) GetClusterName() string { diff --git a/apis/apps/v1alpha1/nebulacluster_common.go b/apis/apps/v1alpha1/nebulacluster_common.go index 29e0b904..012a48f7 100644 --- a/apis/apps/v1alpha1/nebulacluster_common.go +++ b/apis/apps/v1alpha1/nebulacluster_common.go @@ -34,7 +34,9 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" + "github.com/vesoft-inc/nebula-operator/pkg/annotation" "github.com/vesoft-inc/nebula-operator/pkg/label" + "github.com/vesoft-inc/nebula-operator/pkg/util/codec" ) const ( @@ -91,19 +93,12 @@ func getPort(ports []corev1.ContainerPort, portName string) int32 { return 0 } -func getConnAddress(serviceFQDN string, port int32) string { - return joinHostPort(serviceFQDN, port) -} - -func getHeadlessConnAddresses(connAddress, componentName string, replicas int32, isHeadless bool) []string { - if isHeadless { - addresses := make([]string, 0, replicas) - for i := int32(0); i < replicas; i++ { - addresses = append(addresses, fmt.Sprintf("%s.%s", getPodName(componentName, i), connAddress)) - } - return addresses +func getConnAddresses(connAddress, componentName string, replicas int32) []string { + addresses := make([]string, 0, replicas) + for i := int32(0); i < replicas; i++ { + addresses = append(addresses, fmt.Sprintf("%s.%s", getPodName(componentName, i), connAddress)) } - return []string{connAddress} + return addresses } func getKubernetesClusterDomain() string { @@ -157,7 +152,7 @@ func getResources(res *corev1.ResourceRequirements) *corev1.ResourceRequirements return res } -func getConfigKey(componentType string) string { +func getCmKey(componentType string) string { return fmt.Sprintf("nebula-%s.conf", componentType) } @@ -288,7 +283,7 @@ func generateContainers(c NebulaClusterComponentter, cm *corev1.ConfigMap) []cor } } - metadAddress := strings.Join(nc.GetMetadEndpoints(), ",") + metadAddress := strings.Join(nc.GetMetadEndpoints(MetadPortNameThrift), ",") cmd = append(cmd, fmt.Sprintf("exec /usr/local/nebula/bin/nebula-%s", componentType)+ fmt.Sprintf(" --flagfile=/usr/local/nebula/etc/nebula-%s.conf", componentType)+ " --meta_server_addrs="+metadAddress+ @@ -362,7 +357,7 @@ func generateStatefulSet(c NebulaClusterComponentter, cm *corev1.ConfigMap, enab nc := c.GetNebulaCluster() - configKey := getConfigKey(componentType) + cmKey := getCmKey(componentType) containers := generateContainers(c, cm) volumes := c.GenerateVolumes() if cm != nil { @@ -374,8 +369,8 @@ func generateStatefulSet(c NebulaClusterComponentter, cm *corev1.ConfigMap, enab Name: c.GetName(), }, Items: []corev1.KeyToPath{{ - Key: configKey, - Path: configKey, + Key: cmKey, + Path: cmKey, }}, }, }, @@ -410,12 +405,18 @@ func generateStatefulSet(c NebulaClusterComponentter, cm *corev1.ConfigMap, enab return nil, err } + apply, err := codec.Encode(c.GetConfig()) + if err != nil { + return nil, err + } + mergeLabels := mergeStringMaps(true, componentLabel, c.GetPodLabels()) replicas := c.GetReplicas() sts := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: c.GetName(), Namespace: namespace, + Annotations: map[string]string{annotation.AnnLastAppliedFlagsKey: apply}, Labels: componentLabel, OwnerReferences: c.GenerateOwnerReferences(), }, diff --git a/apis/apps/v1alpha1/nebulacluster_componentter.go b/apis/apps/v1alpha1/nebulacluster_componentter.go index d1bb344c..1605a207 100644 --- a/apis/apps/v1alpha1/nebulacluster_componentter.go +++ b/apis/apps/v1alpha1/nebulacluster_componentter.go @@ -53,7 +53,7 @@ type NebulaClusterComponentter interface { GetPodFQDN(int32) string GetPort(string) int32 GetConnAddress(string) string - GetHeadlessConnAddresses(string) []string + GetEndpoints(string) []string IsReady() bool diff --git a/apis/apps/v1alpha1/nebulacluster_graphd.go b/apis/apps/v1alpha1/nebulacluster_graphd.go index 245e4368..fdf4e71d 100644 --- a/apis/apps/v1alpha1/nebulacluster_graphd.go +++ b/apis/apps/v1alpha1/nebulacluster_graphd.go @@ -71,7 +71,7 @@ func (c *graphdComponent) GetConfig() map[string]string { } func (c *graphdComponent) GetConfigMapKey() string { - return getConfigKey(c.Type().String()) + return getCmKey(c.Type().String()) } func (c *graphdComponent) GetResources() *corev1.ResourceRequirements { @@ -183,15 +183,14 @@ func (c *graphdComponent) GetPort(portName string) int32 { } func (c *graphdComponent) GetConnAddress(portName string) string { - return getConnAddress(c.GetServiceFQDN(), c.GetPort(portName)) + return joinHostPort(c.GetServiceFQDN(), c.GetPort(portName)) } -func (c *graphdComponent) GetHeadlessConnAddresses(portName string) []string { - return getHeadlessConnAddresses( +func (c *graphdComponent) GetEndpoints(portName string) []string { + return getConnAddresses( c.GetConnAddress(portName), c.GetName(), - c.GetReplicas(), - c.IsHeadlessService()) + c.GetReplicas()) } func (c *graphdComponent) IsReady() bool { @@ -292,7 +291,7 @@ func (c *graphdComponent) GenerateService() *corev1.Service { func (c *graphdComponent) GenerateConfigMap() *corev1.ConfigMap { cm := generateConfigMap(c) - configKey := getConfigKey(c.Type().String()) + configKey := getCmKey(c.Type().String()) cm.Data[configKey] = GraphdConfigTemplate return cm } diff --git a/apis/apps/v1alpha1/nebulacluster_metad.go b/apis/apps/v1alpha1/nebulacluster_metad.go index 2d1cadf6..a811feeb 100644 --- a/apis/apps/v1alpha1/nebulacluster_metad.go +++ b/apis/apps/v1alpha1/nebulacluster_metad.go @@ -71,7 +71,7 @@ func (c *metadComponent) GetConfig() map[string]string { } func (c *metadComponent) GetConfigMapKey() string { - return getConfigKey(c.Type().String()) + return getCmKey(c.Type().String()) } func (c *metadComponent) GetResources() *corev1.ResourceRequirements { @@ -197,15 +197,14 @@ func (c *metadComponent) GetPort(portName string) int32 { } func (c *metadComponent) GetConnAddress(portName string) string { - return getConnAddress(c.GetServiceFQDN(), c.GetPort(portName)) + return joinHostPort(c.GetServiceFQDN(), c.GetPort(portName)) } -func (c *metadComponent) GetHeadlessConnAddresses(portName string) []string { - return getHeadlessConnAddresses( +func (c *metadComponent) GetEndpoints(portName string) []string { + return getConnAddresses( c.GetConnAddress(portName), c.GetName(), - c.GetReplicas(), - c.IsHeadlessService()) + c.GetReplicas()) } func (c *metadComponent) IsReady() bool { @@ -367,7 +366,7 @@ func (c *metadComponent) GenerateService() *corev1.Service { func (c *metadComponent) GenerateConfigMap() *corev1.ConfigMap { cm := generateConfigMap(c) - configKey := getConfigKey(c.Type().String()) + configKey := getCmKey(c.Type().String()) cm.Data[configKey] = MetadhConfigTemplate return cm } diff --git a/apis/apps/v1alpha1/nebulacluster_storaged.go b/apis/apps/v1alpha1/nebulacluster_storaged.go index 9b9dfb34..b1410478 100644 --- a/apis/apps/v1alpha1/nebulacluster_storaged.go +++ b/apis/apps/v1alpha1/nebulacluster_storaged.go @@ -73,7 +73,7 @@ func (c *storagedComponent) GetConfig() map[string]string { } func (c *storagedComponent) GetConfigMapKey() string { - return getConfigKey(c.Type().String()) + return getCmKey(c.Type().String()) } func (c *storagedComponent) GetResources() *corev1.ResourceRequirements { @@ -202,15 +202,14 @@ func (c *storagedComponent) GetPort(portName string) int32 { } func (c *storagedComponent) GetConnAddress(portName string) string { - return getConnAddress(c.GetServiceFQDN(), c.GetPort(portName)) + return joinHostPort(c.GetServiceFQDN(), c.GetPort(portName)) } -func (c *storagedComponent) GetHeadlessConnAddresses(portName string) []string { - return getHeadlessConnAddresses( +func (c *storagedComponent) GetEndpoints(portName string) []string { + return getConnAddresses( c.GetConnAddress(portName), c.GetName(), - c.GetReplicas(), - c.IsHeadlessService()) + c.GetReplicas()) } func (c *storagedComponent) IsReady() bool { @@ -351,7 +350,7 @@ func (c *storagedComponent) GenerateService() *corev1.Service { func (c *storagedComponent) GenerateConfigMap() *corev1.ConfigMap { cm := generateConfigMap(c) - configKey := getConfigKey(c.Type().String()) + configKey := getCmKey(c.Type().String()) cm.Data[configKey] = StoragedConfigTemplate return cm } diff --git a/apis/apps/v1alpha1/template.go b/apis/apps/v1alpha1/template.go index 8cd1c0a3..83f329d2 100644 --- a/apis/apps/v1alpha1/template.go +++ b/apis/apps/v1alpha1/template.go @@ -16,6 +16,37 @@ limitations under the License. package v1alpha1 +var DynamicFlags = map[string]string{ + "minloglevel": "0", + "v": "0", + "accept_partial_success": "false", + "session_reclaim_interval_secs": "60", + "max_allowed_query_size": "4194304", + "system_memory_high_watermark_ratio": "0.8", + "ng_black_box_file_lifetime_seconds": "1800", + "memory_tracker_limit_ratio": "0.8", + "memory_tracker_untracked_reserved_memory_mb": "50", + "memory_tracker_detail_log": "false", + "memory_tracker_detail_log_interval_ms": "60000", + "memory_purge_enabled": "true", + "memory_purge_interval_seconds": "10", + "heartbeat_interval_secs": "10", + "raft_heartbeat_interval_secs": "30", + "raft_rpc_timeout_ms": "500", + "wal_ttl": "14400", + "query_concurrently": "true", + "auto_remove_invalid_space": "true", + "num_io_threads": "16", + "num_worker_threads": "0", + "max_concurrent_subtasks": "10", + "snapshot_part_rate_limit": "10485760", + "snapshot_batch_size": "1048576", + "rebuild_index_part_rate_limit": "4194304", + "rocksdb_db_options": "{}", + "rocksdb_column_family_options": `{"write_buffer_size":"67108864","max_write_buffer_number":"4","max_bytes_for_level_base":"268435456"}`, + "rocksdb_block_based_table_options": `{"block_size":"8192"}`, +} + const ( // nolint: revive GraphdConfigTemplate = ` @@ -240,6 +271,8 @@ const ( # Root data path, here should be only single path for metad --data_path=data/meta +# !!! Minimum reserved bytes of data path +--minimum_reserved_bytes=268435456 ########## Misc ######### # The default number of parts when a space is created --default_parts_num=10 diff --git a/charts/nebula-cluster/values.yaml b/charts/nebula-cluster/values.yaml index 23b0e690..459f2f71 100644 --- a/charts/nebula-cluster/values.yaml +++ b/charts/nebula-cluster/values.yaml @@ -24,7 +24,7 @@ nebula: limits: cpu: "1" memory: "500Mi" - logStorage: "1Gi" + logStorage: "500Mi" podLabels: {} podAnnotations: {} nodeSelector: {} @@ -47,8 +47,8 @@ nebula: limits: cpu: "1" memory: "1Gi" - logStorage: "1Gi" - dataStorage: "10Gi" + logStorage: "500Mi" + dataStorage: "2Gi" license: {} podLabels: {} podAnnotations: {} @@ -72,7 +72,7 @@ nebula: limits: cpu: "1" memory: "1Gi" - logStorage: "1Gi" + logStorage: "500Mi" dataStorage: "10Gi" enableAutoBalance: false podLabels: {} diff --git a/pkg/annotation/annotation.go b/pkg/annotation/annotation.go index c8991961..069ef42b 100644 --- a/pkg/annotation/annotation.go +++ b/pkg/annotation/annotation.go @@ -26,6 +26,8 @@ const ( AnnLastSyncTimestampKey = "nebula-graph.io/sync-timestamp" // AnnHaModeKey is annotation key to indicate whether in ha mode AnnHaModeKey = "nebula-graph.io/ha-mode" + // AnnLastAppliedFlagsKey is annotation key to indicate the last applied custom flags + AnnLastAppliedFlagsKey = "nebula-graph.io/last-applied-flags" // AnnLastAppliedConfigKey is annotation key to indicate the last applied configuration AnnLastAppliedConfigKey = "nebula-graph.io/last-applied-configuration" // AnnPodSchedulingKey is pod scheduling annotation key, it represents whether the pod is scheduling diff --git a/pkg/controller/component/graphd_cluster.go b/pkg/controller/component/graphd_cluster.go index fe96f96e..12dbe86d 100644 --- a/pkg/controller/component/graphd_cluster.go +++ b/pkg/controller/component/graphd_cluster.go @@ -82,7 +82,7 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error { notExist := apierrors.IsNotFound(err) oldWorkload := oldWorkloadTemp.DeepCopy() - cm, cmHash, err := c.syncGraphdConfigMap(nc) + cm, cmHash, e, err := c.syncGraphdConfigMap(nc) if err != nil { return err } @@ -121,6 +121,13 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error { } } + if nc.GraphdComponent().IsReady() { + endpoints := nc.GetGraphdEndpoints(v1alpha1.GraphdPortNameHTTP) + if err := updateDynamicFlags(endpoints, newWorkload.GetAnnotations(), oldWorkload.GetAnnotations(), e); err != nil { + return fmt.Errorf("update graphd cluster %s dynamic flags failed: %v", newWorkload.GetName(), err) + } + } + return extender.UpdateWorkload(c.clientSet.Workload(), newWorkload, oldWorkload) } @@ -165,13 +172,12 @@ func (c *graphdCluster) syncGraphdService(nc *v1alpha1.NebulaCluster) error { return syncService(newSvc, c.clientSet.Service()) } -func (c *graphdCluster) syncGraphdConfigMap(nc *v1alpha1.NebulaCluster) (*corev1.ConfigMap, string, error) { +func (c *graphdCluster) syncGraphdConfigMap(nc *v1alpha1.NebulaCluster) (*corev1.ConfigMap, string, bool, error) { return syncConfigMap( nc.GraphdComponent(), c.clientSet.ConfigMap(), v1alpha1.GraphdConfigTemplate, - nc.GraphdComponent().GetConfigMapKey(), - ) + nc.GraphdComponent().GetConfigMapKey()) } type FakeGraphdCluster struct { diff --git a/pkg/controller/component/helper.go b/pkg/controller/component/helper.go index b81eabbe..71d8d1ae 100644 --- a/pkg/controller/component/helper.go +++ b/pkg/controller/component/helper.go @@ -19,6 +19,7 @@ package component import ( "encoding/json" "fmt" + "k8s.io/klog/v2" "strings" appsv1 "k8s.io/api/apps/v1" @@ -32,10 +33,12 @@ import ( "github.com/vesoft-inc/nebula-operator/pkg/annotation" "github.com/vesoft-inc/nebula-operator/pkg/kube" "github.com/vesoft-inc/nebula-operator/pkg/label" + "github.com/vesoft-inc/nebula-operator/pkg/util/codec" "github.com/vesoft-inc/nebula-operator/pkg/util/config" "github.com/vesoft-inc/nebula-operator/pkg/util/errors" "github.com/vesoft-inc/nebula-operator/pkg/util/extender" "github.com/vesoft-inc/nebula-operator/pkg/util/hash" + httputil "github.com/vesoft-inc/nebula-operator/pkg/util/http" "github.com/vesoft-inc/nebula-operator/pkg/util/maputil" ) @@ -152,19 +155,64 @@ func syncConfigMap( component v1alpha1.NebulaClusterComponentter, cmClient kube.ConfigMap, template, - fileName string) (*corev1.ConfigMap, string, error) { + cmKey string) (*corev1.ConfigMap, string, bool, error) { cmHash := hash.Hash(template) cm := component.GenerateConfigMap() - if component.GetConfig() != nil { - customConf := config.AppendCustomConfig(template, component.GetConfig()) - cm.Data[fileName] = customConf - cmHash = hash.Hash(customConf) + cfg := component.GetConfig() + // If not all the custom flags in the dynamic flags map, + // then persist these flags to configmap and trigger rolling update + var e bool + if cfg != nil { + e = maputil.AllKeysExist(cfg, v1alpha1.DynamicFlags) + if !e { + customConf := config.AppendCustomConfig(template, component.GetConfig()) + cm.Data[cmKey] = customConf + cmHash = hash.Hash(customConf) + } } if err := cmClient.CreateOrUpdateConfigMap(cm); err != nil { - return nil, "", err + return nil, "", e, err } - return cm, cmHash, nil + return cm, cmHash, e, nil +} + +func updateDynamicFlags(endpoints []string, newAnnotations, oldAnnotations map[string]string, exists bool) error { + newFlags := newAnnotations[annotation.AnnLastAppliedFlagsKey] + if exists && newFlags != "{}" { + for _, endpoint := range endpoints { + url := fmt.Sprintf("http://%s/flags", endpoint) + if _, err := httputil.PutRequest(url, []byte(newFlags)); err != nil { + return err + } + } + klog.Info("update dynamic flags successfully") + } + + apply, ok := oldAnnotations[annotation.AnnLastAppliedFlagsKey] + if ok { + if apply != "{}" && newFlags == "{}" { + oldFlags := map[string]string{} + if err := json.Unmarshal([]byte(apply), &oldFlags); err != nil { + return err + } + if maputil.AllKeysExist(oldFlags, v1alpha1.DynamicFlags) { + for _, endpoint := range endpoints { + url := fmt.Sprintf("http://%s/flags", endpoint) + maputil.ResetMap(oldFlags, v1alpha1.DynamicFlags) + b, err := codec.Encode(oldFlags) + if err != nil { + return err + } + if _, err := httputil.PutRequest(url, []byte(b)); err != nil { + return err + } + } + klog.Info("reset dynamic flags successfully") + } + } + } + return nil } func getContainerImage( diff --git a/pkg/controller/component/metad_cluster.go b/pkg/controller/component/metad_cluster.go index 6ffd79a3..f0b166c6 100644 --- a/pkg/controller/component/metad_cluster.go +++ b/pkg/controller/component/metad_cluster.go @@ -94,7 +94,7 @@ func (c *metadCluster) syncMetadWorkload(nc *v1alpha1.NebulaCluster) error { notExist := apierrors.IsNotFound(err) oldWorkload := oldWorkloadTemp.DeepCopy() - cm, cmHash, err := c.syncMetadConfigMap(nc) + cm, cmHash, e, err := c.syncMetadConfigMap(nc) if err != nil { return err } @@ -133,8 +133,15 @@ func (c *metadCluster) syncMetadWorkload(nc *v1alpha1.NebulaCluster) error { } } - if err := c.setVersion(nc); err != nil { - return err + if nc.MetadComponent().IsReady() { + if err := c.setVersion(nc); err != nil { + return err + } + + endpoints := nc.GetMetadEndpoints(v1alpha1.MetadPortNameHTTP) + if err := updateDynamicFlags(endpoints, newWorkload.GetAnnotations(), oldWorkload.GetAnnotations(), e); err != nil { + return fmt.Errorf("update metad cluster %s dynamic flags failed: %v", newWorkload.GetName(), err) + } } return extender.UpdateWorkload(c.clientSet.Workload(), newWorkload, oldWorkload) @@ -160,8 +167,12 @@ func (c *metadCluster) syncNebulaClusterStatus(nc *v1alpha1.NebulaCluster, oldWo return syncComponentStatus(nc.MetadComponent(), &nc.Status.Metad, oldWorkload) } -func (c *metadCluster) syncMetadConfigMap(nc *v1alpha1.NebulaCluster) (*corev1.ConfigMap, string, error) { - return syncConfigMap(nc.MetadComponent(), c.clientSet.ConfigMap(), v1alpha1.MetadhConfigTemplate, nc.MetadComponent().GetConfigMapKey()) +func (c *metadCluster) syncMetadConfigMap(nc *v1alpha1.NebulaCluster) (*corev1.ConfigMap, string, bool, error) { + return syncConfigMap( + nc.MetadComponent(), + c.clientSet.ConfigMap(), + v1alpha1.MetadhConfigTemplate, + nc.MetadComponent().GetConfigMapKey()) } func (c *metadCluster) setVersion(nc *v1alpha1.NebulaCluster) error { diff --git a/pkg/controller/component/storaged_cluster.go b/pkg/controller/component/storaged_cluster.go index 3065aaa0..dfb819bd 100644 --- a/pkg/controller/component/storaged_cluster.go +++ b/pkg/controller/component/storaged_cluster.go @@ -97,7 +97,7 @@ func (c *storagedCluster) syncStoragedWorkload(nc *v1alpha1.NebulaCluster) error notExist := apierrors.IsNotFound(err) oldWorkload := oldWorkloadTemp.DeepCopy() - cm, cmHash, err := c.syncStoragedConfigMap(nc) + cm, cmHash, e, err := c.syncStoragedConfigMap(nc) if err != nil { return err } @@ -161,6 +161,13 @@ func (c *storagedCluster) syncStoragedWorkload(nc *v1alpha1.NebulaCluster) error } } + if nc.StoragedComponent().IsReady() { + endpoints := nc.GetStoragedEndpoints(v1alpha1.StoragedPortNameHTTP) + if err := updateDynamicFlags(endpoints, newWorkload.GetAnnotations(), oldWorkload.GetAnnotations(), e); err != nil { + return fmt.Errorf("update storaged cluster %s dynamic flags failed: %v", newWorkload.GetName(), err) + } + } + return extender.UpdateWorkload(c.clientSet.Workload(), newWorkload, oldWorkload) } @@ -188,8 +195,11 @@ func (c *storagedCluster) syncNebulaClusterStatus( return syncComponentStatus(nc.StoragedComponent(), &nc.Status.Storaged.ComponentStatus, oldWorkload) } -func (c *storagedCluster) syncStoragedConfigMap(nc *v1alpha1.NebulaCluster) (*corev1.ConfigMap, string, error) { - return syncConfigMap(nc.StoragedComponent(), c.clientSet.ConfigMap(), v1alpha1.StoragedConfigTemplate, +func (c *storagedCluster) syncStoragedConfigMap(nc *v1alpha1.NebulaCluster) (*corev1.ConfigMap, string, bool, error) { + return syncConfigMap( + nc.StoragedComponent(), + c.clientSet.ConfigMap(), + v1alpha1.StoragedConfigTemplate, nc.StoragedComponent().GetConfigMapKey()) } diff --git a/pkg/controller/nebularestore/nebula_restore_manager.go b/pkg/controller/nebularestore/nebula_restore_manager.go index 1d511192..0f7355d3 100644 --- a/pkg/controller/nebularestore/nebula_restore_manager.go +++ b/pkg/controller/nebularestore/nebula_restore_manager.go @@ -173,11 +173,11 @@ func (rm *restoreManager) syncRestoreProcess(rt *v1alpha1.NebulaRestore) error { } if !condition.IsRestoreMetadComplete(rt) { - if !rm.endpointsConnected(restored.GetMetadEndpoints()) { + if !rm.endpointsConnected(restored.GetMetadEndpoints(v1alpha1.MetadPortNameThrift)) { return utilerrors.ReconcileErrorf("restoring [%s/%s] in stage1, waiting for metad init agent are connected", ns, restoredName) } - if err := rm.restore.downloadMetaData(restored.GetMetadEndpoints()); err != nil { + if err := rm.restore.downloadMetaData(restored.GetMetadEndpoints(v1alpha1.MetadPortNameThrift)); err != nil { klog.Errorf("download metad files failed: %v", err) return err } @@ -192,8 +192,8 @@ func (rm *restoreManager) syncRestoreProcess(rt *v1alpha1.NebulaRestore) error { return utilerrors.ReconcileErrorf("restoring [%s/%s] in stage1, waiting for metad running", ns, restoredName) } - hostPairs := rm.restore.genHostPairs(rm.restore.bakMetas[0], restored.GetStoragedEndpoints()) - restoreResp, err := rm.restore.restoreMeta(rm.restore.bakMetas[0], hostPairs, restored.GetMetadEndpoints()) + hostPairs := rm.restore.genHostPairs(rm.restore.bakMetas[0], restored.GetStoragedEndpoints(v1alpha1.StoragedPortNameThrift)) + restoreResp, err := rm.restore.restoreMeta(rm.restore.bakMetas[0], hostPairs, restored.GetMetadEndpoints(v1alpha1.MetadPortNameThrift)) if err != nil { klog.Errorf("restore metad data [%s/%s] failed, error: %v", ns, restoredName, err) return err @@ -223,7 +223,7 @@ func (rm *restoreManager) syncRestoreProcess(rt *v1alpha1.NebulaRestore) error { } if !condition.IsRestoreStoragedComplete(rt) { - if !rm.endpointsConnected(restored.GetStoragedEndpoints()) { + if !rm.endpointsConnected(restored.GetStoragedEndpoints(v1alpha1.StoragedPortNameThrift)) { return utilerrors.ReconcileErrorf("restoring [%s/%s] in stage1, waiting for storaged init agent are connected", ns, restoredName) } @@ -235,7 +235,7 @@ func (rm *restoreManager) syncRestoreProcess(rt *v1alpha1.NebulaRestore) error { klog.Infof("restoring [%s/%s] in stage1, download storaged files successfully", ns, restoredName) - if err := rm.restore.playBackStorageData(restored.GetMetadEndpoints(), rm.restore.storageHosts); err != nil { + if err := rm.restore.playBackStorageData(restored.GetMetadEndpoints(v1alpha1.MetadPortNameThrift), rm.restore.storageHosts); err != nil { return err } @@ -266,7 +266,7 @@ func (rm *restoreManager) syncRestoreProcess(rt *v1alpha1.NebulaRestore) error { return utilerrors.ReconcileErrorf("restoring [%s/%s] in stage1, waiting for cluster ready", ns, restoredName) } - if !rm.endpointsConnected(restored.GetStoragedEndpoints()) { + if !rm.endpointsConnected(restored.GetStoragedEndpoints(v1alpha1.StoragedPortNameThrift)) { return utilerrors.ReconcileErrorf("restoring [%s/%s] in stage1, waiting for storaged sidecar agent are connected", ns, restoredName) } diff --git a/pkg/util/http/http.go b/pkg/util/http/http.go new file mode 100644 index 00000000..1b2bf65a --- /dev/null +++ b/pkg/util/http/http.go @@ -0,0 +1,56 @@ +package http + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "net/http" + "time" + + "k8s.io/klog/v2" +) + +func GetRequest(url string) (body []byte, err error) { + return request(url, "GET", nil) +} + +func PutRequest(url string, jsonData []byte) ([]byte, error) { + return request(url, "PUT", jsonData) +} + +func request(url, method string, jsonData []byte) ([]byte, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(jsonData)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + + client := http.Client{} + resp, err := client.Do(req) + if resp == nil { + return nil, fmt.Errorf("get %s response body is empty", url) + } + if err != nil { + return nil, fmt.Errorf("request %s failed: %v", url, err) + } + + defer func() { + if err := resp.Body.Close(); err != nil { + klog.Error(err) + } + }() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("io read error: %v", err) + } + if resp.StatusCode != 200 { + return nil, fmt.Errorf("unexpected status code %d, details: %s", resp.StatusCode, string(body)) + } + + return body, nil +} diff --git a/pkg/util/maputil/map.go b/pkg/util/maputil/map.go index f30e6ebd..7b4b3f8b 100644 --- a/pkg/util/maputil/map.go +++ b/pkg/util/maputil/map.go @@ -28,3 +28,27 @@ func IsSubMap(first, second map[string]string) bool { } return true } + +// AllKeysExist indicates whether all the first map keys in the second one +func AllKeysExist(first, second map[string]string) bool { + for k := range first { + if second == nil { + return false + } + if _, ok := second[k]; !ok { + return false + } + } + return true +} + +func ResetMap(first, second map[string]string) { + for k := range first { + if second == nil { + return + } + if v, ok := second[k]; ok { + first[k] = v + } + } +} From 5ece711be09e432217c0dcf46b55fd2c47dcd7fa Mon Sep 17 00:00:00 2001 From: "kevin.qiao" Date: Sun, 26 Feb 2023 21:01:14 +0800 Subject: [PATCH 2/2] reset first --- pkg/controller/component/graphd_cluster.go | 2 +- pkg/controller/component/helper.go | 24 +++++++++++--------- pkg/controller/component/metad_cluster.go | 2 +- pkg/controller/component/storaged_cluster.go | 2 +- 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/pkg/controller/component/graphd_cluster.go b/pkg/controller/component/graphd_cluster.go index 12dbe86d..a5650fa5 100644 --- a/pkg/controller/component/graphd_cluster.go +++ b/pkg/controller/component/graphd_cluster.go @@ -82,7 +82,7 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error { notExist := apierrors.IsNotFound(err) oldWorkload := oldWorkloadTemp.DeepCopy() - cm, cmHash, e, err := c.syncGraphdConfigMap(nc) + cm, cmHash, e, err := c.syncGraphdConfigMap(nc.DeepCopy()) if err != nil { return err } diff --git a/pkg/controller/component/helper.go b/pkg/controller/component/helper.go index 71d8d1ae..2a9da0d7 100644 --- a/pkg/controller/component/helper.go +++ b/pkg/controller/component/helper.go @@ -165,7 +165,7 @@ func syncConfigMap( if cfg != nil { e = maputil.AllKeysExist(cfg, v1alpha1.DynamicFlags) if !e { - customConf := config.AppendCustomConfig(template, component.GetConfig()) + customConf := config.AppendCustomConfig(template, cfg) cm.Data[cmKey] = customConf cmHash = hash.Hash(customConf) } @@ -179,16 +179,6 @@ func syncConfigMap( func updateDynamicFlags(endpoints []string, newAnnotations, oldAnnotations map[string]string, exists bool) error { newFlags := newAnnotations[annotation.AnnLastAppliedFlagsKey] - if exists && newFlags != "{}" { - for _, endpoint := range endpoints { - url := fmt.Sprintf("http://%s/flags", endpoint) - if _, err := httputil.PutRequest(url, []byte(newFlags)); err != nil { - return err - } - } - klog.Info("update dynamic flags successfully") - } - apply, ok := oldAnnotations[annotation.AnnLastAppliedFlagsKey] if ok { if apply != "{}" && newFlags == "{}" { @@ -210,8 +200,20 @@ func updateDynamicFlags(endpoints []string, newAnnotations, oldAnnotations map[s } klog.Info("reset dynamic flags successfully") } + return nil } } + + if exists && newFlags != "{}" { + for _, endpoint := range endpoints { + url := fmt.Sprintf("http://%s/flags", endpoint) + if _, err := httputil.PutRequest(url, []byte(newFlags)); err != nil { + return err + } + } + klog.Info("update dynamic flags successfully") + } + return nil } diff --git a/pkg/controller/component/metad_cluster.go b/pkg/controller/component/metad_cluster.go index f0b166c6..1553bbae 100644 --- a/pkg/controller/component/metad_cluster.go +++ b/pkg/controller/component/metad_cluster.go @@ -94,7 +94,7 @@ func (c *metadCluster) syncMetadWorkload(nc *v1alpha1.NebulaCluster) error { notExist := apierrors.IsNotFound(err) oldWorkload := oldWorkloadTemp.DeepCopy() - cm, cmHash, e, err := c.syncMetadConfigMap(nc) + cm, cmHash, e, err := c.syncMetadConfigMap(nc.DeepCopy()) if err != nil { return err } diff --git a/pkg/controller/component/storaged_cluster.go b/pkg/controller/component/storaged_cluster.go index dfb819bd..32a0c6dc 100644 --- a/pkg/controller/component/storaged_cluster.go +++ b/pkg/controller/component/storaged_cluster.go @@ -97,7 +97,7 @@ func (c *storagedCluster) syncStoragedWorkload(nc *v1alpha1.NebulaCluster) error notExist := apierrors.IsNotFound(err) oldWorkload := oldWorkloadTemp.DeepCopy() - cm, cmHash, e, err := c.syncStoragedConfigMap(nc) + cm, cmHash, e, err := c.syncStoragedConfigMap(nc.DeepCopy()) if err != nil { return err }