Skip to content

Commit

Permalink
Adding a feature for queueing delayed tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
do3cc committed Aug 13, 2012
1 parent f285438 commit 2264b99
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 10 deletions.
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -38,6 +38,7 @@
'collective.testcaselayer',
'simplejson'
],
extras_require={'plone4_test': 'five.intid'},
entry_points={
'z3c.autoinclude.plugin': 'target=plone',
},
Expand Down
4 changes: 4 additions & 0 deletions src/plone/app/async/interfaces.py
Expand Up @@ -21,6 +21,10 @@ def getQueues():
def queueJob(func, context, *args, **kwargs):
"""Queue a job."""

def queueJobWithDelay(begin_by, begin_after, func, context, *args,
**kwargs):
"""Queue a job with a delay. See zc.async for more information"""

def queueSerialJobs(*job_infos):
"""Queue several jobs, to be run serially
Expand Down
33 changes: 23 additions & 10 deletions src/plone/app/async/service.py
Expand Up @@ -30,9 +30,10 @@ def _getAuthenticatedUser():
return acl_users.getPhysicalPath(), user.getId()


def _executeAsUser(context_path, portal_path, uf_path, user_id, func, *args, **kwargs):
def _executeAsUser(context_path, portal_path, uf_path, user_id, func, *args,
**kwargs):
"""Reconstruct environment and execute func."""
transaction = Zope2.zpublisher_transactions_manager # Supports isDoomed
transaction = Zope2.zpublisher_transactions_manager # Supports isDoomed
transaction.begin()
app = Zope2.app()
result = None
Expand Down Expand Up @@ -99,7 +100,8 @@ def getQueues(self):
self._conn.onCloseCallback(self.__init__)
return self._conn.root()[KEY]

def queueJobInQueue(self, queue, quota_names, func, context, *args, **kwargs):
def queueJobInQueue(self, queue, quota_names, func, context, *args,
**kwargs):
"""Queue a job in the specified queue."""
portal = getUtility(ISiteRoot)
portal_path = portal.getPhysicalPath()
Expand All @@ -117,37 +119,48 @@ def queueJobInQueue(self, queue, quota_names, func, context, *args, **kwargs):
def queueJob(self, func, context, *args, **kwargs):
"""Queue a job in the default queue."""
queue = self.getQueues()['']
return self.queueJobInQueue(queue, ('default',), func, context, *args, **kwargs)
return self.queueJobInQueue(queue, ('default',), func, context, *args,
**kwargs)

def _queueJobsInQueue(self, queue, quota_names, job_infos, serialize=True):
def queueJobWithDelay(self, begin_by, begin_after, func, context, *args,
**kwargs):
queue = self.getQueues()['']
jobs = [(func, context, args, kwargs)]
return self._queueJobsInQueue(queue, ('default',), jobs, True,
begin_by, begin_after)

def _queueJobsInQueue(self, queue, quota_names, job_infos, serialize=True,
begin_by=None, begin_after=None):
"""Queue multiple jobs in the specified queue."""
portal = getUtility(ISiteRoot)
portal_path = portal.getPhysicalPath()
uf_path, user_id = _getAuthenticatedUser()
scheduled = []
for (func, context, args, kwargs) in job_infos:
context_path = context.getPhysicalPath()
job = Job(_executeAsUser, context_path, portal_path, uf_path, user_id,
func, *args, **kwargs)
job = Job(_executeAsUser, context_path, portal_path, uf_path,
user_id, func, *args, **kwargs)
scheduled.append(job)
if serialize:
job = serial(*scheduled)
else:
job = parallel(*scheduled)
if quota_names:
job.quota_names = quota_names
job = queue.put(job)
job = queue.put(job, begin_by=begin_by, begin_after=begin_after)
job.addCallbacks(success=job_success_callback,
failure=job_failure_callback)
return job

def queueSerialJobsInQueue(self, queue, quota_names, *job_infos):
"""Queue serial jobs in the specified queue."""
return self._queueJobsInQueue(queue, quota_names, job_infos, serialize=True)
return self._queueJobsInQueue(queue, quota_names, job_infos,
serialize=True)

def queueParallelJobsInQueue(self, queue, quota_names, *job_infos):
"""Queue parallel jobs in the specified queue."""
return self._queueJobsInQueue(queue, quota_names, job_infos, serialize=False)
return self._queueJobsInQueue(queue, quota_names, job_infos,
serialize=False)

def queueSerialJobs(self, *job_infos):
"""Queue serial jobs in the default queue."""
Expand Down
13 changes: 13 additions & 0 deletions src/plone/app/async/tests/test_simplejob.py
@@ -1,3 +1,5 @@
import datetime
import pytz
import transaction
from zope.component import getUtility
from zc.async.testing import wait_for_result
Expand Down Expand Up @@ -183,6 +185,17 @@ def test_job_as_anonymous(self):
# not accessible by anon
self.assertEqual(wait_for_result(job), 0)

def test_job_with_delay(self):
before = datetime.datetime.now(pytz.UTC)
job = self.async.queueJobWithDelay(
None, before + datetime.timedelta(seconds=1),
searchForDocument, self.folder, self.folder.getId())
transaction.commit()
wait_for_result(job)
after = datetime.datetime.now(pytz.UTC)
self.assertTrue((after - before).seconds >= 1)



def test_suite():
from unittest import TestSuite, makeSuite
Expand Down

0 comments on commit 2264b99

Please sign in to comment.