Skip to content

Commit

Permalink
Add methods to add, replace, remove subscribers to a queue
Browse files Browse the repository at this point in the history
Fix tests and update documentation
  • Loading branch information
sunloverz authored and featalion committed Mar 25, 2015
1 parent 009187a commit ada1391
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 44 deletions.
36 changes: 33 additions & 3 deletions README.md
Expand Up @@ -217,13 +217,43 @@ Same as create queue
### Add subscribers to a push queue

```python
queue.add_subscribers(["http://endpoint1.com", "https://end.point.com/2"])
subscribers = [
{
'name': 'first',
'url': 'http://first.endpoint.xx/process',
'headers': {
'Content-Type': 'application/json'
}
},
{
'name': 'second',
'url': 'http://second.endpoint.xx/process',
'headers': {
'Content-Type': 'application/json'
}
}
]
queue.add_subscribers(*subscribers)
```

### Replace subscribers on a push queue

Sets list of subscribers to a queue. Older subscribers will be removed.

```python
subscribers = [
{
"name": "the_only",
"url": "http://my.over9k.host.com/push"
}
];
queue.replace_subscribers(*subscribers);
```

### Remove subscribers from a push queue
### Remove subscribers by a name from a push queue

```python
queue.remove_subscribers()
queue.remove_subscribers(*['first', 'second'])
```

### Get the push statuses of a message
Expand Down
50 changes: 24 additions & 26 deletions iron_mq.py
Expand Up @@ -249,34 +249,32 @@ def remove_alert(self, alert_id):
response = self.client.delete(url, body={}, headers={"Content-Type":"application/json"})
return response['body']

def add_subscribers(self, subscribers, type=None, retries=None, retries_delay=None, error_queue=None):
options = {}
options.update(self._prepare_subscribers(*subscribers))
if retries is not None:
options["retries"] = retries
if retries_delay is not None:
options["retries_delay"] = retries_delay
if error_queue is not None:
options["error_queue"] = error_queue
queue = {}
if type is not None:
queue["type"] = type
queue["push"] = options

body = json.dumps({"queue": queue})
url = "queues/%s" % self.name
def add_subscribers(self, *subscribers):
url = "queues/%s/subscribers" % self.name
body = json.dumps({'subscribers': subscribers})

response = self.client.put(url = url, body=body,
headers={"Content-Type":"application/json"})
return response['body']['queue']
response = self.client.post(url, body=body,
headers={"Content-Type":"application/json"})

def remove_subscribers(self):
body = json.dumps({"queue": {"push": {"subscribers": [{}]}}})
url = "queues/%s" % self.name
return response['body']

response = self.client.put(url = url, body=body,
headers={"Content-Type":"application/json"})
return response['body']['queue']
def remove_subscribers(self, *subscribers):
url = "queues/%s/subscribers" % self.name
body = json.dumps(self._prepare_subscribers(*subscribers))
print(body)
response = self.client.delete(url, body=body,
headers={"Content-Type":"application/json"})

return response['body']

def replace_subscribers(self, *subscribers):
url = "queues/%s/subscribers" % self.name
body = json.dumps({'subscribers': subscribers})

response = self.client.put(url, body=body,
headers={"Content-Type":"application/json"})

return response['body']

def get_message_push_statuses(self, message_id):
url = "queues/%s/messages/%s/subscribers" % (self.name, message_id)
Expand All @@ -297,7 +295,7 @@ def _prepare_alert_ids(self, *alert_ids):
return {'alerts': alerts}

def _prepare_subscribers(self, *subscribers):
subscrs = [{'url': ss} for ss in subscribers]
subscrs = [{'name': ss} for ss in subscribers]

return {'subscribers': subscrs}

Expand Down
37 changes: 22 additions & 15 deletions test.py
Expand Up @@ -5,10 +5,9 @@


class TestIronMQ(unittest.TestCase):

def setUp(self):
self.mq = IronMQ()
self.random_number = str(int(random.random() * 10**10))
self.random_number = str(int(random.random() * 10 ** 10))

def test_postMessage(self):
q = self.mq.queue("test_queue")
Expand Down Expand Up @@ -36,27 +35,34 @@ def test_infoShouldReturnAlerts(self):
def test_removeAlerts(self):
q = self.mq.queue("test_queue")
q.clear()
fixed_alerts = [{'type': 'fixed', 'direction': 'desc', 'trigger': 1000, 'queue': 'a_q'}, {'type': 'fixed', 'direction': 'asc', 'trigger': 10000, 'queue': 'a_q'}, {'type': 'progressive', 'direction': 'asc', 'trigger': 500, 'queue': 'a_q'}]
fixed_alerts = [{'type': 'fixed', 'direction': 'desc', 'trigger': 1000, 'queue': 'a_q'},
{'type': 'fixed', 'direction': 'asc', 'trigger': 10000, 'queue': 'a_q'},
{'type': 'progressive', 'direction': 'asc', 'trigger': 500, 'queue': 'a_q'}]
response = q.add_alerts(fixed_alerts)
self.assertEqual(len(response["alerts"]), len(fixed_alerts))
result = q.remove_alerts()
self.assertEqual(len(result["alerts"]), 1)

def test_addSubscribers(self):
q = self.mq.queue("test_queue%s" % time.time())
q.clear()
subscribers = [{"url": "http://mysterious-brook-1807.herokuapp.com/ironmq_push_1"}, {"url": "http://mysterious-brook-1807.herokuapp.com/ironmq_push_1"}]
response = q.add_subscribers(subscribers)
self.assertEqual(len(subscribers), len(response["push"]["subscribers"]))
queue_name = "test_queue%s" % time.time()
subscribers = [{'name': 'first', 'url': 'http://first.endpoint.xx/process' }]
self.mq.create_queue(queue_name, {'push': {'subscribers': subscribers}} )
q = self.mq.queue(queue_name)
response = q.add_subscribers(*[{'name': 'second', 'url': 'http://first.endpoint.xx/process'}])
self.assertEqual(response["msg"], "Updated")
info = q.info()
self.assertEqual(2, len(info['push']['subscribers']))

def test_removeSubscribers(self):
q = self.mq.queue("test_queue%s" % time.time())
q.clear()
subscribers = [{"url": "http://mysterious-brook-1807.herokuapp.com/ironmq_push_1"}, {"url": "http://mysterious-brook-1807.herokuapp.com/ironmq_push_1"}]
response = q.add_subscribers(subscribers)
self.assertEqual(len(subscribers), len(response["push"]["subscribers"]))
result = q.remove_subscribers()
self.assertEqual(len(result["push"]["subscribers"]), 1)
queue_name = "test_queue%s" % time.time()
subscribers = [{'name': 'first',
'url': 'http://first.endpoint.xx/process'},
{'name': 'second',
'url': 'http://second.endpoint.xx/process'}]
self.mq.create_queue(queue_name, {'push': {'subscribers': subscribers}} )
q = self.mq.queue(queue_name)
response = q.remove_subscribers(*['first', 'second'])
self.assertEqual(response["msg"], "Updated")

def test_getMessage(self):
msg = "%s" % time.time()
Expand Down Expand Up @@ -187,5 +193,6 @@ def test_peekMessages(self):
response = q.peek(2)
self.assertEqual(2, len(response["messages"]))


if __name__ == '__main__':
unittest.main()

0 comments on commit ada1391

Please sign in to comment.