Skip to content

Commit

Permalink
new command 'queue purge' added'
Browse files Browse the repository at this point in the history
  • Loading branch information
jandelgado committed Feb 28, 2019
1 parent 2919196 commit 3106d38
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 10 deletions.
13 changes: 11 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ Usage:
rabtap queue bind QUEUE to EXCHANGE --bindingkey=KEY [--uri URI] [-kv]
rabtap queue unbind QUEUE from EXCHANGE --bindingkey=KEY [--uri URI] [-kv]
rabtap queue rm QUEUE [--uri URI] [-kv]
rabtap queue purge QUEUE [--uri URI] [-kv]
rabtap conn close CONNECTION [--reason=REASON] [--api APIURI] [-kv]
rabtap --version
Expand Down Expand Up @@ -150,6 +151,8 @@ Examples:
# use RABTAP_AMQPURI environment variable to specify broker instead of --uri
export RABTAP_AMQPURI=amqp://guest:guest@localhost:5672/
echo "Hello" | rabtap pub amq.topic -r "some.key"
rabtap sub JDQ
rabtap queue create JDQ
rabtap queue bind JDQ to amq.direct --bindingkey=key
rabtap queue rm JDQ
Expand All @@ -163,7 +166,7 @@ Examples:

### Basic commands

Rabtap understand the following commands:
Rabtap understands the following commands:

* `tap` - taps to an exchange and transparently receives messages sent to the
exchange, without affecting actual message delivery (using exchange-to-exchange
Expand All @@ -178,7 +181,7 @@ Rabtap understand the following commands:
(exclusive). If `--statistics` option is enabled, basic statistics are
included in the output. The `--filter` option allows to filter output. See
[filtering](#filtering-output-of-info-command) section for details.
* `queue` - create/bind/unbind/remove queue
* `queue` - create/bind/unbind/remove/purge queue
* `exchange` - create/remove exchange
* `connection` - close connections

Expand Down Expand Up @@ -405,6 +408,12 @@ http://localhost:15672/api (broker ver='3.7.8', mgmt ver='3.7.8', cluster='rabbi
└── amq.topic (exchange, type 'topic', [D])
```

Additionally use the `purge` command to remove all elements from a queue, e.g.

```
$ rabtap queue purge myqueue
```

## JSON message format

When using the `--json` option, messages are print/read as a stream of JSON
Expand Down
18 changes: 15 additions & 3 deletions cmd/rabtap/cmd_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"crypto/tls"
"os"

"github.com/jandelgado/rabtap/pkg"
rabtap "github.com/jandelgado/rabtap/pkg"
"github.com/streadway/amqp"
)

Expand Down Expand Up @@ -44,8 +44,21 @@ func cmdQueueRemove(amqpURI, queueName string, tlsConfig *tls.Config) {
}), "removing queue failed", os.Exit)
}

// cmdQueuePurge purges a queue, i.e. removes all queued elements
func cmdQueuePurge(amqpURI, queueName string, tlsConfig *tls.Config) {
failOnError(rabtap.SimpleAmqpConnector(amqpURI,
tlsConfig,
func(chn *amqp.Channel) error {
log.Debugf("purging queue %s", queueName)
num, err := rabtap.PurgeQueue(chn, queueName)
if err == nil {
log.Infof("purged %d elements from queue %s", num, queueName)
}
return err
}), "purge queue failed", os.Exit)
}

// cmdQueueBindToExchange binds a queue to an exchange
// TODO(JD) add ifUnused, ifEmpty parameters
func cmdQueueBindToExchange(amqpURI, queueName, key, exchangeName string,
tlsConfig *tls.Config) {

Expand All @@ -59,7 +72,6 @@ func cmdQueueBindToExchange(amqpURI, queueName, key, exchangeName string,
}

// cmdQueueUnbindFromExchange unbinds a queue from an exchange
// TODO(JD) add ifUnused, ifEmpty parameters
func cmdQueueUnbindFromExchange(amqpURI, queueName, key, exchangeName string,
tlsConfig *tls.Config) {

Expand Down
7 changes: 6 additions & 1 deletion cmd/rabtap/command_line.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"os"

docopt "github.com/docopt/docopt-go"
"github.com/jandelgado/rabtap/pkg"
rabtap "github.com/jandelgado/rabtap/pkg"
)

// RabtapAppVersion hold the application version and is set during link
Expand All @@ -33,6 +33,7 @@ Usage:
rabtap queue bind QUEUE to EXCHANGE --bindingkey=KEY [--uri URI] [-kv]
rabtap queue unbind QUEUE from EXCHANGE --bindingkey=KEY [--uri URI] [-kv]
rabtap queue rm QUEUE [--uri URI] [-kv]
rabtap queue purge QUEUE [--uri URI] [-kv]
rabtap conn close CONNECTION [--reason=REASON] [--api APIURI] [-kv]
rabtap --version
Expand Down Expand Up @@ -117,6 +118,8 @@ const (
QueueBindCmd
// QueueUnbindCmd unbinds a queue from an exchange
QueueUnbindCmd
// QueuePurgeCmd purges a queue
QueuePurgeCmd
// ConnCloseCmd closes a connection
ConnCloseCmd
)
Expand Down Expand Up @@ -280,6 +283,8 @@ func parseQueueCmdArgs(args map[string]interface{}) (CommandLineArgs, error) {
result.Cmd = QueueUnbindCmd
result.QueueBindingKey = args["--bindingkey"].(string)
result.ExchangeName = args["EXCHANGE"].(string)
} else if args["purge"].(bool) {
result.Cmd = QueuePurgeCmd
}
return result, nil
}
Expand Down
10 changes: 10 additions & 0 deletions cmd/rabtap/command_line_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,16 @@ func TestCliRemoveQueue(t *testing.T) {
assert.Equal(t, "uri", args.AmqpURI)
}

func TestCliPurgeQueue(t *testing.T) {
args, err := ParseCommandLineArgs(
[]string{"queue", "purge", "name", "--uri", "uri"})

assert.Nil(t, err)
assert.Equal(t, QueuePurgeCmd, args.Cmd)
assert.Equal(t, "name", args.QueueName)
assert.Equal(t, "uri", args.AmqpURI)
}

func TestCliUnbindQueue(t *testing.T) {
args, err := ParseCommandLineArgs(
[]string{"queue", "unbind", "queuename", "from", "exchangename",
Expand Down
4 changes: 3 additions & 1 deletion cmd/rabtap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
//"net/http"
//_ "net/http/pprof"

"github.com/jandelgado/rabtap/pkg"
rabtap "github.com/jandelgado/rabtap/pkg"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -139,6 +139,8 @@ func main() {
autodelete: args.Autodelete, tlsConfig: tlsConfig})
case QueueRemoveCmd:
cmdQueueRemove(args.AmqpURI, args.QueueName, tlsConfig)
case QueuePurgeCmd:
cmdQueuePurge(args.AmqpURI, args.QueueName, tlsConfig)
case QueueBindCmd:
cmdQueueBindToExchange(args.AmqpURI, args.QueueName,
args.QueueBindingKey, args.ExchangeName, tlsConfig)
Expand Down
8 changes: 5 additions & 3 deletions pkg/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,24 @@ func CreateQueue(channel *amqp.Channel, queueName string,
// RemoveQueue removes a queue
func RemoveQueue(channel *amqp.Channel,
queueName string, ifUnused, ifEmpty bool) error {

_, err := channel.QueueDelete(queueName, ifUnused, ifEmpty, false /* wait*/)
return err
}

// PurgeQueue clears a queue. Returns number of elements purged
func PurgeQueue(channel *amqp.Channel, queueName string) (int, error) {
return channel.QueuePurge(queueName, false /* wait*/)
}

// BindQueueToExchange binds the given queue to the given exchange.
// TODO(JD) support for header based routing
func BindQueueToExchange(channel *amqp.Channel,
queueName, key, exchangeName string) error {

return channel.QueueBind(queueName, key, exchangeName, false /* wait */, nil)
}

// UnbindQueueFromExchange unbinds a queue from an exchange
func UnbindQueueFromExchange(channel *amqp.Channel,
queueName, key, exchangeName string) error {

return channel.QueueUnbind(queueName, key, exchangeName, nil)
}
24 changes: 24 additions & 0 deletions pkg/queue_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,30 @@ func findBinding(queue, exchange, key string, bindings []RabbitBinding) int {
return -1
}

func TestIntegrationAmqpPurgeQueue(t *testing.T) {

const queueTestName = "purgetestqueue"
const exchangeTestName = "" // default exchange

// TODO empty queue before test in case it exisits

// create queue
conn, ch := testcommon.IntegrationTestConnection(t, "", "", 0, false)
defer conn.Close()
err := CreateQueue(ch, queueTestName, false, false, false)
assert.Nil(t, err)

// publish & purge 10 messages
const numMessages = 10
testcommon.PublishTestMessages(t, ch, numMessages, exchangeTestName, queueTestName, nil)
num, err := PurgeQueue(ch, queueTestName)
assert.Nil(t, err)
assert.Equal(t, numMessages, num)
// TODO additionally verifiy that queue is empty

// TODO remove queue
}

func TestIntegrationAmqpQueueCreateBindUnbindAndRemove(t *testing.T) {

// since in order to remove and unbind a queue we must create it first, we
Expand Down

0 comments on commit 3106d38

Please sign in to comment.