Skip to content

Commit

Permalink
Add command topic message ttl. (streamnative/pulsarctl#246) (apache#348)
Browse files Browse the repository at this point in the history
Add command topic message ttl:
* pulsarctl topics get-message-ttl [topic]
* pulsarctl topics set-message-ttl [topic] -t [seconds] 
* pulsarctl topics remove-message-ttl [topic]
  • Loading branch information
limingnihao authored and tisonkun committed Aug 15, 2023
1 parent 9da9310 commit 5e5bd64
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
22 changes: 22 additions & 0 deletions pulsaradmin/pkg/cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,28 @@ func (c *Client) PostWithMultiPart(endpoint string, in interface{}, body io.Read
return nil
}

func (c *Client) PostWithQueryParams(endpoint string, params map[string]string) error {
req, err := c.newRequest(http.MethodPost, endpoint)
if err != nil {
return err
}
if params != nil {
query := req.url.Query()
for k, v := range params {
query.Add(k, v)
}
req.params = query
}
// nolint
resp, err := checkSuccessful(c.doRequest(req))
if err != nil {
return err
}
defer safeRespClose(resp)

return nil
}

type request struct {
method string
contentType string
Expand Down
32 changes: 32 additions & 0 deletions pulsaradmin/pkg/pulsar/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ type Topics interface {

// CompactStatus checks the status of an ongoing compaction for a topic
CompactStatus(utils.TopicName) (utils.LongRunningProcessStatus, error)

// GetMessageTTL Get the message TTL for a topic
GetMessageTTL(utils.TopicName) (int, error)

// SetMessageTTL Set the message TTL for a topic
SetMessageTTL(utils.TopicName, int) error

// RemoveMessageTTL Remove the message TTL for a topic
RemoveMessageTTL(utils.TopicName) error
}

type topics struct {
Expand Down Expand Up @@ -301,3 +310,26 @@ func (t *topics) CompactStatus(topic utils.TopicName) (utils.LongRunningProcessS
err := t.pulsar.Client.Get(endpoint, &status)
return status, err
}

func (t *topics) GetMessageTTL(topic utils.TopicName) (int, error) {
var ttl int
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "messageTTL")
err := t.pulsar.Client.Get(endpoint, &ttl)
return ttl, err
}

func (t *topics) SetMessageTTL(topic utils.TopicName, messageTTL int) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "messageTTL")
var params = make(map[string]string)
params["messageTTL"] = strconv.Itoa(messageTTL)
err := t.pulsar.Client.PostWithQueryParams(endpoint, params)
return err
}

func (t *topics) RemoveMessageTTL(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "messageTTL")
var params = make(map[string]string)
params["messageTTL"] = strconv.Itoa(0)
err := t.pulsar.Client.DeleteWithQueryParams(endpoint, params)
return err
}

0 comments on commit 5e5bd64

Please sign in to comment.