Skip to content
This repository has been archived by the owner on Apr 9, 2023. It is now read-only.

Commit

Permalink
Merge pull request #18 from plone/asyncevents
Browse files Browse the repository at this point in the history
Asyncevents
  • Loading branch information
bloodbare committed Nov 21, 2016
2 parents 2a30763 + 2048290 commit 7281729
Show file tree
Hide file tree
Showing 12 changed files with 214 additions and 43 deletions.
3 changes: 2 additions & 1 deletion src/plone.server/plone/server/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# -*- encoding: utf-8 -*-
from zope.i18nmessageid import MessageFactory

import collections


Expand All @@ -20,3 +19,5 @@
SCHEMA_CACHE = {}
PERMISSIONS_CACHE = {}
FACTORY_CACHE = {}


6 changes: 4 additions & 2 deletions src/plone.server/plone/server/api/portal.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from plone.server.json.interfaces import IResourceSerializeToJson
from zope.component import getMultiAdapter
from zope.lifecycleevent import ObjectAddedEvent
from zope.event import notify
from plone.server.events import notify


class DefaultGET(Service):
Expand Down Expand Up @@ -63,7 +63,9 @@ async def __call__(self):

site.install()

notify(ObjectAddedEvent(site, self.context, data['id']))
self.request._site_id = site.__name__

await notify(ObjectAddedEvent(site, self.context, site.__name__))

resp = {
'@type': 'Site',
Expand Down
5 changes: 3 additions & 2 deletions src/plone.server/plone/server/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,12 @@ def __call__(self):
for schema in iterSchemataForType(self.content.portal_type):
# create export of the cataloged fields
catalog = mergedTaggedValueDict(schema, CATALOG_KEY)
behavior = schema(self.content)
for field_name, field in getFields(schema).items():
kind_catalog = catalog.get(field_name, False)
if kind_catalog:
real_field = field.bind(self.content)
value = real_field.get(self.content)
real_field = field.bind(behavior)
value = real_field.get(behavior)
ident = schema.getName() + '-' + real_field.getName()
values[ident] = value

Expand Down
32 changes: 7 additions & 25 deletions src/plone.server/plone/server/catalog/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,13 @@ def __init__(self, site_id, loop):
self.site_id = site_id
self.loop = loop

def __call__(self, trns):
async def __call__(self, trns):
if not trns:
return
# Commits are run in sync thread so there is no asyncloop
search = getUtility(ICatalogUtility)
future = asyncio.run_coroutine_threadsafe(
search.remove(self.remove, self.site_id), self.loop)
future2 = asyncio.run_coroutine_threadsafe(
search.index(self.index, self.site_id), self.loop)

try:
result = future.result(30)
result = future2.result(30)
except asyncio.TimeoutError:
logger.info('The coroutine took too long, cancelling the task...')
future.cancel()
future2.cancel()
except Exception as exc:
logger.info('The coroutine raised an exception: {!r}'.format(exc))
else:
logger.info('The coroutine returned: {!r}'.format(result))
await search.remove(self.remove, self.site_id)
await search.index(self.index, self.site_id)

self.index = {}
self.remove = []
Expand Down Expand Up @@ -95,17 +81,13 @@ def add_object(obj, event):
hook.index[uid] = search.get_data(obj)


def add_index(obj, event):
async def add_index(obj, event):
search = queryUtility(ICatalogUtility)
if search is not None:
loop = asyncio.new_event_loop()
asyncio.run_coroutine_threadsafe(
search.create_index(obj.id), loop)
await search.create_index(obj.id)


def remove_index(obj, event):
async def remove_index(obj, event):
search = queryUtility(ICatalogUtility)
if search is not None:
loop = asyncio.new_event_loop()
asyncio.run_coroutine_threadsafe(
search.remove_index(obj.id), loop)
await search.remove_index(obj.id)
2 changes: 2 additions & 0 deletions src/plone.server/plone/server/configure.zcml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
<adapter factory=".languages.FI" />
<adapter factory=".languages.ENUS" />

<subscriber handler=".events.objectEventNotify" />

<adapter factory=".translation.GenericTranslation" />

<adapter factory=".renderers.RendererFormatJson" />
Expand Down
3 changes: 2 additions & 1 deletion src/plone.server/plone/server/content.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from plone.server.utils import Lazy
from plone.server.transactions import synccontext
from plone.server.transactions import get_current_request
from plone.server.utils import get_authenticated_user_id
from zope.annotation.interfaces import IAttributeAnnotatable
from zope.component import getUtility
from zope.component.factory import Factory
Expand Down Expand Up @@ -184,11 +185,11 @@ def createContentInContainer(container, type_, id_, request=None, **kw):
obj = factory()
obj.__name__ = id_
obj.__parent__ = container
container[id_] = obj
if 'id' not in kw:
kw['id'] = id_
for key, value in kw.items():
setattr(obj, key, value)
container[id_] = obj
return obj


Expand Down
38 changes: 38 additions & 0 deletions src/plone.server/plone/server/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,15 @@
from plone.server.interfaces import IObjectFinallyCreatedEvent
from zope.interface import implementer
from zope.interface.interfaces import ObjectEvent
from zope.event import subscribers as syncsubscribers
from zope.component._api import getSiteManager
from zope.component.interfaces import ComponentLookupError
from zope.component.interfaces import IObjectEvent
from zope.component._declaration import adapter
_zone = tzlocal()

asyncsubscribers = []


@implementer(IObjectFinallyCreatedEvent)
class ObjectFinallyCreatedEvent(ObjectEvent):
Expand All @@ -24,3 +31,34 @@ def modified_object(obj, event):
"""Set the modification date of an object."""
now = datetime.now(tz=_zone)
obj.modification_date = now


async def notify(event):
"""Notify all subscribers of ``event``."""
for subscriber in syncsubscribers:
subscriber(event)
for subscriber in asyncsubscribers:
await subscriber(event)

async def dispatch(*event):
try:
sitemanager = getSiteManager()
except ComponentLookupError:
# Oh blast, no site manager. This should *never* happen!
return []

return await sitemanager.adapters.asubscribers(event, None)


@adapter(IObjectEvent)
async def objectEventNotify(event):
"""Dispatch ObjectEvents to interested adapters."""
try:
sitemanager = getSiteManager()
except ComponentLookupError:
# Oh blast, no site manager. This should *never* happen!
return []

return await sitemanager.adapters.asubscribers((event.object, event), None)

asyncsubscribers.append(dispatch)
8 changes: 8 additions & 0 deletions src/plone.server/plone/server/metaconfigure.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@
from zope.security.checker import defineChecker
from zope.security.checker import getCheckerForInstancesOf
from zope.security.checker import undefineChecker
from zope.security.zcml import Permission
from zope.component._declaration import adaptedBy
from zope.component.security import protectedFactory
from zope.component.security import securityAdapterFactory
from zope.component._compat import _BLANK
from zope.component.interface import provideInterface
from zope.component._api import getSiteManager

import json
import logging
Expand Down Expand Up @@ -293,3 +300,4 @@ def addOn(_context, name, title, handler):
'title': title,
'handler': handler
}

144 changes: 144 additions & 0 deletions src/plone.server/plone/server/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
from zope.interface.adapter import BaseAdapterRegistry
from zope.interface.adapter import AdapterLookupBase
from zope.interface import providedBy

from transaction._transaction import Status
from transaction._transaction import Transaction
from transaction import interfaces
from transaction._compat import reraise

import asyncio
import sys

BaseAdapterRegistry._delegated = (
'lookup', 'queryMultiAdapter', 'lookup1', 'queryAdapter',
'adapter_hook', 'lookupAll', 'names',
'subscriptions', 'subscribers', 'asubscribers')


async def asubscribers(self, objects, provided):
subscriptions = self.subscriptions(map(providedBy, objects), provided)
if provided is None:
result = ()
for subscription in subscriptions:
if asyncio.iscoroutinefunction(subscription):
await subscription(*objects)
else:
result = []
for subscription in subscriptions:
if asyncio.iscoroutinefunction(subscription):
subscriber = await subscription(*objects)
if subscriber is not None:
result.append(subscriber)
return result


def subscribers(self, objects, provided):
subscriptions = self.subscriptions(map(providedBy, objects), provided)
if provided is None:
result = ()
for subscription in subscriptions:
if not asyncio.iscoroutinefunction(subscription):
subscription(*objects)
else:
result = []
for subscription in subscriptions:
if not asyncio.iscoroutinefunction(subscription):
subscriber = subscription(*objects)
if subscriber is not None:
result.append(subscriber)
return result

AdapterLookupBase.asubscribers = asubscribers
AdapterLookupBase.subscribers = subscribers




async def acommit(self):
""" See ITransaction.
"""
if self.status is Status.DOOMED:
raise interfaces.DoomedTransaction(
'transaction doomed, cannot commit')

if self._savepoint2index:
self._invalidate_all_savepoints()

if self.status is Status.COMMITFAILED:
self._prior_operation_failed() # doesn't return

await self._acallBeforeCommitHooks()

self._synchronizers.map(lambda s: s.beforeCompletion(self))
self.status = Status.COMMITTING

try:
self._commitResources()
self.status = Status.COMMITTED
except:
t = None
v = None
tb = None
try:
t, v, tb = self._saveAndGetCommitishError()
await self._acallAfterCommitHooks(status=False)
reraise(t, v, tb)
finally:
del t, v, tb
else:
self._free()
self._synchronizers.map(lambda s: s.afterCompletion(self))
await self._acallAfterCommitHooks(status=True)
self.log.debug("commit")


async def _acallBeforeCommitHooks(self):
# Call all hooks registered, allowing further registrations
# during processing. Note that calls to addBeforeCommitHook() may
# add additional hooks while hooks are running, and iterating over a
# growing list is well-defined in Python.
for hook, args, kws in self._before_commit:
if asyncio.iscoroutinefunction(hook):
await hook(*args, **kws)
else:
hook(*args, **kws)
self._before_commit = []


async def _acallAfterCommitHooks(self, status=True):
# Avoid to abort anything at the end if no hooks are registred.
if not self._after_commit:
return
# Call all hooks registered, allowing further registrations
# during processing. Note that calls to addAterCommitHook() may
# add additional hooks while hooks are running, and iterating over a
# growing list is well-defined in Python.
for hook, args, kws in self._after_commit:
# The first argument passed to the hook is a Boolean value,
# true if the commit succeeded, or false if the commit aborted.
try:
if asyncio.iscoroutinefunction(hook):
await hook(status, *args, **kws)
else:
hook(status, *args, **kws)
except:
# We need to catch the exceptions if we want all hooks
# to be called
self.log.error("Error in after commit hook exec in %s ",
hook, exc_info=sys.exc_info())
# The transaction is already committed. It must not have
# further effects after the commit.
for rm in self._resources:
try:
rm.abort(self)
except:
# XXX should we take further actions here ?
self.log.error("Error in abort() on manager %s",
rm, exc_info=sys.exc_info())
self._after_commit = []
self._before_commit = []

Transaction._acallBeforeCommitHooks = _acallBeforeCommitHooks
Transaction.acommit = acommit
Transaction._acallAfterCommitHooks = _acallAfterCommitHooks
1 change: 1 addition & 0 deletions src/plone.server/plone/server/server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
from aiohttp import web
import plone.server.patch # noqa
from plone.server.factory import make_app

import argparse
Expand Down
1 change: 1 addition & 0 deletions src/plone.server/plone/server/testing.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import plone.server.patch # noqa
from aiohttp.test_utils import make_mocked_request
from plone.server.browser import View
from plone.server.factory import IApplication
Expand Down
14 changes: 2 additions & 12 deletions src/plone.server/plone/server/traversal.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,7 @@ async def traverse(request, parent, path):
return parent, path

assert request is not None # could be used for permissions, etc
# dbo = None
# import pdb; pdb.set_trace()
# if IDataBase.providedBy(parent):
# # Look on the PersistentMapping from the DB
# dbo = parent
# parent = request.conn.root()

try:
if path[0].startswith('_'):
raise HTTPUnauthorized()
Expand All @@ -141,11 +136,6 @@ async def traverse(request, parent, path):
except KeyError:
return parent, path

# if dbo is not None:
# context._v_parent = dbo
# else:
# context._v_parent = parent

if IDataBase.providedBy(context):
if SHARED_CONNECTION:
request.conn = context.conn
Expand Down Expand Up @@ -201,7 +191,7 @@ async def handler(self, request):
await sync(request)(txn.abort)
else:
if SHARED_CONNECTION is False:
txn.commit()
await txn.acommit()
else:
await sync(request)(txn.commit)
except Unauthorized:
Expand Down

0 comments on commit 7281729

Please sign in to comment.