Skip to content

Commit

Permalink
feat: add inactive topic policies command for topic (#444)
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <nodeces@gmail.com>

### Changes

background from #246,  the PR implements the following commands:

- `pulsarctl topics get-inactive-topic-policies <topic> --applied <bool>`
- `pulsarctl topics remove-inactive-topic-policies <topic>`
- `pulsarctl topics set-inactive-topic-policies <topic> --enable-delete-while-inactive <bool> --max-inactive-duration <string> --delete-mode <delete_when_no_subscriptions|delete_when_subscriptions_caught_up>`

(cherry picked from commit db4dda4)
  • Loading branch information
nodece committed Nov 12, 2021
1 parent 46e4fa9 commit accaf51
Show file tree
Hide file tree
Showing 7 changed files with 404 additions and 0 deletions.
64 changes: 64 additions & 0 deletions pkg/ctl/topic/get_inactive_topic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
)

func GetInactiveTopicCmd(vc *cmdutils.VerbCmd) {
var desc cmdutils.LongDescription
desc.CommandUsedFor = "Get the inactive topic policies on a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
examples = append(examples, cmdutils.Example{
Desc: desc.CommandUsedFor,
Command: "pulsarctl topics get-inactive-topic-policies [topic]",
})
desc.CommandExamples = examples

vc.SetDescription(
"get-inactive-topic-policies",
desc.CommandUsedFor,
desc.ToString(),
desc.ExampleToString())

var applied bool
vc.Command.Flags().BoolVarP(&applied, "applied", "", false, "Get the applied policy for the topic")

vc.SetRunFuncWithNameArg(func() error {
return doGetInactiveTopic(vc, applied)
}, "the topic name is not specified or the topic name is specified more than one")
}

func doGetInactiveTopic(vc *cmdutils.VerbCmd, applied bool) error {
topic, err := utils.GetTopicName(vc.NameArg)
if err != nil {
return err
}

admin := cmdutils.NewPulsarClient()
response, err := admin.Topics().GetInactiveTopicPolicies(*topic, applied)
if err == nil {
cmdutils.PrintJSON(vc.Command.OutOrStdout(), &response)
}

return err
}
78 changes: 78 additions & 0 deletions pkg/ctl/topic/inactive_topic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"encoding/json"
"fmt"
"testing"
"time"

"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
"github.com/streamnative/pulsarctl/pkg/test"
"github.com/stretchr/testify/assert"
)

func TestInactiveTopicCmd(t *testing.T) {
topicName := fmt.Sprintf("persistent://public/default/test-inactive-topic-%s",
test.RandomSuffix())
createArgs := []string{"create", topicName, "1"}
_, execErr, _, _ := TestTopicCommands(CreateTopicCmd, createArgs)
assert.Nil(t, execErr)

<-time.After(5 * time.Second)

setArgs := []string{"set-inactive-topic-policies", topicName,
"-e=true",
"-t", "1h",
"-m", "delete_when_no_subscriptions"}
out, execErr, _, _ := TestTopicCommands(SetInactiveTopicCmd, setArgs)
assert.Nil(t, execErr)
assert.Equal(t, out.String(),
fmt.Sprintf("Set inactive topic policies successfully for [%s]", topicName))

<-time.After(5 * time.Second)

getArgs := []string{"get-inactive-topic-policies", topicName}
out, execErr, _, _ = TestTopicCommands(GetInactiveTopicCmd, getArgs)
assert.Nil(t, execErr)

var inactiveTopic utils.InactiveTopicPolicies
err := json.Unmarshal(out.Bytes(), &inactiveTopic)
assert.NoError(t, err)
assert.Equal(t, true, inactiveTopic.DeleteWhileInactive)
assert.Equal(t, 3600, inactiveTopic.MaxInactiveDurationSeconds)
assert.Equal(t, "delete_when_no_subscriptions", inactiveTopic.InactiveTopicDeleteMode.String())

removeArgs := []string{"remove-inactive-topic-policies", topicName}
out, execErr, _, _ = TestTopicCommands(RemoveInactiveTopicCmd, removeArgs)
assert.Nil(t, execErr)
assert.Equal(t, out.String(),
fmt.Sprintf("Remove inactive topic policies successfully from [%s]", topicName))

<-time.After(5 * time.Second)

out, execErr, _, _ = TestTopicCommands(GetInactiveTopicCmd, getArgs)
assert.Nil(t, execErr)

err = json.Unmarshal(out.Bytes(), &inactiveTopic)
assert.NoError(t, err)
assert.Equal(t, false, inactiveTopic.DeleteWhileInactive)
assert.Equal(t, 0, inactiveTopic.MaxInactiveDurationSeconds)
assert.Nil(t, inactiveTopic.InactiveTopicDeleteMode)
}
60 changes: 60 additions & 0 deletions pkg/ctl/topic/remove_inactive_topic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
)

func RemoveInactiveTopicCmd(vc *cmdutils.VerbCmd) {
var desc cmdutils.LongDescription
desc.CommandUsedFor = "Remove inactive topic policies from a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
examples = append(examples, cmdutils.Example{
Desc: desc.CommandUsedFor,
Command: "pulsarctl topics remove-inactive-topic-policies [topic]",
})
desc.CommandExamples = examples

vc.SetDescription(
"remove-inactive-topic-policies",
desc.CommandUsedFor,
desc.ToString(),
desc.ExampleToString())

vc.SetRunFuncWithNameArg(func() error {
return doRemoveInactiveTopic(vc)
}, "the topic name is not specified or the topic name is specified more than one")
}

func doRemoveInactiveTopic(vc *cmdutils.VerbCmd) error {
topic, err := utils.GetTopicName(vc.NameArg)
if err != nil {
return err
}

admin := cmdutils.NewPulsarClient()
err = admin.Topics().RemoveInactiveTopicPolicies(*topic)
if err == nil {
vc.Command.Printf("Remove inactive topic policies successfully from [%s]", topic.String())
}
return err
}
112 changes: 112 additions & 0 deletions pkg/ctl/topic/set_inactive_topic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"github.com/streamnative/pulsarctl/pkg/cmdutils"
ctlutils "github.com/streamnative/pulsarctl/pkg/ctl/utils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
)

type setInactiveTopicPoliciesArgs struct {
deleteWhileInactive bool
deleteInactiveTopicsMaxInactiveDuration string
inactiveTopicDeleteMode string
}

func SetInactiveTopicCmd(vc *cmdutils.VerbCmd) {
var desc cmdutils.LongDescription
desc.CommandUsedFor = "Set the inactive topic policies on a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
set := cmdutils.Example{
Desc: desc.CommandUsedFor,
Command: "pulsarctl topics set-inactive-topic-policies [topic] \n" +
"\t--enable-delete-while-inactive true \n" +
"\t--max-inactive-duration 1h \n" +
"\t--delete-mode delete_when_no_subscriptions",
}

examples = append(examples, set)
desc.CommandExamples = examples

args := setInactiveTopicPoliciesArgs{}

vc.Command.Flags().BoolVarP(&args.deleteWhileInactive,
"enable-delete-while-inactive",
"e",
false,
"Control whether deletion is enabled while inactive")

vc.Command.Flags().StringVarP(&args.deleteInactiveTopicsMaxInactiveDuration,
"max-inactive-duration",
"t",
"",
"Max duration of topic inactivity in seconds, "+
"topics that are inactive for longer than this value will be deleted (eg: 1s, 10s, 1m, 5h, 3d)")
vc.Command.Flags().StringVarP(&args.inactiveTopicDeleteMode,
"delete-mode",
"m",
"",
"Mode of delete inactive topic, "+
"Valid options are: [delete_when_no_subscriptions, delete_when_subscriptions_caught_up]")

_ = vc.Command.MarkFlagRequired("delete-mode")
_ = vc.Command.MarkFlagRequired("max-inactive-duration")

vc.SetDescription(
"set-inactive-topic-policies",
desc.CommandUsedFor,
desc.ToString(),
desc.ExampleToString())

vc.SetRunFuncWithNameArg(func() error {
return doSetInactiveTopic(vc, args)
}, "the topic name is not specified or the topic name is specified more than one")
}

func doSetInactiveTopic(vc *cmdutils.VerbCmd, args setInactiveTopicPoliciesArgs) error {
inactiveTopicDeleteMode, err := utils.ParseInactiveTopicDeleteMode(args.inactiveTopicDeleteMode)
if err != nil {
return err
}

maxInactiveDuration, err := ctlutils.ParseRelativeTimeInSeconds(args.deleteInactiveTopicsMaxInactiveDuration)
if err != nil {
return err
}

body := utils.NewInactiveTopicPolicies(
&inactiveTopicDeleteMode,
int(maxInactiveDuration.Seconds()),
args.deleteWhileInactive)

topic, err := utils.GetTopicName(vc.NameArg)
if err != nil {
return err
}

admin := cmdutils.NewPulsarClient()
err = admin.Topics().SetInactiveTopicPolicies(*topic, body)
if err == nil {
vc.Command.Printf("Set inactive topic policies successfully for [%s]", topic.String())
}

return err
}
3 changes: 3 additions & 0 deletions pkg/ctl/topic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
GetPublishRateCmd,
SetPublishRateCmd,
RemovePublishRateCmd,
GetInactiveTopicCmd,
SetInactiveTopicCmd,
RemoveInactiveTopicCmd,
}

cmdutils.AddVerbCmds(flagGrouping, resourceCmd, commands...)
Expand Down
28 changes: 28 additions & 0 deletions pkg/pulsar/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,15 @@ type Topics interface {

// RemoveBacklogQuota removes a backlog quota policy from a topic
RemoveBacklogQuota(utils.TopicName, utils.BacklogQuotaType) error

// GetInactiveTopicPolicies gets the inactive topic policies on a topic
GetInactiveTopicPolicies(topic utils.TopicName, applied bool) (utils.InactiveTopicPolicies, error)

// RemoveInactiveTopicPolicies removes inactive topic policies from a topic
RemoveInactiveTopicPolicies(utils.TopicName) error

// SetInactiveTopicPolicies sets the inactive topic policies on a topic
SetInactiveTopicPolicies(topic utils.TopicName, data utils.InactiveTopicPolicies) error
}

type topics struct {
Expand Down Expand Up @@ -673,3 +682,22 @@ func (t *topics) RemoveBacklogQuota(topic utils.TopicName, backlogQuotaType util
"backlogQuotaType": string(backlogQuotaType),
})
}

func (t *topics) GetInactiveTopicPolicies(topic utils.TopicName, applied bool) (utils.InactiveTopicPolicies, error) {
var out utils.InactiveTopicPolicies
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "inactiveTopicPolicies")
_, err := t.pulsar.Client.GetWithQueryParams(endpoint, &out, map[string]string{
"applied": strconv.FormatBool(applied),
}, true)
return out, err
}

func (t *topics) RemoveInactiveTopicPolicies(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "inactiveTopicPolicies")
return t.pulsar.Client.Delete(endpoint)
}

func (t *topics) SetInactiveTopicPolicies(topic utils.TopicName, data utils.InactiveTopicPolicies) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "inactiveTopicPolicies")
return t.pulsar.Client.Post(endpoint, data)
}

0 comments on commit accaf51

Please sign in to comment.