Skip to content

Commit

Permalink
add new tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jandelgado committed Apr 1, 2018
1 parent 03c9916 commit 69311b0
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 42 deletions.
24 changes: 12 additions & 12 deletions cmd/main/broker_info.go
Expand Up @@ -17,33 +17,33 @@ type BrokerInfo struct {
func NewBrokerInfo(client *rabtap.RabbitHTTPClient) (BrokerInfo, error) {

var err error
var s BrokerInfo
var bi BrokerInfo

// collect infos from rabtap.RabbitMQ API
s.Overview, err = client.GetOverview()
bi.Overview, err = client.GetOverview()
if err != nil {
return s, err
return bi, err
}

s.Exchanges, err = client.GetExchanges()
bi.Exchanges, err = client.GetExchanges()
if err != nil {
return s, err
return bi, err
}

s.Bindings, err = client.GetBindings()
bi.Bindings, err = client.GetBindings()
if err != nil {
return s, err
return bi, err
}

s.Queues, err = client.GetQueues()
bi.Queues, err = client.GetQueues()
if err != nil {
return s, err
return bi, err
}

s.Consumers, err = client.GetConsumers()
bi.Consumers, err = client.GetConsumers()
if err != nil {
return s, err
return bi, err
}

return s, nil
return bi, nil
}
7 changes: 4 additions & 3 deletions cmd/main/cmd_publish.go
Expand Up @@ -45,10 +45,12 @@ func publishMessage(publishChannel rabtap.PublishChannel,
}

// readSingleMessageFromRawFile reads a single messages from the given io.Reader
// which is typically stdin or a file. On subsequent calls, it returns io.EOF.
// which is typically stdin or a file. If reading from stdin, CTRL+D (linux)
// or CTRL+Z (Win) on an empty line terminates the reader.
func readSingleMessageFromRawFile(reader io.Reader) (amqp.Publishing, error) {
buf := new(bytes.Buffer)
if numRead, err := buf.ReadFrom(reader); err != nil {
numRead, err := buf.ReadFrom(reader)
if err != nil {
return amqp.Publishing{}, err
} else if numRead == 0 {
return amqp.Publishing{}, io.EOF
Expand Down Expand Up @@ -102,7 +104,6 @@ func publishMessageStream(publishChannel rabtap.PublishChannel,
// cmdPublish reads messages with the provied readNextMessageFunc and
// publishes the messages to the given exchange.
func cmdPublish(cmd CmdPublishArg) {

log.Debugf("publishing message(s) to exchange %s with routingkey %s",
cmd.exchange, cmd.routingKey)
publisher := rabtap.NewAmqpPublish(cmd.amqpURI, cmd.tlsConfig, log)
Expand Down
6 changes: 1 addition & 5 deletions cmd/main/cmd_subscribe.go
Expand Up @@ -7,7 +7,6 @@ package main
import (
"crypto/tls"
"os"
"os/signal"

"github.com/jandelgado/rabtap/pkg"
)
Expand All @@ -23,17 +22,14 @@ type CmdSubscribeArg struct {

// cmdSub subscribes to messages from the given queue
func cmdSubscribe(cmd CmdSubscribeArg) {

log.Debugf("cmdSub: subscribing to queue %s", cmd.queue)
// this channel is used to decouple message receiving threads
// with the main thread, which does the actual message processing
log.Debugf("cmdSub: subscribing to queue %s", cmd.queue)

messageChannel := make(rabtap.TapChannel)
subscriber := rabtap.NewAmqpSubscriber(cmd.amqpURI, cmd.tlsConfig, log)
defer subscriber.Close()
go subscriber.EstablishSubscription(cmd.queue, messageChannel)

signal.Notify(cmd.signalChannel, os.Interrupt)
messageReceiveLoop(messageChannel, cmd.messageReceiveFunc, cmd.signalChannel)
log.Debug("cmdSub: cmd_subscribe ending")
}
9 changes: 3 additions & 6 deletions cmd/main/cmd_tap.go
Expand Up @@ -7,31 +7,28 @@ package main
import (
"crypto/tls"
"os"
"os/signal"

"github.com/jandelgado/rabtap/pkg"
)

func tapCmdShutdownFunc(taps []*rabtap.AmqpTap) {
log.Info("rabtap tap threads shutting down ...")
for _, tap := range taps {
tap.Close()
if err := tap.Close(); err != nil {
log.Errorf("error closing tap: %v", err)
}
}
}

// cmdTap taps to the given exchanges and displays or saves the received
// messages.
func cmdTap(tapConfig []rabtap.TapConfiguration, tlsConfig *tls.Config,
messageReceiveFunc MessageReceiveFunc, signalChannel chan os.Signal) {

// this channel is used to decouple message receiving threads
// with the main thread, which does the actual message processing
tapMessageChannel := make(rabtap.TapChannel)
taps := establishTaps(tapMessageChannel, tapConfig, tlsConfig)
defer tapCmdShutdownFunc(taps)

signal.Notify(signalChannel, os.Interrupt)

messageReceiveLoop(tapMessageChannel, messageReceiveFunc, signalChannel)
}

Expand Down
9 changes: 6 additions & 3 deletions cmd/main/main.go
Expand Up @@ -5,6 +5,7 @@ package main
import (
"crypto/tls"
"os"
"os/signal"

"github.com/jandelgado/rabtap/pkg"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -35,9 +36,9 @@ func getTLSConfig(insecureTLS bool) *tls.Config {
return &tls.Config{InsecureSkipVerify: insecureTLS}
}

func startCmdInfo(args CommandLineArgs) {
func startCmdInfo(args CommandLineArgs, title string) {
cmdInfo(CmdInfoArg{
rootNode: args.APIURI,
rootNode: title,
client: rabtap.NewRabbitHTTPClient(args.APIURI, getTLSConfig(args.InsecureTLS)),
printBrokerInfoConfig: PrintBrokerInfoConfig{
ShowStats: args.ShowStats,
Expand Down Expand Up @@ -66,6 +67,7 @@ func startCmdPublish(args CommandLineArgs) {
func startCmdSubscribe(args CommandLineArgs) {
// signalChannel receives ctrl+C/interrput signal
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, os.Interrupt)
// messageReceiveFunc receives the tapped messages, prints
// and optionally saves them.
messageReceiveFunc := createMessageReceiveFunc(
Expand All @@ -81,6 +83,7 @@ func startCmdSubscribe(args CommandLineArgs) {

func startCmdTap(args CommandLineArgs) {
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, os.Interrupt)
messageReceiveFunc := createMessageReceiveFunc(
NewColorableWriter(os.Stdout), args.JSONFormat,
args.SaveDir, args.NoColor)
Expand All @@ -100,7 +103,7 @@ func main() {

switch args.Cmd {
case InfoCmd:
startCmdInfo(args)
startCmdInfo(args, args.APIURI)
case SubCmd:
startCmdSubscribe(args)
case PubCmd:
Expand Down
14 changes: 12 additions & 2 deletions cmd/main/main_test.go
@@ -1,13 +1,12 @@
// Copyright (C) 2017 Jan Delgado

// +build integration

package main

import (
"errors"
"testing"

"github.com/jandelgado/rabtap/pkg/testcommon"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -45,3 +44,14 @@ func TestFailOnError(t *testing.T) {
assert.False(t, exitFuncCalled)

}

func ExamplestartCmdInfo() {
mock := testcommon.NewRabbitAPIMock(testcommon.MockModeEmpty)
defer mock.Close()

args := CommandLineArgs{APIURI: mock.URL, commonArgs: commonArgs{NoColor: true}}
startCmdInfo(args, "http://rootnode")

// Output:
// http://rootnode (broker ver=3.6.9, mgmt ver=3.6.9, cluster=rabbit@08f57d1fe8ab)
}
49 changes: 39 additions & 10 deletions cmd/main/subscribe_test.go
Expand Up @@ -3,11 +3,15 @@
package main

import (
"bytes"
"io/ioutil"
"os"
"strings"
"testing"

rabtap "github.com/jandelgado/rabtap/pkg"
"github.com/streadway/amqp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand All @@ -16,19 +20,44 @@ func TestCreateMessageReceiveFuncRawToFile(t *testing.T) {
require.Nil(t, err)
defer os.RemoveAll(testDir)

out := os.Stdout
rcvFunc := createMessageReceiveFunc(out, false, &testDir, false)
_ = rcvFunc(&amqp.Delivery{})
var b bytes.Buffer
rcvFunc := createMessageReceiveFunc(&b, false, &testDir, false)
_ = rcvFunc(&amqp.Delivery{Body: []byte("Testmessage")})

// TODO
assert.True(t, strings.Contains(b.String(), "Testmessage"))

// TODO make created filename predicatable and check written file
}

func TestCreateMessageReceiveFuncJSONToFile(t *testing.T) {
// testDir, err := ioutil.TempDir("", "")
// require.Nil(t, err)
// defer os.RemoveAll(testDir)
testDir, err := ioutil.TempDir("", "")
require.Nil(t, err)
defer os.RemoveAll(testDir)

var b bytes.Buffer
rcvFunc := createMessageReceiveFunc(&b, true, &testDir, false)
_ = rcvFunc(&amqp.Delivery{Body: []byte("Testmessage")})

assert.True(t, strings.Contains(b.String(), "\"Body\": \"VGVzdG1lc3NhZ2U=\""))

// TODO make created filename predicatable and check written file
}

// out := os.Stdout
// rcvFunc := createMessageReceiveFunc(out, true, &testDir, false)
// err = rcvFunc(&amqp.Delivery{})
func TestMessageReceiveLoop(t *testing.T) {
messageChan := make(rabtap.TapChannel)
signalChannel := make(chan os.Signal)
done := make(chan bool)
received := 0

receiveFunc := func(*amqp.Delivery) error {
received = received + 1
done <- true
return nil
}
go messageReceiveLoop(messageChan, receiveFunc, signalChannel)

messageChan <- &rabtap.TapMessage{}
<-done // TODO add timeout
signalChannel <- os.Interrupt // terminates go routine
assert.Equal(t, 1, received)
}
2 changes: 1 addition & 1 deletion pkg/tap_integration_test.go
Expand Up @@ -248,7 +248,7 @@ func TestIntegrationCloseTap(t *testing.T) {
assert.Nil(t, err)
assert.False(t, tap.Connected())

// try to close again
// try to close again, should not block & return error
err = tap.Close()
assert.NotNil(t, err)
assert.False(t, tap.Connected())
Expand Down

0 comments on commit 69311b0

Please sign in to comment.