Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Jun 6, 2023
1 parent fc17c8a commit e52cd4e
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 2 deletions.
4 changes: 4 additions & 0 deletions bin/ci/before_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ $CTL add_vhost /
$CTL add_user guest guest
$CTL set_permissions -p / guest ".*" ".*" ".*"

$CTL add_user policymaker policymaker
$CTL set_user_tags policymaker "policymaker"
$CTL set_permissions -p / policymaker ".*" ".*" ".*"

# Reduce retention policy for faster publishing of stats
$CTL eval 'supervisor2:terminate_child(rabbit_mgmt_sup_sup, rabbit_mgmt_sup), application:set_env(rabbitmq_management, sample_retention_policies, [{global, [{605, 1}]}, {basic, [{605, 1}]}, {detailed, [{10, 1}]}]), rabbit_mgmt_sup_sup:start_child().'
$CTL eval 'supervisor2:terminate_child(rabbit_mgmt_agent_sup_sup, rabbit_mgmt_agent_sup), application:set_env(rabbitmq_management_agent, sample_retention_policies, [{global, [{605, 1}]}, {basic, [{605, 1}]}, {detailed, [{10, 1}]}]), rabbit_mgmt_agent_sup_sup:start_child().'
Expand Down
49 changes: 49 additions & 0 deletions connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,19 @@ type ConnectionInfo struct {
ConnectedAt uint64 `json:"connected_at,omitempty"`
}

// Connection of a specific user. This provides just enough information
// to the monitoring tools.
type UserConnectionInfo struct {
// Connection name
Name string `json:"name"`
// Node the client is connected to
Node string `json:"node"`
// Username
User string `json:"user"`
// Virtual host
Vhost string `json:"vhost"`
}

//
// GET /api/connections
//
Expand All @@ -99,6 +112,24 @@ func (c *Client) ListConnections() (rec []ConnectionInfo, err error) {
return rec, nil
}

//
// GET /api/connections/username/{username}
//

// ListConnections returns a list of client connections to target node.
func (c *Client) ListConnectionsOfUser(username string) (rec []UserConnectionInfo, err error) {
req, err := newGETRequest(c, "connections/username/"+url.PathEscape(username))
if err != nil {
return []UserConnectionInfo{}, err
}

if err = executeAndParseRequest(c, req, &rec); err != nil {
return []UserConnectionInfo{}, err
}

return rec, nil
}

//
// GET /api/connections/{name}
//
Expand Down Expand Up @@ -134,3 +165,21 @@ func (c *Client) CloseConnection(name string) (res *http.Response, err error) {

return res, nil
}

//
// DELETE /api/connections/username/{username}
//

// CloseConnection closes a connection.
func (c *Client) CloseAllConnectionsOfUser(username string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "connections/username/"+url.PathEscape(username), nil)
if err != nil {
return nil, err
}

if res, err = executeRequest(c, req); err != nil {
return nil, err
}

return res, nil
}
169 changes: 167 additions & 2 deletions rabbithole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rabbithole

import (
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
Expand Down Expand Up @@ -55,7 +56,12 @@ func FindUserByName(sl []UserInfo, name string) (u UserInfo) {
}

func openConnection(vhost string) *amqp.Connection {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/" + url.QueryEscape(vhost))
return openConnectionWithCredentials(vhost, "guest", "guest")
}

func openConnectionWithCredentials(vhost string, username string, password string) *amqp.Connection {
uri := fmt.Sprintf("amqp://%s:%s@localhost:5672/%s", username, password, url.QueryEscape(vhost))
conn, err := amqp.Dial(uri)
Ω(err).Should(BeNil())

if err != nil {
Expand Down Expand Up @@ -178,6 +184,118 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
})
})

// rabbitmq/rabbitmq-server#8482, rabbitmq/rabbitmq-server#5319
Context("DELETE /api/connections/username/{username} invoked by an administrator", func() {
It("closes the connection", func() {
// first close all connections
xs, _ := rmqc.ListConnections()
for _, c := range xs {
rmqc.CloseConnection(c.Name)
}
u := "policymaker"

Eventually(func(g Gomega) []UserConnectionInfo {
xs, err := rmqc.ListConnectionsOfUser(u)
Ω(err).Should(BeNil())

return xs
}).Should(BeEmpty())

conn := openConnectionWithCredentials("/", u, u)
defer conn.Close()

Eventually(func(g Gomega) []UserConnectionInfo {
xs, err := rmqc.ListConnectionsOfUser(u)
Ω(err).Should(BeNil())

return xs
}).ShouldNot(BeEmpty())

closeEvents := make(chan *amqp.Error)
conn.NotifyClose(closeEvents)

_, err := rmqc.CloseAllConnectionsOfUser(u)
Ω(err).Should(BeNil())

evt := <-closeEvents
Ω(evt).ShouldNot(BeNil())
Ω(evt.Code).Should(Equal(320))
Ω(evt.Reason).Should(Equal("CONNECTION_FORCED - Closed via management plugin"))
// server-initiated
Ω(evt.Server).Should(Equal(true))
})
})

// rabbitmq/rabbitmq-server#8482, rabbitmq/rabbitmq-server#5319
Context("DELETE /api/connections/username/{username} invoked by a non-privileged user, case 1", func() {
It("closes the connection", func() {
// first close all connections as an administrative user
xs, _ := rmqc.ListConnections()
for _, c := range xs {
rmqc.CloseConnection(c.Name)
}
u := "policymaker"

// an HTTP API client that uses policymaker-level permissions
alt_rmqc, _ := NewClient("http://127.0.0.1:15672", u, u)

Eventually(func(g Gomega) []ConnectionInfo {
xs, err := alt_rmqc.ListConnections()
Ω(err).Should(BeNil())

return xs
}).Should(BeEmpty())

conn := openConnectionWithCredentials("/", u, u)
defer conn.Close()

// the user can list their own connections
Eventually(func(g Gomega) []UserConnectionInfo {
xs, err := alt_rmqc.ListConnectionsOfUser(u)
Ω(err).Should(BeNil())

return xs
}).ShouldNot(BeEmpty())

closeEvents := make(chan *amqp.Error)
conn.NotifyClose(closeEvents)

// the user can close their own connections
_, err := alt_rmqc.CloseAllConnectionsOfUser(u)
Ω(err).Should(BeNil())

evt := <-closeEvents
Ω(evt).ShouldNot(BeNil())
Ω(evt.Code).Should(Equal(320))
Ω(evt.Reason).Should(Equal("CONNECTION_FORCED - Closed via management plugin"))
// server-initiated
Ω(evt.Server).Should(Equal(true))
})
})

// rabbitmq/rabbitmq-server#8482, rabbitmq/rabbitmq-server#5319
Context("DELETE /api/connections/username/{username} invoked by a non-privileged user, case 2", func() {
It("fails with insufficient permissions", func() {
u := "policymaker"

// an HTTP API client that uses policymaker-level permissions
alt_rmqc, _ := NewClient("http://127.0.0.1:15672", u, u)

conn := openConnection("/")
defer conn.Close()

// the user cannot list connections of the default administrative user
_, err := alt_rmqc.ListConnectionsOfUser("guest")
Ω(err).Should(HaveOccurred())
Ω(err.Error()).Should(Equal("Error: API responded with a 401 Unauthorized"))

// the user cannot close connections of the default administrative user
_, err = alt_rmqc.CloseAllConnectionsOfUser("guest")
Ω(err).Should(HaveOccurred())
Ω(err.Error()).Should(Equal("Error: API responded with a 401 Unauthorized"))
})
})

Context("EnabledProtocols", func() {
It("returns a list of enabled protocols", func() {
xs, err := rmqc.EnabledProtocols()
Expand Down Expand Up @@ -288,7 +406,6 @@ var _ = Describe("RabbitMQ HTTP API client", func() {

Context("GET /connections when there are active connections", func() {
It("returns decoded response", func() {
// this really should be tested with > 1 connection and channel. MK.
conn := openConnection("/")
defer conn.Close()

Expand Down Expand Up @@ -331,6 +448,54 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
})
})

Context("GET /connections/username/{username} when there are active connections", func() {
It("returns decoded response", func() {
conn := openConnectionWithCredentials("/", "policymaker", "policymaker")
defer conn.Close()

conn2 := openConnection("/")
defer conn2.Close()

ch, err := conn.Channel()
Ω(err).Should(BeNil())
defer ch.Close()

ch2, err := conn2.Channel()
Ω(err).Should(BeNil())
defer ch2.Close()

ch3, err := conn2.Channel()
Ω(err).Should(BeNil())
defer ch3.Close()

Eventually(func(g Gomega) []UserConnectionInfo {
xs, err := rmqc.ListConnectionsOfUser("guest")
Ω(err).Should(BeNil())

return xs
}).ShouldNot(BeEmpty())

Eventually(func(g Gomega) []UserConnectionInfo {
xs, err := rmqc.ListConnectionsOfUser("policymaker")
Ω(err).Should(BeNil())

return xs
}).ShouldNot(BeEmpty())

xs, err := rmqc.ListConnectionsOfUser("guest")
Ω(err).Should(BeNil())

info := xs[0]
Ω(info.Name).ShouldNot(BeNil())
Ω(info.User).ShouldNot(BeEmpty())
Ω(info.Vhost).Should(Equal("/"))

// administrative users can list connections of any user
_, err = rmqc.ListConnectionsOfUser("policymaker")
Ω(err).Should(BeNil())
})
})

Context("GET /channels when there are active connections with open channels", func() {
It("returns decoded response", func() {
cs, _ := rmqc.ListConnections()
Expand Down

0 comments on commit e52cd4e

Please sign in to comment.