Skip to content

Commit

Permalink
Merge pull request #1153 from memphisdev/producer_consumer_gc_config
Browse files Browse the repository at this point in the history
cluster configuration for producers and consumers gc
  • Loading branch information
daniel-davidd committed Jul 23, 2023
2 parents d473459 + 532205a commit bac3308
Show file tree
Hide file tree
Showing 12 changed files with 91 additions and 31 deletions.
1 change: 1 addition & 0 deletions conf/debug/memphis-0.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ k8s_namespace: "memphis"
logs_retention_days: 7
tiered_storage_upload_interval_seconds: 8
dls_retention_hours: 3
gc_producer_consumer_retention_hours: 3
# ui_host: ""
# rest_gw_host: ""
# broker_host: ""
Expand Down
1 change: 1 addition & 0 deletions conf/debug/memphis-1.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ k8s_namespace: "memphis"
logs_retention_days: 7
tiered_storage_upload_interval_seconds: 8
dls_retention_hours: 3
gc_producer_consumer_retention_hours: 3
# ui_host: ""
# rest_gw_host: ""
# broker_host: ""
Expand Down
1 change: 1 addition & 0 deletions conf/debug/memphis-2.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ k8s_namespace: "memphis"
logs_retention_days: 7
tiered_storage_upload_interval_seconds: 8
dls_retention_hours: 3
gc_producer_consumer_retention_hours: 3
# ui_host: ""
# rest_gw_host: ""
# broker_host: ""
Expand Down
1 change: 1 addition & 0 deletions conf/debug/stand-alone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ rest_gw_port: 4444
logs_retention_days: 7
tiered_storage_upload_interval_seconds: 8
dls_retention_hours: 3
gc_producer_consumer_retention_hours: 3
# ui_host: ""
# rest_gw_host: ""
# broker_host: ""
Expand Down
15 changes: 8 additions & 7 deletions models/configurations.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
package models

type EditClusterConfigSchema struct {
DlsRetention int `json:"dls_retention" binding:"required"`
LogsRetention int `json:"logs_retention" binding:"required"`
BrokerHost string `json:"broker_host"`
UiHost string `json:"ui_host"`
RestGWHost string `json:"rest_gw_host"`
TSTimeSec int `json:"tiered_storage_time_sec"`
MaxMsgSizeMb int `json:"max_msg_size_mb"`
DlsRetention int `json:"dls_retention" binding:"required"`
LogsRetention int `json:"logs_retention" binding:"required"`
BrokerHost string `json:"broker_host"`
UiHost string `json:"ui_host"`
RestGWHost string `json:"rest_gw_host"`
TSTimeSec int `json:"tiered_storage_time_sec"`
MaxMsgSizeMb int `json:"max_msg_size_mb"`
GCProducersConsumersRetentionHours int `json:"gc_producer_consumer_retention_hours"`
}

type GlobalConfigurationsUpdate struct {
Expand Down
2 changes: 1 addition & 1 deletion server/background_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func (s *Server) RemoveOldDlsMsgs() {
func (s *Server) RemoveOldProducersAndConsumers() {
ticker := time.NewTicker(15 * time.Minute)
for range ticker.C {
timeInterval := time.Now().Add(time.Duration(time.Hour * -2))
timeInterval := time.Now().Add(time.Duration(time.Hour * -time.Duration(s.opts.GCProducersConsumersRetentionHours)))
deletedCGs, err := db.DeleteOldProducersAndConsumers(timeInterval)
if err != nil {
serv.Errorf("RemoveOldProducersAndConsumers at DeleteOldProducersAndConsumers : %v", err.Error())
Expand Down
38 changes: 24 additions & 14 deletions server/memphis_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,14 @@ func (ch ConfigurationsHandler) EditClusterConfig(c *gin.Context) {
return
}
}
if ch.S.opts.GCProducersConsumersRetentionHours != body.GCProducersConsumersRetentionHours {
err := changeGCProducersConsumersRetentionHours(body.GCProducersConsumersRetentionHours, user.TenantName)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]EditConfigurations at changeGCProducersConsumersRetentionHours: %v", user.TenantName, user.Username, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
}
if ch.S.opts.LogsRetentionDays != body.LogsRetention {
err := changeLogsRetention(body.LogsRetention)
if err != nil {
Expand Down Expand Up @@ -1057,13 +1065,14 @@ func (ch ConfigurationsHandler) EditClusterConfig(c *gin.Context) {
}

c.IndentedJSON(200, gin.H{
"dls_retention": body.DlsRetention,
"logs_retention": body.LogsRetention,
"broker_host": brokerHost,
"ui_host": uiHost,
"rest_gw_host": restGWHost,
"tiered_storage_time_sec": body.TSTimeSec,
"max_msg_size_mb": int32(body.MaxMsgSizeMb),
"dls_retention": body.DlsRetention,
"logs_retention": body.LogsRetention,
"broker_host": brokerHost,
"ui_host": uiHost,
"rest_gw_host": restGWHost,
"tiered_storage_time_sec": body.TSTimeSec,
"max_msg_size_mb": int32(body.MaxMsgSizeMb),
"gc_producer_consumer_retention_hours": body.GCProducersConsumersRetentionHours,
})
}

Expand All @@ -1081,13 +1090,14 @@ func (ch ConfigurationsHandler) GetClusterConfig(c *gin.Context) {
analytics.SendEvent(user.TenantName, user.Username, analyticsParams, "user-enter-cluster-config-page")
}
c.IndentedJSON(200, gin.H{
"dls_retention": ch.S.opts.DlsRetentionHours[user.TenantName],
"logs_retention": ch.S.opts.LogsRetentionDays,
"broker_host": ch.S.opts.BrokerHost,
"ui_host": ch.S.opts.UiHost,
"rest_gw_host": ch.S.opts.RestGwHost,
"tiered_storage_time_sec": ch.S.opts.TieredStorageUploadIntervalSec,
"max_msg_size_mb": ch.S.opts.MaxPayload / 1024 / 1024,
"dls_retention": ch.S.opts.DlsRetentionHours[user.TenantName],
"logs_retention": ch.S.opts.LogsRetentionDays,
"broker_host": ch.S.opts.BrokerHost,
"ui_host": ch.S.opts.UiHost,
"rest_gw_host": ch.S.opts.RestGwHost,
"tiered_storage_time_sec": ch.S.opts.TieredStorageUploadIntervalSec,
"max_msg_size_mb": ch.S.opts.MaxPayload / 1024 / 1024,
"gc_producer_consumer_retention_hours": ch.S.opts.GCProducersConsumersRetentionHours,
})
}

Expand Down
8 changes: 8 additions & 0 deletions server/memphis_handlers_configurations.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ func changeDlsRetention(dlsRetention int, tenantName string) error {
return nil
}

func changeGCProducersConsumersRetentionHours(retention int, tenantName string) error {
err := db.UpsertConfiguration("gc_producer_consumer_retention_hours", strconv.Itoa(retention), tenantName)
if err != nil {
return err
}
return nil
}

func changeLogsRetention(logsRetention int) error {
err := db.UpsertConfiguration("logs_retention", strconv.Itoa(logsRetention), serv.MemphisGlobalAccountString())
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions server/memphis_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,9 @@ func GetMemphisOpts(opts *Options) (*Options, error) {
case "max_msg_size_mb":
v, _ := strconv.Atoi(conf.Value)
opts.MaxPayload = int32(v * 1024 * 1024)
case "gc_producer_consumer_retention_hours":
v, _ := strconv.Atoi(conf.Value)
opts.GCProducersConsumersRetentionHours = v
}
}

Expand Down
30 changes: 21 additions & 9 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,15 +286,16 @@ type Options struct {
LameDuckGracePeriod time.Duration `json:"-"`

// memphis options
UiPort int `json:"-"`
RestGwPort int `json:"-"`
K8sNamespace string `json:"-"`
LogsRetentionDays int `json:"-"`
TieredStorageUploadIntervalSec int `json:"-"`
DlsRetentionHours map[string]int `json:"-"`
UiHost string `json:"-"`
RestGwHost string `json:"-"`
BrokerHost string `json:"-"`
UiPort int `json:"-"`
RestGwPort int `json:"-"`
K8sNamespace string `json:"-"`
LogsRetentionDays int `json:"-"`
TieredStorageUploadIntervalSec int `json:"-"`
DlsRetentionHours map[string]int `json:"-"`
GCProducersConsumersRetentionHours int `json:"-"`
UiHost string `json:"-"`
RestGwHost string `json:"-"`
BrokerHost string `json:"-"`

// MaxTracedMsgLen is the maximum printable length for traced messages.
MaxTracedMsgLen int `json:"-"`
Expand Down Expand Up @@ -1443,6 +1444,14 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error
}
o.DlsRetentionHours = make(map[string]int)
o.DlsRetentionHours[conf.MemphisGlobalAccountName] = value
case "gc_producer_consumer_retention_hours":
value := int(v.(int64))
if value < 1 || value > 48 {
*errors = append(*errors, &configErr{tk, "error gc_producer_consumer_retention_hours config: has to be positive and not more than 48"})
return
}
o.GCProducersConsumersRetentionHours = value

case "ui_host":
value := v.(string)
if value == _EMPTY_ {
Expand Down Expand Up @@ -4745,6 +4754,9 @@ func setBaselineOptions(opts *Options) {
if opts.LogsRetentionDays == 0 {
opts.LogsRetentionDays = 7
}
if opts.GCProducersConsumersRetentionHours == 0 {
opts.GCProducersConsumersRetentionHours = 2
}
if opts.TieredStorageUploadIntervalSec == 0 {
opts.TieredStorageUploadIntervalSec = DEFAULT_TIERED_STORAGE_UPLOAD_INTERVAL_SEC
}
Expand Down
12 changes: 12 additions & 0 deletions server/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,16 @@ type dlsRetentionHoursOption struct {
newValue map[string]int
}

type GCProducersConsumersRetentionHoursOption struct {
noopOption
newValue int
}

func (o *GCProducersConsumersRetentionHoursOption) Apply(server *Server) {
// no need to update anything since it happens on the edit cluster configuration endpoint
server.Noticef("Reloaded: gc_producer_consumer_retention_hours = %d", o.newValue)
}

// Apply the setting by updating the server info and each client.
func (o *dlsRetentionHoursOption) Apply(server *Server) {
// no need to update anything since it happens on the edit cluster configuration endpoint
Expand Down Expand Up @@ -1452,6 +1462,8 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
diffOpts = append(diffOpts, &brokerHostOption{newValue: newValue.(string)})
case "restgwhost":
diffOpts = append(diffOpts, &restGwOption{newValue: newValue.(string)})
case "gcproducersconsumersretentionhours":
diffOpts = append(diffOpts, &GCProducersConsumersRetentionHoursOption{newValue: newValue.(int)})
default:
// TODO(ik): Implement String() on those options to have a nice print.
// %v is difficult to figure what's what, %+v print private fields and
Expand Down
10 changes: 10 additions & 0 deletions ui_src/src/domain/administration/clusterConfiguration/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,16 @@ function ClusterConfiguration() {
unit={'h'}
onChanges={(e) => handleChange('dls_retention', e)}
/>
<SliderRow
title="DISCONNECTED PRODUCERS AND CONSUMERS RETENTION"
desc="Amount of hours to retain inactive producerd and consumers"
value={formFields?.gc_producer_consumer_retention_hours}
img={DeadLetterInHours}
min={1}
max={48}
unit={'h'}
onChanges={(e) => handleChange('gc_producer_consumer_retention_hours', e)}
/>
<SliderRow
title="MAX MESSAGE SIZE"
desc="Maximum message size (payload + headers) in megabytes"
Expand Down

0 comments on commit bac3308

Please sign in to comment.