Skip to content
This repository has been archived by the owner on Dec 21, 2022. It is now read-only.

Commit

Permalink
Merge pull request #42 from brandicted/develop
Browse files Browse the repository at this point in the history
release 0.2.3
  • Loading branch information
chartpath committed Jun 4, 2015
2 parents fee403a + 55e7bb9 commit 86706cd
Show file tree
Hide file tree
Showing 8 changed files with 389 additions and 148 deletions.
13 changes: 10 additions & 3 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
Changelog
=========

* :release:`0.2.3 <2015-06-03>`
* :bug:`-` Fixed password minimum length support by adding before and after validation processors
* :bug:`-` Fixed bug with Elasticsearch indexing of nested relationships
* :bug:`-` Fixed race condition in Elasticsearch indexing

* :release:`0.2.2 <2015-05-27>`
* :bug:`-` fixes login issue
* :bug:`-` fixes posting to singular resources e.g. /api/users/<username>/profile
* :bug:`-` fixes multiple foreign keys to same model
* :bug:`-` Fixed login issue
* :bug:`-` Fixed posting to singular resources e.g. /api/users/<username>/profile
* :bug:`-` Fixed multiple foreign keys to same model
* :bug:`-` Fixed ES mapping error when values of field were all null
* :bug:`-` Fixed a bug whereby Relationship could not be created without a backref

* :release:`0.2.1 <2015-05-20>`
* :bug:`-` Fixed slow queries to backrefs
Expand Down
174 changes: 128 additions & 46 deletions nefertari_sqla/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from sqlalchemy.orm.collections import InstrumentedList
from sqlalchemy.exc import InvalidRequestError, IntegrityError
from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
from sqlalchemy.orm.query import Query
from sqlalchemy.orm.properties import RelationshipProperty
from pyramid_sqlalchemy import Session, BaseObject

Expand All @@ -15,7 +16,7 @@
from nefertari.utils import (
process_fields, process_limit, _split, dictset,
DataProxy)
from .signals import ESMetaclass
from .signals import ESMetaclass, on_bulk_delete
from .fields import ListField, DictField, DateTimeField, IntegerField
from . import types

Expand Down Expand Up @@ -67,23 +68,23 @@ def process_bools(_dict):
types.LimitedText: {'type': 'string'},
types.LimitedUnicode: {'type': 'string'},
types.LimitedUnicodeText: {'type': 'string'},
types.ProcessableChoice: {'type': 'string'},
types.Choice: {'type': 'string'},

types.ProcessableBoolean: {'type': 'boolean'},
types.ProcessableLargeBinary: {'type': 'object'},
types.ProcessableDict: {'type': 'object'},
types.Boolean: {'type': 'boolean'},
types.LargeBinary: {'type': 'object'},
types.Dict: {'type': 'object'},

types.LimitedNumeric: {'type': 'double'},
types.LimitedFloat: {'type': 'double'},

types.LimitedInteger: {'type': 'long'},
types.LimitedBigInteger: {'type': 'long'},
types.LimitedSmallInteger: {'type': 'long'},
types.ProcessableInterval: {'type': 'long'},
types.Interval: {'type': 'long'},

types.ProcessableDateTime: {'type': 'date', 'format': 'dateOptionalTime'},
types.ProcessableDate: {'type': 'date', 'format': 'dateOptionalTime'},
types.ProcessableTime: {'type': 'date', 'format': 'HH:mm:ss'},
types.DateTime: {'type': 'date', 'format': 'dateOptionalTime'},
types.Date: {'type': 'date', 'format': 'dateOptionalTime'},
types.Time: {'type': 'date', 'format': 'HH:mm:ss'},
}


Expand Down Expand Up @@ -119,18 +120,27 @@ def get_es_mapping(cls):
}
mapper = class_mapper(cls)
columns = {c.name: c for c in mapper.columns}
relationships = {r.key: r for r in mapper.relationships}
# Replace field 'id' with primary key field
columns['id'] = columns.get(cls.pk_field())

for name, column in columns.items():
column_type = column.type
if isinstance(column_type, types.ProcessableChoiceArray):
if isinstance(column_type, types.ChoiceArray):
column_type = column_type.impl.item_type
column_type = type(column_type)
if column_type not in TYPES_MAP:
continue
properties[name] = TYPES_MAP[column_type]

for name, column in relationships.items():
if name in cls._nested_relationships:
column_type = {'type': 'object'}
else:
rel_pk_field = column.mapper.class_.pk_field_type()
column_type = TYPES_MAP[rel_pk_field]
properties[name] = column_type

properties['_type'] = {'type': 'string'}
return mapping

Expand Down Expand Up @@ -241,10 +251,9 @@ def filter_objects(cls, objects, first=False, **params):
if first:
params['_limit'] = 1
params['__raise_on_empty'] = True
params['query_set'] = query_set.from_self()
query_set = cls.get_collection(**params)
params['query_set'] = query_set.from_self()
query_set = cls.get_collection(**params)

if first:
first_obj = query_set.first()
if not first_obj:
msg = "'{}({}={})' resource not found".format(
Expand Down Expand Up @@ -439,8 +448,7 @@ def get_or_create(cls, **params):
except NoResultFound:
defaults.update(params)
new_obj = cls(**defaults)
query_set.session.add(new_obj)
query_set.session.flush()
new_obj.save()
return new_obj, True
except MultipleResultsFound:
raise JHTTPBadRequest('Bad or Insufficient Params')
Expand All @@ -465,21 +473,58 @@ def _update(self, params, **kw):
return self

@classmethod
def _delete(cls, **params):
obj = cls.get(**params)
object_session(obj).delete(obj)
def _delete_many(cls, items, synchronize_session=False,
refresh_index=None):
""" Delete :items: queryset or objects list.
@classmethod
def _delete_many(cls, items):
When queryset passed, Query.delete() is used to delete it. Note that
queryset may not have limit(), offset(), order_by(), group_by(), or
distinct() called on it.
If some of the methods listed above were called, or :items: is not
a Query instance, one-by-one items update is performed.
`on_bulk_delete` function is called to delete objects from index
and to reindex relationships. This is done explicitly because it is
impossible to get access to deleted objects in signal handler for
'after_bulk_delete' ORM event.
"""
if isinstance(items, Query):
try:
delete_items = items.all()
items.delete(
synchronize_session=synchronize_session)
on_bulk_delete(cls, delete_items, refresh_index=refresh_index)
return
except Exception as ex:
log.error(str(ex))
session = Session()
for item in items:
item._refresh_index = refresh_index
session.delete(item)
session.flush()

@classmethod
def _update_many(cls, items, **params):
def _update_many(cls, items, synchronize_session='fetch',
refresh_index=None, **params):
""" Update :items: queryset or objects list.
When queryset passed, Query.update() is used to update it. Note that
queryset may not have limit(), offset(), order_by(), group_by(), or
distinct() called on it.
If some of the methods listed above were called, or :items: is not
a Query instance, one-by-one items update is performed.
"""
if isinstance(items, Query):
try:
items._refresh_index = refresh_index
return items.update(
params, synchronize_session=synchronize_session)
except Exception as ex:
log.error(str(ex))
for item in items:
item.update(params)
item.update(params, refresh_index=refresh_index)

def __repr__(self):
parts = []
Expand Down Expand Up @@ -532,7 +577,9 @@ def to_dict(self, **kwargs):
return _dict

def update_iterables(self, params, attr, unique=False,
value_type=None, save=True):
value_type=None, save=True,
refresh_index=None):
self._refresh_index = refresh_index
mapper = class_mapper(self.__class__)
columns = {c.name: c for c in mapper.columns}
is_dict = isinstance(columns.get(attr), DictField)
Expand Down Expand Up @@ -568,9 +615,7 @@ def update_dict(update_params):

setattr(self, attr, final_value)
if save:
session = object_session(self)
session.add(self)
session.flush()
self.save(refresh_index=refresh_index)

def update_list(update_params):
final_value = getattr(self, attr, []) or []
Expand All @@ -594,9 +639,7 @@ def update_list(update_params):

setattr(self, attr, final_value)
if save:
session = object_session(self)
session.add(self)
session.flush()
self.save(refresh_index=refresh_index)

if is_dict:
update_dict(params)
Expand All @@ -615,8 +658,11 @@ def get_reference_documents(self):
# If 'Many' side should be indexed, its value is already a list.
if value is None or isinstance(value, list):
continue
session = object_session(value)
session.refresh(value)
try:
session = object_session(value)
session.refresh(value)
except InvalidRequestError:
pass
yield (value.__class__, [value.to_dict()])

def _is_modified(self):
Expand Down Expand Up @@ -651,15 +697,17 @@ def _bump_version(self):
self.updated_at = datetime.utcnow()
self._version = (self._version or 0) + 1

def save(self, *arg, **kw):
def save(self, refresh_index=None):
session = object_session(self)
self._bump_version()
self._refresh_index = refresh_index
session = session or Session()
try:
self.clean()
self.apply_before_validation()
session.add(self)
session.flush()
session.expire(self)
self.apply_after_validation()
return self
except (IntegrityError,) as e:
if 'duplicate' not in e.message:
Expand All @@ -670,14 +718,16 @@ def save(self, *arg, **kw):
self.__class__.__name__),
extra={'data': e})

def update(self, params):
def update(self, params, refresh_index=None):
self._refresh_index = refresh_index
try:
self._update(params)
self._bump_version()
self.clean()
self.apply_before_validation()
session = object_session(self)
session.add(self)
session.flush()
self.apply_after_validation()
return self
except (IntegrityError,) as e:
if 'duplicate' not in e.message:
Expand All @@ -688,9 +738,36 @@ def update(self, params):
self.__class__.__name__),
extra={'data': e})

def clean(self, force_all=False):
""" Apply field processors to all changed fields And perform custom
field values cleaning before running DB validation.
def delete(self, refresh_index=None):
self._refresh_index = refresh_index
object_session(self).delete(self)

def apply_processors(self, column_names=None, before=False, after=False):
""" Apply processors to columns with :column_names: names.
Arguments:
:column_names: List of string names of changed columns.
:before: Boolean indicating whether to apply before_validation
processors.
:after: Boolean indicating whether to apply after_validation
processors.
"""
columns = {c.key: c for c in class_mapper(self.__class__).columns}
if column_names is None:
column_names = columns.keys()

for name in column_names:
column = columns.get(name)
if column is not None and hasattr(column, 'apply_processors'):
new_value = getattr(self, name)
processed_value = column.apply_processors(
instance=self, new_value=new_value,
before=before, after=after)
setattr(self, name, processed_value)

def apply_before_validation(self):
""" Determine changed columns and run `self.apply_processors` to
apply needed processors.
Note that at this stage, field values are in the exact same state
you posted/set them. E.g. if you set time_field='11/22/2000',
Expand All @@ -699,18 +776,23 @@ def clean(self, force_all=False):
columns = {c.key: c for c in class_mapper(self.__class__).columns}
state = attributes.instance_state(self)

if state.persistent and not force_all:
if state.persistent:
changed_columns = state.committed_state.keys()
else: # New object
changed_columns = columns.keys()

for name in changed_columns:
column = columns.get(name)
if column is not None and hasattr(column, 'apply_processors'):
new_value = getattr(self, name)
processed_value = column.apply_processors(
instance=self, new_value=new_value)
setattr(self, name, processed_value)
self._columns_to_process = changed_columns
self.apply_processors(changed_columns, before=True)

def apply_after_validation(self):
""" Run `self.apply_processors` with columns names determined by
`self.apply_before_validation`.
Note that at this stage, field values are in the exact same state
you posted/set them. E.g. if you set time_field='11/22/2000',
self.time_field will be equal to '11/22/2000' here.
"""
self.apply_processors(self._columns_to_process, after=True)


class ESBaseDocument(BaseDocument):
Expand Down

0 comments on commit 86706cd

Please sign in to comment.