Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster configuration for producers and consumers gc #1153

Merged
merged 6 commits into from
Jul 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conf/debug/memphis-0.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ k8s_namespace: "memphis"
logs_retention_days: 7
tiered_storage_upload_interval_seconds: 8
dls_retention_hours: 3
gc_producer_consumer_retention_hours: 3
# ui_host: ""
# rest_gw_host: ""
# broker_host: ""
Expand Down
1 change: 1 addition & 0 deletions conf/debug/memphis-1.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ k8s_namespace: "memphis"
logs_retention_days: 7
tiered_storage_upload_interval_seconds: 8
dls_retention_hours: 3
gc_producer_consumer_retention_hours: 3
# ui_host: ""
# rest_gw_host: ""
# broker_host: ""
Expand Down
1 change: 1 addition & 0 deletions conf/debug/memphis-2.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ k8s_namespace: "memphis"
logs_retention_days: 7
tiered_storage_upload_interval_seconds: 8
dls_retention_hours: 3
gc_producer_consumer_retention_hours: 3
# ui_host: ""
# rest_gw_host: ""
# broker_host: ""
Expand Down
1 change: 1 addition & 0 deletions conf/debug/stand-alone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ rest_gw_port: 4444
logs_retention_days: 7
tiered_storage_upload_interval_seconds: 8
dls_retention_hours: 3
gc_producer_consumer_retention_hours: 3
# ui_host: ""
# rest_gw_host: ""
# broker_host: ""
Expand Down
15 changes: 8 additions & 7 deletions models/configurations.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
package models

type EditClusterConfigSchema struct {
DlsRetention int `json:"dls_retention" binding:"required"`
LogsRetention int `json:"logs_retention" binding:"required"`
BrokerHost string `json:"broker_host"`
UiHost string `json:"ui_host"`
RestGWHost string `json:"rest_gw_host"`
TSTimeSec int `json:"tiered_storage_time_sec"`
MaxMsgSizeMb int `json:"max_msg_size_mb"`
DlsRetention int `json:"dls_retention" binding:"required"`
LogsRetention int `json:"logs_retention" binding:"required"`
BrokerHost string `json:"broker_host"`
UiHost string `json:"ui_host"`
RestGWHost string `json:"rest_gw_host"`
TSTimeSec int `json:"tiered_storage_time_sec"`
MaxMsgSizeMb int `json:"max_msg_size_mb"`
GCProducersConsumersRetentionHours int `json:"gc_producer_consumer_retention_hours"`
}

type GlobalConfigurationsUpdate struct {
Expand Down
2 changes: 1 addition & 1 deletion server/background_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func (s *Server) RemoveOldDlsMsgs() {
func (s *Server) RemoveOldProducersAndConsumers() {
ticker := time.NewTicker(15 * time.Minute)
for range ticker.C {
timeInterval := time.Now().Add(time.Duration(time.Hour * -2))
timeInterval := time.Now().Add(time.Duration(time.Hour * -time.Duration(s.opts.GCProducersConsumersRetentionHours)))
deletedCGs, err := db.DeleteOldProducersAndConsumers(timeInterval)
if err != nil {
serv.Errorf("RemoveOldProducersAndConsumers at DeleteOldProducersAndConsumers : %v", err.Error())
Expand Down
38 changes: 24 additions & 14 deletions server/memphis_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,14 @@ func (ch ConfigurationsHandler) EditClusterConfig(c *gin.Context) {
return
}
}
if ch.S.opts.GCProducersConsumersRetentionHours != body.GCProducersConsumersRetentionHours {
err := changeGCProducersConsumersRetentionHours(body.GCProducersConsumersRetentionHours, user.TenantName)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]EditConfigurations at changeGCProducersConsumersRetentionHours: %v", user.TenantName, user.Username, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
}
if ch.S.opts.LogsRetentionDays != body.LogsRetention {
err := changeLogsRetention(body.LogsRetention)
if err != nil {
Expand Down Expand Up @@ -1057,13 +1065,14 @@ func (ch ConfigurationsHandler) EditClusterConfig(c *gin.Context) {
}

c.IndentedJSON(200, gin.H{
"dls_retention": body.DlsRetention,
"logs_retention": body.LogsRetention,
"broker_host": brokerHost,
"ui_host": uiHost,
"rest_gw_host": restGWHost,
"tiered_storage_time_sec": body.TSTimeSec,
"max_msg_size_mb": int32(body.MaxMsgSizeMb),
"dls_retention": body.DlsRetention,
"logs_retention": body.LogsRetention,
"broker_host": brokerHost,
"ui_host": uiHost,
"rest_gw_host": restGWHost,
"tiered_storage_time_sec": body.TSTimeSec,
"max_msg_size_mb": int32(body.MaxMsgSizeMb),
"gc_producer_consumer_retention_hours": body.GCProducersConsumersRetentionHours,
})
}

Expand All @@ -1081,13 +1090,14 @@ func (ch ConfigurationsHandler) GetClusterConfig(c *gin.Context) {
analytics.SendEvent(user.TenantName, user.Username, analyticsParams, "user-enter-cluster-config-page")
}
c.IndentedJSON(200, gin.H{
"dls_retention": ch.S.opts.DlsRetentionHours[user.TenantName],
"logs_retention": ch.S.opts.LogsRetentionDays,
"broker_host": ch.S.opts.BrokerHost,
"ui_host": ch.S.opts.UiHost,
"rest_gw_host": ch.S.opts.RestGwHost,
"tiered_storage_time_sec": ch.S.opts.TieredStorageUploadIntervalSec,
"max_msg_size_mb": ch.S.opts.MaxPayload / 1024 / 1024,
"dls_retention": ch.S.opts.DlsRetentionHours[user.TenantName],
"logs_retention": ch.S.opts.LogsRetentionDays,
"broker_host": ch.S.opts.BrokerHost,
"ui_host": ch.S.opts.UiHost,
"rest_gw_host": ch.S.opts.RestGwHost,
"tiered_storage_time_sec": ch.S.opts.TieredStorageUploadIntervalSec,
"max_msg_size_mb": ch.S.opts.MaxPayload / 1024 / 1024,
"gc_producer_consumer_retention_hours": ch.S.opts.GCProducersConsumersRetentionHours,
})
}

Expand Down
8 changes: 8 additions & 0 deletions server/memphis_handlers_configurations.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ func changeDlsRetention(dlsRetention int, tenantName string) error {
return nil
}

func changeGCProducersConsumersRetentionHours(retention int, tenantName string) error {
err := db.UpsertConfiguration("gc_producer_consumer_retention_hours", strconv.Itoa(retention), tenantName)
if err != nil {
return err
}
return nil
}

func changeLogsRetention(logsRetention int) error {
err := db.UpsertConfiguration("logs_retention", strconv.Itoa(logsRetention), serv.MemphisGlobalAccountString())
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions server/memphis_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,9 @@ func GetMemphisOpts(opts *Options) (*Options, error) {
case "max_msg_size_mb":
v, _ := strconv.Atoi(conf.Value)
opts.MaxPayload = int32(v * 1024 * 1024)
case "gc_producer_consumer_retention_hours":
v, _ := strconv.Atoi(conf.Value)
opts.GCProducersConsumersRetentionHours = v
}
}

Expand Down
30 changes: 21 additions & 9 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,15 +286,16 @@ type Options struct {
LameDuckGracePeriod time.Duration `json:"-"`

// memphis options
UiPort int `json:"-"`
RestGwPort int `json:"-"`
K8sNamespace string `json:"-"`
LogsRetentionDays int `json:"-"`
TieredStorageUploadIntervalSec int `json:"-"`
DlsRetentionHours map[string]int `json:"-"`
UiHost string `json:"-"`
RestGwHost string `json:"-"`
BrokerHost string `json:"-"`
UiPort int `json:"-"`
RestGwPort int `json:"-"`
K8sNamespace string `json:"-"`
LogsRetentionDays int `json:"-"`
TieredStorageUploadIntervalSec int `json:"-"`
DlsRetentionHours map[string]int `json:"-"`
GCProducersConsumersRetentionHours int `json:"-"`
UiHost string `json:"-"`
RestGwHost string `json:"-"`
BrokerHost string `json:"-"`

// MaxTracedMsgLen is the maximum printable length for traced messages.
MaxTracedMsgLen int `json:"-"`
Expand Down Expand Up @@ -1443,6 +1444,14 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error
}
o.DlsRetentionHours = make(map[string]int)
o.DlsRetentionHours[conf.MemphisGlobalAccountName] = value
case "gc_producer_consumer_retention_hours":
value := int(v.(int64))
if value < 1 || value > 48 {
*errors = append(*errors, &configErr{tk, "error gc_producer_consumer_retention_hours config: has to be positive and not more than 48"})
return
}
o.GCProducersConsumersRetentionHours = value

case "ui_host":
value := v.(string)
if value == _EMPTY_ {
Expand Down Expand Up @@ -4745,6 +4754,9 @@ func setBaselineOptions(opts *Options) {
if opts.LogsRetentionDays == 0 {
opts.LogsRetentionDays = 7
}
if opts.GCProducersConsumersRetentionHours == 0 {
opts.GCProducersConsumersRetentionHours = 2
}
if opts.TieredStorageUploadIntervalSec == 0 {
opts.TieredStorageUploadIntervalSec = DEFAULT_TIERED_STORAGE_UPLOAD_INTERVAL_SEC
}
Expand Down
12 changes: 12 additions & 0 deletions server/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,16 @@ type dlsRetentionHoursOption struct {
newValue map[string]int
}

type GCProducersConsumersRetentionHoursOption struct {
noopOption
newValue int
}

func (o *GCProducersConsumersRetentionHoursOption) Apply(server *Server) {
// no need to update anything since it happens on the edit cluster configuration endpoint
server.Noticef("Reloaded: gc_producer_consumer_retention_hours = %d", o.newValue)
}

// Apply the setting by updating the server info and each client.
func (o *dlsRetentionHoursOption) Apply(server *Server) {
// no need to update anything since it happens on the edit cluster configuration endpoint
Expand Down Expand Up @@ -1452,6 +1462,8 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
diffOpts = append(diffOpts, &brokerHostOption{newValue: newValue.(string)})
case "restgwhost":
diffOpts = append(diffOpts, &restGwOption{newValue: newValue.(string)})
case "gcproducersconsumersretentionhours":
diffOpts = append(diffOpts, &GCProducersConsumersRetentionHoursOption{newValue: newValue.(int)})
default:
// TODO(ik): Implement String() on those options to have a nice print.
// %v is difficult to figure what's what, %+v print private fields and
Expand Down
10 changes: 10 additions & 0 deletions ui_src/src/domain/administration/clusterConfiguration/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,16 @@ function ClusterConfiguration() {
unit={'h'}
onChanges={(e) => handleChange('dls_retention', e)}
/>
<SliderRow
title="DISCONNECTED PRODUCERS AND CONSUMERS RETENTION"
desc="Amount of hours to retain inactive producerd and consumers"
value={formFields?.gc_producer_consumer_retention_hours}
img={DeadLetterInHours}
min={1}
max={48}
unit={'h'}
onChanges={(e) => handleChange('gc_producer_consumer_retention_hours', e)}
/>
<SliderRow
title="MAX MESSAGE SIZE"
desc="Maximum message size (payload + headers) in megabytes"
Expand Down