Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Add timestamp to message store API. Add TTL tests for memory store.

  • Loading branch information...
commit fe045201774d01a89475f7ffff3a109f22bc3293 1 parent 9fe55b3
@bbangert bbangert authored
View
1  prod-reqs.txt
@@ -25,4 +25,3 @@ zope.component==3.12
zope.deprecation==3.5
zope.event==3.5.1
zope.interface==3.8
-git+git://github.com/pycassa/pycassa.git
View
5 queuey/storage/__init__.py
@@ -114,7 +114,7 @@ def retrieve(consistency, application_name, queue_name, message_id,
"""
def push(consistency, application_name, queue_name, message,
- metadata=None, ttl=3600 * 24 * 3):
+ metadata=None, ttl=3600 * 24 * 3, timestamp=None):
"""Push a message onto the given queue
The queue is assumed to exist, and will be created if it does not
@@ -128,6 +128,9 @@ def push(consistency, application_name, queue_name, message,
:type metadata: dict
:param ttl: Time to Live in seconds for the message, after this
period the message should be unavilable
+ :param timestamp: The timestamp to use for the message, should be
+ a float of seconds since the epoch as time.time()
+ would return. Defaults to the current time.
:returns: The message id and timestamp as a tuple
:rtype: tuple
View
8 queuey/storage/cassandra.py
@@ -13,6 +13,7 @@
from queuey.storage import MessageQueueBackend
from queuey.storage import MetadataBackend
from queuey.storage import StorageUnavailable
+from queuey.storage.util import convert_time_to_uuid
ONE = pycassa.ConsistencyLevel.ONE
QUORUM = pycassa.ConsistencyLevel.QUORUM
@@ -193,10 +194,13 @@ def retrieve(self, consistency, application_name, queue_name, message_id,
return obj
def push(self, consistency, application_name, queue_name, message,
- metadata=None, ttl=60 * 60 * 24 * 3):
+ metadata=None, ttl=60 * 60 * 24 * 3, timestamp=None):
"""Push a message onto the queue"""
cl = self.cl or self._get_cl(consistency)
- now = uuid.uuid1()
+ if timestamp:
+ now = convert_time_to_uuid(timestamp, randomize=True)
+ else:
+ now = uuid.uuid1()
queue_name = '%s:%s' % (application_name, queue_name)
if metadata:
batch = pycassa.batch.Mutator(self.pool,
View
25 queuey/storage/memory.py
@@ -5,11 +5,11 @@
import uuid
import time
-from pycassa.util import convert_time_to_uuid
from zope.interface import implements
from queuey.storage import MessageQueueBackend
from queuey.storage import MetadataBackend
+from queuey.storage.util import convert_time_to_uuid
# Queue's keyed by applciation_name + queue_name
# Queues are just a list of Message objects
@@ -25,6 +25,10 @@ def __init__(self, id, body, ttl, **metadata):
self.body = body
self.metadata = metadata
self.ttl = None
+ self.expiration = None
+ if ttl:
+ now = (self.id.time - 0x01b21dd213814000L) / 1e7
+ self.expiration = now + ttl
class Application(object):
@@ -64,6 +68,7 @@ def retrieve_batch(self, consistency, application_name, queue_names,
queue_names = ['%s:%s' % (application_name, x) for x in queue_names]
results = []
+ now = time.time()
for queue_name in queue_names:
msgs = message_store[queue_name]
if not msgs:
@@ -94,7 +99,11 @@ def retrieve_batch(self, consistency, application_name, queue_names,
else:
start = 0
count = 0
+
for msg in msgs[start::order]:
+ if msg.expiration and now > msg.expiration:
+ msgs.remove(msg)
+ continue
count += 1
if limit and count > limit:
break
@@ -127,8 +136,14 @@ def retrieve(self, consistency, application_name, queue_name, message_id,
if msg.id == message_id:
found = msg
break
+
if not found:
return {}
+
+ if found.expiration and time.time() > found.expiration:
+ queue.remove(found)
+ return {}
+
obj = {
'message_id': found.id.hex,
'timestamp': (found.id.time - 0x01b21dd213814000L) / 1e7,
@@ -141,9 +156,13 @@ def retrieve(self, consistency, application_name, queue_name, message_id,
return obj
def push(self, consistency, application_name, queue_name, message,
- metadata=None, ttl=60 * 60 * 24 * 3):
+ metadata=None, ttl=60 * 60 * 24 * 3, timestamp=None):
"""Push a message onto the queue"""
- msg = Message(id=uuid.uuid1(), body=message, ttl=ttl)
+ if timestamp:
+ now = convert_time_to_uuid(timestamp, randomize=True)
+ else:
+ now = uuid.uuid1()
+ msg = Message(id=now, body=message, ttl=ttl)
if metadata:
msg.metadata = metadata
timestamp = (msg.id.time - 0x01b21dd213814000L) / 1e7
View
3  queuey/tests/storage.py
@@ -3,13 +3,10 @@
# You can obtain one at http://mozilla.org/MPL/2.0/.
import unittest
import uuid
-import os
from nose.tools import eq_
from nose.tools import raises
-import mock
-
class StorageTestMessageBase(unittest.TestCase):
def _makeOne(self, **kwargs):
View
25 queuey/tests/test_memory.py
@@ -1,6 +1,11 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
+import uuid
+import time
+
+from nose.tools import eq_
+
from queuey.tests.storage import StorageTestMessageBase
from queuey.tests.storage import StorageTestMetadataBase
@@ -10,6 +15,26 @@ def _makeOne(self, **kwargs):
from queuey.storage.memory import MemoryQueueBackend
return MemoryQueueBackend()
+ def test_ttl_in_batch(self):
+ backend = self._makeOne()
+ payload = 'a rather boring payload'
+ queue_name = uuid.uuid4().hex
+ past = time.time() - 10
+ backend.push('weak', 'myapp', queue_name, payload, ttl=5,
+ timestamp=past)[0]
+ existing = backend.retrieve_batch('weak', 'myapp', [queue_name])
+ eq_([], existing)
+
+ def test_ttl_in_retrieve(self):
+ backend = self._makeOne()
+ payload = 'a rather boring payload'
+ queue_name = uuid.uuid4().hex
+ past = time.time() - 10
+ msg = backend.push('weak', 'myapp', queue_name, payload, ttl=5,
+ timestamp=past)[0]
+ existing = backend.retrieve('weak', 'myapp', queue_name, msg)
+ eq_({}, existing)
+
class TestMemoryMetadata(StorageTestMetadataBase):
def _makeOne(self):
Please sign in to comment.
Something went wrong with that request. Please try again.