Skip to content

Commit

Permalink
feat: add tq.tasks() method
Browse files Browse the repository at this point in the history
  • Loading branch information
william-silversmith committed Oct 29, 2020
1 parent 9ac4bda commit 3eeedc2
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 4 deletions.
3 changes: 0 additions & 3 deletions taskqueue/aws_queue_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,6 @@ def purge(self):
for task in tasks:
self.delete(task)
return self

def list(self):
return list(self)

def __iter__(self):
return iter(self._request(num_tasks=10, visibility_timeout=0))
Expand Down
20 changes: 19 additions & 1 deletion taskqueue/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def totalfn(iterator, total):
except TypeError:
return None

class UnsupportedProtocolError(BaseException):
pass

class QueueEmptyError(LookupError):
pass

Expand Down Expand Up @@ -88,7 +91,7 @@ def initialize_api(self, path, **kwargs):
elif path.protocol == 'fq':
return FileQueueAPI(path.path)
else:
raise ValueError('Unsupported protocol ' + str(self.path.protocol))
raise UnsupportedProtocolError('Unsupported protocol ' + str(self.path.protocol))

def check_monkey_patch_status(self):
import gevent.monkey
Expand Down Expand Up @@ -260,6 +263,21 @@ def purge(self):
self.wait()
return self

def tasks(self):
"""
Iterate over all tasks.
Can cause infinite loops on SQS and so is not
supported. You can use the api method directly
if you know what you're doing.
"""
if self.path.protocol == "sqs":
raise UnsupportedProtocolError("SQS could enter an infinite loop from this method.")

tsks = ( totask(task) for task in iter(self.api) )
if N >= 0:
return itertools.islice(tsks, 0, N)
return tsks

def poll(
self, lease_seconds=LEASE_SECONDS,
verbose=False, execute_args=[], execute_kwargs={},
Expand Down
23 changes: 23 additions & 0 deletions test/test_filequeue.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,26 @@ def test_renew():

assert ts(filename) >= int(time.time()) + 1
assert ident(filename) == identity

def test_enumerating_tasks():
tq = TaskQueue(FILE_QURL)
tq.purge()

for _ in range(10):
tq.insert(PrintTask('hello'))
tq.insert(PrintTask('world'))

lst = list(tq.tasks())

assert len(lst) == 20
hello = 0
world = 0
for task in lst:
hello += int(task.txt == "hello")
world += int(task.txt == "world")

assert hello == 10
assert world == 10



0 comments on commit 3eeedc2

Please sign in to comment.