Skip to content

Commit

Permalink
improve termination behaviour in tap and sub commands (#17)
Browse files Browse the repository at this point in the history
and minor refactorings
  • Loading branch information
jandelgado committed Aug 2, 2019
1 parent 6117de0 commit 40ebac5
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 78 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ See [below](#build-from-source) if you prefer to compile from source.
## Usage

```
rabtap - RabbitMQ wire tap.
rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap
Usage:
rabtap -h|--help
Expand Down Expand Up @@ -125,7 +125,8 @@ Options:
--by-connection output of info command starts with connections.
--consumers include consumers and connections in output of info command.
-d, --durable create durable exchange/queue.
--filter EXPR Filter for info command to filter queues (see README.md)
--filter EXPR Predicate for info command to filter queues [default: true]
(see README.md for details)
-h, --help print this help.
-j, --json print/save/publish message metadata and body to a
single JSON file. JSON body is base64 encoded. Otherwise
Expand Down
8 changes: 4 additions & 4 deletions cmd/rabtap/cmd_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ package main
// subscribe cli command handler

import (
"context"
"crypto/tls"
"os"

rabtap "github.com/jandelgado/rabtap/pkg"
)
Expand All @@ -17,17 +17,17 @@ type CmdSubscribeArg struct {
queue string
tlsConfig *tls.Config
messageReceiveFunc MessageReceiveFunc
signalChannel chan os.Signal
}

// cmdSub subscribes to messages from the given queue
func cmdSubscribe(cmd CmdSubscribeArg) error {
func cmdSubscribe(ctx context.Context, cmd CmdSubscribeArg) error {
log.Debugf("cmdSub: subscribing to queue %s", cmd.queue)

// this channel is used to decouple message receiving threads
// with the main thread, which does the actual message processing
messageChannel := make(rabtap.TapChannel)
subscriber := rabtap.NewAmqpSubscriber(cmd.amqpURI, false, cmd.tlsConfig, log)
defer subscriber.Close()
go subscriber.EstablishSubscription(cmd.queue, messageChannel)
return messageReceiveLoop(messageChannel, cmd.messageReceiveFunc, cmd.signalChannel)
return messageReceiveLoop(ctx, messageChannel, cmd.messageReceiveFunc)
}
21 changes: 11 additions & 10 deletions cmd/rabtap/cmd_subscribe_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ package main
// integration test

import (
"context"
"crypto/tls"
"io"
"os"
"testing"
"time"

Expand All @@ -22,23 +22,25 @@ import (

func TestCmdSubFailsEarlyWhenBrokerIsNotAvailable(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
done := make(chan bool)
go func() {
cmdSubscribe(CmdSubscribeArg{
// we expect cmdSubscribe to return
cmdSubscribe(ctx, CmdSubscribeArg{
amqpURI: "invalid uri",
queue: "queue",
tlsConfig: &tls.Config{},
messageReceiveFunc: func(rabtap.TapMessage) error { return nil },
signalChannel: make(chan os.Signal, 1)})
})
done <- true
}()

// test if our tap received the message
select {
case <-done:
case <-time.After(time.Second * 2):
assert.Fail(t, "cmdSubscribe did not fail on initial connection error")
}
cancel()
}

func TestCmdSub(t *testing.T) {
Expand All @@ -58,9 +60,9 @@ func TestCmdSub(t *testing.T) {
return nil
}

// signalChannel receives ctrl+C/interrput signal
signalChannel := make(chan os.Signal, 1)
ctx, cancel := context.WithCancel(context.Background())

// creat exchange
cmdExchangeCreate(CmdExchangeCreateArg{amqpURI: amqpURI,
exchange: testExchange, exchangeType: "fanout",
durable: false, tlsConfig: tlsConfig})
Expand All @@ -73,12 +75,11 @@ func TestCmdSub(t *testing.T) {
defer cmdQueueRemove(amqpURI, testQueue, tlsConfig)

// subscribe to testQueue
go cmdSubscribe(CmdSubscribeArg{
go cmdSubscribe(ctx, CmdSubscribeArg{
amqpURI: amqpURI,
queue: testQueue,
tlsConfig: tlsConfig,
messageReceiveFunc: receiveFunc,
signalChannel: signalChannel})
messageReceiveFunc: receiveFunc})

time.Sleep(time.Second * 1)

Expand Down Expand Up @@ -107,7 +108,7 @@ func TestCmdSub(t *testing.T) {
case <-time.After(time.Second * 2):
assert.Fail(t, "did not receive message within expected time")
}
signalChannel <- os.Interrupt
cancel() // stop cmdSubscribe()

cmdQueueUnbindFromExchange(amqpURI, testQueue, testKey, testExchange, tlsConfig)
// TODO check that queue is unbound
Expand Down
9 changes: 5 additions & 4 deletions cmd/rabtap/cmd_tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
package main

import (
"context"
"crypto/tls"
"os"

rabtap "github.com/jandelgado/rabtap/pkg"
)
Expand All @@ -23,14 +23,15 @@ func tapCmdShutdownFunc(taps []*rabtap.AmqpTap) {

// cmdTap taps to the given exchanges and displays or saves the received
// messages.
func cmdTap(tapConfig []rabtap.TapConfiguration, tlsConfig *tls.Config,
messageReceiveFunc MessageReceiveFunc, signalChannel chan os.Signal) {
func cmdTap(ctx context.Context, tapConfig []rabtap.TapConfiguration, tlsConfig *tls.Config,
messageReceiveFunc MessageReceiveFunc) {

// this channel is used to decouple message receiving threads
// with the main thread, which does the actual message processing
tapMessageChannel := make(rabtap.TapChannel)
taps := establishTaps(tapMessageChannel, tapConfig, tlsConfig)
defer tapCmdShutdownFunc(taps)
err := messageReceiveLoop(tapMessageChannel, messageReceiveFunc, signalChannel)
err := messageReceiveLoop(ctx, tapMessageChannel, messageReceiveFunc)
if err != nil {
log.Errorf("tap failed with %v", err)
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/rabtap/cmd_tap_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
package main

import (
"context"
"crypto/tls"
"os"
"testing"
"time"

Expand Down Expand Up @@ -38,9 +38,9 @@ func TestCmdTap(t *testing.T) {
tapConfig := []rabtap.TapConfiguration{
{AmqpURI: testcommon.IntegrationURIFromEnv(),
Exchanges: exchangeConfig}}
// signalChannel receives ctrl+C/interrput signal
signalChannel := make(chan os.Signal, 1)
go cmdTap(tapConfig, &tls.Config{}, receiveFunc, signalChannel)

ctx, cancel := context.WithCancel(context.Background())
go cmdTap(ctx, tapConfig, &tls.Config{}, receiveFunc)

time.Sleep(time.Second * 1)
err := ch.Publish(
Expand All @@ -61,5 +61,5 @@ func TestCmdTap(t *testing.T) {
case <-time.After(time.Second * 2):
assert.Fail(t, "did not receive message within expected time")
}
signalChannel <- os.Interrupt
cancel() // stop cmdTap()
}
12 changes: 5 additions & 7 deletions cmd/rabtap/command_line.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var RabtapAppVersion = "(version not specified)"

const (
// note: usage is interpreted by docopt - this is code.
usage = `rabtap - RabbitMQ wire tap.
usage = `rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap
Usage:
rabtap -h|--help
Expand Down Expand Up @@ -52,7 +52,8 @@ Options:
--by-connection output of info command starts with connections.
--consumers include consumers and connections in output of info command.
-d, --durable create durable exchange/queue.
--filter EXPR Filter for info command to filter queues (see README.md)
--filter EXPR Predicate for info command to filter queues [default: true]
(see README.md for details)
-h, --help print this help.
-j, --json print/save/publish message metadata and body to a
single JSON file. JSON body is base64 encoded. Otherwise
Expand Down Expand Up @@ -151,7 +152,7 @@ type CommandLineArgs struct {
ShowConsumers bool // info mode: also show consumer
ShowByConnection bool // info mode: show by connection
ShowStats bool // info mode: also show statistics
QueueFilter *string // info mode: optional filter for queues
QueueFilter string // info mode: optional filter predicate
OmitEmptyExchanges bool // info mode: do not show exchanges wo/ bindings
Durable bool // queue create, exchange create
Autodelete bool // queue create, exchange create
Expand Down Expand Up @@ -210,16 +211,13 @@ func parseInfoCmdArgs(args map[string]interface{}) (CommandLineArgs, error) {
result := CommandLineArgs{
Cmd: InfoCmd,
commonArgs: parseCommonArgs(args),
QueueFilter: args["--filter"].(string),
OmitEmptyExchanges: args["--omit-empty"].(bool),
ShowConsumers: args["--consumers"].(bool),
ShowStats: args["--stats"].(bool),
ShowDefaultExchange: args["--show-default"].(bool),
ShowByConnection: args["--by-connection"].(bool)}

if args["--filter"] != nil {
filter := args["--filter"].(string)
result.QueueFilter = &filter
}
var err error
if result.APIURI, err = parseAPIURI(args); err != nil {
return result, err
Expand Down
4 changes: 2 additions & 2 deletions cmd/rabtap/command_line_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,13 @@ func TestCliInfoCmd(t *testing.T) {
assert.Equal(t, 0, len(args.TapConfig))
assert.Equal(t, InfoCmd, args.Cmd)
assert.Equal(t, "APIURI", args.APIURI)
assert.Equal(t, "true", args.QueueFilter)
assert.False(t, args.Verbose)
assert.False(t, args.ShowStats)
assert.False(t, args.ShowConsumers)
assert.False(t, args.ShowByConnection)
assert.False(t, args.InsecureTLS)
assert.False(t, args.NoColor)
assert.Nil(t, args.QueueFilter)
assert.False(t, args.OmitEmptyExchanges)
}

Expand Down Expand Up @@ -193,7 +193,7 @@ func TestCliInfoCmdAllOptionsAreSet(t *testing.T) {
assert.Equal(t, "APIURI", args.APIURI)
assert.Nil(t, args.SaveDir)
assert.False(t, args.Verbose)
assert.Equal(t, "EXPR", *args.QueueFilter)
assert.Equal(t, "EXPR", args.QueueFilter)
assert.True(t, args.ShowStats)
assert.True(t, args.ShowConsumers)
assert.True(t, args.ShowDefaultExchange)
Expand Down
74 changes: 41 additions & 33 deletions cmd/rabtap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
package main

import (
"context"
"crypto/tls"
"fmt"
"net/url"
"os"
"os/signal"
Expand Down Expand Up @@ -42,16 +44,9 @@ func getTLSConfig(insecureTLS bool) *tls.Config {
return &tls.Config{InsecureSkipVerify: insecureTLS}
}

func createFilterPredicate(expr *string) (Predicate, error) {
if expr != nil {
return NewPredicateExpression(*expr)
}
return NewPredicateExpression("true")
}

func startCmdInfo(args CommandLineArgs, title string) {
queueFilter, err := createFilterPredicate(args.QueueFilter)
failOnError(err, "invalid queue filter predicate", os.Exit)
queueFilter, err := NewPredicateExpression(args.QueueFilter)
failOnError(err, fmt.Sprintf("invalid queue filter predicate '%s'", args.QueueFilter), os.Exit)
apiURL, err := url.Parse(args.APIURI)
failOnError(err, "invalid api url", os.Exit)
cmdInfo(CmdInfoArg{
Expand Down Expand Up @@ -85,51 +80,36 @@ func startCmdPublish(args CommandLineArgs) {
failOnError(err, "error publishing message", os.Exit)
}

func startCmdSubscribe(args CommandLineArgs) {
// signalChannel receives ctrl+C/interrput signal
signalChannel := make(chan os.Signal, 5)
signal.Notify(signalChannel, os.Interrupt)
func startCmdSubscribe(ctx context.Context, args CommandLineArgs) {
messageReceiveFunc := createMessageReceiveFunc(
NewColorableWriter(os.Stdout), args.JSONFormat,
args.SaveDir, args.NoColor)
err := cmdSubscribe(CmdSubscribeArg{
err := cmdSubscribe(ctx, CmdSubscribeArg{
amqpURI: args.AmqpURI,
queue: args.QueueName,
tlsConfig: getTLSConfig(args.InsecureTLS),
messageReceiveFunc: messageReceiveFunc,
signalChannel: signalChannel})
messageReceiveFunc: messageReceiveFunc})
failOnError(err, "error subscribing messages", os.Exit)
}

func startCmdTap(args CommandLineArgs) {
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, os.Interrupt)
func startCmdTap(ctx context.Context, args CommandLineArgs) {
messageReceiveFunc := createMessageReceiveFunc(
NewColorableWriter(os.Stdout), args.JSONFormat,
args.SaveDir, args.NoColor)
cmdTap(args.TapConfig, getTLSConfig(args.InsecureTLS),
messageReceiveFunc, signalChannel)
cmdTap(ctx, args.TapConfig, getTLSConfig(args.InsecureTLS),
messageReceiveFunc)
}

func main() {
args, err := ParseCommandLineArgs(os.Args[1:])
if err != nil {
log.Fatal(err)
}
initLogging(args.Verbose)
log.Debugf("parsed cli-args: %+v", args)

tlsConfig := getTLSConfig(args.InsecureTLS)

func dispatchCmd(ctx context.Context, args CommandLineArgs, tlsConfig *tls.Config) {
switch args.Cmd {
case InfoCmd:
startCmdInfo(args, args.APIURI)
case SubCmd:
startCmdSubscribe(args)
startCmdSubscribe(ctx, args)
case PubCmd:
startCmdPublish(args)
case TapCmd:
startCmdTap(args)
startCmdTap(ctx, args)
case ExchangeCreateCmd:
cmdExchangeCreate(CmdExchangeCreateArg{amqpURI: args.AmqpURI,
exchange: args.ExchangeName, exchangeType: args.ExchangeType,
Expand All @@ -156,3 +136,31 @@ func main() {
args.CloseReason, tlsConfig)
}
}

func main() {
args, err := ParseCommandLineArgs(os.Args[1:])
if err != nil {
log.Fatal(err)
}

initLogging(args.Verbose)
tlsConfig := getTLSConfig(args.InsecureTLS)

// translate ^C (Interrput) in ctx.Done()
ctx, cancel := context.WithCancel(context.Background())
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
defer func() {
signal.Stop(c)
cancel()
}()
go func() {
select {
case <-c:
cancel()
case <-ctx.Done():
}
}()

dispatchCmd(ctx, args, tlsConfig)
}
2 changes: 1 addition & 1 deletion cmd/rabtap/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func Example_startCmdInfo() {
mock := testcommon.NewRabbitAPIMock(testcommon.MockModeEmpty)
defer mock.Close()

args := CommandLineArgs{APIURI: mock.URL, commonArgs: commonArgs{NoColor: true}}
args, _ := ParseCommandLineArgs([]string{"info", "--api", mock.URL, "--no-color"})
startCmdInfo(args, "http://rootnode")

// Output:
Expand Down

0 comments on commit 40ebac5

Please sign in to comment.