diff --git a/pulsaradmin/pkg/cli/client.go b/pulsaradmin/pkg/cli/client.go index 465027b24..2593e3c2a 100644 --- a/pulsaradmin/pkg/cli/client.go +++ b/pulsaradmin/pkg/cli/client.go @@ -277,6 +277,28 @@ func (c *Client) PostWithMultiPart(endpoint string, in interface{}, body io.Read return nil } +func (c *Client) PostWithQueryParams(endpoint string, params map[string]string) error { + req, err := c.newRequest(http.MethodPost, endpoint) + if err != nil { + return err + } + if params != nil { + query := req.url.Query() + for k, v := range params { + query.Add(k, v) + } + req.params = query + } + // nolint + resp, err := checkSuccessful(c.doRequest(req)) + if err != nil { + return err + } + defer safeRespClose(resp) + + return nil +} + type request struct { method string contentType string diff --git a/pulsaradmin/pkg/pulsar/topic.go b/pulsaradmin/pkg/pulsar/topic.go index f5865a343..761a93e04 100644 --- a/pulsaradmin/pkg/pulsar/topic.go +++ b/pulsaradmin/pkg/pulsar/topic.go @@ -98,6 +98,15 @@ type Topics interface { // CompactStatus checks the status of an ongoing compaction for a topic CompactStatus(utils.TopicName) (utils.LongRunningProcessStatus, error) + + // GetMessageTTL Get the message TTL for a topic + GetMessageTTL(utils.TopicName) (int, error) + + // SetMessageTTL Set the message TTL for a topic + SetMessageTTL(utils.TopicName, int) error + + // RemoveMessageTTL Remove the message TTL for a topic + RemoveMessageTTL(utils.TopicName) error } type topics struct { @@ -301,3 +310,26 @@ func (t *topics) CompactStatus(topic utils.TopicName) (utils.LongRunningProcessS err := t.pulsar.Client.Get(endpoint, &status) return status, err } + +func (t *topics) GetMessageTTL(topic utils.TopicName) (int, error) { + var ttl int + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "messageTTL") + err := t.pulsar.Client.Get(endpoint, &ttl) + return ttl, err +} + +func (t *topics) SetMessageTTL(topic utils.TopicName, messageTTL int) error { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "messageTTL") + var params = make(map[string]string) + params["messageTTL"] = strconv.Itoa(messageTTL) + err := t.pulsar.Client.PostWithQueryParams(endpoint, params) + return err +} + +func (t *topics) RemoveMessageTTL(topic utils.TopicName) error { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "messageTTL") + var params = make(map[string]string) + params["messageTTL"] = strconv.Itoa(0) + err := t.pulsar.Client.DeleteWithQueryParams(endpoint, params) + return err +}