Permalink
Browse files

Support for new Permissions functionality in RabbitMQ 1.6.0

  • Loading branch information...
1 parent 2e0181b commit 70b32f3d8b671cb8a24486ccab72331eafd9ae3c @clemesha-ooi clemesha-ooi committed Sep 17, 2009
Showing with 57 additions and 59 deletions.
  1. +38 −22 rabbitmqctl_service.py
  2. +19 −37 tests/test_service.py
@@ -94,38 +94,54 @@ def list_vhosts(self):
returnValue(response)
@inlineCallbacks
- def map_user_vhost(self, username, vhostpath):
- """allow access of user to vhost"""
- username, vhostpath = Binary(username), Binary(vhostpath)
- result = yield self.process.callRemote(self.nodename, self.module, "map_user_vhost", username, vhostpath)
- response = {"command":"map_user_vhost", "username":username.value, "vhostpath":vhostpath.value, "result":result.text}
+ def set_permissions(self, username, vhostpath, config_regex, write_regex, read_regex):
+ """set permission of a user to broker resources"""
+ username, vhostpath = Binary(username), Binary(vhostpath), Binary(config_regex), Binary(write_regex), Binary(read_regex)
+ result = yield self.process.callRemote(self.nodename, self.module, "set_permissions", username, \
+ vhostpath, config_regex, write_regex, read_regex)
+ response = {"command":"set_permissions", "username":username.value, "vhostpath":vhostpath.value, "result":result.text}
returnValue(response)
@inlineCallbacks
- def unmap_user_vhost(self, username, vhostpath):
- """deny access of user to vhost"""
+ def clear_permissions(self, username, vhostpath):
+ """clear user permissions"""
username, vhostpath = Binary(username), Binary(vhostpath)
- result = yield self.process.callRemote(self.nodename, self.module, "unmap_user_vhost", username, vhostpath)
- response = {"command":"unmap_user_vhost", "username":username.value, "vhostpath":vhostpath.value, "result":result.text}
+ result = yield self.process.callRemote(self.nodename, self.module, "clear_permissions", username, vhostpath)
+ response = {"command":"clear_permissions", "username":username.value, "vhostpath":vhostpath.value, "result":result.text}
returnValue(response)
@inlineCallbacks
- def list_user_vhosts(self, username):
- """list all vhosts for user"""
- username = Binary(username)
- result = yield self.process.callRemote(self.nodename, self.module, "list_user_vhosts", username)
- #XXX check for failure: (<Atom at 0x2883690, text 'error'>, (<Atom at 0x2883710, text 'no_such_user'>,
- vhosts = [vhost.value for vhost in result]
- response = {"command":"list_user_vhosts", "username":username.value, "result":vhosts}
+ def list_vhost_permissions(self, vhostpath=None):
+ """list all users permissions"""
+ if vhostpath is None:
+ vhostpath = "/"
+ vhostpath = Binary(vhostpath)
+ result = yield self.process.callRemote(self.nodename, self.module, "list_vhost_permissions", vhostpath)
+ result_all = {}
+ for v in result:
+ username = v[0].value
+ config_regex = v[1].value
+ write_regex = v[2].value
+ read_regex = v[3].value
+ result_all[(username, vhostpath.value)] = [config_regex, write_regex, read_regex]
+ response = {"command":"list_vhost_permissions", "vhostpath":vhostpath.value, "result":result_all}
returnValue(response)
@inlineCallbacks
- def list_vhost_users(self, vhostpath):
- """list all users in vhost"""
- vhostpath = Binary(vhostpath)
- result = yield self.process.callRemote(self.nodename, self.module, "list_vhost_users", vhostpath)
- users = [user.value for user in result]
- response = {"command":"list_vhost_users", "vhostpath":vhostpath.value, "result":users}
+ def list_user_permissions(self, username=None):
+ """list all users permissions"""
+ if username is None:
+ username = "guest"
+ username = Binary(username)
+ result = yield self.process.callRemote(self.nodename, self.module, "list_user_permissions", username)
+ result_all = {}
+ for v in result:
+ vhostpath = v[0].value
+ config_regex = v[1].value
+ write_regex = v[2].value
+ read_regex = v[3].value
+ result_all[(vhostpath, username.value)] = [config_regex, write_regex, read_regex]
+ response = {"command":"list_user_permissions", "vhostpath":username.value, "result":result_all}
returnValue(response)
@inlineCallbacks
View
@@ -45,20 +45,6 @@ def test_list_vhosts(self):
self.failUnless("/" in vhosts["result"])
@inlineCallbacks
- def test_list_user_vhosts(self):
- vhosts = yield self.service.list_user_vhosts("guest")
- self.failUnless(vhosts["username"] == "guest")
- self.failUnless(vhosts["command"] == "list_user_vhosts")
- self.failUnless("/" in vhosts["result"])
-
- @inlineCallbacks
- def test_list_vhost_users(self):
- users = yield self.service.list_vhost_users("/")
- self.failUnless(users["vhostpath"] == "/")
- self.failUnless(users["command"] == "list_vhost_users")
- self.failUnless("guest" in users["result"])
-
- @inlineCallbacks
def test_add_changepassword_delete_user(self):
"""New user test.
Add a new user, change new user's password, delete new user.
@@ -93,42 +79,38 @@ def test_add_delete_vhost(self):
self.failUnless(delete_vhost["result"] == "ok")
@inlineCallbacks
- def test_map_unmap_user_vhost(self):
- """Map and unmap user to vhost test.
- Add a new vhost, map and unmap user to vhost, delete new vhost.
- """
- add_vhost = yield self.service.add_vhost("test_vhost_path")
-
- map_user_vhost = yield self.service.map_user_vhost("guest", "test_vhost_path")
- self.failUnless(map_user_vhost["command"] == "map_user_vhost")
- self.failUnless(map_user_vhost["username"] == "guest")
- self.failUnless(map_user_vhost["vhostpath"] == "test_vhost_path")
- self.failUnless(map_user_vhost["result"] == "ok")
+ def test_list_vhost_permissions(self):
+ """Test list all vhost permissions"""
+ list_vhost_permissions = yield self.service.list_vhost_permissions()
+ self.failUnless(list_vhost_permissions["command"] == "list_vhost_permissions")
+ result = list_vhost_permissions["result"]
+ self.failUnless(result[('guest', '/')] == ['.*', '.*', '.*'])
- unmap_user_vhost = yield self.service.unmap_user_vhost("guest", "test_vhost_path")
- self.failUnless(unmap_user_vhost["command"] == "unmap_user_vhost")
- self.failUnless(unmap_user_vhost["username"] == "guest")
- self.failUnless(unmap_user_vhost["vhostpath"] == "test_vhost_path")
- self.failUnless(unmap_user_vhost["result"] == "ok")
-
- delete_vhost = yield self.service.delete_vhost("test_vhost_path")
+ @inlineCallbacks
+ def test_list_user_permissions(self):
+ """Test list all users permissions"""
+ list_user_permissions = yield self.service.list_user_permissions()
+ self.failUnless(list_user_permissions["command"] == "list_user_permissions")
+ result = list_user_permissions["result"]
+ self.failUnless(result[('/', 'guest')] == ['.*', '.*', '.*'])
@inlineCallbacks
def test_list_queues(self):
list_queues = yield self.service.list_queues()
- print list_queues
-
+ self.failUnless(list_queues["vhostpath"] == "/")
+ self.failUnless(list_queues["command"] == "list_queues")
+
@inlineCallbacks
def test_list_exchanges(self):
list_exchanges = yield self.service.list_exchanges()
- print list_exchanges
+ #print list_exchanges
@inlineCallbacks
def test_list_bindings(self):
list_bindings = yield self.service.list_bindings()
- print list_bindings
+ #print list_bindings
@inlineCallbacks
def test_list_connections(self):
list_connections = yield self.service.list_connections()
- print list_connections
+ #print list_connections

0 comments on commit 70b32f3

Please sign in to comment.