Skip to content

Commit

Permalink
add new --no-auto-ack option to sub command (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
jandelgado committed Aug 2, 2019
1 parent 40ebac5 commit fc4d15c
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 12 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@

# Changelog for rabtap

## v1.19 (2019-08-03)

* new option `--no-auto-ack` for `sub` command disables auto acknowledge when
messages are received by rabtap (#15)
* fix: termination with ctrl+c in `tap` and `sub` commands now works reliably
* Simplified code

## v1.18 (2019-07-05)

* fix: tap: allow colons in exchange names by escaping them (`exchange\\:with\\:colon`).
Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ See [below](#build-from-source) if you prefer to compile from source.
## Usage

```
rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap
rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap
Usage:
rabtap -h|--help
Expand All @@ -99,7 +99,7 @@ Usage:
[--filter EXPR]
[--omit-empty] [--show-default] [--by-connection] [-knv]
rabtap pub [--uri URI] EXCHANGE [FILE] [--routingkey=KEY] [-jkv]
rabtap sub QUEUE [--uri URI] [--saveto=DIR] [-jkvn]
rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--no-auto-ack] [-jkvn]
rabtap exchange create EXCHANGE [--uri URI] [--type TYPE] [-adkv]
rabtap exchange rm EXCHANGE [--uri URI] [-kv]
rabtap queue create QUEUE [--uri URI] [-adkv]
Expand Down Expand Up @@ -133,6 +133,9 @@ Options:
metadata and body (as-is) are saved separately.
-k, --insecure allow insecure TLS connections (no certificate check).
-n, --no-color don't colorize output (also environment variable NO_COLOR)
--no-auto-ack disable auto-ack in subscribe mode. This will lead to
unacked messages on the broker which will be requeued
when the channel is closed.
-o, --omit-empty don't show echanges without bindings in info command.
--reason=REASON reason why the connection was closed
[default: closed by rabtap].
Expand Down
4 changes: 3 additions & 1 deletion cmd/rabtap/cmd_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type CmdSubscribeArg struct {
queue string
tlsConfig *tls.Config
messageReceiveFunc MessageReceiveFunc
AutoAck bool
}

// cmdSub subscribes to messages from the given queue
Expand All @@ -26,7 +27,8 @@ func cmdSubscribe(ctx context.Context, cmd CmdSubscribeArg) error {
// this channel is used to decouple message receiving threads
// with the main thread, which does the actual message processing
messageChannel := make(rabtap.TapChannel)
subscriber := rabtap.NewAmqpSubscriber(cmd.amqpURI, false, cmd.tlsConfig, log)
config := rabtap.AmqpSubscriberConfig{Exclusive: false, AutoAck: cmd.AutoAck}
subscriber := rabtap.NewAmqpSubscriber(config, cmd.amqpURI, cmd.tlsConfig, log)
defer subscriber.Close()
go subscriber.EstablishSubscription(cmd.queue, messageChannel)
return messageReceiveLoop(ctx, messageChannel, cmd.messageReceiveFunc)
Expand Down
9 changes: 7 additions & 2 deletions cmd/rabtap/command_line.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var RabtapAppVersion = "(version not specified)"

const (
// note: usage is interpreted by docopt - this is code.
usage = `rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap
usage = `rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap
Usage:
rabtap -h|--help
Expand All @@ -26,7 +26,7 @@ Usage:
[--filter EXPR]
[--omit-empty] [--show-default] [--by-connection] [-knv]
rabtap pub [--uri URI] EXCHANGE [FILE] [--routingkey=KEY] [-jkv]
rabtap sub QUEUE [--uri URI] [--saveto=DIR] [-jkvn]
rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--no-auto-ack] [-jkvn]
rabtap exchange create EXCHANGE [--uri URI] [--type TYPE] [-adkv]
rabtap exchange rm EXCHANGE [--uri URI] [-kv]
rabtap queue create QUEUE [--uri URI] [-adkv]
Expand Down Expand Up @@ -60,6 +60,9 @@ Options:
metadata and body (as-is) are saved separately.
-k, --insecure allow insecure TLS connections (no certificate check).
-n, --no-color don't colorize output (also environment variable NO_COLOR)
--no-auto-ack disable auto-ack in subscribe mode. This will lead to
unacked messages on the broker which will be requeued
when the channel is closed.
-o, --omit-empty don't show echanges without bindings in info command.
--reason=REASON reason why the connection was closed
[default: closed by rabtap].
Expand Down Expand Up @@ -145,6 +148,7 @@ type CommandLineArgs struct {
PubExchange string // pub mode: exchange to publish to
PubRoutingKey string // pub mode: routing key, defaults to ""
PubFile *string // pub mode: file to send
AutoAck bool // sub mode: auto ack enabled
QueueName string // queue create, remove, bind, sub
QueueBindingKey string // queue bind
ExchangeName string // exchange name create, remove or queue bind
Expand Down Expand Up @@ -245,6 +249,7 @@ func parseSubCmdArgs(args map[string]interface{}) (CommandLineArgs, error) {
result := CommandLineArgs{
Cmd: SubCmd,
commonArgs: parseCommonArgs(args),
AutoAck: !args["--no-auto-ack"].(bool),
QueueName: args["QUEUE"].(string),
}
var err error
Expand Down
1 change: 1 addition & 0 deletions cmd/rabtap/command_line_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ func TestCliSubCmdSaveToDir(t *testing.T) {
assert.Equal(t, "queuename", args.QueueName)
assert.Equal(t, "uri", args.AmqpURI)
assert.Equal(t, "dir", *args.SaveDir)
assert.True(t, args.AutoAck)
}

func TestCliCreateQueue(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions cmd/rabtap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func startCmdSubscribe(ctx context.Context, args CommandLineArgs) {
err := cmdSubscribe(ctx, CmdSubscribeArg{
amqpURI: args.AmqpURI,
queue: args.QueueName,
AutoAck: args.AutoAck,
tlsConfig: getTLSConfig(args.InsecureTLS),
messageReceiveFunc: messageReceiveFunc})
failOnError(err, "error subscribing messages", os.Exit)
Expand Down
16 changes: 11 additions & 5 deletions pkg/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,25 @@ import (
"github.com/streadway/amqp"
)

// AmqpSubscriberConfig stores configuration of the subscriber
type AmqpSubscriberConfig struct {
Exclusive bool
AutoAck bool
}

// AmqpSubscriber allows to tap to subscribe to queues
type AmqpSubscriber struct {
config AmqpSubscriberConfig
connection *AmqpConnector
logger logrus.StdLogger
exclusive bool
}

// NewAmqpSubscriber returns a new AmqpSubscriber object associated with the
// RabbitMQ broker denoted by the uri parameter.
func NewAmqpSubscriber(uri string, exclusive bool, tlsConfig *tls.Config, logger logrus.StdLogger) *AmqpSubscriber {
func NewAmqpSubscriber(config AmqpSubscriberConfig, uri string, tlsConfig *tls.Config, logger logrus.StdLogger) *AmqpSubscriber {
return &AmqpSubscriber{
config: config,
connection: NewAmqpConnector(uri, tlsConfig, logger),
exclusive: exclusive,
logger: logger}
}

Expand Down Expand Up @@ -119,8 +125,8 @@ func (s *AmqpSubscriber) consumeMessages(conn *amqp.Connection,
msgs, err := ch.Consume(
queueName,
"__rabtap-consumer-"+uuid.Must(uuid.NewRandom()).String()[:8], // TODO param
true, // auto-ack
s.exclusive,
s.config.AutoAck,
s.config.Exclusive,
false, // no-local - unsupported
false, // wait
nil, // args
Expand Down
3 changes: 2 additions & 1 deletion pkg/subscribe_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func TestSubscribe(t *testing.T) {

finishChan := make(chan int)

subscriber := NewAmqpSubscriber(testcommon.IntegrationURIFromEnv(), false, &tls.Config{}, log.New(os.Stderr, "", log.LstdFlags))
config := AmqpSubscriberConfig{Exclusive: false, AutoAck: true}
subscriber := NewAmqpSubscriber(config, testcommon.IntegrationURIFromEnv(), &tls.Config{}, log.New(os.Stderr, "", log.LstdFlags))
defer subscriber.Close()
resultChannel := make(TapChannel)
go subscriber.EstablishSubscription(queueName, resultChannel)
Expand Down
3 changes: 2 additions & 1 deletion pkg/tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ type AmqpTap struct {
// NewAmqpTap returns a new AmqpTap object associated with the RabbitMQ
// broker denoted by the uri parameter.
func NewAmqpTap(uri string, tlsConfig *tls.Config, logger logrus.StdLogger) *AmqpTap {
config := AmqpSubscriberConfig{Exclusive: true, AutoAck: true}
return &AmqpTap{
AmqpSubscriber: NewAmqpSubscriber(uri, true /* exclusive */, tlsConfig, logger)}
AmqpSubscriber: NewAmqpSubscriber(config, uri, tlsConfig, logger)}
}

func getTapExchangeNameForExchange(exchange, postfix string) string {
Expand Down

0 comments on commit fc4d15c

Please sign in to comment.