Skip to content

Commit

Permalink
also consider DLX defined in queue's arguments attribute
Browse files Browse the repository at this point in the history
  • Loading branch information
jandelgado authored and Jan Delgado committed Aug 26, 2019
1 parent c7479c6 commit a044dd2
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 68 deletions.
14 changes: 7 additions & 7 deletions cmd/rabtap/cmd_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ func Example_cmdInfoByExchangeInTextFormat() {
// │ │ ├── some_consumer (consumer user='guest', prefetch=0, chan='172.17.0.1:40874 -> 172.17.0.2:5672 (1)')
// │ │ │ └── '172.17.0.1:40874 -> 172.17.0.2:5672' (connection client='https://github.com/streadway/amqp', host='172.17.0.2:5672', peer='172.17.0.1:40874')
// │ │ └── another_consumer w/ faulty channel (consumer user='', prefetch=0, chan='')
// │ └── direct-q2 (queue, key='direct-q2', running, [D|DLX])
// │ └── direct-q2 (queue, key='direct-q2', running, [D])
// ├── test-fanout (exchange, type 'fanout', [D])
// │ ├── fanout-q1 (queue, idle since 2017-05-25 19:14:32, [D])
// │ └── fanout-q2 (queue, idle since 2017-05-25 19:14:32, [D])
// ├── test-headers (exchange, type 'headers', [D|AD])
// │ ├── header-q1 (queue, key='headers-q1', idle since 2017-05-25 19:14:53, [D])
// │ └── header-q2 (queue, key='headers-q2', idle since 2017-05-25 19:14:47, [D])
// └── test-topic (exchange, type 'topic', [D])
// ├── topic-q1 (queue, key='topic-q1', idle since 2017-05-25 19:14:17, [D|AD|EX])
// ├── topic-q1 (queue, key='topic-q1', idle since 2017-05-25 19:14:17, [D|AD|EX|DLX])
// └── topic-q2 (queue, key='topic-q2', idle since 2017-05-25 19:14:21, [D])

}
Expand Down Expand Up @@ -153,9 +153,9 @@ const expectedResultDotByExchange = `digraph broker {
"boundqueue_direct-q1" [shape="record"; label="{ { Q | direct-q1 } | { D | | |<dlx> DLX } }"];
"boundqueue_direct-q1":dlx -> "exchange_mydlx";
"boundqueue_direct-q2" [shape="record"; label="{ { Q | direct-q2 } | { D | | |<dlx> DLX } }"];
"boundqueue_direct-q2" [shape="record"; label="{ { Q | direct-q2 } | { D | | | } }"];
"boundqueue_direct-q2":dlx -> "exchange_mydlx";
"exchange_test-fanout" [shape="record"; label="{ { E | test-fanout } |fanout | { D | | } }"];
"exchange_test-fanout" -> "boundqueue_fanout-q1" [fontsize=10; headport=n; label=""];
Expand Down Expand Up @@ -183,9 +183,9 @@ const expectedResultDotByExchange = `digraph broker {
"exchange_test-topic" -> "boundqueue_topic-q1" [fontsize=10; headport=n; label="topic-q1"];
"exchange_test-topic" -> "boundqueue_topic-q2" [fontsize=10; headport=n; label="topic-q2"];
"boundqueue_topic-q1" [shape="record"; label="{ { Q | topic-q1 } | { D | AD | EX | } }"];
"boundqueue_topic-q1" [shape="record"; label="{ { Q | topic-q1 } | { D | AD | EX |<dlx> DLX } }"];
"boundqueue_topic-q1":dlx -> "exchange_mydlx";
"boundqueue_topic-q2" [shape="record"; label="{ { Q | topic-q2 } | { D | | | } }"];
}`
Expand All @@ -211,7 +211,7 @@ func TestCmdInfoByExchangeInDotFormat(t *testing.T) {
out: os.Stdout})
}
result := testcommon.CaptureOutput(testfunc)
// fmt.Print(result)
// fmt.Print(result)
assert.Equal(t, strings.Trim(expectedResultDotByExchange, " \n"),
strings.Trim(result, " \n"))
}
Expand Down
83 changes: 35 additions & 48 deletions pkg/rabbitmq_rest_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,6 @@ func (s RabbitHTTPClient) Policies() ([]RabbitPolicy, error) {
return *res.(*[]RabbitPolicy), err
}

func fixQueueEffectivePolicyDefinitions(policies []RabbitPolicy, queues []RabbitQueue) {
// create EffectivePolicyDefinitions in RabbitQueues array since older API
// versions do not provide this attribute.
for qi, q := range queues {
if q.Policy == nil {
continue
}
if i := FindPolicyByName(policies, q.Vhost, *q.Policy); i != -1 {
policy := policies[i]
queues[qi].EffectivePolicyDefinition = policy.Definition
}
}
}

// BrokerInfo gets all resources of the broker in parallel
// TODO use a ctx to for timeout/cancellation
func (s RabbitHTTPClient) BrokerInfo() (BrokerInfo, error) {
Expand All @@ -163,16 +149,7 @@ func (s RabbitHTTPClient) BrokerInfo() (BrokerInfo, error) {
g.Go(func() (err error) { r.Consumers, err = s.Consumers(); return })
g.Go(func() (err error) { r.Bindings, err = s.Bindings(); return })
g.Go(func() (err error) { r.Policies, err = s.Policies(); return })
err := g.Wait()

if err != nil {
return r, err
}

// make older RabbitMQ API version compatbile (pre 3.7)
fixQueueEffectivePolicyDefinitions(r.Policies, r.Queues)

return r, nil
return r, g.Wait()
}

// CloseConnection closes a connection by DELETING the associated resource
Expand Down Expand Up @@ -468,18 +445,17 @@ type RabbitQueue struct {
ReductionsDetails struct {
Rate float64 `json:"rate"`
} `json:"reductions_details"`
Reductions int `json:"reductions"`
Node string `json:"node"`
Arguments struct {
} `json:"arguments"`
Exclusive bool `json:"exclusive"`
AutoDelete bool `json:"auto_delete"`
Durable bool `json:"durable"`
EffectivePolicyDefinition map[string]string `json:"effective_policy_definition"`
Vhost string `json:"vhost"`
Name string `json:"name"`
MessageBytesPagedOut int `json:"message_bytes_paged_out"`
MessagesPagedOut int `json:"messages_paged_out"`
Reductions int `json:"reductions"`
Node string `json:"node"`
Arguments map[string]interface{} `json:"arguments,omitempty"`
Exclusive bool `json:"exclusive"`
AutoDelete bool `json:"auto_delete"`
Durable bool `json:"durable"`
EffectivePolicyDefinition map[string]interface{} `json:"effective_policy_definition"`
Vhost string `json:"vhost"`
Name string `json:"name"`
MessageBytesPagedOut int `json:"message_bytes_paged_out"`
MessagesPagedOut int `json:"messages_paged_out"`
BackingQueueStatus struct {
Mode string `json:"mode"`
Q1 int `json:"q1"`
Expand Down Expand Up @@ -523,18 +499,29 @@ type RabbitQueue struct {
Memory int `json:"memory"`
}

// HasDlx returns true if the given queue has an associated DLX
// HasDlx returns true if the given queue has an associated DLX, either defined
// by a policy or passed by argument during creation.
func (s RabbitQueue) HasDlx() bool {
_, hasDlx := s.EffectivePolicyDefinition["dead-letter-exchange"]
return hasDlx
if _, hasDlx := s.EffectivePolicyDefinition["dead-letter-exchange"]; hasDlx {
return true
}
if _, hasDlx := s.Arguments["x-dead-letter-exchange"]; hasDlx {
return true
}
return false
}

// Dlx returns the name of the associated DLX, "" if not configured.
// Since "" denotes also the default exchange, make sure to check existence
// with HasDlx()
// with HasDlx().
func (s RabbitQueue) Dlx() string {
dlx, _ := s.EffectivePolicyDefinition["dead-letter-exchange"]
return dlx
if dlx, hasDlx := s.EffectivePolicyDefinition["dead-letter-exchange"]; hasDlx {
return dlx.(string)
}
if dlx, hasDlx := s.Arguments["x-dead-letter-exchange"]; hasDlx {
return dlx.(string)
}
return ""
}

// RabbitBinding models the /bindings resource of the rabbitmq http api
Expand Down Expand Up @@ -575,12 +562,12 @@ type RabbitExchange struct {

// RabbitPolicy models the /policies resource of the rabbitmq http api
type RabbitPolicy struct {
Vhost string `json:"vhost"`
Name string `json:"name"`
Pattern string `json:"pattern"`
ApplyTo string `json:"apply-to"`
Definition map[string]string `json:"definition"`
Priority int `json:"priority"`
Vhost string `json:"vhost"`
Name string `json:"name"`
Pattern string `json:"pattern"`
ApplyTo string `json:"apply-to"`
Definition map[string]interface{} `json:"definition"`
Priority int `json:"priority"`
}

// ChannelDetails model channel_details in RabbitConsumer
Expand Down
21 changes: 15 additions & 6 deletions pkg/rabbitmq_rest_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,22 @@ func TestFindBindingsByExchangeReturnsMatchingBindings(t *testing.T) {
assert.Equal(t, "q3", foundBindings[1].Destination)
}

func TestQueueDlxAccessorMethods(t *testing.T) {
dlxPolicy := map[string]string{"dead-letter-exchange": "mydlx"}
qWithDlx := RabbitQueue{EffectivePolicyDefinition: dlxPolicy}
assert.True(t, qWithDlx.HasDlx())
assert.Equal(t, "mydlx", qWithDlx.Dlx())

func TestQueueDlxAccessorMethodsWhenNotDefined(t *testing.T) {
qWithoutDlx := RabbitQueue{}
assert.False(t, qWithoutDlx.HasDlx())
assert.Equal(t, "", qWithoutDlx.Dlx())
}

func TestQueueDlxAccessorMethodsWhenDefinedInArguments(t *testing.T) {
args := map[string]interface{}{"x-dead-letter-exchange": "mydlx", "test": 1234, "empty": nil}
qWithDlx := RabbitQueue{Arguments: args}
assert.True(t, qWithDlx.HasDlx())
assert.Equal(t, "mydlx", qWithDlx.Dlx())
}

func TestQueueDlxAccessorMethodsWhenDefinedInPolicy(t *testing.T) {
dlxPolicy := map[string]interface{}{"dead-letter-exchange": "mydlx", "test": 1234, "empty": nil}
qWithDlx := RabbitQueue{EffectivePolicyDefinition: dlxPolicy}
assert.True(t, qWithDlx.HasDlx())
assert.Equal(t, "mydlx", qWithDlx.Dlx())
}
19 changes: 12 additions & 7 deletions pkg/testcommon/rabbitmq_rest_api_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,10 @@ const (
`

// result of GET /api/queues
// Queue direct-q1: policy = DLX and EffectivePolicyDefitions set
// Queue direct-q2: policy = DLX and EffectivePolicyDefitions not set
// Other queues: policy = null, EffectivePolicyDefinitions not set
// DLX-policies:
// DLX yes: Queue direct-q1: EffectivePolicyDefitions set
// DLX yes: Queue topic-q1: EffectivePolicyDefinitions not set, Arguments set
// Other queues: None set
queueResult = `
[
{
Expand Down Expand Up @@ -443,7 +444,8 @@ const (
"auto_delete": false,
"durable": true,
"effective_policy_definition": {
"dead-letter-exchange": "mydlx"
"dead-letter-exchange": "mydlx",
"some-other-value": 1234
},
"vhost": "/",
"name": "direct-q1",
Expand Down Expand Up @@ -491,7 +493,6 @@ const (
"recoverable_slaves": null,
"consumers": 4,
"exclusive_consumer_tag": null,
"policy": "DLX",
"consumer_utilisation": null,
"memory": 29840
},
Expand Down Expand Up @@ -565,7 +566,6 @@ const (
"recoverable_slaves": null,
"consumers": 0,
"exclusive_consumer_tag": null,
"policy": "DLX",
"consumer_utilisation": null,
"memory": 29840
},
Expand Down Expand Up @@ -895,6 +895,10 @@ const (
"durable": true,
"vhost": "/",
"name": "topic-q1",
"arguments": {
"x-dead-letter-exchange": "mydlx",
"some-other-value": 1234
},
"message_bytes_paged_out": 0,
"messages_paged_out": 0,
"backing_queue_status": {
Expand Down Expand Up @@ -1219,7 +1223,8 @@ const (
"priority" : 0,
"name" : "DLX",
"definition" : {
"dead-letter-exchange" : "mydlx"
"dead-letter-exchange" : "mydlx",
"some-other-value": 1234
},
"vhost" : "/",
"apply-to" : "queues"
Expand Down

0 comments on commit a044dd2

Please sign in to comment.