Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic flags #182

Merged
merged 2 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions apis/apps/v1alpha1/nebulacluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,16 @@ func (nc *NebulaCluster) GetMetadThriftConnAddress() string {
return nc.MetadComponent().GetConnAddress(MetadPortNameThrift)
}

func (nc *NebulaCluster) GetMetadEndpoints() []string {
return nc.MetadComponent().GetHeadlessConnAddresses(MetadPortNameThrift)
func (nc *NebulaCluster) GetMetadEndpoints(portName string) []string {
return nc.MetadComponent().GetEndpoints(portName)
}

func (nc *NebulaCluster) GetStoragedEndpoints() []string {
return nc.StoragedComponent().GetHeadlessConnAddresses(StoragedPortNameThrift)
func (nc *NebulaCluster) GetStoragedEndpoints(portName string) []string {
return nc.StoragedComponent().GetEndpoints(portName)
}

func (nc *NebulaCluster) GetGraphdEndpoints(portName string) []string {
return nc.GraphdComponent().GetEndpoints(portName)
}

func (nc *NebulaCluster) GetClusterName() string {
Expand Down
35 changes: 18 additions & 17 deletions apis/apps/v1alpha1/nebulacluster_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/vesoft-inc/nebula-operator/pkg/annotation"
"github.com/vesoft-inc/nebula-operator/pkg/label"
"github.com/vesoft-inc/nebula-operator/pkg/util/codec"
)

const (
Expand Down Expand Up @@ -91,19 +93,12 @@ func getPort(ports []corev1.ContainerPort, portName string) int32 {
return 0
}

func getConnAddress(serviceFQDN string, port int32) string {
return joinHostPort(serviceFQDN, port)
}

func getHeadlessConnAddresses(connAddress, componentName string, replicas int32, isHeadless bool) []string {
if isHeadless {
addresses := make([]string, 0, replicas)
for i := int32(0); i < replicas; i++ {
addresses = append(addresses, fmt.Sprintf("%s.%s", getPodName(componentName, i), connAddress))
}
return addresses
func getConnAddresses(connAddress, componentName string, replicas int32) []string {
addresses := make([]string, 0, replicas)
for i := int32(0); i < replicas; i++ {
addresses = append(addresses, fmt.Sprintf("%s.%s", getPodName(componentName, i), connAddress))
}
return []string{connAddress}
return addresses
}

func getKubernetesClusterDomain() string {
Expand Down Expand Up @@ -157,7 +152,7 @@ func getResources(res *corev1.ResourceRequirements) *corev1.ResourceRequirements
return res
}

func getConfigKey(componentType string) string {
func getCmKey(componentType string) string {
return fmt.Sprintf("nebula-%s.conf", componentType)
}

Expand Down Expand Up @@ -288,7 +283,7 @@ func generateContainers(c NebulaClusterComponentter, cm *corev1.ConfigMap) []cor
}
}

metadAddress := strings.Join(nc.GetMetadEndpoints(), ",")
metadAddress := strings.Join(nc.GetMetadEndpoints(MetadPortNameThrift), ",")
cmd = append(cmd, fmt.Sprintf("exec /usr/local/nebula/bin/nebula-%s", componentType)+
fmt.Sprintf(" --flagfile=/usr/local/nebula/etc/nebula-%s.conf", componentType)+
" --meta_server_addrs="+metadAddress+
Expand Down Expand Up @@ -362,7 +357,7 @@ func generateStatefulSet(c NebulaClusterComponentter, cm *corev1.ConfigMap, enab

nc := c.GetNebulaCluster()

configKey := getConfigKey(componentType)
cmKey := getCmKey(componentType)
containers := generateContainers(c, cm)
volumes := c.GenerateVolumes()
if cm != nil {
Expand All @@ -374,8 +369,8 @@ func generateStatefulSet(c NebulaClusterComponentter, cm *corev1.ConfigMap, enab
Name: c.GetName(),
},
Items: []corev1.KeyToPath{{
Key: configKey,
Path: configKey,
Key: cmKey,
Path: cmKey,
}},
},
},
Expand Down Expand Up @@ -410,12 +405,18 @@ func generateStatefulSet(c NebulaClusterComponentter, cm *corev1.ConfigMap, enab
return nil, err
}

apply, err := codec.Encode(c.GetConfig())
if err != nil {
return nil, err
}

mergeLabels := mergeStringMaps(true, componentLabel, c.GetPodLabels())
replicas := c.GetReplicas()
sts := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: c.GetName(),
Namespace: namespace,
Annotations: map[string]string{annotation.AnnLastAppliedFlagsKey: apply},
Labels: componentLabel,
OwnerReferences: c.GenerateOwnerReferences(),
},
Expand Down
2 changes: 1 addition & 1 deletion apis/apps/v1alpha1/nebulacluster_componentter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type NebulaClusterComponentter interface {
GetPodFQDN(int32) string
GetPort(string) int32
GetConnAddress(string) string
GetHeadlessConnAddresses(string) []string
GetEndpoints(string) []string

IsReady() bool

Expand Down
13 changes: 6 additions & 7 deletions apis/apps/v1alpha1/nebulacluster_graphd.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (c *graphdComponent) GetConfig() map[string]string {
}

func (c *graphdComponent) GetConfigMapKey() string {
return getConfigKey(c.Type().String())
return getCmKey(c.Type().String())
}

func (c *graphdComponent) GetResources() *corev1.ResourceRequirements {
Expand Down Expand Up @@ -183,15 +183,14 @@ func (c *graphdComponent) GetPort(portName string) int32 {
}

func (c *graphdComponent) GetConnAddress(portName string) string {
return getConnAddress(c.GetServiceFQDN(), c.GetPort(portName))
return joinHostPort(c.GetServiceFQDN(), c.GetPort(portName))
}

func (c *graphdComponent) GetHeadlessConnAddresses(portName string) []string {
return getHeadlessConnAddresses(
func (c *graphdComponent) GetEndpoints(portName string) []string {
return getConnAddresses(
c.GetConnAddress(portName),
c.GetName(),
c.GetReplicas(),
c.IsHeadlessService())
c.GetReplicas())
}

func (c *graphdComponent) IsReady() bool {
Expand Down Expand Up @@ -292,7 +291,7 @@ func (c *graphdComponent) GenerateService() *corev1.Service {

func (c *graphdComponent) GenerateConfigMap() *corev1.ConfigMap {
cm := generateConfigMap(c)
configKey := getConfigKey(c.Type().String())
configKey := getCmKey(c.Type().String())
cm.Data[configKey] = GraphdConfigTemplate
return cm
}
Expand Down
13 changes: 6 additions & 7 deletions apis/apps/v1alpha1/nebulacluster_metad.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (c *metadComponent) GetConfig() map[string]string {
}

func (c *metadComponent) GetConfigMapKey() string {
return getConfigKey(c.Type().String())
return getCmKey(c.Type().String())
}

func (c *metadComponent) GetResources() *corev1.ResourceRequirements {
Expand Down Expand Up @@ -197,15 +197,14 @@ func (c *metadComponent) GetPort(portName string) int32 {
}

func (c *metadComponent) GetConnAddress(portName string) string {
return getConnAddress(c.GetServiceFQDN(), c.GetPort(portName))
return joinHostPort(c.GetServiceFQDN(), c.GetPort(portName))
}

func (c *metadComponent) GetHeadlessConnAddresses(portName string) []string {
return getHeadlessConnAddresses(
func (c *metadComponent) GetEndpoints(portName string) []string {
return getConnAddresses(
c.GetConnAddress(portName),
c.GetName(),
c.GetReplicas(),
c.IsHeadlessService())
c.GetReplicas())
}

func (c *metadComponent) IsReady() bool {
Expand Down Expand Up @@ -367,7 +366,7 @@ func (c *metadComponent) GenerateService() *corev1.Service {

func (c *metadComponent) GenerateConfigMap() *corev1.ConfigMap {
cm := generateConfigMap(c)
configKey := getConfigKey(c.Type().String())
configKey := getCmKey(c.Type().String())
cm.Data[configKey] = MetadhConfigTemplate
return cm
}
Expand Down
13 changes: 6 additions & 7 deletions apis/apps/v1alpha1/nebulacluster_storaged.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (c *storagedComponent) GetConfig() map[string]string {
}

func (c *storagedComponent) GetConfigMapKey() string {
return getConfigKey(c.Type().String())
return getCmKey(c.Type().String())
}

func (c *storagedComponent) GetResources() *corev1.ResourceRequirements {
Expand Down Expand Up @@ -202,15 +202,14 @@ func (c *storagedComponent) GetPort(portName string) int32 {
}

func (c *storagedComponent) GetConnAddress(portName string) string {
return getConnAddress(c.GetServiceFQDN(), c.GetPort(portName))
return joinHostPort(c.GetServiceFQDN(), c.GetPort(portName))
}

func (c *storagedComponent) GetHeadlessConnAddresses(portName string) []string {
return getHeadlessConnAddresses(
func (c *storagedComponent) GetEndpoints(portName string) []string {
return getConnAddresses(
c.GetConnAddress(portName),
c.GetName(),
c.GetReplicas(),
c.IsHeadlessService())
c.GetReplicas())
}

func (c *storagedComponent) IsReady() bool {
Expand Down Expand Up @@ -351,7 +350,7 @@ func (c *storagedComponent) GenerateService() *corev1.Service {

func (c *storagedComponent) GenerateConfigMap() *corev1.ConfigMap {
cm := generateConfigMap(c)
configKey := getConfigKey(c.Type().String())
configKey := getCmKey(c.Type().String())
cm.Data[configKey] = StoragedConfigTemplate
return cm
}
Expand Down
33 changes: 33 additions & 0 deletions apis/apps/v1alpha1/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,37 @@ limitations under the License.

package v1alpha1

var DynamicFlags = map[string]string{
"minloglevel": "0",
"v": "0",
"accept_partial_success": "false",
"session_reclaim_interval_secs": "60",
"max_allowed_query_size": "4194304",
"system_memory_high_watermark_ratio": "0.8",
"ng_black_box_file_lifetime_seconds": "1800",
"memory_tracker_limit_ratio": "0.8",
"memory_tracker_untracked_reserved_memory_mb": "50",
"memory_tracker_detail_log": "false",
"memory_tracker_detail_log_interval_ms": "60000",
"memory_purge_enabled": "true",
"memory_purge_interval_seconds": "10",
"heartbeat_interval_secs": "10",
"raft_heartbeat_interval_secs": "30",
"raft_rpc_timeout_ms": "500",
"wal_ttl": "14400",
"query_concurrently": "true",
"auto_remove_invalid_space": "true",
"num_io_threads": "16",
"num_worker_threads": "0",
"max_concurrent_subtasks": "10",
"snapshot_part_rate_limit": "10485760",
"snapshot_batch_size": "1048576",
"rebuild_index_part_rate_limit": "4194304",
"rocksdb_db_options": "{}",
"rocksdb_column_family_options": `{"write_buffer_size":"67108864","max_write_buffer_number":"4","max_bytes_for_level_base":"268435456"}`,
"rocksdb_block_based_table_options": `{"block_size":"8192"}`,
}

const (
// nolint: revive
GraphdConfigTemplate = `
Expand Down Expand Up @@ -240,6 +271,8 @@ const (
# Root data path, here should be only single path for metad
--data_path=data/meta

# !!! Minimum reserved bytes of data path
--minimum_reserved_bytes=268435456
########## Misc #########
# The default number of parts when a space is created
--default_parts_num=10
Expand Down
8 changes: 4 additions & 4 deletions charts/nebula-cluster/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ nebula:
limits:
cpu: "1"
memory: "500Mi"
logStorage: "1Gi"
logStorage: "500Mi"
podLabels: {}
podAnnotations: {}
nodeSelector: {}
Expand All @@ -47,8 +47,8 @@ nebula:
limits:
cpu: "1"
memory: "1Gi"
logStorage: "1Gi"
dataStorage: "10Gi"
logStorage: "500Mi"
dataStorage: "2Gi"
license: {}
podLabels: {}
podAnnotations: {}
Expand All @@ -72,7 +72,7 @@ nebula:
limits:
cpu: "1"
memory: "1Gi"
logStorage: "1Gi"
logStorage: "500Mi"
dataStorage: "10Gi"
enableAutoBalance: false
podLabels: {}
Expand Down
2 changes: 2 additions & 0 deletions pkg/annotation/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
AnnLastSyncTimestampKey = "nebula-graph.io/sync-timestamp"
// AnnHaModeKey is annotation key to indicate whether in ha mode
AnnHaModeKey = "nebula-graph.io/ha-mode"
// AnnLastAppliedFlagsKey is annotation key to indicate the last applied custom flags
AnnLastAppliedFlagsKey = "nebula-graph.io/last-applied-flags"
// AnnLastAppliedConfigKey is annotation key to indicate the last applied configuration
AnnLastAppliedConfigKey = "nebula-graph.io/last-applied-configuration"
// AnnPodSchedulingKey is pod scheduling annotation key, it represents whether the pod is scheduling
Expand Down
14 changes: 10 additions & 4 deletions pkg/controller/component/graphd_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error {
notExist := apierrors.IsNotFound(err)
oldWorkload := oldWorkloadTemp.DeepCopy()

cm, cmHash, err := c.syncGraphdConfigMap(nc)
cm, cmHash, e, err := c.syncGraphdConfigMap(nc.DeepCopy())
if err != nil {
return err
}
Expand Down Expand Up @@ -121,6 +121,13 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error {
}
}

if nc.GraphdComponent().IsReady() {
endpoints := nc.GetGraphdEndpoints(v1alpha1.GraphdPortNameHTTP)
if err := updateDynamicFlags(endpoints, newWorkload.GetAnnotations(), oldWorkload.GetAnnotations(), e); err != nil {
return fmt.Errorf("update graphd cluster %s dynamic flags failed: %v", newWorkload.GetName(), err)
}
}

return extender.UpdateWorkload(c.clientSet.Workload(), newWorkload, oldWorkload)
}

Expand Down Expand Up @@ -165,13 +172,12 @@ func (c *graphdCluster) syncGraphdService(nc *v1alpha1.NebulaCluster) error {
return syncService(newSvc, c.clientSet.Service())
}

func (c *graphdCluster) syncGraphdConfigMap(nc *v1alpha1.NebulaCluster) (*corev1.ConfigMap, string, error) {
func (c *graphdCluster) syncGraphdConfigMap(nc *v1alpha1.NebulaCluster) (*corev1.ConfigMap, string, bool, error) {
return syncConfigMap(
nc.GraphdComponent(),
c.clientSet.ConfigMap(),
v1alpha1.GraphdConfigTemplate,
nc.GraphdComponent().GetConfigMapKey(),
)
nc.GraphdComponent().GetConfigMapKey())
}

type FakeGraphdCluster struct {
Expand Down
Loading