diff --git a/README.md b/README.md index 20be590..e4e9545 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ and exchanges, inspect broker. * [Publish messages](#publish-messages) * [Messages consumer (subscribe)](#messages-consumer-subscribe) * [Poor mans shovel](#poor-mans-shovel) + * [Close connection](#close-connection) * [JSON message format](#json-message-format) * [Build from source](#build-from-source) * [Test data generator](#test-data-generator) @@ -56,6 +57,10 @@ Output of `rabtap info` command: ![info mode](doc/images/info.png) +Output of `rabtap info --stats` command, showing additional statistics: + +![info mode](doc/images/info-stats.png) + Output of rabtap running in `tap` mode, showing message meta data with unset attributes filtered out and the message body: @@ -85,6 +90,7 @@ Usage: rabtap queue create QUEUE [--uri URI] [-adkv] rabtap queue bind QUEUE to EXCHANGE --bindingkey=KEY [--uri URI] [-kv] rabtap queue rm QUEUE [--uri URI] [-kv] + rabtap conn close CONNECTION [--reason=REASON] [--api APIURI] [-kv] rabtap --version Examples: @@ -99,6 +105,11 @@ Examples: rabtap queue bind JDQ to amq.direct --bindingkey=key rabtap queue rm JDQ + # use RABTAP_APIURI environment variable to specify mgmt api uri instead of --api + export RABTAP_APIURI=http://guest:guest@localhost:15672/api + raptap info + rabtap conn close "172.17.0.1:40874 -> 172.17.0.2:5672" + Options: EXCHANGES comma-separated list of exchanges and binding keys, e.g. amq.topic:# or exchange1:key1,exchange2:key2. @@ -106,6 +117,7 @@ Options: FILE file to publish in pub mode. If omitted, stdin will be read. QUEUE name of a queue. + CONNECTION name of a connection. -a, --autodelete create auto delete exchange/queue. --api APIURI connect to given API server. If APIURI is omitted, the environment variable RABTAP_APIURI will be used. @@ -118,6 +130,8 @@ Options: metadata and body (as-is) are saved separately. -k, --insecure allow insecure TLS connections (no certificate check). -n, --no-color don't colorize output (also environment variable NO_COLOR) + --reason=REASON reason why the connection was closed + [default: closed by rabtap]. -r, --routingkey KEY routing key to use in publish mode. --saveto DIR also save messages and metadata to DIR. --show-default include default exchange in output info command. @@ -142,6 +156,7 @@ Rabtap understand the following commands: * `info` - show broker related info (exchanges, queues, bindings, stats) * `queue` - create/bind/remove queue * `exchange` - create/remove exhange +* `connection` - close connections ### Broker URI specification @@ -278,6 +293,27 @@ $ rabtap tap --uri amqp://broker1 my-topic-exchange:# --json | \ rabtap pub --uri amqp://broker2 amq.direct -r routingKey --json ``` +#### Close connection + +The `conn` command allows to close a connection. The name of the connection to +be closed is expected as parameter. Use the `info` command with the +`--consumers` option to find the connection associated with a queue. Example: + +``` +$ rabtap info +http://localhost:15672/api (broker ver='3.6.9', mgmt ver='3.6.9', cluster='rabbit@ae1ad1477419') +└── Vhost / + ├── amq.direct (exchange, type 'direct', [D]) + : + └── test-topic (exchange, type 'topic', [AD]) + ├── test-q-test-topic-0 (queue, key='test-q-test-topic-0', running, []) + │ └── __rabtap-consumer-4823a3c0 (consumer user='guest', chan='172.17.0.1:59228 -> 172.17.0.2:5672 (1)') + │ └── '172.17.0.1:59228 -> 172.17.0.2:5672' (connection client='https://github.com/streadway/amqp', host='172.17.0.2:5672', peer='172.17.0.1:59228') + ├── test-q-test-topic-1 (queue, key='test-q-test-topic-1', running, []) + : +$ rabtap conn close '172.17.0.1:59228 -> 172.17.0.2:5672' +``` + ## JSON message format When using the `--json` option, messages are print/read as a stream of JSON diff --git a/cmd/main/cmd_conn.go b/cmd/main/cmd_conn.go new file mode 100644 index 0000000..76d5ec8 --- /dev/null +++ b/cmd/main/cmd_conn.go @@ -0,0 +1,16 @@ +package main + +import ( + "crypto/tls" + "fmt" + "os" + + rabtap "github.com/jandelgado/rabtap/pkg" +) + +func cmdConnClose(apiURI, connName, reason string, tlsConfig *tls.Config) error { + client := rabtap.NewRabbitHTTPClient(apiURI, tlsConfig) + err := client.CloseConnection(connName, reason) + failOnError(err, fmt.Sprintf("close connection '%s'", connName), os.Exit) + return err +} diff --git a/cmd/main/cmd_conn_test.go b/cmd/main/cmd_conn_test.go new file mode 100644 index 0000000..e4cf156 --- /dev/null +++ b/cmd/main/cmd_conn_test.go @@ -0,0 +1,80 @@ +// Copyright (C) 2017 Jan Delgado +// +build integration + +package main + +import ( + "crypto/tls" + "testing" + "time" + + rabtap "github.com/jandelgado/rabtap/pkg" + "github.com/jandelgado/rabtap/pkg/testcommon" + "github.com/streadway/amqp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func findClosedConnName(connectionsBefore []rabtap.RabbitConnection, + connectionsAfter []rabtap.RabbitConnection) string { + // given to lists of connections, find the first connection by name which + // is in the first, but not in the second list. + for _, ca := range connectionsAfter { + found := false + for _, cb := range connectionsBefore { + if ca.Name == cb.Name { + found = true + break + } + } + if !found { + return ca.Name + } + } + return "" +} + +func TestCmdCloseConnection(t *testing.T) { + + uri := testcommon.IntegrationAPIURIFromEnv() + client := rabtap.NewRabbitHTTPClient(uri, &tls.Config{}) + + // we can not get the name of a connection through the API of the AMQP client. So + // we figure out the connections name by comparing the list of active + // connection before and after we created our test connection. Therefore, + // make sure this test runs isolated on the broker. + connsBefore, err := client.Connections() + require.Nil(t, err) + + // start the test connection to be terminated + conn, _ := testcommon.IntegrationTestConnection(t, "", "", 0, false) + + // it takes a few seconds for the new connection to show up in the REST API + time.Sleep(time.Second * 5) + + connsAfter, err := client.Connections() + require.Nil(t, err) + + // we add a notification callback and expect the cb to be called + // when we close the connection via the API + errorChan := make(chan *amqp.Error) + conn.NotifyClose(errorChan) + + connToClose := findClosedConnName(connsBefore, connsAfter) + require.NotEqual(t, "", connToClose) + + // now close the newly created connection. TODO handle potential + // call to failOnError in cmdConnClose + err = cmdConnClose(uri, connToClose, "some reason", &tls.Config{}) + require.Nil(t, err) + + // ... and make sure it gets closed, notified by a message on the errorChan + connClosed := false + select { + case <-errorChan: + connClosed = true + case <-time.After(time.Second * 2): + assert.Fail(t, "did not receive notification within expected time") + } + assert.True(t, connClosed) +} diff --git a/cmd/main/cmd_subscribe_integration_test.go b/cmd/main/cmd_subscribe_integration_test.go index f102484..e3d4c60 100644 --- a/cmd/main/cmd_subscribe_integration_test.go +++ b/cmd/main/cmd_subscribe_integration_test.go @@ -4,7 +4,8 @@ package main -// cmd_{sub, queueCreate, queueBind, queueDelete} integration test +// cmd_{exchangeCreate, sub, queueCreate, queueBind, queueDelete} +// integration test import ( "crypto/tls" diff --git a/cmd/main/command_line.go b/cmd/main/command_line.go index 996dd2f..7db6943 100644 --- a/cmd/main/command_line.go +++ b/cmd/main/command_line.go @@ -29,6 +29,7 @@ Usage: rabtap queue create QUEUE [--uri URI] [-adkv] rabtap queue bind QUEUE to EXCHANGE --bindingkey=KEY [--uri URI] [-kv] rabtap queue rm QUEUE [--uri URI] [-kv] + rabtap conn close CONNECTION [--reason=REASON] [--api APIURI] [-kv] rabtap --version Examples: @@ -43,6 +44,11 @@ Examples: rabtap queue bind JDQ to amq.direct --bindingkey=key rabtap queue rm JDQ + # use RABTAP_APIURI environment variable to specify mgmt api uri instead of --api + export RABTAP_APIURI=http://guest:guest@localhost:15672/api + raptap info + rabtap conn close "172.17.0.1:40874 -> 172.17.0.2:5672" + Options: EXCHANGES comma-separated list of exchanges and binding keys, e.g. amq.topic:# or exchange1:key1,exchange2:key2. @@ -50,6 +56,7 @@ Options: FILE file to publish in pub mode. If omitted, stdin will be read. QUEUE name of a queue. + CONNECTION name of a connection. -a, --autodelete create auto delete exchange/queue. --api APIURI connect to given API server. If APIURI is omitted, the environment variable RABTAP_APIURI will be used. @@ -62,6 +69,8 @@ Options: metadata and body (as-is) are saved separately. -k, --insecure allow insecure TLS connections (no certificate check). -n, --no-color don't colorize output (also environment variable NO_COLOR) + --reason=REASON reason why the connection was closed + [default: closed by rabtap]. -r, --routingkey KEY routing key to use in publish mode. --saveto DIR also save messages and metadata to DIR. --show-default include default exchange in output info command. @@ -96,6 +105,8 @@ const ( QueueRemoveCmd // QueueBindCmd binds a queue to an exchange QueueBindCmd + // ConnCloseCmd closes a connection + ConnCloseCmd ) type commonArgs struct { @@ -128,6 +139,9 @@ type CommandLineArgs struct { Autodelete bool // queue create, exchange create SaveDir *string // save mode: optional directory to stores files to ShowDefaultExchange bool + + ConnName string // conn mode: name of connection + CloseReason string // conn mode: reason of close } // getAmqpURI returns the ith entry of amqpURIs array or the value @@ -153,6 +167,19 @@ func parseAmqpURI(args map[string]interface{}) (string, error) { return uri, nil } +func parseAPIURI(args map[string]interface{}) (string, error) { + var apiURI string + if args["--api"] != nil { + apiURI = args["--api"].(string) + } else { + apiURI = os.Getenv("RABTAP_APIURI") + } + if apiURI == "" { + return "", fmt.Errorf("--api omitted but RABTAP_APIURI not set in environment") + } + return apiURI, nil +} + func parseCommonArgs(args map[string]interface{}) commonArgs { return commonArgs{ Verbose: args["--verbose"].(bool), @@ -169,16 +196,26 @@ func parseInfoCmdArgs(args map[string]interface{}) (CommandLineArgs, error) { ShowStats: args["--stats"].(bool), ShowDefaultExchange: args["--show-default"].(bool)} - var apiURI string - if args["--api"] != nil { - apiURI = args["--api"].(string) - } else { - apiURI = os.Getenv("RABTAP_APIURI") + var err error + if result.APIURI, err = parseAPIURI(args); err != nil { + return result, err } - if apiURI == "" { - return CommandLineArgs{}, fmt.Errorf("--api omitted but RABTAP_APIURI not set in environment") + return result, nil +} + +func parseConnCmdArgs(args map[string]interface{}) (CommandLineArgs, error) { + result := CommandLineArgs{ + commonArgs: parseCommonArgs(args)} + + var err error + if result.APIURI, err = parseAPIURI(args); err != nil { + return result, err + } + if args["close"].(bool) { + result.Cmd = ConnCloseCmd + result.ConnName = args["CONNECTION"].(string) + result.CloseReason = args["--reason"].(string) } - result.APIURI = apiURI return result, nil } @@ -307,6 +344,8 @@ func ParseCommandLineArgs(cliArgs []string) (CommandLineArgs, error) { return parseQueueCmdArgs(args) } else if args["exchange"].(bool) { return parseExchangeCmdArgs(args) + } else if args["conn"].(bool) { + return parseConnCmdArgs(args) } return CommandLineArgs{}, fmt.Errorf("command missing") } diff --git a/cmd/main/command_line_test.go b/cmd/main/command_line_test.go index b9480d5..bab1146 100644 --- a/cmd/main/command_line_test.go +++ b/cmd/main/command_line_test.go @@ -364,6 +364,29 @@ func TestCliRemoveExchange(t *testing.T) { assert.Equal(t, "uri", args.AmqpURI) } +func TestCliCloseConnectionWithDefaultReason(t *testing.T) { + args, err := ParseCommandLineArgs( + []string{"conn", "close", "conn-name", "--api", "uri"}) + + assert.Nil(t, err) + assert.Equal(t, ConnCloseCmd, args.Cmd) + assert.Equal(t, "uri", args.APIURI) + assert.Equal(t, "conn-name", args.ConnName) + assert.Equal(t, "closed by rabtap", args.CloseReason) +} + +func TestCliCloseConnection(t *testing.T) { + args, err := ParseCommandLineArgs( + []string{"conn", "close", "conn-name", "--api", "uri", + "--reason", "reason"}) + + assert.Nil(t, err) + assert.Equal(t, ConnCloseCmd, args.Cmd) + assert.Equal(t, "uri", args.APIURI) + assert.Equal(t, "conn-name", args.ConnName) + assert.Equal(t, "reason", args.CloseReason) +} + func TestParseNoColorFromEnvironment(t *testing.T) { const key = "NO_COLOR" os.Setenv(key, "1") diff --git a/cmd/main/main.go b/cmd/main/main.go index acfbc53..2adefc5 100644 --- a/cmd/main/main.go +++ b/cmd/main/main.go @@ -126,5 +126,8 @@ func main() { case QueueBindCmd: cmdQueueBindToExchange(args.AmqpURI, args.QueueName, args.QueueBindingKey, args.ExchangeName, tlsConfig) + case ConnCloseCmd: + cmdConnClose(args.APIURI, args.ConnName, + args.CloseReason, tlsConfig) } } diff --git a/doc/images/info-stats.png b/doc/images/info-stats.png new file mode 100644 index 0000000..3fb0153 Binary files /dev/null and b/doc/images/info-stats.png differ diff --git a/pkg/amqp_connector.go b/pkg/amqp_connector.go index d7f13f6..f17fa91 100644 --- a/pkg/amqp_connector.go +++ b/pkg/amqp_connector.go @@ -93,6 +93,7 @@ func (s *AmqpConnector) Connected() bool { // Try to connect to the RabbitMQ server as long as it takes to establish a // connection. Will be interrupted by any message on the control channel. +// TODO fail on first errornous connection attempt, only re-connect later. func (s *AmqpConnector) redial() (*amqp.Connection, error) { s.connection = nil s.connected.Store(stateConnecting) diff --git a/pkg/rabbitmq_rest_client.go b/pkg/rabbitmq_rest_client.go index b37e332..e75ea07 100644 --- a/pkg/rabbitmq_rest_client.go +++ b/pkg/rabbitmq_rest_client.go @@ -7,6 +7,7 @@ import ( "crypto/tls" "encoding/json" "errors" + "io/ioutil" "log" "net/http" ) @@ -382,6 +383,32 @@ func (s *RabbitHTTPClient) getResource(uri string, result interface{}) error { return nil } +func (s *RabbitHTTPClient) delResource(uri string) error { + client := &http.Client{} + + req, err := http.NewRequest("DELETE", uri, nil) + if err != nil { + return err + } + + // Fetch Request + resp, err := client.Do(req) + if err != nil { + return err + } + if resp.StatusCode != 200 && resp.StatusCode != 204 { + return errors.New(uri + " : " + resp.Status) + } + defer resp.Body.Close() + + _, err = ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + return nil +} + // Connections returns the /connections ressource of the broker func (s *RabbitHTTPClient) Connections() ([]RabbitConnection, error) { var result []RabbitConnection @@ -431,3 +458,9 @@ func (s *RabbitHTTPClient) Bindings() ([]RabbitBinding, error) { err := s.getResource(s.uri+"/bindings", &result) return result, err } + +// CloseConnection closes a connection by DELETING the associated +// ressource +func (s *RabbitHTTPClient) CloseConnection(conn, reason string) error { + return s.delResource(s.uri + "/connections/" + conn) +} diff --git a/pkg/rabbitmq_rest_client_test.go b/pkg/rabbitmq_rest_client_test.go index 32ea9b2..be9e004 100644 --- a/pkg/rabbitmq_rest_client_test.go +++ b/pkg/rabbitmq_rest_client_test.go @@ -162,3 +162,25 @@ func TestRabbitClientGetConsumersChannelDetailsIsEmptyArray(t *testing.T) { assert.Equal(t, "another_consumer w/ faulty channel", consumer[1].ConsumerTag) assert.Equal(t, "", consumer[1].ChannelDetails.Name) } + +// test of DELETE /connections/conn to close a connection +func TestRabbitClientCloseExistingConnection(t *testing.T) { + + mock := testcommon.NewRabbitAPIMock(testcommon.MockModeStd) + defer mock.Close() + client := NewRabbitHTTPClient(mock.URL, &tls.Config{}) + + err := client.CloseConnection("172.17.0.1:40874 -> 172.17.0.2:5672", "reason") + assert.Nil(t, err) +} + +// test of DELETE /connections/conn to close a connection +func TestRabbitClientCloseNonExistingConnectionRaisesError(t *testing.T) { + + mock := testcommon.NewRabbitAPIMock(testcommon.MockModeStd) + defer mock.Close() + client := NewRabbitHTTPClient(mock.URL, &tls.Config{}) + + err := client.CloseConnection("DOES NOT EXIST", "reason") + assert.NotNil(t, err) +} diff --git a/pkg/testcommon/rabbitmq_rest_api_mock.go b/pkg/testcommon/rabbitmq_rest_api_mock.go index 1015591..7a062b7 100644 --- a/pkg/testcommon/rabbitmq_rest_api_mock.go +++ b/pkg/testcommon/rabbitmq_rest_api_mock.go @@ -20,7 +20,8 @@ const ( // NewRabbitAPIMock returns a mock server for the rabbitmq http managemet // API. It is used by the integration test. Only a very limited subset -// of resources is support (GET exchanges, bindings, queues, overviews). +// of resources is support (GET exchanges, bindings, queues, overviews, +// channels, connections; DELETE connections) // Usage: // mockServer := NewRabbitAPIMock(MockModeStd) // defer mockServer.Close() @@ -36,7 +37,6 @@ func NewRabbitAPIMock(mode MockMode) *httptest.Server { } func mockEmptyHandler(w http.ResponseWriter, r *http.Request) { - // TODO add test for GET var result string if r.URL.RequestURI() == "/overview" { result = overviewResult @@ -47,7 +47,17 @@ func mockEmptyHandler(w http.ResponseWriter, r *http.Request) { } func mockStdHandler(w http.ResponseWriter, r *http.Request) { - // TODO add test for GET + switch r.Method { + case "GET": + mockStdGetHandler(w, r) + case "DELETE": + mockStdDeleteHandler(w, r) + default: + w.WriteHeader(http.StatusBadRequest) + } +} + +func mockStdGetHandler(w http.ResponseWriter, r *http.Request) { result := "" switch r.URL.RequestURI() { case "/exchanges": @@ -66,10 +76,22 @@ func mockStdHandler(w http.ResponseWriter, r *http.Request) { result = connectionResult default: w.WriteHeader(http.StatusNotFound) + return } + w.WriteHeader(http.StatusOK) fmt.Fprint(w, result) } +func mockStdDeleteHandler(w http.ResponseWriter, r *http.Request) { + switch r.URL.RequestURI() { + case "/connections/172.17.0.1:40874%20-%3E%20172.17.0.2:5672": + w.WriteHeader(http.StatusNoContent) + default: + w.WriteHeader(http.StatusNotFound) + } + fmt.Fprint(w, "") +} + var bindingResult = ` [ { diff --git a/pkg/testcommon/test_common.go b/pkg/testcommon/test_common.go index 08f1b30..3c98594 100644 --- a/pkg/testcommon/test_common.go +++ b/pkg/testcommon/test_common.go @@ -38,7 +38,7 @@ func IntegrationQueueName(i int) string { return fmt.Sprintf("queue-%d", i) } -// IntegrationTestConnection creates connection to rabbitmq broker and set up +// IntegrationTestConnection creates connection to rabbitmq broker and sets up // optionally an exchange of the given type and bind given number of queues to // the exchange. The binding key will aways be the queue name. The queues are // named "queue-0" "queue-1" etc (see integrationQueueName() func). If