From 69311b0e13728f42e956112629830546f995d002 Mon Sep 17 00:00:00 2001 From: Jan Delgado Date: Mon, 2 Apr 2018 00:30:54 +0200 Subject: [PATCH] add new tests --- cmd/main/broker_info.go | 24 +++++++++--------- cmd/main/cmd_publish.go | 7 +++--- cmd/main/cmd_subscribe.go | 6 +---- cmd/main/cmd_tap.go | 9 +++---- cmd/main/main.go | 9 ++++--- cmd/main/main_test.go | 14 +++++++++-- cmd/main/subscribe_test.go | 49 +++++++++++++++++++++++++++++-------- pkg/tap_integration_test.go | 2 +- 8 files changed, 78 insertions(+), 42 deletions(-) diff --git a/cmd/main/broker_info.go b/cmd/main/broker_info.go index 43d7a76..e307c78 100644 --- a/cmd/main/broker_info.go +++ b/cmd/main/broker_info.go @@ -17,33 +17,33 @@ type BrokerInfo struct { func NewBrokerInfo(client *rabtap.RabbitHTTPClient) (BrokerInfo, error) { var err error - var s BrokerInfo + var bi BrokerInfo // collect infos from rabtap.RabbitMQ API - s.Overview, err = client.GetOverview() + bi.Overview, err = client.GetOverview() if err != nil { - return s, err + return bi, err } - s.Exchanges, err = client.GetExchanges() + bi.Exchanges, err = client.GetExchanges() if err != nil { - return s, err + return bi, err } - s.Bindings, err = client.GetBindings() + bi.Bindings, err = client.GetBindings() if err != nil { - return s, err + return bi, err } - s.Queues, err = client.GetQueues() + bi.Queues, err = client.GetQueues() if err != nil { - return s, err + return bi, err } - s.Consumers, err = client.GetConsumers() + bi.Consumers, err = client.GetConsumers() if err != nil { - return s, err + return bi, err } - return s, nil + return bi, nil } diff --git a/cmd/main/cmd_publish.go b/cmd/main/cmd_publish.go index 61a5ff1..880a03e 100644 --- a/cmd/main/cmd_publish.go +++ b/cmd/main/cmd_publish.go @@ -45,10 +45,12 @@ func publishMessage(publishChannel rabtap.PublishChannel, } // readSingleMessageFromRawFile reads a single messages from the given io.Reader -// which is typically stdin or a file. On subsequent calls, it returns io.EOF. +// which is typically stdin or a file. If reading from stdin, CTRL+D (linux) +// or CTRL+Z (Win) on an empty line terminates the reader. func readSingleMessageFromRawFile(reader io.Reader) (amqp.Publishing, error) { buf := new(bytes.Buffer) - if numRead, err := buf.ReadFrom(reader); err != nil { + numRead, err := buf.ReadFrom(reader) + if err != nil { return amqp.Publishing{}, err } else if numRead == 0 { return amqp.Publishing{}, io.EOF @@ -102,7 +104,6 @@ func publishMessageStream(publishChannel rabtap.PublishChannel, // cmdPublish reads messages with the provied readNextMessageFunc and // publishes the messages to the given exchange. func cmdPublish(cmd CmdPublishArg) { - log.Debugf("publishing message(s) to exchange %s with routingkey %s", cmd.exchange, cmd.routingKey) publisher := rabtap.NewAmqpPublish(cmd.amqpURI, cmd.tlsConfig, log) diff --git a/cmd/main/cmd_subscribe.go b/cmd/main/cmd_subscribe.go index 1db533e..4ab72be 100644 --- a/cmd/main/cmd_subscribe.go +++ b/cmd/main/cmd_subscribe.go @@ -7,7 +7,6 @@ package main import ( "crypto/tls" "os" - "os/signal" "github.com/jandelgado/rabtap/pkg" ) @@ -23,17 +22,14 @@ type CmdSubscribeArg struct { // cmdSub subscribes to messages from the given queue func cmdSubscribe(cmd CmdSubscribeArg) { - + log.Debugf("cmdSub: subscribing to queue %s", cmd.queue) // this channel is used to decouple message receiving threads // with the main thread, which does the actual message processing - log.Debugf("cmdSub: subscribing to queue %s", cmd.queue) - messageChannel := make(rabtap.TapChannel) subscriber := rabtap.NewAmqpSubscriber(cmd.amqpURI, cmd.tlsConfig, log) defer subscriber.Close() go subscriber.EstablishSubscription(cmd.queue, messageChannel) - signal.Notify(cmd.signalChannel, os.Interrupt) messageReceiveLoop(messageChannel, cmd.messageReceiveFunc, cmd.signalChannel) log.Debug("cmdSub: cmd_subscribe ending") } diff --git a/cmd/main/cmd_tap.go b/cmd/main/cmd_tap.go index 10c46f7..46b8fd5 100644 --- a/cmd/main/cmd_tap.go +++ b/cmd/main/cmd_tap.go @@ -7,7 +7,6 @@ package main import ( "crypto/tls" "os" - "os/signal" "github.com/jandelgado/rabtap/pkg" ) @@ -15,7 +14,9 @@ import ( func tapCmdShutdownFunc(taps []*rabtap.AmqpTap) { log.Info("rabtap tap threads shutting down ...") for _, tap := range taps { - tap.Close() + if err := tap.Close(); err != nil { + log.Errorf("error closing tap: %v", err) + } } } @@ -23,15 +24,11 @@ func tapCmdShutdownFunc(taps []*rabtap.AmqpTap) { // messages. func cmdTap(tapConfig []rabtap.TapConfiguration, tlsConfig *tls.Config, messageReceiveFunc MessageReceiveFunc, signalChannel chan os.Signal) { - // this channel is used to decouple message receiving threads // with the main thread, which does the actual message processing tapMessageChannel := make(rabtap.TapChannel) taps := establishTaps(tapMessageChannel, tapConfig, tlsConfig) defer tapCmdShutdownFunc(taps) - - signal.Notify(signalChannel, os.Interrupt) - messageReceiveLoop(tapMessageChannel, messageReceiveFunc, signalChannel) } diff --git a/cmd/main/main.go b/cmd/main/main.go index 18d0467..ad8ab1b 100644 --- a/cmd/main/main.go +++ b/cmd/main/main.go @@ -5,6 +5,7 @@ package main import ( "crypto/tls" "os" + "os/signal" "github.com/jandelgado/rabtap/pkg" "github.com/sirupsen/logrus" @@ -35,9 +36,9 @@ func getTLSConfig(insecureTLS bool) *tls.Config { return &tls.Config{InsecureSkipVerify: insecureTLS} } -func startCmdInfo(args CommandLineArgs) { +func startCmdInfo(args CommandLineArgs, title string) { cmdInfo(CmdInfoArg{ - rootNode: args.APIURI, + rootNode: title, client: rabtap.NewRabbitHTTPClient(args.APIURI, getTLSConfig(args.InsecureTLS)), printBrokerInfoConfig: PrintBrokerInfoConfig{ ShowStats: args.ShowStats, @@ -66,6 +67,7 @@ func startCmdPublish(args CommandLineArgs) { func startCmdSubscribe(args CommandLineArgs) { // signalChannel receives ctrl+C/interrput signal signalChannel := make(chan os.Signal, 1) + signal.Notify(signalChannel, os.Interrupt) // messageReceiveFunc receives the tapped messages, prints // and optionally saves them. messageReceiveFunc := createMessageReceiveFunc( @@ -81,6 +83,7 @@ func startCmdSubscribe(args CommandLineArgs) { func startCmdTap(args CommandLineArgs) { signalChannel := make(chan os.Signal, 1) + signal.Notify(signalChannel, os.Interrupt) messageReceiveFunc := createMessageReceiveFunc( NewColorableWriter(os.Stdout), args.JSONFormat, args.SaveDir, args.NoColor) @@ -100,7 +103,7 @@ func main() { switch args.Cmd { case InfoCmd: - startCmdInfo(args) + startCmdInfo(args, args.APIURI) case SubCmd: startCmdSubscribe(args) case PubCmd: diff --git a/cmd/main/main_test.go b/cmd/main/main_test.go index 936ce5f..66079e7 100644 --- a/cmd/main/main_test.go +++ b/cmd/main/main_test.go @@ -1,13 +1,12 @@ // Copyright (C) 2017 Jan Delgado -// +build integration - package main import ( "errors" "testing" + "github.com/jandelgado/rabtap/pkg/testcommon" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" ) @@ -45,3 +44,14 @@ func TestFailOnError(t *testing.T) { assert.False(t, exitFuncCalled) } + +func ExamplestartCmdInfo() { + mock := testcommon.NewRabbitAPIMock(testcommon.MockModeEmpty) + defer mock.Close() + + args := CommandLineArgs{APIURI: mock.URL, commonArgs: commonArgs{NoColor: true}} + startCmdInfo(args, "http://rootnode") + + // Output: + // http://rootnode (broker ver=3.6.9, mgmt ver=3.6.9, cluster=rabbit@08f57d1fe8ab) +} diff --git a/cmd/main/subscribe_test.go b/cmd/main/subscribe_test.go index 3a006b5..1d9e228 100644 --- a/cmd/main/subscribe_test.go +++ b/cmd/main/subscribe_test.go @@ -3,11 +3,15 @@ package main import ( + "bytes" "io/ioutil" "os" + "strings" "testing" + rabtap "github.com/jandelgado/rabtap/pkg" "github.com/streadway/amqp" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -16,19 +20,44 @@ func TestCreateMessageReceiveFuncRawToFile(t *testing.T) { require.Nil(t, err) defer os.RemoveAll(testDir) - out := os.Stdout - rcvFunc := createMessageReceiveFunc(out, false, &testDir, false) - _ = rcvFunc(&amqp.Delivery{}) + var b bytes.Buffer + rcvFunc := createMessageReceiveFunc(&b, false, &testDir, false) + _ = rcvFunc(&amqp.Delivery{Body: []byte("Testmessage")}) - // TODO + assert.True(t, strings.Contains(b.String(), "Testmessage")) + + // TODO make created filename predicatable and check written file } func TestCreateMessageReceiveFuncJSONToFile(t *testing.T) { - // testDir, err := ioutil.TempDir("", "") - // require.Nil(t, err) - // defer os.RemoveAll(testDir) + testDir, err := ioutil.TempDir("", "") + require.Nil(t, err) + defer os.RemoveAll(testDir) + + var b bytes.Buffer + rcvFunc := createMessageReceiveFunc(&b, true, &testDir, false) + _ = rcvFunc(&amqp.Delivery{Body: []byte("Testmessage")}) + + assert.True(t, strings.Contains(b.String(), "\"Body\": \"VGVzdG1lc3NhZ2U=\"")) + + // TODO make created filename predicatable and check written file +} - // out := os.Stdout - // rcvFunc := createMessageReceiveFunc(out, true, &testDir, false) - // err = rcvFunc(&amqp.Delivery{}) +func TestMessageReceiveLoop(t *testing.T) { + messageChan := make(rabtap.TapChannel) + signalChannel := make(chan os.Signal) + done := make(chan bool) + received := 0 + + receiveFunc := func(*amqp.Delivery) error { + received = received + 1 + done <- true + return nil + } + go messageReceiveLoop(messageChan, receiveFunc, signalChannel) + + messageChan <- &rabtap.TapMessage{} + <-done // TODO add timeout + signalChannel <- os.Interrupt // terminates go routine + assert.Equal(t, 1, received) } diff --git a/pkg/tap_integration_test.go b/pkg/tap_integration_test.go index 5db6fe9..d737781 100644 --- a/pkg/tap_integration_test.go +++ b/pkg/tap_integration_test.go @@ -248,7 +248,7 @@ func TestIntegrationCloseTap(t *testing.T) { assert.Nil(t, err) assert.False(t, tap.Connected()) - // try to close again + // try to close again, should not block & return error err = tap.Close() assert.NotNil(t, err) assert.False(t, tap.Connected())