Skip to content

Commit

Permalink
added new 'conn' command with option to close connections
Browse files Browse the repository at this point in the history
  • Loading branch information
jandelgado committed May 1, 2018
1 parent d90a54b commit eb379fc
Show file tree
Hide file tree
Showing 13 changed files with 289 additions and 13 deletions.
36 changes: 36 additions & 0 deletions README.md
Expand Up @@ -28,6 +28,7 @@ and exchanges, inspect broker.
* [Publish messages](#publish-messages)
* [Messages consumer (subscribe)](#messages-consumer-subscribe)
* [Poor mans shovel](#poor-mans-shovel)
* [Close connection](#close-connection)
* [JSON message format](#json-message-format)
* [Build from source](#build-from-source)
* [Test data generator](#test-data-generator)
Expand Down Expand Up @@ -56,6 +57,10 @@ Output of `rabtap info` command:

![info mode](doc/images/info.png)

Output of `rabtap info --stats` command, showing additional statistics:

![info mode](doc/images/info-stats.png)

Output of rabtap running in `tap` mode, showing message meta data
with unset attributes filtered out and the message body:

Expand Down Expand Up @@ -85,6 +90,7 @@ Usage:
rabtap queue create QUEUE [--uri URI] [-adkv]
rabtap queue bind QUEUE to EXCHANGE --bindingkey=KEY [--uri URI] [-kv]
rabtap queue rm QUEUE [--uri URI] [-kv]
rabtap conn close CONNECTION [--reason=REASON] [--api APIURI] [-kv]
rabtap --version
Examples:
Expand All @@ -99,13 +105,19 @@ Examples:
rabtap queue bind JDQ to amq.direct --bindingkey=key
rabtap queue rm JDQ
# use RABTAP_APIURI environment variable to specify mgmt api uri instead of --api
export RABTAP_APIURI=http://guest:guest@localhost:15672/api
raptap info
rabtap conn close "172.17.0.1:40874 -> 172.17.0.2:5672"
Options:
EXCHANGES comma-separated list of exchanges and binding keys,
e.g. amq.topic:# or exchange1:key1,exchange2:key2.
EXCHANGE name of an exchange, e.g. amq.direct.
FILE file to publish in pub mode. If omitted, stdin will
be read.
QUEUE name of a queue.
CONNECTION name of a connection.
-a, --autodelete create auto delete exchange/queue.
--api APIURI connect to given API server. If APIURI is omitted,
the environment variable RABTAP_APIURI will be used.
Expand All @@ -118,6 +130,8 @@ Options:
metadata and body (as-is) are saved separately.
-k, --insecure allow insecure TLS connections (no certificate check).
-n, --no-color don't colorize output (also environment variable NO_COLOR)
--reason=REASON reason why the connection was closed
[default: closed by rabtap].
-r, --routingkey KEY routing key to use in publish mode.
--saveto DIR also save messages and metadata to DIR.
--show-default include default exchange in output info command.
Expand All @@ -142,6 +156,7 @@ Rabtap understand the following commands:
* `info` - show broker related info (exchanges, queues, bindings, stats)
* `queue` - create/bind/remove queue
* `exchange` - create/remove exhange
* `connection` - close connections

### Broker URI specification

Expand Down Expand Up @@ -278,6 +293,27 @@ $ rabtap tap --uri amqp://broker1 my-topic-exchange:# --json | \
rabtap pub --uri amqp://broker2 amq.direct -r routingKey --json
```

#### Close connection

The `conn` command allows to close a connection. The name of the connection to
be closed is expected as parameter. Use the `info` command with the
`--consumers` option to find the connection associated with a queue. Example:

```
$ rabtap info
http://localhost:15672/api (broker ver='3.6.9', mgmt ver='3.6.9', cluster='rabbit@ae1ad1477419')
└── Vhost /
├── amq.direct (exchange, type 'direct', [D])
:
└── test-topic (exchange, type 'topic', [AD])
├── test-q-test-topic-0 (queue, key='test-q-test-topic-0', running, [])
│ └── __rabtap-consumer-4823a3c0 (consumer user='guest', chan='172.17.0.1:59228 -> 172.17.0.2:5672 (1)')
│ └── '172.17.0.1:59228 -> 172.17.0.2:5672' (connection client='https://github.com/streadway/amqp', host='172.17.0.2:5672', peer='172.17.0.1:59228')
├── test-q-test-topic-1 (queue, key='test-q-test-topic-1', running, [])
:
$ rabtap conn close '172.17.0.1:59228 -> 172.17.0.2:5672'
```

## JSON message format

When using the `--json` option, messages are print/read as a stream of JSON
Expand Down
16 changes: 16 additions & 0 deletions cmd/main/cmd_conn.go
@@ -0,0 +1,16 @@
package main

import (
"crypto/tls"
"fmt"
"os"

rabtap "github.com/jandelgado/rabtap/pkg"
)

func cmdConnClose(apiURI, connName, reason string, tlsConfig *tls.Config) error {
client := rabtap.NewRabbitHTTPClient(apiURI, tlsConfig)
err := client.CloseConnection(connName, reason)
failOnError(err, fmt.Sprintf("close connection '%s'", connName), os.Exit)
return err
}
80 changes: 80 additions & 0 deletions cmd/main/cmd_conn_test.go
@@ -0,0 +1,80 @@
// Copyright (C) 2017 Jan Delgado
// +build integration

package main

import (
"crypto/tls"
"testing"
"time"

rabtap "github.com/jandelgado/rabtap/pkg"
"github.com/jandelgado/rabtap/pkg/testcommon"
"github.com/streadway/amqp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func findClosedConnName(connectionsBefore []rabtap.RabbitConnection,
connectionsAfter []rabtap.RabbitConnection) string {
// given to lists of connections, find the first connection by name which
// is in the first, but not in the second list.
for _, ca := range connectionsAfter {
found := false
for _, cb := range connectionsBefore {
if ca.Name == cb.Name {
found = true
break
}
}
if !found {
return ca.Name
}
}
return ""
}

func TestCmdCloseConnection(t *testing.T) {

uri := testcommon.IntegrationAPIURIFromEnv()
client := rabtap.NewRabbitHTTPClient(uri, &tls.Config{})

// we can not get the name of a connection through the API of the AMQP client. So
// we figure out the connections name by comparing the list of active
// connection before and after we created our test connection. Therefore,
// make sure this test runs isolated on the broker.
connsBefore, err := client.Connections()
require.Nil(t, err)

// start the test connection to be terminated
conn, _ := testcommon.IntegrationTestConnection(t, "", "", 0, false)

// it takes a few seconds for the new connection to show up in the REST API
time.Sleep(time.Second * 5)

connsAfter, err := client.Connections()
require.Nil(t, err)

// we add a notification callback and expect the cb to be called
// when we close the connection via the API
errorChan := make(chan *amqp.Error)
conn.NotifyClose(errorChan)

connToClose := findClosedConnName(connsBefore, connsAfter)
require.NotEqual(t, "", connToClose)

// now close the newly created connection. TODO handle potential
// call to failOnError in cmdConnClose
err = cmdConnClose(uri, connToClose, "some reason", &tls.Config{})
require.Nil(t, err)

// ... and make sure it gets closed, notified by a message on the errorChan
connClosed := false
select {
case <-errorChan:
connClosed = true
case <-time.After(time.Second * 2):
assert.Fail(t, "did not receive notification within expected time")
}
assert.True(t, connClosed)
}
3 changes: 2 additions & 1 deletion cmd/main/cmd_subscribe_integration_test.go
Expand Up @@ -4,7 +4,8 @@

package main

// cmd_{sub, queueCreate, queueBind, queueDelete} integration test
// cmd_{exchangeCreate, sub, queueCreate, queueBind, queueDelete}
// integration test

import (
"crypto/tls"
Expand Down
55 changes: 47 additions & 8 deletions cmd/main/command_line.go
Expand Up @@ -29,6 +29,7 @@ Usage:
rabtap queue create QUEUE [--uri URI] [-adkv]
rabtap queue bind QUEUE to EXCHANGE --bindingkey=KEY [--uri URI] [-kv]
rabtap queue rm QUEUE [--uri URI] [-kv]
rabtap conn close CONNECTION [--reason=REASON] [--api APIURI] [-kv]
rabtap --version
Examples:
Expand All @@ -43,13 +44,19 @@ Examples:
rabtap queue bind JDQ to amq.direct --bindingkey=key
rabtap queue rm JDQ
# use RABTAP_APIURI environment variable to specify mgmt api uri instead of --api
export RABTAP_APIURI=http://guest:guest@localhost:15672/api
raptap info
rabtap conn close "172.17.0.1:40874 -> 172.17.0.2:5672"
Options:
EXCHANGES comma-separated list of exchanges and binding keys,
e.g. amq.topic:# or exchange1:key1,exchange2:key2.
EXCHANGE name of an exchange, e.g. amq.direct.
FILE file to publish in pub mode. If omitted, stdin will
be read.
QUEUE name of a queue.
CONNECTION name of a connection.
-a, --autodelete create auto delete exchange/queue.
--api APIURI connect to given API server. If APIURI is omitted,
the environment variable RABTAP_APIURI will be used.
Expand All @@ -62,6 +69,8 @@ Options:
metadata and body (as-is) are saved separately.
-k, --insecure allow insecure TLS connections (no certificate check).
-n, --no-color don't colorize output (also environment variable NO_COLOR)
--reason=REASON reason why the connection was closed
[default: closed by rabtap].
-r, --routingkey KEY routing key to use in publish mode.
--saveto DIR also save messages and metadata to DIR.
--show-default include default exchange in output info command.
Expand Down Expand Up @@ -96,6 +105,8 @@ const (
QueueRemoveCmd
// QueueBindCmd binds a queue to an exchange
QueueBindCmd
// ConnCloseCmd closes a connection
ConnCloseCmd
)

type commonArgs struct {
Expand Down Expand Up @@ -128,6 +139,9 @@ type CommandLineArgs struct {
Autodelete bool // queue create, exchange create
SaveDir *string // save mode: optional directory to stores files to
ShowDefaultExchange bool

ConnName string // conn mode: name of connection
CloseReason string // conn mode: reason of close
}

// getAmqpURI returns the ith entry of amqpURIs array or the value
Expand All @@ -153,6 +167,19 @@ func parseAmqpURI(args map[string]interface{}) (string, error) {
return uri, nil
}

func parseAPIURI(args map[string]interface{}) (string, error) {
var apiURI string
if args["--api"] != nil {
apiURI = args["--api"].(string)
} else {
apiURI = os.Getenv("RABTAP_APIURI")
}
if apiURI == "" {
return "", fmt.Errorf("--api omitted but RABTAP_APIURI not set in environment")
}
return apiURI, nil
}

func parseCommonArgs(args map[string]interface{}) commonArgs {
return commonArgs{
Verbose: args["--verbose"].(bool),
Expand All @@ -169,16 +196,26 @@ func parseInfoCmdArgs(args map[string]interface{}) (CommandLineArgs, error) {
ShowStats: args["--stats"].(bool),
ShowDefaultExchange: args["--show-default"].(bool)}

var apiURI string
if args["--api"] != nil {
apiURI = args["--api"].(string)
} else {
apiURI = os.Getenv("RABTAP_APIURI")
var err error
if result.APIURI, err = parseAPIURI(args); err != nil {
return result, err
}
if apiURI == "" {
return CommandLineArgs{}, fmt.Errorf("--api omitted but RABTAP_APIURI not set in environment")
return result, nil
}

func parseConnCmdArgs(args map[string]interface{}) (CommandLineArgs, error) {
result := CommandLineArgs{
commonArgs: parseCommonArgs(args)}

var err error
if result.APIURI, err = parseAPIURI(args); err != nil {
return result, err
}
if args["close"].(bool) {
result.Cmd = ConnCloseCmd
result.ConnName = args["CONNECTION"].(string)
result.CloseReason = args["--reason"].(string)
}
result.APIURI = apiURI
return result, nil
}

Expand Down Expand Up @@ -307,6 +344,8 @@ func ParseCommandLineArgs(cliArgs []string) (CommandLineArgs, error) {
return parseQueueCmdArgs(args)
} else if args["exchange"].(bool) {
return parseExchangeCmdArgs(args)
} else if args["conn"].(bool) {
return parseConnCmdArgs(args)
}
return CommandLineArgs{}, fmt.Errorf("command missing")
}
23 changes: 23 additions & 0 deletions cmd/main/command_line_test.go
Expand Up @@ -364,6 +364,29 @@ func TestCliRemoveExchange(t *testing.T) {
assert.Equal(t, "uri", args.AmqpURI)
}

func TestCliCloseConnectionWithDefaultReason(t *testing.T) {
args, err := ParseCommandLineArgs(
[]string{"conn", "close", "conn-name", "--api", "uri"})

assert.Nil(t, err)
assert.Equal(t, ConnCloseCmd, args.Cmd)
assert.Equal(t, "uri", args.APIURI)
assert.Equal(t, "conn-name", args.ConnName)
assert.Equal(t, "closed by rabtap", args.CloseReason)
}

func TestCliCloseConnection(t *testing.T) {
args, err := ParseCommandLineArgs(
[]string{"conn", "close", "conn-name", "--api", "uri",
"--reason", "reason"})

assert.Nil(t, err)
assert.Equal(t, ConnCloseCmd, args.Cmd)
assert.Equal(t, "uri", args.APIURI)
assert.Equal(t, "conn-name", args.ConnName)
assert.Equal(t, "reason", args.CloseReason)
}

func TestParseNoColorFromEnvironment(t *testing.T) {
const key = "NO_COLOR"
os.Setenv(key, "1")
Expand Down
3 changes: 3 additions & 0 deletions cmd/main/main.go
Expand Up @@ -126,5 +126,8 @@ func main() {
case QueueBindCmd:
cmdQueueBindToExchange(args.AmqpURI, args.QueueName,
args.QueueBindingKey, args.ExchangeName, tlsConfig)
case ConnCloseCmd:
cmdConnClose(args.APIURI, args.ConnName,
args.CloseReason, tlsConfig)
}
}
Binary file added doc/images/info-stats.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions pkg/amqp_connector.go
Expand Up @@ -93,6 +93,7 @@ func (s *AmqpConnector) Connected() bool {

// Try to connect to the RabbitMQ server as long as it takes to establish a
// connection. Will be interrupted by any message on the control channel.
// TODO fail on first errornous connection attempt, only re-connect later.
func (s *AmqpConnector) redial() (*amqp.Connection, error) {
s.connection = nil
s.connected.Store(stateConnecting)
Expand Down

0 comments on commit eb379fc

Please sign in to comment.