From ba97fa2c4726252b66d3a9b438c94a7dda25be68 Mon Sep 17 00:00:00 2001 From: jandelgado Date: Fri, 2 Aug 2019 20:16:46 +0200 Subject: [PATCH] add new --no-auto-ack option to sub command (#18) implements (#15) --- CHANGELOG.md | 7 +++++++ README.md | 7 +++++-- cmd/rabtap/cmd_subscribe.go | 4 +++- cmd/rabtap/command_line.go | 9 +++++++-- cmd/rabtap/command_line_test.go | 1 + cmd/rabtap/main.go | 1 + pkg/subscribe.go | 16 +++++++++++----- pkg/subscribe_integration_test.go | 3 ++- pkg/tap.go | 3 ++- 9 files changed, 39 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4cf5001..a177de0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,13 @@ # Changelog for rabtap +## v1.19 (2019-08-03) + +* new option `--no-auto-ack` for `sub` command disables auto acknowledge when + messages are received by rabtap (#15) +* fix: termination with ctrl+c in `tap` and `sub` commands now works reliably +* Simplified code + ## v1.18 (2019-07-05) * fix: tap: allow colons in exchange names by escaping them (`exchange\\:with\\:colon`). diff --git a/README.md b/README.md index 85c7575..8f411ae 100644 --- a/README.md +++ b/README.md @@ -89,7 +89,7 @@ See [below](#build-from-source) if you prefer to compile from source. ## Usage ``` -rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap +rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap Usage: rabtap -h|--help @@ -99,7 +99,7 @@ Usage: [--filter EXPR] [--omit-empty] [--show-default] [--by-connection] [-knv] rabtap pub [--uri URI] EXCHANGE [FILE] [--routingkey=KEY] [-jkv] - rabtap sub QUEUE [--uri URI] [--saveto=DIR] [-jkvn] + rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--no-auto-ack] [-jkvn] rabtap exchange create EXCHANGE [--uri URI] [--type TYPE] [-adkv] rabtap exchange rm EXCHANGE [--uri URI] [-kv] rabtap queue create QUEUE [--uri URI] [-adkv] @@ -133,6 +133,9 @@ Options: metadata and body (as-is) are saved separately. -k, --insecure allow insecure TLS connections (no certificate check). -n, --no-color don't colorize output (also environment variable NO_COLOR) + --no-auto-ack disable auto-ack in subscribe mode. This will lead to + unacked messages on the broker which will be requeued + when the channel is closed. -o, --omit-empty don't show echanges without bindings in info command. --reason=REASON reason why the connection was closed [default: closed by rabtap]. diff --git a/cmd/rabtap/cmd_subscribe.go b/cmd/rabtap/cmd_subscribe.go index 2633beb..a892d01 100644 --- a/cmd/rabtap/cmd_subscribe.go +++ b/cmd/rabtap/cmd_subscribe.go @@ -17,6 +17,7 @@ type CmdSubscribeArg struct { queue string tlsConfig *tls.Config messageReceiveFunc MessageReceiveFunc + AutoAck bool } // cmdSub subscribes to messages from the given queue @@ -26,7 +27,8 @@ func cmdSubscribe(ctx context.Context, cmd CmdSubscribeArg) error { // this channel is used to decouple message receiving threads // with the main thread, which does the actual message processing messageChannel := make(rabtap.TapChannel) - subscriber := rabtap.NewAmqpSubscriber(cmd.amqpURI, false, cmd.tlsConfig, log) + config := rabtap.AmqpSubscriberConfig{Exclusive: false, AutoAck: cmd.AutoAck} + subscriber := rabtap.NewAmqpSubscriber(config, cmd.amqpURI, cmd.tlsConfig, log) defer subscriber.Close() go subscriber.EstablishSubscription(cmd.queue, messageChannel) return messageReceiveLoop(ctx, messageChannel, cmd.messageReceiveFunc) diff --git a/cmd/rabtap/command_line.go b/cmd/rabtap/command_line.go index 5ecfcc2..3e3de5d 100644 --- a/cmd/rabtap/command_line.go +++ b/cmd/rabtap/command_line.go @@ -16,7 +16,7 @@ var RabtapAppVersion = "(version not specified)" const ( // note: usage is interpreted by docopt - this is code. - usage = `rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap + usage = `rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap Usage: rabtap -h|--help @@ -26,7 +26,7 @@ Usage: [--filter EXPR] [--omit-empty] [--show-default] [--by-connection] [-knv] rabtap pub [--uri URI] EXCHANGE [FILE] [--routingkey=KEY] [-jkv] - rabtap sub QUEUE [--uri URI] [--saveto=DIR] [-jkvn] + rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--no-auto-ack] [-jkvn] rabtap exchange create EXCHANGE [--uri URI] [--type TYPE] [-adkv] rabtap exchange rm EXCHANGE [--uri URI] [-kv] rabtap queue create QUEUE [--uri URI] [-adkv] @@ -60,6 +60,9 @@ Options: metadata and body (as-is) are saved separately. -k, --insecure allow insecure TLS connections (no certificate check). -n, --no-color don't colorize output (also environment variable NO_COLOR) + --no-auto-ack disable auto-ack in subscribe mode. This will lead to + unacked messages on the broker which will be requeued + when the channel is closed. -o, --omit-empty don't show echanges without bindings in info command. --reason=REASON reason why the connection was closed [default: closed by rabtap]. @@ -145,6 +148,7 @@ type CommandLineArgs struct { PubExchange string // pub mode: exchange to publish to PubRoutingKey string // pub mode: routing key, defaults to "" PubFile *string // pub mode: file to send + AutoAck bool // sub mode: auto ack enabled QueueName string // queue create, remove, bind, sub QueueBindingKey string // queue bind ExchangeName string // exchange name create, remove or queue bind @@ -245,6 +249,7 @@ func parseSubCmdArgs(args map[string]interface{}) (CommandLineArgs, error) { result := CommandLineArgs{ Cmd: SubCmd, commonArgs: parseCommonArgs(args), + AutoAck: !args["--no-auto-ack"].(bool), QueueName: args["QUEUE"].(string), } var err error diff --git a/cmd/rabtap/command_line_test.go b/cmd/rabtap/command_line_test.go index ecd5f54..2a7afc6 100644 --- a/cmd/rabtap/command_line_test.go +++ b/cmd/rabtap/command_line_test.go @@ -261,6 +261,7 @@ func TestCliSubCmdSaveToDir(t *testing.T) { assert.Equal(t, "queuename", args.QueueName) assert.Equal(t, "uri", args.AmqpURI) assert.Equal(t, "dir", *args.SaveDir) + assert.True(t, args.AutoAck) } func TestCliCreateQueue(t *testing.T) { diff --git a/cmd/rabtap/main.go b/cmd/rabtap/main.go index 67ec91d..e993c67 100644 --- a/cmd/rabtap/main.go +++ b/cmd/rabtap/main.go @@ -87,6 +87,7 @@ func startCmdSubscribe(ctx context.Context, args CommandLineArgs) { err := cmdSubscribe(ctx, CmdSubscribeArg{ amqpURI: args.AmqpURI, queue: args.QueueName, + AutoAck: args.AutoAck, tlsConfig: getTLSConfig(args.InsecureTLS), messageReceiveFunc: messageReceiveFunc}) failOnError(err, "error subscribing messages", os.Exit) diff --git a/pkg/subscribe.go b/pkg/subscribe.go index 1924a45..a04696d 100644 --- a/pkg/subscribe.go +++ b/pkg/subscribe.go @@ -11,19 +11,25 @@ import ( "github.com/streadway/amqp" ) +// AmqpSubscriberConfig stores configuration of the subscriber +type AmqpSubscriberConfig struct { + Exclusive bool + AutoAck bool +} + // AmqpSubscriber allows to tap to subscribe to queues type AmqpSubscriber struct { + config AmqpSubscriberConfig connection *AmqpConnector logger logrus.StdLogger - exclusive bool } // NewAmqpSubscriber returns a new AmqpSubscriber object associated with the // RabbitMQ broker denoted by the uri parameter. -func NewAmqpSubscriber(uri string, exclusive bool, tlsConfig *tls.Config, logger logrus.StdLogger) *AmqpSubscriber { +func NewAmqpSubscriber(config AmqpSubscriberConfig, uri string, tlsConfig *tls.Config, logger logrus.StdLogger) *AmqpSubscriber { return &AmqpSubscriber{ + config: config, connection: NewAmqpConnector(uri, tlsConfig, logger), - exclusive: exclusive, logger: logger} } @@ -119,8 +125,8 @@ func (s *AmqpSubscriber) consumeMessages(conn *amqp.Connection, msgs, err := ch.Consume( queueName, "__rabtap-consumer-"+uuid.Must(uuid.NewRandom()).String()[:8], // TODO param - true, // auto-ack - s.exclusive, + s.config.AutoAck, + s.config.Exclusive, false, // no-local - unsupported false, // wait nil, // args diff --git a/pkg/subscribe_integration_test.go b/pkg/subscribe_integration_test.go index 34994df..0843bf7 100644 --- a/pkg/subscribe_integration_test.go +++ b/pkg/subscribe_integration_test.go @@ -30,7 +30,8 @@ func TestSubscribe(t *testing.T) { finishChan := make(chan int) - subscriber := NewAmqpSubscriber(testcommon.IntegrationURIFromEnv(), false, &tls.Config{}, log.New(os.Stderr, "", log.LstdFlags)) + config := AmqpSubscriberConfig{Exclusive: false, AutoAck: true} + subscriber := NewAmqpSubscriber(config, testcommon.IntegrationURIFromEnv(), &tls.Config{}, log.New(os.Stderr, "", log.LstdFlags)) defer subscriber.Close() resultChannel := make(TapChannel) go subscriber.EstablishSubscription(queueName, resultChannel) diff --git a/pkg/tap.go b/pkg/tap.go index e7e7151..06a388e 100644 --- a/pkg/tap.go +++ b/pkg/tap.go @@ -25,8 +25,9 @@ type AmqpTap struct { // NewAmqpTap returns a new AmqpTap object associated with the RabbitMQ // broker denoted by the uri parameter. func NewAmqpTap(uri string, tlsConfig *tls.Config, logger logrus.StdLogger) *AmqpTap { + config := AmqpSubscriberConfig{Exclusive: true, AutoAck: true} return &AmqpTap{ - AmqpSubscriber: NewAmqpSubscriber(uri, true /* exclusive */, tlsConfig, logger)} + AmqpSubscriber: NewAmqpSubscriber(config, uri, tlsConfig, logger)} } func getTapExchangeNameForExchange(exchange, postfix string) string {