Skip to content

Commit

Permalink
Release message by reservation_id
Browse files Browse the repository at this point in the history
Add wrapper "queues" to the list of queues response
Add wrapper "queue" to the Queue.info response
Change peek endpoint
Add wrapper "message" to the get_message_by_id
Add create and update queue
  • Loading branch information
sunloverz authored and featalion committed Mar 25, 2015
1 parent 9f0afea commit 76b33ff
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 5 deletions.
55 changes: 51 additions & 4 deletions iron_mq.py
Expand Up @@ -141,10 +141,10 @@ def reserve(self, max=None, timeout=None):
def get_message_by_id(self, message_id):
url = "queues/%s/messages/%s" % (self.name, message_id)
response = self.client.get(url)
return response['body']
return response["body"]["message"]

def peek(self, max=None):
url = "queues/%s/messages/peek" % self.name
url = "queues/%s/messages" % self.name
if max is not None:
url = "%s?n=%s" % (url, max)

Expand All @@ -170,11 +170,20 @@ def touch(self, message_id, reservation_id = None):

return response['body']

def release(self, message_id, delay=0):
def release(self, message_id, delay=0, reservation_id = None):
"""Release locked message after specified time. If there is no message with such id on the queue.
Arguments:
message_id -- The ID of the message.
delay -- The time after which the message will be released.
reservation_id -- Reservation Id of the message. Reserved message could not be deleted without reservation Id.
"""
url = "queues/%s/messages/%s/release" % (self.name, message_id)
body = {}
if delay > 0:
body['delay'] = delay
if reservation_id is not None:
body["reservation_id"] = reservation_id
body = json.dumps(body)

response = self.client.post(url, body=body,
Expand Down Expand Up @@ -304,7 +313,7 @@ def queues(self, page=None, per_page=None):
url = "%s?%s" % (url, query)
result = self.client.get(url)

return [queue["name"] for queue in result["body"]]
return [queue["name"] for queue in result["body"]["queues"]]


def queue(self, queue_name):
Expand All @@ -316,6 +325,44 @@ def queue(self, queue_name):
return Queue(self, queue_name)


def create_queue(self, queue_name, message_expiration=None, type=None, push=None, alerts=None):
options = {}
if message_expiration is not None:
options["message_expiration"] = message_expiration
if type is not None:
options["type"] = type
if push is not None:
options["push"] = push
if alerts is not None:
options["alerts"] = alerts

queue = {"queue": options}
body = json.dumps(queue)
url = "queues/%s" % queue_name

response = self.client.put(url, body=body, headers={"Content-Type":"application/json"})
return response['body']


def update_queue(self, queue_name, message_expiration=None, type=None, push=None, alerts=None):
options = {}
if message_expiration is not None:
options["message_expiration"] = message_expiration
if type is not None:
options["type"] = type
if push is not None:
options["push"] = push
if alerts is not None:
options["alerts"] = alerts

queue = {"queue": options}
body = json.dumps(queue)
url = "queues/%s" % queue_name

response = self.client.request(url = url, method="PATCH", body=body,
headers={"Content-Type":"application/json"})
return response['body']

# DEPRECATED

def getQueues(self, page=None, project_id=None):
Expand Down
10 changes: 9 additions & 1 deletion test.py
Expand Up @@ -161,7 +161,15 @@ def test_getMessageById(self):
q = self.mq.queue("test_queue")
response_post = q.post(body)
message = q.get_message_by_id(response_post["ids"][0])
self.assertEqual(body, message["message"]["body"])
self.assertEqual(body, message["body"])

def test_peekMessages(self):
q = self.mq.queue("test_queue")
q.clear()
q.post("more")
q.post("and more")
response = q.peek(2)
self.assertEqual(2, len(response["messages"]))

if __name__ == '__main__':
unittest.main()

0 comments on commit 76b33ff

Please sign in to comment.