Skip to content

Commit

Permalink
implement job setup/teardown in a Job subclass
Browse files Browse the repository at this point in the history
  • Loading branch information
davisagli committed Jan 19, 2012
1 parent 6d3d7cf commit 3d9b2c9
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 1 deletion.
4 changes: 3 additions & 1 deletion setup.py
Expand Up @@ -35,7 +35,9 @@
'zc.async',
'zc.monitor',
'zc.z3monitor',
'collective.testcaselayer'
'collective.testcaselayer',
'zope.keyreference',
'simplejson',
],
entry_points={
'z3c.autoinclude.plugin': 'target=plone',
Expand Down
3 changes: 3 additions & 0 deletions src/plone/app/async/__init__.py
@@ -1 +1,4 @@
# plone.app.async

from .job import Job
from .utils import queue
71 changes: 71 additions & 0 deletions src/plone/app/async/job.py
@@ -0,0 +1,71 @@
import threading
import zc.async.job
import Zope2
from AccessControl.SecurityManagement import noSecurityManager,\
newSecurityManager, getSecurityManager
from AccessControl.User import SpecialUser
from zope.site.hooks import getSite, setSite
from Products.CMFCore.utils import getToolByName


tldata = threading.local()


class Job(zc.async.job.Job):
# A job to be run in a Zope 2 context.
# Stores the current site and user when the job is created,
# and sets them back up while the job is run.

portal_path = None
uf_path = None
user_id = None

context_path = None
func_name = None

def _bind_and_call(self, *args, **kw):
im_self = tldata.app.unrestrictedTraverse(self.context_path)
func = getattr(im_self, self.func_name)
return func(*args, **kw)

def __init__(self, *args, **kwargs):
super(Job, self).__init__(*args, **kwargs)

# Instance methods cannot be directly pickled, so instead
# we store the context path and method name on the job
# and reconstitute the method in _bind_and_call
if hasattr(self.callable, 'im_self'):
im_self = self.callable.im_self
if hasattr(im_self, 'getPhysicalPath'):
self.context_path = im_self.getPhysicalPath()
self.func_name = self.callable.func_name
self.callable = self._bind_and_call

portal = getToolByName(getSite(), 'portal_url').getPortalObject()
self.portal_path = portal.getPhysicalPath()

user = getSecurityManager().getUser()
if isinstance(user, SpecialUser):
self.uf_path, user_id = (), None
else:
self.uf_path = user.aq_parent.getPhysicalPath()
self.user_id = user.getId()

def setUp(self):
tldata.app = app = Zope2.app()

portal = app.unrestrictedTraverse(self.portal_path, None)
old_site = getSite()
setSite(portal)

if self.uf_path:
acl_users = app.unrestrictedTraverse(self.uf_path, None)
user = acl_users.getUserById(self.user_id)
newSecurityManager(None, user)

return old_site

def tearDown(self, setup_info):
del tldata.app
noSecurityManager()
setSite(setup_info)
53 changes: 53 additions & 0 deletions src/plone/app/async/tests/test_job.py
@@ -0,0 +1,53 @@
import transaction
from zc.async.testing import wait_for_result
from Products.PloneTestCase.PloneTestCase import default_user
from plone.app.async.tests.base import AsyncTestCase


def addNumbers(x1, x2):
return x1 + x2


def doom():
doom.retries += 1
transaction.doom()


class TestJob(AsyncTestCase):

def test_add_job(self):
"""Tests adding a computational job and getting the result.
"""
from plone.app.async import Job, queue
job = queue(Job(addNumbers, 40, 2))
transaction.commit()
self.assertEqual(job.status, u'pending-status')
wait_for_result(job)
self.assertEqual(job.status, u'completed-status')
self.assertEqual(job.result, 42)

def test_add_persistent(self):
"""Adding a job that creates persistent objects.
"""
from plone.app.async import Job, queue
self.setRoles(['Manager'])
job = queue(Job(self.folder.invokeFactory, 'Document', 'anid',
title='atitle', description='adescr', text='abody'))
transaction.commit()
self.assertEqual(job.status, u'pending-status')
wait_for_result(job)
self.assertEqual(job.status, u'completed-status')
self.assertEqual(job.result, 'anid')
self.failUnless('anid' in self.folder.objectIds())
document = self.folder['anid']
self.assertEqual(document.Creator(), default_user)

def test_retry(self):
"""A job that causes a conflict while committing should be retried."""
doom.retries = 0
from plone.app.async import Job, queue
job = queue(Job(doom))
transaction.commit()
wait_for_result(job)
self.assertTrue(job.result.type is transaction.interfaces.DoomedTransaction)
self.assertEqual(5, doom.retries)
6 changes: 6 additions & 0 deletions src/plone/app/async/utils.py
Expand Up @@ -15,3 +15,9 @@ def wait_for_all_jobs(seconds=6, assert_successful=True):
if assert_successful:
assert not isinstance(job.result, Failure), str(job.result)
commit()


def queue(job):
service = component.getUtility(IAsyncService)
queue = service.getQueues()['']
return queue.put(job)

0 comments on commit 3d9b2c9

Please sign in to comment.