From accaf51e0483d07d84f05040abd180a0111ae44b Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 28 Sep 2021 15:56:34 +0800 Subject: [PATCH] feat: add inactive topic policies command for topic (#444) Signed-off-by: Zixuan Liu ### Changes background from #246, the PR implements the following commands: - `pulsarctl topics get-inactive-topic-policies --applied ` - `pulsarctl topics remove-inactive-topic-policies ` - `pulsarctl topics set-inactive-topic-policies --enable-delete-while-inactive --max-inactive-duration --delete-mode ` (cherry picked from commit db4dda44826d59c566f3d0b748103e732aa0f717) --- pkg/ctl/topic/get_inactive_topic.go | 64 +++++++++++ pkg/ctl/topic/inactive_topic_test.go | 78 ++++++++++++++ pkg/ctl/topic/remove_inactive_topic.go | 60 +++++++++++ pkg/ctl/topic/set_inactive_topic.go | 112 ++++++++++++++++++++ pkg/ctl/topic/topic.go | 3 + pkg/pulsar/topic.go | 28 +++++ pkg/pulsar/utils/inactive_topic_policies.go | 59 +++++++++++ 7 files changed, 404 insertions(+) create mode 100644 pkg/ctl/topic/get_inactive_topic.go create mode 100644 pkg/ctl/topic/inactive_topic_test.go create mode 100644 pkg/ctl/topic/remove_inactive_topic.go create mode 100644 pkg/ctl/topic/set_inactive_topic.go create mode 100644 pkg/pulsar/utils/inactive_topic_policies.go diff --git a/pkg/ctl/topic/get_inactive_topic.go b/pkg/ctl/topic/get_inactive_topic.go new file mode 100644 index 00000000..68ed722e --- /dev/null +++ b/pkg/ctl/topic/get_inactive_topic.go @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package topic + +import ( + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar/utils" +) + +func GetInactiveTopicCmd(vc *cmdutils.VerbCmd) { + var desc cmdutils.LongDescription + desc.CommandUsedFor = "Get the inactive topic policies on a topic" + desc.CommandPermission = "This command requires tenant admin permissions." + + var examples []cmdutils.Example + examples = append(examples, cmdutils.Example{ + Desc: desc.CommandUsedFor, + Command: "pulsarctl topics get-inactive-topic-policies [topic]", + }) + desc.CommandExamples = examples + + vc.SetDescription( + "get-inactive-topic-policies", + desc.CommandUsedFor, + desc.ToString(), + desc.ExampleToString()) + + var applied bool + vc.Command.Flags().BoolVarP(&applied, "applied", "", false, "Get the applied policy for the topic") + + vc.SetRunFuncWithNameArg(func() error { + return doGetInactiveTopic(vc, applied) + }, "the topic name is not specified or the topic name is specified more than one") +} + +func doGetInactiveTopic(vc *cmdutils.VerbCmd, applied bool) error { + topic, err := utils.GetTopicName(vc.NameArg) + if err != nil { + return err + } + + admin := cmdutils.NewPulsarClient() + response, err := admin.Topics().GetInactiveTopicPolicies(*topic, applied) + if err == nil { + cmdutils.PrintJSON(vc.Command.OutOrStdout(), &response) + } + + return err +} diff --git a/pkg/ctl/topic/inactive_topic_test.go b/pkg/ctl/topic/inactive_topic_test.go new file mode 100644 index 00000000..d7714784 --- /dev/null +++ b/pkg/ctl/topic/inactive_topic_test.go @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package topic + +import ( + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/streamnative/pulsarctl/pkg/pulsar/utils" + "github.com/streamnative/pulsarctl/pkg/test" + "github.com/stretchr/testify/assert" +) + +func TestInactiveTopicCmd(t *testing.T) { + topicName := fmt.Sprintf("persistent://public/default/test-inactive-topic-%s", + test.RandomSuffix()) + createArgs := []string{"create", topicName, "1"} + _, execErr, _, _ := TestTopicCommands(CreateTopicCmd, createArgs) + assert.Nil(t, execErr) + + <-time.After(5 * time.Second) + + setArgs := []string{"set-inactive-topic-policies", topicName, + "-e=true", + "-t", "1h", + "-m", "delete_when_no_subscriptions"} + out, execErr, _, _ := TestTopicCommands(SetInactiveTopicCmd, setArgs) + assert.Nil(t, execErr) + assert.Equal(t, out.String(), + fmt.Sprintf("Set inactive topic policies successfully for [%s]", topicName)) + + <-time.After(5 * time.Second) + + getArgs := []string{"get-inactive-topic-policies", topicName} + out, execErr, _, _ = TestTopicCommands(GetInactiveTopicCmd, getArgs) + assert.Nil(t, execErr) + + var inactiveTopic utils.InactiveTopicPolicies + err := json.Unmarshal(out.Bytes(), &inactiveTopic) + assert.NoError(t, err) + assert.Equal(t, true, inactiveTopic.DeleteWhileInactive) + assert.Equal(t, 3600, inactiveTopic.MaxInactiveDurationSeconds) + assert.Equal(t, "delete_when_no_subscriptions", inactiveTopic.InactiveTopicDeleteMode.String()) + + removeArgs := []string{"remove-inactive-topic-policies", topicName} + out, execErr, _, _ = TestTopicCommands(RemoveInactiveTopicCmd, removeArgs) + assert.Nil(t, execErr) + assert.Equal(t, out.String(), + fmt.Sprintf("Remove inactive topic policies successfully from [%s]", topicName)) + + <-time.After(5 * time.Second) + + out, execErr, _, _ = TestTopicCommands(GetInactiveTopicCmd, getArgs) + assert.Nil(t, execErr) + + err = json.Unmarshal(out.Bytes(), &inactiveTopic) + assert.NoError(t, err) + assert.Equal(t, false, inactiveTopic.DeleteWhileInactive) + assert.Equal(t, 0, inactiveTopic.MaxInactiveDurationSeconds) + assert.Nil(t, inactiveTopic.InactiveTopicDeleteMode) +} diff --git a/pkg/ctl/topic/remove_inactive_topic.go b/pkg/ctl/topic/remove_inactive_topic.go new file mode 100644 index 00000000..033c350d --- /dev/null +++ b/pkg/ctl/topic/remove_inactive_topic.go @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package topic + +import ( + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar/utils" +) + +func RemoveInactiveTopicCmd(vc *cmdutils.VerbCmd) { + var desc cmdutils.LongDescription + desc.CommandUsedFor = "Remove inactive topic policies from a topic" + desc.CommandPermission = "This command requires tenant admin permissions." + + var examples []cmdutils.Example + examples = append(examples, cmdutils.Example{ + Desc: desc.CommandUsedFor, + Command: "pulsarctl topics remove-inactive-topic-policies [topic]", + }) + desc.CommandExamples = examples + + vc.SetDescription( + "remove-inactive-topic-policies", + desc.CommandUsedFor, + desc.ToString(), + desc.ExampleToString()) + + vc.SetRunFuncWithNameArg(func() error { + return doRemoveInactiveTopic(vc) + }, "the topic name is not specified or the topic name is specified more than one") +} + +func doRemoveInactiveTopic(vc *cmdutils.VerbCmd) error { + topic, err := utils.GetTopicName(vc.NameArg) + if err != nil { + return err + } + + admin := cmdutils.NewPulsarClient() + err = admin.Topics().RemoveInactiveTopicPolicies(*topic) + if err == nil { + vc.Command.Printf("Remove inactive topic policies successfully from [%s]", topic.String()) + } + return err +} diff --git a/pkg/ctl/topic/set_inactive_topic.go b/pkg/ctl/topic/set_inactive_topic.go new file mode 100644 index 00000000..7cff3a98 --- /dev/null +++ b/pkg/ctl/topic/set_inactive_topic.go @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package topic + +import ( + "github.com/streamnative/pulsarctl/pkg/cmdutils" + ctlutils "github.com/streamnative/pulsarctl/pkg/ctl/utils" + "github.com/streamnative/pulsarctl/pkg/pulsar/utils" +) + +type setInactiveTopicPoliciesArgs struct { + deleteWhileInactive bool + deleteInactiveTopicsMaxInactiveDuration string + inactiveTopicDeleteMode string +} + +func SetInactiveTopicCmd(vc *cmdutils.VerbCmd) { + var desc cmdutils.LongDescription + desc.CommandUsedFor = "Set the inactive topic policies on a topic" + desc.CommandPermission = "This command requires tenant admin permissions." + + var examples []cmdutils.Example + set := cmdutils.Example{ + Desc: desc.CommandUsedFor, + Command: "pulsarctl topics set-inactive-topic-policies [topic] \n" + + "\t--enable-delete-while-inactive true \n" + + "\t--max-inactive-duration 1h \n" + + "\t--delete-mode delete_when_no_subscriptions", + } + + examples = append(examples, set) + desc.CommandExamples = examples + + args := setInactiveTopicPoliciesArgs{} + + vc.Command.Flags().BoolVarP(&args.deleteWhileInactive, + "enable-delete-while-inactive", + "e", + false, + "Control whether deletion is enabled while inactive") + + vc.Command.Flags().StringVarP(&args.deleteInactiveTopicsMaxInactiveDuration, + "max-inactive-duration", + "t", + "", + "Max duration of topic inactivity in seconds, "+ + "topics that are inactive for longer than this value will be deleted (eg: 1s, 10s, 1m, 5h, 3d)") + vc.Command.Flags().StringVarP(&args.inactiveTopicDeleteMode, + "delete-mode", + "m", + "", + "Mode of delete inactive topic, "+ + "Valid options are: [delete_when_no_subscriptions, delete_when_subscriptions_caught_up]") + + _ = vc.Command.MarkFlagRequired("delete-mode") + _ = vc.Command.MarkFlagRequired("max-inactive-duration") + + vc.SetDescription( + "set-inactive-topic-policies", + desc.CommandUsedFor, + desc.ToString(), + desc.ExampleToString()) + + vc.SetRunFuncWithNameArg(func() error { + return doSetInactiveTopic(vc, args) + }, "the topic name is not specified or the topic name is specified more than one") +} + +func doSetInactiveTopic(vc *cmdutils.VerbCmd, args setInactiveTopicPoliciesArgs) error { + inactiveTopicDeleteMode, err := utils.ParseInactiveTopicDeleteMode(args.inactiveTopicDeleteMode) + if err != nil { + return err + } + + maxInactiveDuration, err := ctlutils.ParseRelativeTimeInSeconds(args.deleteInactiveTopicsMaxInactiveDuration) + if err != nil { + return err + } + + body := utils.NewInactiveTopicPolicies( + &inactiveTopicDeleteMode, + int(maxInactiveDuration.Seconds()), + args.deleteWhileInactive) + + topic, err := utils.GetTopicName(vc.NameArg) + if err != nil { + return err + } + + admin := cmdutils.NewPulsarClient() + err = admin.Topics().SetInactiveTopicPolicies(*topic, body) + if err == nil { + vc.Command.Printf("Set inactive topic policies successfully for [%s]", topic.String()) + } + + return err +} diff --git a/pkg/ctl/topic/topic.go b/pkg/ctl/topic/topic.go index 37f99cd6..5bf83ab7 100644 --- a/pkg/ctl/topic/topic.go +++ b/pkg/ctl/topic/topic.go @@ -90,6 +90,9 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { GetPublishRateCmd, SetPublishRateCmd, RemovePublishRateCmd, + GetInactiveTopicCmd, + SetInactiveTopicCmd, + RemoveInactiveTopicCmd, } cmdutils.AddVerbCmds(flagGrouping, resourceCmd, commands...) diff --git a/pkg/pulsar/topic.go b/pkg/pulsar/topic.go index 8e780804..0bfd4bf7 100644 --- a/pkg/pulsar/topic.go +++ b/pkg/pulsar/topic.go @@ -216,6 +216,15 @@ type Topics interface { // RemoveBacklogQuota removes a backlog quota policy from a topic RemoveBacklogQuota(utils.TopicName, utils.BacklogQuotaType) error + + // GetInactiveTopicPolicies gets the inactive topic policies on a topic + GetInactiveTopicPolicies(topic utils.TopicName, applied bool) (utils.InactiveTopicPolicies, error) + + // RemoveInactiveTopicPolicies removes inactive topic policies from a topic + RemoveInactiveTopicPolicies(utils.TopicName) error + + // SetInactiveTopicPolicies sets the inactive topic policies on a topic + SetInactiveTopicPolicies(topic utils.TopicName, data utils.InactiveTopicPolicies) error } type topics struct { @@ -673,3 +682,22 @@ func (t *topics) RemoveBacklogQuota(topic utils.TopicName, backlogQuotaType util "backlogQuotaType": string(backlogQuotaType), }) } + +func (t *topics) GetInactiveTopicPolicies(topic utils.TopicName, applied bool) (utils.InactiveTopicPolicies, error) { + var out utils.InactiveTopicPolicies + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "inactiveTopicPolicies") + _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &out, map[string]string{ + "applied": strconv.FormatBool(applied), + }, true) + return out, err +} + +func (t *topics) RemoveInactiveTopicPolicies(topic utils.TopicName) error { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "inactiveTopicPolicies") + return t.pulsar.Client.Delete(endpoint) +} + +func (t *topics) SetInactiveTopicPolicies(topic utils.TopicName, data utils.InactiveTopicPolicies) error { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "inactiveTopicPolicies") + return t.pulsar.Client.Post(endpoint, data) +} diff --git a/pkg/pulsar/utils/inactive_topic_policies.go b/pkg/pulsar/utils/inactive_topic_policies.go new file mode 100644 index 00000000..05f81b66 --- /dev/null +++ b/pkg/pulsar/utils/inactive_topic_policies.go @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package utils + +import "github.com/pkg/errors" + +type InactiveTopicDeleteMode string + +const ( + // The topic can be deleted when no subscriptions and no active producers. + DeleteWhenNoSubscriptions InactiveTopicDeleteMode = "delete_when_no_subscriptions" + // The topic can be deleted when all subscriptions catchup and no active producers/consumers. + DeleteWhenSubscriptionsCaughtUp InactiveTopicDeleteMode = "delete_when_subscriptions_caught_up" +) + +func (i InactiveTopicDeleteMode) String() string { + return string(i) +} + +func ParseInactiveTopicDeleteMode(str string) (InactiveTopicDeleteMode, error) { + switch str { + case DeleteWhenNoSubscriptions.String(): + return DeleteWhenNoSubscriptions, nil + case DeleteWhenSubscriptionsCaughtUp.String(): + return DeleteWhenSubscriptionsCaughtUp, nil + default: + return "", errors.Errorf("cannot parse %s to InactiveTopicDeleteMode type", str) + } +} + +type InactiveTopicPolicies struct { + InactiveTopicDeleteMode *InactiveTopicDeleteMode `json:"inactiveTopicDeleteMode"` + MaxInactiveDurationSeconds int `json:"maxInactiveDurationSeconds"` + DeleteWhileInactive bool `json:"deleteWhileInactive"` +} + +func NewInactiveTopicPolicies(inactiveTopicDeleteMode *InactiveTopicDeleteMode, maxInactiveDurationSeconds int, + deleteWhileInactive bool) InactiveTopicPolicies { + return InactiveTopicPolicies{ + InactiveTopicDeleteMode: inactiveTopicDeleteMode, + MaxInactiveDurationSeconds: maxInactiveDurationSeconds, + DeleteWhileInactive: deleteWhileInactive, + } +}