Skip to content

Commit

Permalink
Merge branch 'master' into omit-include-fields
Browse files Browse the repository at this point in the history
  • Loading branch information
vangheem committed Sep 25, 2017
2 parents 2425f8d + 042ff44 commit 314d0e0
Show file tree
Hide file tree
Showing 14 changed files with 342 additions and 49 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -5,6 +5,13 @@
that are returned in the payload
[vangheem]

- Limit max object cache size to 5mb
[vangheem]

- Optimize indexing for patch operations to only index changed data instead
of the full object
[vangheem]


1.3.16 (2017-09-21)
-------------------
Expand Down
2 changes: 1 addition & 1 deletion guillotina/api/content.py
Expand Up @@ -258,7 +258,7 @@ async def __call__(self):
str(e),
status=400)

await notify(ObjectModifiedEvent(self.context, data))
await notify(ObjectModifiedEvent(self.context, payload=data))

return Response(response={}, status=204)

Expand Down
63 changes: 46 additions & 17 deletions guillotina/catalog/catalog.py
Expand Up @@ -27,38 +27,44 @@
@implementer(ICatalogUtility)
class DefaultSearchUtility(object):

async def search(self, query):
async def search(self, container, query):
pass

async def get_by_uuid(self, uuid):
async def get_by_uuid(self, container, uuid):
pass

async def get_object_by_uuid(self, uuid):
async def get_object_by_uuid(self, container, uuid):
pass

async def get_by_type(self, doc_type, query={}):
async def get_by_type(self, container, doc_type, query={}):
pass

async def get_by_path(self, container, path, depth=-1, query={}, doc_type=None):
pass

async def get_folder_contents(self, obj):
async def get_folder_contents(self, container, parent_uid):
pass

async def index(self, datas):
async def index(self, container, datas):
"""
{uid: <dict>}
"""
pass

async def remove(self, uids):
async def update(self, container, datas):
"""
{uid: <dict>}
"""
pass

async def remove(self, container, uids):
"""
list of UIDs to remove from index
"""
pass

async def reindex_all_content(self, container):
""" For all Dexterity Content add a queue task that reindex the object
async def reindex_all_content(self, container, security=False):
""" For all content add a queue task that reindex the object
"""
pass

Expand All @@ -72,11 +78,11 @@ async def remove_catalog(self, container):
"""
pass

async def get_data(self, content):
async def get_data(self, content, indexes=None):
data = {}
adapter = queryAdapter(content, ICatalogDataAdapter)
if adapter:
data.update(await adapter())
data.update(await adapter(indexes))
return data


Expand Down Expand Up @@ -119,24 +125,47 @@ def get_data(self, ob, iface, field_name):
value = getattr(ob, field_name, None)
return json_compatible(value)

async def __call__(self):
async def load_behavior(self, behavior):
if IAsyncBehavior.implementedBy(behavior.__class__):
# providedBy not working here?
await behavior.load(create=False)

async def __call__(self, indexes=None):
# For each type
values = {}

for schema in iter_schemata(self.content):
behavior = schema(self.content)
if IAsyncBehavior.implementedBy(behavior.__class__):
# providedBy not working here?
await behavior.load(create=False)
loaded = False
for index_name, index_data in merged_tagged_value_dict(schema, index.key).items():
try:
if 'accessor' in index_data:
# accessors we always reindex since we can't know if updated
# from the indexes param--they are "fake" like indexes, not fields
if ('accessor' in index_data and (
indexes is None or index_data.get('field') in indexes)):
if not loaded:
await self.load_behavior(behavior)
loaded = True
values[index_name] = index_data['accessor'](behavior)
else:
elif (indexes is None or index_name in indexes or
isinstance(getattr(type(self.content), index_name, None), property)):
if not loaded:
await self.load_behavior(behavior)
loaded = True
# in this case, properties are also dynamic so we have to make sure
# to allow for them to be reindexed every time.
values[index_name] = self.get_data(behavior, schema, index_name)
except NoIndexField:
pass
for metadata_name in merged_tagged_value_list(schema, metadata.key):
if (indexes is not None and index_name not in indexes and
not isinstance(getattr(type(self.content), index_name, None), property)):
# in this case, properties are also dynamic so we have to make sure
# to allow for them to be reindexed every time.
continue # skip
if not loaded:
await self.load_behavior(behavior)
loaded = True
values[metadata_name] = self.get_data(behavior, schema, metadata_name)

return values
21 changes: 20 additions & 1 deletion guillotina/catalog/index.py
Expand Up @@ -19,6 +19,7 @@ class IndexFuture(object):
def __init__(self, container, request):
self.remove = []
self.index = {}
self.update = {}
self.container = container
self.request = request

Expand All @@ -30,8 +31,11 @@ async def __call__(self):
await search.remove(self.container, self.remove)
if len(self.index) > 0:
await search.index(self.container, self.index)
if len(self.update) > 0:
await search.update(self.container, self.update)

self.index = {}
self.update = {}
self.remove = []


Expand Down Expand Up @@ -80,9 +84,12 @@ def remove_object(obj, event):
fut = get_future()
if fut is None:
return

fut.remove.append((uid, type_name, content_path))
if uid in fut.index:
del fut.index[uid]
if uid in fut.update:
del fut.update[uid]


@configure.subscriber(for_=(IResource, IObjectAddedEvent))
Expand All @@ -100,7 +107,19 @@ async def add_object(obj, event):
return
search = queryUtility(ICatalogUtility)
if search:
fut.index[uid] = await search.get_data(obj)
if IObjectModifiedEvent.providedBy(event):
indexes = []
if event.payload and len(event.payload) > 0:
# get a list of potential indexes
for field_name in event.payload.keys():
if '.' in field_name:
for behavior_field_name in event.payload[field_name].keys():
indexes.append(behavior_field_name)
else:
indexes.append(field_name)
fut.update[uid] = await search.get_data(obj, indexes)
else:
fut.index[uid] = await search.get_data(obj)


@configure.subscriber(for_=(IContainer, IObjectAddedEvent))
Expand Down
2 changes: 2 additions & 0 deletions guillotina/db/cache/base.py
Expand Up @@ -6,6 +6,8 @@

class BaseCache:

max_cache_record_size = 1024 * 1024 * 5 # even 5mb is quite large...

def __init__(self, storage, transaction):
self._storage = storage
self._transaction = transaction
Expand Down
9 changes: 6 additions & 3 deletions guillotina/db/transaction.py
Expand Up @@ -232,7 +232,8 @@ async def get(self, oid, ignore_registered=False):
# ttl of zero means we want to provide a hard cache here
HARD_CACHE[oid] = result
else:
await self._cache.set(result, oid=oid)
if self._cache.max_cache_record_size > len(result['state']):
await self._cache.set(result, oid=oid)

return obj

Expand Down Expand Up @@ -337,7 +338,8 @@ async def get_child(self, container, key):
result = await self._manager._storage.get_child(self, container._p_oid, key)
if result is None:
return None
await self._cache.set(result, container=container, id=key)
if self._cache.max_cache_record_size > len(result['state']):
await self._cache.set(result, container=container, id=key)

obj = reader(result)
obj.__parent__ = container
Expand Down Expand Up @@ -366,7 +368,8 @@ async def get_annotation(self, base_obj, id):
result = await self._manager._storage.get_annotation(self, base_obj._p_oid, id)
if result is None:
raise KeyError(id)
await self._cache.set(result, container=base_obj, id=id, variant='annotation')
if self._cache.max_cache_record_size > len(result['state']):
await self._cache.set(result, container=base_obj, id=id, variant='annotation')
obj = reader(result)
obj.__of__ = base_obj._p_oid
obj._p_jar = self
Expand Down
16 changes: 9 additions & 7 deletions guillotina/factory/app.py
Expand Up @@ -23,14 +23,14 @@
from guillotina.interfaces.content import IContentNegotiation
from guillotina.request import Request
from guillotina.traversal import TraversalRouter
from guillotina.utils import lazy_apply
from guillotina.utils import resolve_dotted_name
from guillotina.utils import resolve_path
from guillotina.writable import check_writable_request

import aiohttp
import asyncio
import collections
import inspect
import json
import logging
import logging.config
Expand All @@ -57,10 +57,7 @@ def update_app_settings(settings):
def load_application(module, root, settings):
# includeme function
if hasattr(module, 'includeme'):
args = [root]
if len(inspect.signature(module.includeme).parameters) == 2:
args.append(settings)
module.includeme(*args)
lazy_apply(module.includeme, root, settings)
# app_settings
if hasattr(module, 'app_settings') and app_settings != module.app_settings:
update_app_settings(module.app_settings)
Expand Down Expand Up @@ -266,7 +263,11 @@ def make_app(config_file=None, settings=None, loop=None, server_app=None):

for utility in getAllUtilitiesRegisteredFor(IAsyncUtility):
# In case there is Utilties that are registered
ident = asyncio.ensure_future(utility.initialize(app=server_app), loop=loop)
if hasattr(utility, 'initialize'):
ident = asyncio.ensure_future(
lazy_apply(utility.initialize, app=server_app), loop=loop)
else:
logger.warn(f'No initialize method found on {utility} object')
root.add_async_task(utility, ident, {})

server_app.on_cleanup.append(close_utilities)
Expand All @@ -282,7 +283,8 @@ def make_app(config_file=None, settings=None, loop=None, server_app=None):

async def close_utilities(app):
for utility in getAllUtilitiesRegisteredFor(IAsyncUtility):
asyncio.ensure_future(utility.finalize(app=app), loop=app.loop)
if hasattr(utility, 'finalize'):
asyncio.ensure_future(lazy_apply(utility.finalize, app=app), loop=app.loop)
for db in app.router._root:
if IDatabase.providedBy(db[1]):
await db[1]._db.finalize()
13 changes: 10 additions & 3 deletions guillotina/factory/content.py
Expand Up @@ -9,6 +9,7 @@
from guillotina.interfaces import IDatabase
from guillotina.utils import apply_coroutine
from guillotina.utils import import_class
from guillotina.utils import lazy_apply
from zope.interface import implementer

import asyncio
Expand All @@ -32,13 +33,18 @@ def add_async_utility(self, config, loop=None):
interface = import_class(config['provides'])
factory = import_class(config['factory'])
try:
utility_object = factory(config['settings'], loop=loop)
utility_object = lazy_apply(factory, config.get('settings', {}), loop=loop)
except Exception:
logger.error('Error initializing utility {}'.format(repr(factory)),
exc_info=True)
raise
provideUtility(utility_object, interface)
task = asyncio.ensure_future(utility_object.initialize(app=self.app), loop=loop)
if hasattr(utility_object, 'initialize'):
task = asyncio.ensure_future(
lazy_apply(utility_object.initialize, app=self.app), loop=loop)
else:
task = None
logger.warn(f'No initialize method found on {utility_object} object')
self.add_async_task(config['provides'], task, config)

def add_async_task(self, ident, task, config):
Expand All @@ -51,7 +57,8 @@ def add_async_task(self, ident, task, config):

def cancel_async_utility(self, ident):
if ident in self._async_utilities:
self._async_utilities[ident]['task'].cancel()
if self._async_utilities[ident]['task'] is not None:
self._async_utilities[ident]['task'].cancel()
else:
raise KeyError("Ident does not exist as utility")

Expand Down
5 changes: 5 additions & 0 deletions guillotina/tests/fixtures.py
Expand Up @@ -19,6 +19,11 @@
import sys


TESTING_SETTINGS["utilities"].append({
"provides": "guillotina.interfaces.ICatalogUtility",
"factory": "guillotina.catalog.catalog.DefaultSearchUtility"
})

IS_TRAVIS = 'TRAVIS' in os.environ
USE_COCKROACH = 'USE_COCKROACH' in os.environ

Expand Down
27 changes: 27 additions & 0 deletions guillotina/tests/mocks.py
Expand Up @@ -4,6 +4,7 @@
from guillotina.db.interfaces import ITransaction
from guillotina.db.interfaces import ITransactionStrategy
from zope.interface import implementer
from guillotina.db.interfaces import IWriter


class MockDBTransaction:
Expand Down Expand Up @@ -53,6 +54,8 @@ def __init__(self, transaction_strategy='resolve', cache_strategy='dummy'):
self._transaction_strategy = transaction_strategy
self._cache_strategy = cache_strategy
self._transaction = None
self._objects = {}
self._parent_objs = {}

async def get_annotation(self, trns, oid, id):
return None
Expand All @@ -70,6 +73,30 @@ async def abort(self, txn):
async def commit(self, txn):
pass

async def load(self, txn, oid):
return self._objects[oid]

async def get_child(self, txn, container_p_oid, key):
if container_p_oid not in self._objects:
return
children = self._objects[container_p_oid]['children']
if key in children:
oid = children[key]
if oid in self._objects:
return self._objects[oid]

def store(self, ob):
writer = IWriter(ob)
self._objects[ob._p_oid] = {
'state': writer.serialize(),
'zoid': ob._p_oid,
'tid': 1,
'id': writer.id,
'children': self._objects.get(ob._p_oid, {}).get('children', {})
}
if ob.__parent__ and ob.__parent__._p_oid in self._objects:
self._objects[ob.__parent__._p_oid]['children'][ob.id] = ob._p_oid


class MockTransactionManager:
_storage = None
Expand Down

0 comments on commit 314d0e0

Please sign in to comment.