Skip to content

Commit

Permalink
support older RabbitMQ API
Browse files Browse the repository at this point in the history
  • Loading branch information
jandelgado committed Aug 26, 2019
1 parent b7ef463 commit c7479c6
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 8 deletions.
8 changes: 4 additions & 4 deletions cmd/rabtap/cmd_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func Example_cmdInfoByExchangeInTextFormat() {
// │ │ ├── some_consumer (consumer user='guest', prefetch=0, chan='172.17.0.1:40874 -> 172.17.0.2:5672 (1)')
// │ │ │ └── '172.17.0.1:40874 -> 172.17.0.2:5672' (connection client='https://github.com/streadway/amqp', host='172.17.0.2:5672', peer='172.17.0.1:40874')
// │ │ └── another_consumer w/ faulty channel (consumer user='', prefetch=0, chan='')
// │ └── direct-q2 (queue, key='direct-q2', running, [D])
// │ └── direct-q2 (queue, key='direct-q2', running, [D|DLX])
// ├── test-fanout (exchange, type 'fanout', [D])
// │ ├── fanout-q1 (queue, idle since 2017-05-25 19:14:32, [D])
// │ └── fanout-q2 (queue, idle since 2017-05-25 19:14:32, [D])
Expand Down Expand Up @@ -153,9 +153,9 @@ const expectedResultDotByExchange = `digraph broker {
"boundqueue_direct-q1" [shape="record"; label="{ { Q | direct-q1 } | { D | | |<dlx> DLX } }"];
"boundqueue_direct-q1":dlx -> "exchange_mydlx";
"boundqueue_direct-q2" [shape="record"; label="{ { Q | direct-q2 } | { D | | | } }"];
"boundqueue_direct-q2" [shape="record"; label="{ { Q | direct-q2 } | { D | | |<dlx> DLX } }"];
"boundqueue_direct-q2":dlx -> "exchange_mydlx";
"exchange_test-fanout" [shape="record"; label="{ { E | test-fanout } |fanout | { D | | } }"];
"exchange_test-fanout" -> "boundqueue_fanout-q1" [fontsize=10; headport=n; label=""];
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestCmdInfoByExchangeInDotFormat(t *testing.T) {
out: os.Stdout})
}
result := testcommon.CaptureOutput(testfunc)
// fmt.Print(result)
// fmt.Print(result)
assert.Equal(t, strings.Trim(expectedResultDotByExchange, " \n"),
strings.Trim(result, " \n"))
}
Expand Down
57 changes: 55 additions & 2 deletions pkg/rabbitmq_rest_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type BrokerInfo struct {
Queues []RabbitQueue
Consumers []RabbitConsumer
Bindings []RabbitBinding
Policies []RabbitPolicy
}

// Overview returns the /overview resource of the RabbitMQ REST API
Expand Down Expand Up @@ -130,6 +131,26 @@ func (s RabbitHTTPClient) Bindings() ([]RabbitBinding, error) {
return *res.(*[]RabbitBinding), err
}

// Policies returns the /policies resource of the RabbitMQ REST API
func (s RabbitHTTPClient) Policies() ([]RabbitPolicy, error) {
res, err := s.getResource(httpRequest{"policies", reflect.TypeOf([]RabbitPolicy{})})
return *res.(*[]RabbitPolicy), err
}

func fixQueueEffectivePolicyDefinitions(policies []RabbitPolicy, queues []RabbitQueue) {
// create EffectivePolicyDefinitions in RabbitQueues array since older API
// versions do not provide this attribute.
for qi, q := range queues {
if q.Policy == nil {
continue
}
if i := FindPolicyByName(policies, q.Vhost, *q.Policy); i != -1 {
policy := policies[i]
queues[qi].EffectivePolicyDefinition = policy.Definition
}
}
}

// BrokerInfo gets all resources of the broker in parallel
// TODO use a ctx to for timeout/cancellation
func (s RabbitHTTPClient) BrokerInfo() (BrokerInfo, error) {
Expand All @@ -141,7 +162,17 @@ func (s RabbitHTTPClient) BrokerInfo() (BrokerInfo, error) {
g.Go(func() (err error) { r.Queues, err = s.Queues(); return })
g.Go(func() (err error) { r.Consumers, err = s.Consumers(); return })
g.Go(func() (err error) { r.Bindings, err = s.Bindings(); return })
return r, g.Wait()
g.Go(func() (err error) { r.Policies, err = s.Policies(); return })
err := g.Wait()

if err != nil {
return r, err
}

// make older RabbitMQ API version compatbile (pre 3.7)
fixQueueEffectivePolicyDefinitions(r.Policies, r.Queues)

return r, nil
}

// CloseConnection closes a connection by DELETING the associated resource
Expand Down Expand Up @@ -205,6 +236,18 @@ func FindBindingsForExchange(exchange RabbitExchange, bindings []RabbitBinding)
// return -1
// }

// FindPolicyByName searches in the policies array for a policy with the given
// name and vhost. index is returned or -1 if nothing is found.
func FindPolicyByName(policies []RabbitPolicy,
vhost, policyName string) int {
for i, policy := range policies {
if policy.Name == policyName && policy.Vhost == vhost {
return i
}
}
return -1
}

// FindConnectionByName searches in the connections array for a connection with the given
// name and vhost. index is returned or -1 if nothing is found.
func FindConnectionByName(conns []RabbitConnection,
Expand Down Expand Up @@ -473,7 +516,7 @@ type RabbitQueue struct {
// RecoverableSlaves interface{} `json:"recoverable_slaves"`
Consumers int `json:"consumers"`
// ExclusiveConsumerTag interface{} `json:"exclusive_consumer_tag"`
// Policy interface{} `json:"policy"`
Policy *string `json:"policy,omitempty"`
// ConsumerUtilisation interface{} `json:"consumer_utilisation"`
// TODO use cusom marshaller and parese into time.Time
IdleSince string `json:"idle_since"`
Expand Down Expand Up @@ -530,6 +573,16 @@ type RabbitExchange struct {
} `json:"message_stats,omitempty"`
}

// RabbitPolicy models the /policies resource of the rabbitmq http api
type RabbitPolicy struct {
Vhost string `json:"vhost"`
Name string `json:"name"`
Pattern string `json:"pattern"`
ApplyTo string `json:"apply-to"`
Definition map[string]string `json:"definition"`
Priority int `json:"priority"`
}

// ChannelDetails model channel_details in RabbitConsumer
type ChannelDetails struct {
PeerHost string `json:"peer_host"`
Expand Down
16 changes: 16 additions & 0 deletions pkg/rabbitmq_rest_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,22 @@ func TestFindQueueByNameNotFound(t *testing.T) {
assert.Equal(t, -1, FindQueueByName(queues, "/", "not-available"))
}

func TestFindPolicyByName(t *testing.T) {
policies := []RabbitPolicy{
{Name: "pol0", Vhost: "vhost"},
{Name: "pol1", Vhost: "vhost"},
}
assert.Equal(t, 1, FindPolicyByName(policies, "vhost", "pol1"))
}

func TestFindPolicyByNameNotFound(t *testing.T) {
policies := []RabbitPolicy{
{Name: "pol0", Vhost: "vhost"},
{Name: "pol1", Vhost: "vhost"},
}
assert.Equal(t, -1, FindPolicyByName(policies, "vhost", "not-available"))
}

func TestFindConnectionByName(t *testing.T) {
conns := []RabbitConnection{
{Name: "c1", Vhost: "vhost"},
Expand Down
23 changes: 21 additions & 2 deletions pkg/testcommon/rabbitmq_rest_api_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func mockStdGetHandler(w http.ResponseWriter, r *http.Request) {
result = overviewResult
case "/consumers":
result = consumerResult
case "/policies":
result = policyResult
case "/channels":
result = channelResult
case "/connections":
Expand Down Expand Up @@ -411,6 +413,9 @@ const (
`

// result of GET /api/queues
// Queue direct-q1: policy = DLX and EffectivePolicyDefitions set
// Queue direct-q2: policy = DLX and EffectivePolicyDefitions not set
// Other queues: policy = null, EffectivePolicyDefinitions not set
queueResult = `
[
{
Expand Down Expand Up @@ -486,7 +491,7 @@ const (
"recoverable_slaves": null,
"consumers": 4,
"exclusive_consumer_tag": null,
"policy": null,
"policy": "DLX",
"consumer_utilisation": null,
"memory": 29840
},
Expand Down Expand Up @@ -560,7 +565,7 @@ const (
"recoverable_slaves": null,
"consumers": 0,
"exclusive_consumer_tag": null,
"policy": null,
"policy": "DLX",
"consumer_utilisation": null,
"memory": 29840
},
Expand Down Expand Up @@ -1207,6 +1212,20 @@ const (
}
]`

policyResult = `
[
{
"pattern" : "myqueue",
"priority" : 0,
"name" : "DLX",
"definition" : {
"dead-letter-exchange" : "mydlx"
},
"vhost" : "/",
"apply-to" : "queues"
}
]
`
channelResult = `
[
Expand Down

0 comments on commit c7479c6

Please sign in to comment.