Skip to content
This repository has been archived by the owner on Sep 23, 2020. It is now read-only.

Commit

Permalink
- rabbitctl_service.py is now more full featured, including introspec…
Browse files Browse the repository at this point in the history
…tion

of all queues and exchanges. Responses now have a more standard format.
- webui.py more cleanup up, generalized.  Now returns json.
- add several examples of twotp usage in examples dir.
  • Loading branch information
clemesha-ooi committed Jun 25, 2009
1 parent 69fe44b commit 36db426
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 56 deletions.
37 changes: 37 additions & 0 deletions examples/add_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/usr/bin/env python
# Copyright (c) 2007-2009 Thomas Herve <therve@free.fr>.
# See LICENSE for details.

"""
Example emulating rabbitmqctl
"""

from twisted.internet import reactor

from twotp import Process, readCookie, buildNodeName
from twotp.term import Binary, Atom


def add_user(process, username, password):
def cb(resp):
print resp
reactor.stop()
def eb(error):
print "Got error", error
reactor.stop()
un, pw = Binary(username), Binary(password)
process.callRemote("rabbit", "rabbit_access_control", "add_user", un, pw).addCallback(cb).addErrback(eb)


if __name__ == "__main__":
import sys
if len(sys.argv) != 4:
print "USAGE: ./add_user.py COOKIE username password"
sys.exit(1)
cookie = sys.argv[1] #cookie = readCookie()
username = sys.argv[2]
password = sys.argv[3]
nodeName = buildNodeName("twotp-rabbit")
process = Process(nodeName, cookie)
reactor.callWhenRunning(add_user, process, username, password)
reactor.run()
15 changes: 9 additions & 6 deletions examples/rabbitmqctl.py → examples/delete_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# See LICENSE for details.

"""
Example emulating rabbitmqctl, just calling list_vhosts for now.
Example emulating rabbitmqctl
"""

from twisted.internet import reactor
Expand All @@ -12,22 +12,25 @@
from twotp.term import Binary, Atom


def testListVhost(process):
def delete_user(process, username):
def cb(resp):
print resp
reactor.stop()
def eb(error):
print "Got error", error
reactor.stop()
un, pw = Atom("guest"), Binary("pass2")
print un, pw
process.callRemote("rabbit", "rabbit_access_control", "delete_user", "guest").addCallback(cb).addErrback(eb)
un = Binary(username)
process.callRemote("rabbit", "rabbit_access_control", "delete_user", un).addCallback(cb).addErrback(eb)


if __name__ == "__main__":
import sys
if len(sys.argv) != 3:
print "USAGE: ./delete_user.py COOKIE username"
sys.exit(1)
cookie = sys.argv[1] #cookie = readCookie()
username = sys.argv[2]
nodeName = buildNodeName("twotp-rabbit")
process = Process(nodeName, cookie)
reactor.callWhenRunning(testListVhost, process)
reactor.callWhenRunning(delete_user, process, username)
reactor.run()
51 changes: 51 additions & 0 deletions examples/exchange_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/usr/bin/env python
"""
Get RabbitMQ exchange info.
"""

from twisted.internet import reactor

from twotp import Process, readCookie, buildNodeName
from twotp.term import Binary, Atom


items = ["name", "type", "durable", "auto_delete", "arguments"]
INFO_ARGS = [Atom(item) for item in items]
VHOST = Binary("/")

def exchange_info(process):

def cb(resp):
#print resp
print
for info in _exchange_info(resp):
print "\n", info, "\n"
reactor.stop()

def _exchange_info(resp):
allinfo = []
for v in resp:
# [(exch1, infodict1), (exch2, infodict2), ...]
allinfo.append((v[0][1][3].value,
{"name":v[0][1][3].value,
"type":v[1][1].text,
"durable":v[2][1].text == "true",
"auto_delete":v[3][1].text == "true",
"arguments":v[4][1]}))
return allinfo


def eb(error):
print "Got error", error
reactor.stop()

process.callRemote("rabbit", "rabbit_exchange", "info_all", VHOST, INFO_ARGS).addCallback(cb).addErrback(eb)


if __name__ == "__main__":
import sys
cookie = sys.argv[1] #cookie = readCookie()
nodeName = buildNodeName("twotp-rabbit")
process = Process(nodeName, cookie)
reactor.callWhenRunning(exchange_info, process)
reactor.run()
62 changes: 62 additions & 0 deletions examples/queue_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/usr/bin/env python
"""
Get RabbitMQ queue info.
"""

from twisted.internet import reactor

from twotp import Process, readCookie, buildNodeName
from twotp.term import Binary, Atom


items = ["name", "durable", "auto_delete", "arguments", "pid",
"messages_ready", "messages_unacknowledged",
"messages_uncommitted", "messages", "acks_uncommitted",
"consumers", "transactions", "memory"]
INFO_ARGS = [Atom(item) for item in items]
VHOST = Binary("/")

def queue_info(process):

def cb(resp):
#print resp
print
for info in _queue_info(resp):
print "\n", info, "\n"
reactor.stop()

def _queue_info(resp):
allinfo = []
for queue in resp:
# [(qname1, infodict1), (qname2, infodict2), ...]
allinfo.append((queue[0][1][3].value,
{"name":queue[0][1][3].value,
"durable":queue[1][1].text == "true",
"auto_delete":queue[2][1].text == "true",
"arguments":queue[3][1],
"pid":queue[4][1].nodeName.text,
"messages_ready":queue[5][1],
"messages_unacknowledged":queue[6][1],
"messages_uncommitted":queue[7][1],
"messages":queue[8][1],
"acks_uncommitted":queue[9][1],
"memory":queue[10][1],
"transactions":queue[11][1],
"memory":queue[12][1]}))
return allinfo


def eb(error):
print "Got error", error
reactor.stop()

process.callRemote("rabbit", "rabbit_amqqueue", "info_all", VHOST, INFO_ARGS).addCallback(cb).addErrback(eb)


if __name__ == "__main__":
import sys
cookie = sys.argv[1] #cookie = readCookie()
nodeName = buildNodeName("twotp-rabbit")
process = Process(nodeName, cookie)
reactor.callWhenRunning(queue_info, process)
reactor.run()
2 changes: 1 addition & 1 deletion irabbitmqctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def list_vhost_users(vhostpath):
def list_queues(vhostpath=None, queueinfoitem=None):
"""list all queues"""

def list_exchanges(vhostpath=None, queueinfoitem=None):
def list_exchanges(vhostpath=None, exchangeinfoitem=None):
"""list all exchanges"""

def list_bindings(vhostpath=None):
Expand Down
141 changes: 122 additions & 19 deletions rabbitmqctl_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,67 +16,170 @@ class RabbitMQControlService(service.Service):
as well as the ability to do management
functions like add users and vhosts, etc.
The communication happens with TwOPT, which
The communication happens with Twotp, which
implements the Erlang node protocol for Twisted.
"""

implements(IRabbitMQControlService)

def __init__(self, process, nodename="rabbit"):
def __init__(self, process, nodename="rabbit", module="rabbit_access_control"):
self.process = process
self.nodename = nodename
self.module = module

@inlineCallbacks
def add_user(self, username, password):
"""add new user with given password"""
module = "rabbit_access_control"
result = yield self.process.callRemote(self.nodename, module, "add_user", username, password)
returnValue(result)
username, password = Binary(username), Binary(password)
result = yield self.process.callRemote(self.nodename, self.module, "add_user", username, password)
print result
response = {"command":"add_user", "username":username.value, "result":result.value}
returnValue(response)

@inlineCallbacks
def delete_user(self, username):
"""delete user"""
pass
username = Binary(username)
result = yield self.process.callRemote(self.nodename, self.module, "delete_user", username)
response = {"command":"delete_user", "username":username.value, "result":result.value}
returnValue(response)

@inlineCallbacks
def change_password(self, username, password):
"""change user password"""
pass
username, password = Binary(username), Binary(password)
result = yield self.process.callRemote(self.nodename, self.module, "change_password", username, password)
response = {"command":"change_password", "username":username.value, "result":result.value}
returnValue(response)

@inlineCallbacks
def list_users(self):
"""list all users"""
module = "rabbit_access_control"
users = yield self.process.callRemote(self.nodename, module, "list_users")
returnValue(users)
users = yield self.process.callRemote(self.nodename, self.module, "list_users")
users = [user.value for user in users]
response = {"command":"list_users", "count":len(users), "users":users}
returnValue(response)

@inlineCallbacks
def add_vhost(self, vhostpath):
"""add new vhost"""
pass
vhostpath = Binary(vhostpath)
result = yield self.process.callRemote(self.nodename, self.module, "add_vhost", vhostpath)
response = {"command":"add_vhost", "vhostpath":vhostpath.value, "result":result.value}
returnValue(response)

@inlineCallbacks
def delete_vhost(self, vhostpath):
"""delete vhost"""
pass
vhostpath = Binary(vhostpath)
result = yield self.process.callRemote(self.nodename, self.module, "delete_vhost", vhostpath)
response = {"command":"delete_vhost", "vhostpath":vhostpath.value, "result":result.value}
returnValue(response)

@inlineCallbacks
def list_vhosts(self):
"""list all vhosts"""
module = "rabbit_access_control"
vhosts = yield self.process.callRemote(self.nodename, module, "list_vhosts")
returnValue(vhosts)
vhosts = yield self.process.callRemote(self.nodename, self.module, "list_vhosts")
vhosts = [vhost.value for vhost in vhosts]
response = {"command":"list_vhosts", "count":len(vhosts), "vhosts":vhosts}
returnValue(response)

@inlineCallbacks
def map_user_vhost(self, username, vhostpath):
"""allow access of user to vhost"""
pass
username, vhostpath = Binary(username), Binary(vhostpath)
result = yield self.process.callRemote(self.nodename, self.module, "map_user_vhost", username, vhostpath)
response = {"command":"map_user_vhost", "username":username.value, "vhostpath":vhostpath.value, "result":result.value}
returnValue(response)

@inlineCallbacks
def unmap_user_vhost(self, username, vhostpath):
"""deny access of user to vhost"""
pass
username, vhostpath = Binary(username), Binary(vhostpath)
result = yield self.process.callRemote(self.nodename, self.module, "unmap_user_vhost", username, vhostpath)
response = {"command":"unmap_user_vhost", "username":username.value, "vhostpath":vhostpath.value, "result":result.value}
returnValue(response)

@inlineCallbacks
def list_user_vhosts(self, username):
"""list all vhosts for user"""
pass
username = Binary(username)
result = yield self.process.callRemote(self.nodename, self.module, "list_user_vhosts", username)
#XXX check for failure: (<Atom at 0x2883690, text 'error'>, (<Atom at 0x2883710, text 'no_such_user'>,
vhosts = [vhost.value for vhost in result]
response = {"command":"list_user_vhosts", "username":username.value, "vhost":vhosts}
returnValue(response)

@inlineCallbacks
def list_vhost_users(self, vhostpath):
"""list all users in vhost"""
pass
vhostpath = Binary(vhostpath)
result = yield self.process.callRemote(self.nodename, self.module, "list_vhost_users", vhostpath)
users = [user.value for user in result]
response = {"command":"list_vhost_users", "vhostpath":vhostpath.value, "users":users}
returnValue(response)

@inlineCallbacks
def list_queues(self, vhostpath=None, queueinfoitem=None):
"""list all queues"""
if vhostpath is None:
vhostpath = "/"
vhostpath = Binary(vhostpath)
if queueinfoitem is None:
infoitems = [Atom(item) for item in ["name", "durable", "auto_delete", "arguments", "pid",
"messages_ready", "messages_unacknowledged", "messages_uncommitted", "messages", "acks_uncommitted",
"consumers", "transactions", "memory"]]
result = yield self.process.callRemote(self.nodename, "rabbit_amqqueue", "info_all", vhostpath, infoitems)
info_all = []
for v in result:
info_all.append((v[0][1][3].value,
{"name":v[0][1][3].value,
"durable":v[1][1].text == "true",
"auto_delete":v[2][1].text == "true",
"arguments":v[3][1],
"pid":v[4][1].nodeName.text,
"messages_ready":v[5][1],
"messages_unacknowledged":v[6][1],
"messages_uncommitted":v[7][1],
"messages":v[8][1],
"acks_uncommitted":v[9][1],
"memory":v[10][1],
"transactions":v[11][1],
"memory":v[12][1]}))
response = {"command":"list_queues", "vhostpath":vhostpath.value, "info_all":info_all}
returnValue(response)

@inlineCallbacks
def list_exchanges(self, vhostpath=None, exchangeinfoitem=None):
"""list all exchanges"""
if vhostpath is None:
vhostpath = "/"
vhostpath = Binary(vhostpath)
if exchangeinfoitem is None:
infoitems = [Atom(item) for item in ["name", "type", "durable", "auto_delete", "arguments"]]
result = yield self.process.callRemote(self.nodename, "rabbit_exchange", "info_all", vhostpath, infoitems)
info_all = []
for v in result:
# [(exch1, infodict1), (exch2, infodict2), ...]
info_all.append((v[0][1][3].value,
{"name":v[0][1][3].value,
"type":v[1][1].text,
"durable":v[2][1].text == "true",
"auto_delete":v[3][1].text == "true",
"arguments":v[4][1]}))
response = {"command":"list_exchanges", "vhostpath":vhostpath.value, "info_all":info_all}
returnValue(response)

@inlineCallbacks
def list_bindings(self, vhostpath):
"""list all bindings"""
response = {"command":"list_bindings"}
returnValue(response)

@inlineCallbacks
def list_connections(self, connectioninfoitem=None):
"""list all connections"""
response = {"command":"list_connections"}
print "CIIIIIIIIIIIIII ", connectioninfoitem, response
returnValue(response)

Loading

0 comments on commit 36db426

Please sign in to comment.