Skip to content

Commit

Permalink
Dynamic flags (#182)
Browse files Browse the repository at this point in the history
  • Loading branch information
MegaByte875 committed Feb 27, 2023
1 parent 02d11a9 commit 427670b
Show file tree
Hide file tree
Showing 16 changed files with 267 additions and 73 deletions.
12 changes: 8 additions & 4 deletions apis/apps/v1alpha1/nebulacluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,16 @@ func (nc *NebulaCluster) GetMetadThriftConnAddress() string {
return nc.MetadComponent().GetConnAddress(MetadPortNameThrift)
}

func (nc *NebulaCluster) GetMetadEndpoints() []string {
return nc.MetadComponent().GetHeadlessConnAddresses(MetadPortNameThrift)
func (nc *NebulaCluster) GetMetadEndpoints(portName string) []string {
return nc.MetadComponent().GetEndpoints(portName)
}

func (nc *NebulaCluster) GetStoragedEndpoints() []string {
return nc.StoragedComponent().GetHeadlessConnAddresses(StoragedPortNameThrift)
func (nc *NebulaCluster) GetStoragedEndpoints(portName string) []string {
return nc.StoragedComponent().GetEndpoints(portName)
}

func (nc *NebulaCluster) GetGraphdEndpoints(portName string) []string {
return nc.GraphdComponent().GetEndpoints(portName)
}

func (nc *NebulaCluster) GetClusterName() string {
Expand Down
35 changes: 18 additions & 17 deletions apis/apps/v1alpha1/nebulacluster_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/vesoft-inc/nebula-operator/pkg/annotation"
"github.com/vesoft-inc/nebula-operator/pkg/label"
"github.com/vesoft-inc/nebula-operator/pkg/util/codec"
)

const (
Expand Down Expand Up @@ -91,19 +93,12 @@ func getPort(ports []corev1.ContainerPort, portName string) int32 {
return 0
}

func getConnAddress(serviceFQDN string, port int32) string {
return joinHostPort(serviceFQDN, port)
}

func getHeadlessConnAddresses(connAddress, componentName string, replicas int32, isHeadless bool) []string {
if isHeadless {
addresses := make([]string, 0, replicas)
for i := int32(0); i < replicas; i++ {
addresses = append(addresses, fmt.Sprintf("%s.%s", getPodName(componentName, i), connAddress))
}
return addresses
func getConnAddresses(connAddress, componentName string, replicas int32) []string {
addresses := make([]string, 0, replicas)
for i := int32(0); i < replicas; i++ {
addresses = append(addresses, fmt.Sprintf("%s.%s", getPodName(componentName, i), connAddress))
}
return []string{connAddress}
return addresses
}

func getKubernetesClusterDomain() string {
Expand Down Expand Up @@ -157,7 +152,7 @@ func getResources(res *corev1.ResourceRequirements) *corev1.ResourceRequirements
return res
}

func getConfigKey(componentType string) string {
func getCmKey(componentType string) string {
return fmt.Sprintf("nebula-%s.conf", componentType)
}

Expand Down Expand Up @@ -288,7 +283,7 @@ func generateContainers(c NebulaClusterComponentter, cm *corev1.ConfigMap) []cor
}
}

metadAddress := strings.Join(nc.GetMetadEndpoints(), ",")
metadAddress := strings.Join(nc.GetMetadEndpoints(MetadPortNameThrift), ",")
cmd = append(cmd, fmt.Sprintf("exec /usr/local/nebula/bin/nebula-%s", componentType)+
fmt.Sprintf(" --flagfile=/usr/local/nebula/etc/nebula-%s.conf", componentType)+
" --meta_server_addrs="+metadAddress+
Expand Down Expand Up @@ -362,7 +357,7 @@ func generateStatefulSet(c NebulaClusterComponentter, cm *corev1.ConfigMap, enab

nc := c.GetNebulaCluster()

configKey := getConfigKey(componentType)
cmKey := getCmKey(componentType)
containers := generateContainers(c, cm)
volumes := c.GenerateVolumes()
if cm != nil {
Expand All @@ -374,8 +369,8 @@ func generateStatefulSet(c NebulaClusterComponentter, cm *corev1.ConfigMap, enab
Name: c.GetName(),
},
Items: []corev1.KeyToPath{{
Key: configKey,
Path: configKey,
Key: cmKey,
Path: cmKey,
}},
},
},
Expand Down Expand Up @@ -410,12 +405,18 @@ func generateStatefulSet(c NebulaClusterComponentter, cm *corev1.ConfigMap, enab
return nil, err
}

apply, err := codec.Encode(c.GetConfig())
if err != nil {
return nil, err
}

mergeLabels := mergeStringMaps(true, componentLabel, c.GetPodLabels())
replicas := c.GetReplicas()
sts := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: c.GetName(),
Namespace: namespace,
Annotations: map[string]string{annotation.AnnLastAppliedFlagsKey: apply},
Labels: componentLabel,
OwnerReferences: c.GenerateOwnerReferences(),
},
Expand Down
2 changes: 1 addition & 1 deletion apis/apps/v1alpha1/nebulacluster_componentter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type NebulaClusterComponentter interface {
GetPodFQDN(int32) string
GetPort(string) int32
GetConnAddress(string) string
GetHeadlessConnAddresses(string) []string
GetEndpoints(string) []string

IsReady() bool

Expand Down
13 changes: 6 additions & 7 deletions apis/apps/v1alpha1/nebulacluster_graphd.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (c *graphdComponent) GetConfig() map[string]string {
}

func (c *graphdComponent) GetConfigMapKey() string {
return getConfigKey(c.Type().String())
return getCmKey(c.Type().String())
}

func (c *graphdComponent) GetResources() *corev1.ResourceRequirements {
Expand Down Expand Up @@ -183,15 +183,14 @@ func (c *graphdComponent) GetPort(portName string) int32 {
}

func (c *graphdComponent) GetConnAddress(portName string) string {
return getConnAddress(c.GetServiceFQDN(), c.GetPort(portName))
return joinHostPort(c.GetServiceFQDN(), c.GetPort(portName))
}

func (c *graphdComponent) GetHeadlessConnAddresses(portName string) []string {
return getHeadlessConnAddresses(
func (c *graphdComponent) GetEndpoints(portName string) []string {
return getConnAddresses(
c.GetConnAddress(portName),
c.GetName(),
c.GetReplicas(),
c.IsHeadlessService())
c.GetReplicas())
}

func (c *graphdComponent) IsReady() bool {
Expand Down Expand Up @@ -292,7 +291,7 @@ func (c *graphdComponent) GenerateService() *corev1.Service {

func (c *graphdComponent) GenerateConfigMap() *corev1.ConfigMap {
cm := generateConfigMap(c)
configKey := getConfigKey(c.Type().String())
configKey := getCmKey(c.Type().String())
cm.Data[configKey] = GraphdConfigTemplate
return cm
}
Expand Down
13 changes: 6 additions & 7 deletions apis/apps/v1alpha1/nebulacluster_metad.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (c *metadComponent) GetConfig() map[string]string {
}

func (c *metadComponent) GetConfigMapKey() string {
return getConfigKey(c.Type().String())
return getCmKey(c.Type().String())
}

func (c *metadComponent) GetResources() *corev1.ResourceRequirements {
Expand Down Expand Up @@ -197,15 +197,14 @@ func (c *metadComponent) GetPort(portName string) int32 {
}

func (c *metadComponent) GetConnAddress(portName string) string {
return getConnAddress(c.GetServiceFQDN(), c.GetPort(portName))
return joinHostPort(c.GetServiceFQDN(), c.GetPort(portName))
}

func (c *metadComponent) GetHeadlessConnAddresses(portName string) []string {
return getHeadlessConnAddresses(
func (c *metadComponent) GetEndpoints(portName string) []string {
return getConnAddresses(
c.GetConnAddress(portName),
c.GetName(),
c.GetReplicas(),
c.IsHeadlessService())
c.GetReplicas())
}

func (c *metadComponent) IsReady() bool {
Expand Down Expand Up @@ -367,7 +366,7 @@ func (c *metadComponent) GenerateService() *corev1.Service {

func (c *metadComponent) GenerateConfigMap() *corev1.ConfigMap {
cm := generateConfigMap(c)
configKey := getConfigKey(c.Type().String())
configKey := getCmKey(c.Type().String())
cm.Data[configKey] = MetadhConfigTemplate
return cm
}
Expand Down
13 changes: 6 additions & 7 deletions apis/apps/v1alpha1/nebulacluster_storaged.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (c *storagedComponent) GetConfig() map[string]string {
}

func (c *storagedComponent) GetConfigMapKey() string {
return getConfigKey(c.Type().String())
return getCmKey(c.Type().String())
}

func (c *storagedComponent) GetResources() *corev1.ResourceRequirements {
Expand Down Expand Up @@ -202,15 +202,14 @@ func (c *storagedComponent) GetPort(portName string) int32 {
}

func (c *storagedComponent) GetConnAddress(portName string) string {
return getConnAddress(c.GetServiceFQDN(), c.GetPort(portName))
return joinHostPort(c.GetServiceFQDN(), c.GetPort(portName))
}

func (c *storagedComponent) GetHeadlessConnAddresses(portName string) []string {
return getHeadlessConnAddresses(
func (c *storagedComponent) GetEndpoints(portName string) []string {
return getConnAddresses(
c.GetConnAddress(portName),
c.GetName(),
c.GetReplicas(),
c.IsHeadlessService())
c.GetReplicas())
}

func (c *storagedComponent) IsReady() bool {
Expand Down Expand Up @@ -351,7 +350,7 @@ func (c *storagedComponent) GenerateService() *corev1.Service {

func (c *storagedComponent) GenerateConfigMap() *corev1.ConfigMap {
cm := generateConfigMap(c)
configKey := getConfigKey(c.Type().String())
configKey := getCmKey(c.Type().String())
cm.Data[configKey] = StoragedConfigTemplate
return cm
}
Expand Down
33 changes: 33 additions & 0 deletions apis/apps/v1alpha1/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,37 @@ limitations under the License.

package v1alpha1

var DynamicFlags = map[string]string{
"minloglevel": "0",
"v": "0",
"accept_partial_success": "false",
"session_reclaim_interval_secs": "60",
"max_allowed_query_size": "4194304",
"system_memory_high_watermark_ratio": "0.8",
"ng_black_box_file_lifetime_seconds": "1800",
"memory_tracker_limit_ratio": "0.8",
"memory_tracker_untracked_reserved_memory_mb": "50",
"memory_tracker_detail_log": "false",
"memory_tracker_detail_log_interval_ms": "60000",
"memory_purge_enabled": "true",
"memory_purge_interval_seconds": "10",
"heartbeat_interval_secs": "10",
"raft_heartbeat_interval_secs": "30",
"raft_rpc_timeout_ms": "500",
"wal_ttl": "14400",
"query_concurrently": "true",
"auto_remove_invalid_space": "true",
"num_io_threads": "16",
"num_worker_threads": "0",
"max_concurrent_subtasks": "10",
"snapshot_part_rate_limit": "10485760",
"snapshot_batch_size": "1048576",
"rebuild_index_part_rate_limit": "4194304",
"rocksdb_db_options": "{}",
"rocksdb_column_family_options": `{"write_buffer_size":"67108864","max_write_buffer_number":"4","max_bytes_for_level_base":"268435456"}`,
"rocksdb_block_based_table_options": `{"block_size":"8192"}`,
}

const (
// nolint: revive
GraphdConfigTemplate = `
Expand Down Expand Up @@ -240,6 +271,8 @@ const (
# Root data path, here should be only single path for metad
--data_path=data/meta
# !!! Minimum reserved bytes of data path
--minimum_reserved_bytes=268435456
########## Misc #########
# The default number of parts when a space is created
--default_parts_num=10
Expand Down
8 changes: 4 additions & 4 deletions charts/nebula-cluster/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ nebula:
limits:
cpu: "1"
memory: "500Mi"
logStorage: "1Gi"
logStorage: "500Mi"
podLabels: {}
podAnnotations: {}
nodeSelector: {}
Expand All @@ -47,8 +47,8 @@ nebula:
limits:
cpu: "1"
memory: "1Gi"
logStorage: "1Gi"
dataStorage: "10Gi"
logStorage: "500Mi"
dataStorage: "2Gi"
license: {}
podLabels: {}
podAnnotations: {}
Expand All @@ -72,7 +72,7 @@ nebula:
limits:
cpu: "1"
memory: "1Gi"
logStorage: "1Gi"
logStorage: "500Mi"
dataStorage: "10Gi"
enableAutoBalance: false
podLabels: {}
Expand Down
2 changes: 2 additions & 0 deletions pkg/annotation/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
AnnLastSyncTimestampKey = "nebula-graph.io/sync-timestamp"
// AnnHaModeKey is annotation key to indicate whether in ha mode
AnnHaModeKey = "nebula-graph.io/ha-mode"
// AnnLastAppliedFlagsKey is annotation key to indicate the last applied custom flags
AnnLastAppliedFlagsKey = "nebula-graph.io/last-applied-flags"
// AnnLastAppliedConfigKey is annotation key to indicate the last applied configuration
AnnLastAppliedConfigKey = "nebula-graph.io/last-applied-configuration"
// AnnPodSchedulingKey is pod scheduling annotation key, it represents whether the pod is scheduling
Expand Down
14 changes: 10 additions & 4 deletions pkg/controller/component/graphd_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error {
notExist := apierrors.IsNotFound(err)
oldWorkload := oldWorkloadTemp.DeepCopy()

cm, cmHash, err := c.syncGraphdConfigMap(nc)
cm, cmHash, e, err := c.syncGraphdConfigMap(nc.DeepCopy())
if err != nil {
return err
}
Expand Down Expand Up @@ -121,6 +121,13 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error {
}
}

if nc.GraphdComponent().IsReady() {
endpoints := nc.GetGraphdEndpoints(v1alpha1.GraphdPortNameHTTP)
if err := updateDynamicFlags(endpoints, newWorkload.GetAnnotations(), oldWorkload.GetAnnotations(), e); err != nil {
return fmt.Errorf("update graphd cluster %s dynamic flags failed: %v", newWorkload.GetName(), err)
}
}

return extender.UpdateWorkload(c.clientSet.Workload(), newWorkload, oldWorkload)
}

Expand Down Expand Up @@ -165,13 +172,12 @@ func (c *graphdCluster) syncGraphdService(nc *v1alpha1.NebulaCluster) error {
return syncService(newSvc, c.clientSet.Service())
}

func (c *graphdCluster) syncGraphdConfigMap(nc *v1alpha1.NebulaCluster) (*corev1.ConfigMap, string, error) {
func (c *graphdCluster) syncGraphdConfigMap(nc *v1alpha1.NebulaCluster) (*corev1.ConfigMap, string, bool, error) {
return syncConfigMap(
nc.GraphdComponent(),
c.clientSet.ConfigMap(),
v1alpha1.GraphdConfigTemplate,
nc.GraphdComponent().GetConfigMapKey(),
)
nc.GraphdComponent().GetConfigMapKey())
}

type FakeGraphdCluster struct {
Expand Down
Loading

0 comments on commit 427670b

Please sign in to comment.