From be9fec9ad6ffcf3f525e72ba27c9fb3723a05d7c Mon Sep 17 00:00:00 2001 From: limingnihao Date: Tue, 20 Jul 2021 14:44:12 +0800 Subject: [PATCH] Add command topic message dispatch rate. (streamnative/pulsarctl#246) (#397) Add command topic Message Dispatch Rate: > Pulsarctl does not support 2-letter shorthand, so use the full length. - pulsarctl topics get-dispatch-rate [topic] - pulsarctl topics set-dispatch-rate [topic] --msg-dispatch-rate 1 --byte-dispatch-rate 2 --dispatch-rate-period 3 --relative-to-publish-rate - pulsarctl topics remove-dispatch-rate [topic] --- pulsaradmin/pkg/pulsar/topic.go | 26 ++++++++++++++++++++++++++ pulsaradmin/pkg/pulsar/utils/data.go | 7 +++++++ 2 files changed, 33 insertions(+) diff --git a/pulsaradmin/pkg/pulsar/topic.go b/pulsaradmin/pkg/pulsar/topic.go index a92e8a8d01..b59a034050 100644 --- a/pulsaradmin/pkg/pulsar/topic.go +++ b/pulsaradmin/pkg/pulsar/topic.go @@ -161,6 +161,15 @@ type Topics interface { // RemoveDelayedDelivery Remove the delayed delivery policy on a topic RemoveDelayedDelivery(utils.TopicName) error + + // GetDispatchRate Get message dispatch rate for a topic + GetDispatchRate(utils.TopicName) (*utils.DispatchRateData, error) + + // SetDispatchRate Set message dispatch rate for a topic + SetDispatchRate(utils.TopicName, utils.DispatchRateData) error + + // RemoveDispatchRate Remove message dispatch rate for a topic + RemoveDispatchRate(utils.TopicName) error } type topics struct { @@ -493,3 +502,20 @@ func (t *topics) RemoveDelayedDelivery(topic utils.TopicName) error { endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "delayedDelivery") return t.pulsar.Client.Delete(endpoint) } + +func (t *topics) GetDispatchRate(topic utils.TopicName) (*utils.DispatchRateData, error) { + var dispatchRateData utils.DispatchRateData + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "dispatchRate") + err := t.pulsar.Client.Get(endpoint, &dispatchRateData) + return &dispatchRateData, err +} + +func (t *topics) SetDispatchRate(topic utils.TopicName, dispatchRateData utils.DispatchRateData) error { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "dispatchRate") + return t.pulsar.Client.Post(endpoint, &dispatchRateData) +} + +func (t *topics) RemoveDispatchRate(topic utils.TopicName) error { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "dispatchRate") + return t.pulsar.Client.Delete(endpoint) +} diff --git a/pulsaradmin/pkg/pulsar/utils/data.go b/pulsaradmin/pkg/pulsar/utils/data.go index 95eb733072..e42c6fef9f 100644 --- a/pulsaradmin/pkg/pulsar/utils/data.go +++ b/pulsaradmin/pkg/pulsar/utils/data.go @@ -381,3 +381,10 @@ type DelayedDeliveryData struct { TickTime float64 `json:"tickTime"` Active bool `json:"active"` } + +type DispatchRateData struct { + DispatchThrottlingRateInMsg int64 `json:"dispatchThrottlingRateInMsg"` + DispatchThrottlingRateInByte int64 `json:"dispatchThrottlingRateInByte"` + RatePeriodInSecond int64 `json:"ratePeriodInSecond"` + RelativeToPublishRate bool `json:"relativeToPublishRate"` +}