From 40ebac5be8247de6422bf05509af62c8174dc58b Mon Sep 17 00:00:00 2001 From: jandelgado Date: Fri, 2 Aug 2019 18:30:41 +0200 Subject: [PATCH] improve termination behaviour in tap and sub commands (#17) and minor refactorings --- README.md | 5 +- cmd/rabtap/cmd_subscribe.go | 8 +-- cmd/rabtap/cmd_subscribe_integration_test.go | 21 +++--- cmd/rabtap/cmd_tap.go | 9 +-- cmd/rabtap/cmd_tap_integration_test.go | 10 +-- cmd/rabtap/command_line.go | 12 ++-- cmd/rabtap/command_line_test.go | 4 +- cmd/rabtap/main.go | 74 +++++++++++--------- cmd/rabtap/main_test.go | 2 +- cmd/rabtap/subscribe.go | 13 ++-- cmd/rabtap/subscribe_test.go | 9 +-- 11 files changed, 89 insertions(+), 78 deletions(-) diff --git a/README.md b/README.md index 931708b..85c7575 100644 --- a/README.md +++ b/README.md @@ -89,7 +89,7 @@ See [below](#build-from-source) if you prefer to compile from source. ## Usage ``` -rabtap - RabbitMQ wire tap. +rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap Usage: rabtap -h|--help @@ -125,7 +125,8 @@ Options: --by-connection output of info command starts with connections. --consumers include consumers and connections in output of info command. -d, --durable create durable exchange/queue. - --filter EXPR Filter for info command to filter queues (see README.md) + --filter EXPR Predicate for info command to filter queues [default: true] + (see README.md for details) -h, --help print this help. -j, --json print/save/publish message metadata and body to a single JSON file. JSON body is base64 encoded. Otherwise diff --git a/cmd/rabtap/cmd_subscribe.go b/cmd/rabtap/cmd_subscribe.go index 0a5a7d4..2633beb 100644 --- a/cmd/rabtap/cmd_subscribe.go +++ b/cmd/rabtap/cmd_subscribe.go @@ -5,8 +5,8 @@ package main // subscribe cli command handler import ( + "context" "crypto/tls" - "os" rabtap "github.com/jandelgado/rabtap/pkg" ) @@ -17,17 +17,17 @@ type CmdSubscribeArg struct { queue string tlsConfig *tls.Config messageReceiveFunc MessageReceiveFunc - signalChannel chan os.Signal } // cmdSub subscribes to messages from the given queue -func cmdSubscribe(cmd CmdSubscribeArg) error { +func cmdSubscribe(ctx context.Context, cmd CmdSubscribeArg) error { log.Debugf("cmdSub: subscribing to queue %s", cmd.queue) + // this channel is used to decouple message receiving threads // with the main thread, which does the actual message processing messageChannel := make(rabtap.TapChannel) subscriber := rabtap.NewAmqpSubscriber(cmd.amqpURI, false, cmd.tlsConfig, log) defer subscriber.Close() go subscriber.EstablishSubscription(cmd.queue, messageChannel) - return messageReceiveLoop(messageChannel, cmd.messageReceiveFunc, cmd.signalChannel) + return messageReceiveLoop(ctx, messageChannel, cmd.messageReceiveFunc) } diff --git a/cmd/rabtap/cmd_subscribe_integration_test.go b/cmd/rabtap/cmd_subscribe_integration_test.go index c366891..d6ae16e 100644 --- a/cmd/rabtap/cmd_subscribe_integration_test.go +++ b/cmd/rabtap/cmd_subscribe_integration_test.go @@ -8,9 +8,9 @@ package main // integration test import ( + "context" "crypto/tls" "io" - "os" "testing" "time" @@ -22,23 +22,25 @@ import ( func TestCmdSubFailsEarlyWhenBrokerIsNotAvailable(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) done := make(chan bool) go func() { - cmdSubscribe(CmdSubscribeArg{ + // we expect cmdSubscribe to return + cmdSubscribe(ctx, CmdSubscribeArg{ amqpURI: "invalid uri", queue: "queue", tlsConfig: &tls.Config{}, messageReceiveFunc: func(rabtap.TapMessage) error { return nil }, - signalChannel: make(chan os.Signal, 1)}) + }) done <- true }() - // test if our tap received the message select { case <-done: case <-time.After(time.Second * 2): assert.Fail(t, "cmdSubscribe did not fail on initial connection error") } + cancel() } func TestCmdSub(t *testing.T) { @@ -58,9 +60,9 @@ func TestCmdSub(t *testing.T) { return nil } - // signalChannel receives ctrl+C/interrput signal - signalChannel := make(chan os.Signal, 1) + ctx, cancel := context.WithCancel(context.Background()) + // creat exchange cmdExchangeCreate(CmdExchangeCreateArg{amqpURI: amqpURI, exchange: testExchange, exchangeType: "fanout", durable: false, tlsConfig: tlsConfig}) @@ -73,12 +75,11 @@ func TestCmdSub(t *testing.T) { defer cmdQueueRemove(amqpURI, testQueue, tlsConfig) // subscribe to testQueue - go cmdSubscribe(CmdSubscribeArg{ + go cmdSubscribe(ctx, CmdSubscribeArg{ amqpURI: amqpURI, queue: testQueue, tlsConfig: tlsConfig, - messageReceiveFunc: receiveFunc, - signalChannel: signalChannel}) + messageReceiveFunc: receiveFunc}) time.Sleep(time.Second * 1) @@ -107,7 +108,7 @@ func TestCmdSub(t *testing.T) { case <-time.After(time.Second * 2): assert.Fail(t, "did not receive message within expected time") } - signalChannel <- os.Interrupt + cancel() // stop cmdSubscribe() cmdQueueUnbindFromExchange(amqpURI, testQueue, testKey, testExchange, tlsConfig) // TODO check that queue is unbound diff --git a/cmd/rabtap/cmd_tap.go b/cmd/rabtap/cmd_tap.go index 934c699..52abe85 100644 --- a/cmd/rabtap/cmd_tap.go +++ b/cmd/rabtap/cmd_tap.go @@ -3,8 +3,8 @@ package main import ( + "context" "crypto/tls" - "os" rabtap "github.com/jandelgado/rabtap/pkg" ) @@ -23,14 +23,15 @@ func tapCmdShutdownFunc(taps []*rabtap.AmqpTap) { // cmdTap taps to the given exchanges and displays or saves the received // messages. -func cmdTap(tapConfig []rabtap.TapConfiguration, tlsConfig *tls.Config, - messageReceiveFunc MessageReceiveFunc, signalChannel chan os.Signal) { +func cmdTap(ctx context.Context, tapConfig []rabtap.TapConfiguration, tlsConfig *tls.Config, + messageReceiveFunc MessageReceiveFunc) { + // this channel is used to decouple message receiving threads // with the main thread, which does the actual message processing tapMessageChannel := make(rabtap.TapChannel) taps := establishTaps(tapMessageChannel, tapConfig, tlsConfig) defer tapCmdShutdownFunc(taps) - err := messageReceiveLoop(tapMessageChannel, messageReceiveFunc, signalChannel) + err := messageReceiveLoop(ctx, tapMessageChannel, messageReceiveFunc) if err != nil { log.Errorf("tap failed with %v", err) } diff --git a/cmd/rabtap/cmd_tap_integration_test.go b/cmd/rabtap/cmd_tap_integration_test.go index 16cca72..597c98a 100644 --- a/cmd/rabtap/cmd_tap_integration_test.go +++ b/cmd/rabtap/cmd_tap_integration_test.go @@ -5,8 +5,8 @@ package main import ( + "context" "crypto/tls" - "os" "testing" "time" @@ -38,9 +38,9 @@ func TestCmdTap(t *testing.T) { tapConfig := []rabtap.TapConfiguration{ {AmqpURI: testcommon.IntegrationURIFromEnv(), Exchanges: exchangeConfig}} - // signalChannel receives ctrl+C/interrput signal - signalChannel := make(chan os.Signal, 1) - go cmdTap(tapConfig, &tls.Config{}, receiveFunc, signalChannel) + + ctx, cancel := context.WithCancel(context.Background()) + go cmdTap(ctx, tapConfig, &tls.Config{}, receiveFunc) time.Sleep(time.Second * 1) err := ch.Publish( @@ -61,5 +61,5 @@ func TestCmdTap(t *testing.T) { case <-time.After(time.Second * 2): assert.Fail(t, "did not receive message within expected time") } - signalChannel <- os.Interrupt + cancel() // stop cmdTap() } diff --git a/cmd/rabtap/command_line.go b/cmd/rabtap/command_line.go index a71ecd7..5ecfcc2 100644 --- a/cmd/rabtap/command_line.go +++ b/cmd/rabtap/command_line.go @@ -16,7 +16,7 @@ var RabtapAppVersion = "(version not specified)" const ( // note: usage is interpreted by docopt - this is code. - usage = `rabtap - RabbitMQ wire tap. + usage = `rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap Usage: rabtap -h|--help @@ -52,7 +52,8 @@ Options: --by-connection output of info command starts with connections. --consumers include consumers and connections in output of info command. -d, --durable create durable exchange/queue. - --filter EXPR Filter for info command to filter queues (see README.md) + --filter EXPR Predicate for info command to filter queues [default: true] + (see README.md for details) -h, --help print this help. -j, --json print/save/publish message metadata and body to a single JSON file. JSON body is base64 encoded. Otherwise @@ -151,7 +152,7 @@ type CommandLineArgs struct { ShowConsumers bool // info mode: also show consumer ShowByConnection bool // info mode: show by connection ShowStats bool // info mode: also show statistics - QueueFilter *string // info mode: optional filter for queues + QueueFilter string // info mode: optional filter predicate OmitEmptyExchanges bool // info mode: do not show exchanges wo/ bindings Durable bool // queue create, exchange create Autodelete bool // queue create, exchange create @@ -210,16 +211,13 @@ func parseInfoCmdArgs(args map[string]interface{}) (CommandLineArgs, error) { result := CommandLineArgs{ Cmd: InfoCmd, commonArgs: parseCommonArgs(args), + QueueFilter: args["--filter"].(string), OmitEmptyExchanges: args["--omit-empty"].(bool), ShowConsumers: args["--consumers"].(bool), ShowStats: args["--stats"].(bool), ShowDefaultExchange: args["--show-default"].(bool), ShowByConnection: args["--by-connection"].(bool)} - if args["--filter"] != nil { - filter := args["--filter"].(string) - result.QueueFilter = &filter - } var err error if result.APIURI, err = parseAPIURI(args); err != nil { return result, err diff --git a/cmd/rabtap/command_line_test.go b/cmd/rabtap/command_line_test.go index 5cf5a30..ecd5f54 100644 --- a/cmd/rabtap/command_line_test.go +++ b/cmd/rabtap/command_line_test.go @@ -145,13 +145,13 @@ func TestCliInfoCmd(t *testing.T) { assert.Equal(t, 0, len(args.TapConfig)) assert.Equal(t, InfoCmd, args.Cmd) assert.Equal(t, "APIURI", args.APIURI) + assert.Equal(t, "true", args.QueueFilter) assert.False(t, args.Verbose) assert.False(t, args.ShowStats) assert.False(t, args.ShowConsumers) assert.False(t, args.ShowByConnection) assert.False(t, args.InsecureTLS) assert.False(t, args.NoColor) - assert.Nil(t, args.QueueFilter) assert.False(t, args.OmitEmptyExchanges) } @@ -193,7 +193,7 @@ func TestCliInfoCmdAllOptionsAreSet(t *testing.T) { assert.Equal(t, "APIURI", args.APIURI) assert.Nil(t, args.SaveDir) assert.False(t, args.Verbose) - assert.Equal(t, "EXPR", *args.QueueFilter) + assert.Equal(t, "EXPR", args.QueueFilter) assert.True(t, args.ShowStats) assert.True(t, args.ShowConsumers) assert.True(t, args.ShowDefaultExchange) diff --git a/cmd/rabtap/main.go b/cmd/rabtap/main.go index 30ab308..67ec91d 100644 --- a/cmd/rabtap/main.go +++ b/cmd/rabtap/main.go @@ -3,7 +3,9 @@ package main import ( + "context" "crypto/tls" + "fmt" "net/url" "os" "os/signal" @@ -42,16 +44,9 @@ func getTLSConfig(insecureTLS bool) *tls.Config { return &tls.Config{InsecureSkipVerify: insecureTLS} } -func createFilterPredicate(expr *string) (Predicate, error) { - if expr != nil { - return NewPredicateExpression(*expr) - } - return NewPredicateExpression("true") -} - func startCmdInfo(args CommandLineArgs, title string) { - queueFilter, err := createFilterPredicate(args.QueueFilter) - failOnError(err, "invalid queue filter predicate", os.Exit) + queueFilter, err := NewPredicateExpression(args.QueueFilter) + failOnError(err, fmt.Sprintf("invalid queue filter predicate '%s'", args.QueueFilter), os.Exit) apiURL, err := url.Parse(args.APIURI) failOnError(err, "invalid api url", os.Exit) cmdInfo(CmdInfoArg{ @@ -85,51 +80,36 @@ func startCmdPublish(args CommandLineArgs) { failOnError(err, "error publishing message", os.Exit) } -func startCmdSubscribe(args CommandLineArgs) { - // signalChannel receives ctrl+C/interrput signal - signalChannel := make(chan os.Signal, 5) - signal.Notify(signalChannel, os.Interrupt) +func startCmdSubscribe(ctx context.Context, args CommandLineArgs) { messageReceiveFunc := createMessageReceiveFunc( NewColorableWriter(os.Stdout), args.JSONFormat, args.SaveDir, args.NoColor) - err := cmdSubscribe(CmdSubscribeArg{ + err := cmdSubscribe(ctx, CmdSubscribeArg{ amqpURI: args.AmqpURI, queue: args.QueueName, tlsConfig: getTLSConfig(args.InsecureTLS), - messageReceiveFunc: messageReceiveFunc, - signalChannel: signalChannel}) + messageReceiveFunc: messageReceiveFunc}) failOnError(err, "error subscribing messages", os.Exit) } -func startCmdTap(args CommandLineArgs) { - signalChannel := make(chan os.Signal, 1) - signal.Notify(signalChannel, os.Interrupt) +func startCmdTap(ctx context.Context, args CommandLineArgs) { messageReceiveFunc := createMessageReceiveFunc( NewColorableWriter(os.Stdout), args.JSONFormat, args.SaveDir, args.NoColor) - cmdTap(args.TapConfig, getTLSConfig(args.InsecureTLS), - messageReceiveFunc, signalChannel) + cmdTap(ctx, args.TapConfig, getTLSConfig(args.InsecureTLS), + messageReceiveFunc) } -func main() { - args, err := ParseCommandLineArgs(os.Args[1:]) - if err != nil { - log.Fatal(err) - } - initLogging(args.Verbose) - log.Debugf("parsed cli-args: %+v", args) - - tlsConfig := getTLSConfig(args.InsecureTLS) - +func dispatchCmd(ctx context.Context, args CommandLineArgs, tlsConfig *tls.Config) { switch args.Cmd { case InfoCmd: startCmdInfo(args, args.APIURI) case SubCmd: - startCmdSubscribe(args) + startCmdSubscribe(ctx, args) case PubCmd: startCmdPublish(args) case TapCmd: - startCmdTap(args) + startCmdTap(ctx, args) case ExchangeCreateCmd: cmdExchangeCreate(CmdExchangeCreateArg{amqpURI: args.AmqpURI, exchange: args.ExchangeName, exchangeType: args.ExchangeType, @@ -156,3 +136,31 @@ func main() { args.CloseReason, tlsConfig) } } + +func main() { + args, err := ParseCommandLineArgs(os.Args[1:]) + if err != nil { + log.Fatal(err) + } + + initLogging(args.Verbose) + tlsConfig := getTLSConfig(args.InsecureTLS) + + // translate ^C (Interrput) in ctx.Done() + ctx, cancel := context.WithCancel(context.Background()) + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + defer func() { + signal.Stop(c) + cancel() + }() + go func() { + select { + case <-c: + cancel() + case <-ctx.Done(): + } + }() + + dispatchCmd(ctx, args, tlsConfig) +} diff --git a/cmd/rabtap/main_test.go b/cmd/rabtap/main_test.go index a058429..51c51ec 100644 --- a/cmd/rabtap/main_test.go +++ b/cmd/rabtap/main_test.go @@ -49,7 +49,7 @@ func Example_startCmdInfo() { mock := testcommon.NewRabbitAPIMock(testcommon.MockModeEmpty) defer mock.Close() - args := CommandLineArgs{APIURI: mock.URL, commonArgs: commonArgs{NoColor: true}} + args, _ := ParseCommandLineArgs([]string{"info", "--api", mock.URL, "--no-color"}) startCmdInfo(args, "http://rootnode") // Output: diff --git a/cmd/rabtap/subscribe.go b/cmd/rabtap/subscribe.go index b87378b..5830fee 100644 --- a/cmd/rabtap/subscribe.go +++ b/cmd/rabtap/subscribe.go @@ -5,9 +5,9 @@ package main // common functionality to subscribe to queues. import ( + "context" "fmt" "io" - "os" "path" "time" @@ -17,11 +17,15 @@ import ( // MessageReceiveFunc processes receiced messages from a tap. type MessageReceiveFunc func(rabtap.TapMessage) error -func messageReceiveLoop(messageChan rabtap.TapChannel, - messageReceiveFunc MessageReceiveFunc, signalChannel chan os.Signal) error { +func messageReceiveLoop(ctx context.Context, messageChan rabtap.TapChannel, + messageReceiveFunc MessageReceiveFunc) error { for { select { + case <-ctx.Done(): + log.Debugf("subscribe: cancel") + return nil + case message, more := <-messageChan: if !more { log.Debug("subscribe: messageReceiveLoop: channel closed.") @@ -37,9 +41,6 @@ func messageReceiveLoop(messageChan rabtap.TapChannel, if err := messageReceiveFunc(message); err != nil { log.Error(err) } - case <-signalChannel: - log.Debugf("subscribe: caught signal!") - return nil } } } diff --git a/cmd/rabtap/subscribe_test.go b/cmd/rabtap/subscribe_test.go index c10c051..f5e2b9c 100644 --- a/cmd/rabtap/subscribe_test.go +++ b/cmd/rabtap/subscribe_test.go @@ -4,6 +4,7 @@ package main import ( "bytes" + "context" "io/ioutil" "os" "strings" @@ -49,8 +50,8 @@ func TestCreateMessageReceiveFuncJSONToFile(t *testing.T) { } func TestMessageReceiveLoop(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) messageChan := make(rabtap.TapChannel) - signalChannel := make(chan os.Signal) done := make(chan bool) received := 0 @@ -59,10 +60,10 @@ func TestMessageReceiveLoop(t *testing.T) { done <- true return nil } - go messageReceiveLoop(messageChan, receiveFunc, signalChannel) + go messageReceiveLoop(ctx, messageChan, receiveFunc) messageChan <- rabtap.TapMessage{} - <-done // TODO add timeout - signalChannel <- os.Interrupt // terminates go routine + <-done // TODO add timeout + cancel() assert.Equal(t, 1, received) }