Skip to content

Commit

Permalink
Add command topic message dispatch rate. (streamnative/pulsarctl#246) (
Browse files Browse the repository at this point in the history
…apache#397)

Add command topic Message Dispatch Rate:
> Pulsarctl does not support 2-letter shorthand, so use the full length.

- pulsarctl topics get-dispatch-rate [topic]
- pulsarctl topics set-dispatch-rate [topic] --msg-dispatch-rate 1 --byte-dispatch-rate 2 --dispatch-rate-period 3 --relative-to-publish-rate 
- pulsarctl topics remove-dispatch-rate [topic]
  • Loading branch information
limingnihao authored and tisonkun committed Aug 15, 2023
1 parent 5ed4992 commit be9fec9
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
26 changes: 26 additions & 0 deletions pulsaradmin/pkg/pulsar/topic.go
Expand Up @@ -161,6 +161,15 @@ type Topics interface {

// RemoveDelayedDelivery Remove the delayed delivery policy on a topic
RemoveDelayedDelivery(utils.TopicName) error

// GetDispatchRate Get message dispatch rate for a topic
GetDispatchRate(utils.TopicName) (*utils.DispatchRateData, error)

// SetDispatchRate Set message dispatch rate for a topic
SetDispatchRate(utils.TopicName, utils.DispatchRateData) error

// RemoveDispatchRate Remove message dispatch rate for a topic
RemoveDispatchRate(utils.TopicName) error
}

type topics struct {
Expand Down Expand Up @@ -493,3 +502,20 @@ func (t *topics) RemoveDelayedDelivery(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "delayedDelivery")
return t.pulsar.Client.Delete(endpoint)
}

func (t *topics) GetDispatchRate(topic utils.TopicName) (*utils.DispatchRateData, error) {
var dispatchRateData utils.DispatchRateData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "dispatchRate")
err := t.pulsar.Client.Get(endpoint, &dispatchRateData)
return &dispatchRateData, err
}

func (t *topics) SetDispatchRate(topic utils.TopicName, dispatchRateData utils.DispatchRateData) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "dispatchRate")
return t.pulsar.Client.Post(endpoint, &dispatchRateData)
}

func (t *topics) RemoveDispatchRate(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "dispatchRate")
return t.pulsar.Client.Delete(endpoint)
}
7 changes: 7 additions & 0 deletions pulsaradmin/pkg/pulsar/utils/data.go
Expand Up @@ -381,3 +381,10 @@ type DelayedDeliveryData struct {
TickTime float64 `json:"tickTime"`
Active bool `json:"active"`
}

type DispatchRateData struct {
DispatchThrottlingRateInMsg int64 `json:"dispatchThrottlingRateInMsg"`
DispatchThrottlingRateInByte int64 `json:"dispatchThrottlingRateInByte"`
RatePeriodInSecond int64 `json:"ratePeriodInSecond"`
RelativeToPublishRate bool `json:"relativeToPublishRate"`
}

0 comments on commit be9fec9

Please sign in to comment.