From 5e5bd64be65752ceca107ed1778ecc02eb59dcc5 Mon Sep 17 00:00:00 2001 From: limingnihao Date: Wed, 2 Jun 2021 09:35:46 +0800 Subject: [PATCH] Add command topic message ttl. (streamnative/pulsarctl#246) (#348) Add command topic message ttl: * pulsarctl topics get-message-ttl [topic] * pulsarctl topics set-message-ttl [topic] -t [seconds] * pulsarctl topics remove-message-ttl [topic] --- pulsaradmin/pkg/cli/client.go | 22 ++++++++++++++++++++++ pulsaradmin/pkg/pulsar/topic.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/pulsaradmin/pkg/cli/client.go b/pulsaradmin/pkg/cli/client.go index 465027b24e..2593e3c2a7 100644 --- a/pulsaradmin/pkg/cli/client.go +++ b/pulsaradmin/pkg/cli/client.go @@ -277,6 +277,28 @@ func (c *Client) PostWithMultiPart(endpoint string, in interface{}, body io.Read return nil } +func (c *Client) PostWithQueryParams(endpoint string, params map[string]string) error { + req, err := c.newRequest(http.MethodPost, endpoint) + if err != nil { + return err + } + if params != nil { + query := req.url.Query() + for k, v := range params { + query.Add(k, v) + } + req.params = query + } + // nolint + resp, err := checkSuccessful(c.doRequest(req)) + if err != nil { + return err + } + defer safeRespClose(resp) + + return nil +} + type request struct { method string contentType string diff --git a/pulsaradmin/pkg/pulsar/topic.go b/pulsaradmin/pkg/pulsar/topic.go index f5865a3431..761a93e045 100644 --- a/pulsaradmin/pkg/pulsar/topic.go +++ b/pulsaradmin/pkg/pulsar/topic.go @@ -98,6 +98,15 @@ type Topics interface { // CompactStatus checks the status of an ongoing compaction for a topic CompactStatus(utils.TopicName) (utils.LongRunningProcessStatus, error) + + // GetMessageTTL Get the message TTL for a topic + GetMessageTTL(utils.TopicName) (int, error) + + // SetMessageTTL Set the message TTL for a topic + SetMessageTTL(utils.TopicName, int) error + + // RemoveMessageTTL Remove the message TTL for a topic + RemoveMessageTTL(utils.TopicName) error } type topics struct { @@ -301,3 +310,26 @@ func (t *topics) CompactStatus(topic utils.TopicName) (utils.LongRunningProcessS err := t.pulsar.Client.Get(endpoint, &status) return status, err } + +func (t *topics) GetMessageTTL(topic utils.TopicName) (int, error) { + var ttl int + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "messageTTL") + err := t.pulsar.Client.Get(endpoint, &ttl) + return ttl, err +} + +func (t *topics) SetMessageTTL(topic utils.TopicName, messageTTL int) error { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "messageTTL") + var params = make(map[string]string) + params["messageTTL"] = strconv.Itoa(messageTTL) + err := t.pulsar.Client.PostWithQueryParams(endpoint, params) + return err +} + +func (t *topics) RemoveMessageTTL(topic utils.TopicName) error { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "messageTTL") + var params = make(map[string]string) + params["messageTTL"] = strconv.Itoa(0) + err := t.pulsar.Client.DeleteWithQueryParams(endpoint, params) + return err +}