Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
Merge pull request #50 from brandicted/develop
Browse files Browse the repository at this point in the history
0.3.2
  • Loading branch information
chartpath committed Jun 4, 2015
2 parents 3cbea71 + 27c0bfe commit d9a612c
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 57 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.3.1
0.3.2
9 changes: 7 additions & 2 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
Changelog
=========

* :release:`0.3.2 <2015-06-03>`
* :bug:`-` Fixed bug with Elasticsearch indexing of nested relationships
* :bug:`-` Fixed race condition in Elasticsearch indexing

* :release:`0.3.1 <2015-05-27>`
* :bug:`-` fixed PUT to replace all fields and PATCH to update some
* :bug:`-` fixed posting to singular resources e.d. /api/users/<username>/profile
* :bug:`-` Fixed PUT to replace all fields and PATCH to update some
* :bug:`-` Fixed posting to singular resources e.d. /api/users/<username>/profile
* :bug:`-` Fixed ES mapping error when values of field were all null

* :release:`0.3.0 <2015-05-18>`
* :support:`-` Step-by-step 'Getting started' guide
Expand Down
8 changes: 6 additions & 2 deletions docs/source/making_requests.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ Making requests
Query syntax
------------

Query parameters can be used on either GET, PATCH, PUT or DELETE requests.

=============================== ===========
url parameter description
=============================== ===========
Expand All @@ -23,13 +25,15 @@ Additional parameters are available when using an ElasticSearch-enabled collecti
=============================== ===========
url parameter description
=============================== ===========
``<field_name>=<keywords>`` to filter a collection using full-text search on <field_name>, ElasticSearch operators [#]_ can be used, e.g. ``?title=foo AND bar``
``<field_name>=<keywords>`` to filter a collection using full-text search on <field_name>, ES operators [#]_ can be used, e.g. ``?title=foo AND bar``
``q=<keywords>`` to filter a collection using full-text search on all fields
``_search_fields=<field_list>`` use with ``?q=<keywords>`` to restrict search to specific fields
``_refresh_index=true`` to refresh the ES index after performing the operation [#]_ set ``elasticsearch.enable_refresh_query = true`` in your .ini file to enable that feature
=============================== ===========

.. [#] To update listfields and dictfields, you can use the following syntax: ``_m=PATCH&<listfield>=<comma_separated_list>&<dictfield>.<key>=<value>``
.. [#] The full syntax of ElasticSearch querying is beyond the scope of this documentation. You can read more on the ElasticSearch Query String Query `documentation <http://www.elastic.co/guide/en/elasticsearch/reference/1.x/query-dsl-query-string-query.html>`_ to do things like fuzzy search: ``?name=fuzzy~`` or date range search: ``?date=[2015-01-01 TO *]``
.. [#] The full syntax of ElasticSearch querying is beyond the scope of this documentation. You can read more on the `ElasticSearch Query String Query documentation <http://www.elastic.co/guide/en/elasticsearch/reference/1.x/query-dsl-query-string-query.html>`_ to do things like fuzzy search: ``?name=fuzzy~`` or date range search: ``?date=[2015-01-01 TO *]``
.. [#] This parameter only works with POST, PATCH, PUT and DELETE methods. Read more on `ElasticSearch Bulk API documentation <https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-refresh>`_
update_many()
-------------
Expand Down
12 changes: 8 additions & 4 deletions nefertari/authentication/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,17 @@ class AuthUser(AuthModelDefaultMixin, engine.BaseDocument):
__tablename__ = 'nefertari_authuser'

id = engine.IdField(primary_key=True)

username = engine.StringField(
min_length=1, max_length=50, unique=True,
required=True, processors=[lower_strip])
min_length=1, max_length=50, unique=True, required=True,
before_validation=[lower_strip])
email = engine.StringField(
unique=True, required=True, processors=[lower_strip])
unique=True, required=True,
before_validation=[lower_strip])
password = engine.StringField(
min_length=3, required=True, processors=[encrypt_password])
min_length=3, required=True,
after_validation=[encrypt_password])

groups = engine.ListField(
item_type=engine.StringField,
choices=['admin', 'user'], default=['user'])
Expand Down
102 changes: 80 additions & 22 deletions nefertari/elasticsearch.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import absolute_import
import json
import logging
from functools import partial

import elasticsearch

Expand Down Expand Up @@ -28,15 +30,31 @@ class IndexNotFoundException(Exception):


class ESHttpConnection(elasticsearch.Urllib3HttpConnection):
def _catch_index_error(self, response):
""" Catch and raise index errors which are not critical and thus
not raised by elasticsearch-py.
"""
code, headers, raw_data = response
if not raw_data:
return
data = json.loads(raw_data)
if not data or not data.get('errors'):
return
try:
error_dict = data['items'][0]['index']
message = error_dict['error']
except (KeyError, IndexError):
return
raise exception_response(400, detail=message)

def perform_request(self, *args, **kw):
try:
if log.level == logging.DEBUG:
msg = str(args)
if len(msg) > 512:
msg = msg[:300] + '...TRUNCATED...' + msg[-212:]
log.debug(msg)

return super(ESHttpConnection, self).perform_request(*args, **kw)
resp = super(ESHttpConnection, self).perform_request(*args, **kw)
except Exception as e:
log.error(e.error)
status_code = e.status_code
Expand All @@ -48,6 +66,9 @@ def perform_request(self, *args, **kw):
status_code,
detail='elasticsearch error.',
extra=dict(data=e))
else:
self._catch_index_error(resp)
return resp


def includeme(config):
Expand All @@ -56,8 +77,13 @@ def includeme(config):
ES.create_index()


def _bulk_body(body):
return ES.api.bulk(body=body)
def _bulk_body(body, refresh_index=None):
kwargs = {'body': body}
refresh_provided = refresh_index is not None
refresh_enabled = ES.settings.asbool('enable_refresh_query')
if refresh_provided and refresh_enabled:
kwargs['refresh'] = refresh_index
return ES.api.bulk(**kwargs)


def process_fields_param(fields):
Expand Down Expand Up @@ -138,6 +164,7 @@ def src2type(cls, source):
@classmethod
def setup(cls, settings):
ES.settings = settings.mget('elasticsearch')
ES.settings.setdefault('chunk_size', 500)

try:
_hosts = ES.settings.hosts
Expand All @@ -162,6 +189,13 @@ def setup(cls, settings):
raise Exception(
'Bad or missing settings for elasticsearch. %s' % e)

def __init__(self, source='', index_name=None, chunk_size=None):
self.doc_type = self.src2type(source)
self.index_name = index_name or ES.settings.index_name
if chunk_size is None:
chunk_size = ES.settings.asint('chunk_size')
self.chunk_size = chunk_size

@classmethod
def create_index(cls, index_name=None):
index_name = index_name or ES.settings.index_name
Expand All @@ -171,7 +205,22 @@ def create_index(cls, index_name=None):
ES.api.indices.create(index_name)

@classmethod
def setup_mappings(cls):
def setup_mappings(cls, force=False):
""" Setup ES mappings for all existing models.
This method is meant to be run once at application lauch.
ES._mappings_setup flag is set to not run make mapping creation
calls on subsequent runs.
Use `force=True` to make subsequent calls perform mapping
creation calls to ES.
"""
if getattr(ES, '_mappings_setup', False) and not force:
log.debug('ES mappings have been already set up for currently '
'running application. Call `setup_mappings` with '
'`force=True` to perform mappings set up again.')
return
log.info('Setting up ES mappings for all existing models')
models = engine.get_document_classes()
try:
for model_name, model_cls in models.items():
Expand All @@ -180,11 +229,13 @@ def setup_mappings(cls):
es.put_mapping(body=model_cls.get_es_mapping())
except JHTTPBadRequest as ex:
raise Exception(ex.json['extra']['data'])
ES._mappings_setup = True

def __init__(self, source='', index_name=None, chunk_size=100):
self.doc_type = self.src2type(source)
self.index_name = index_name or ES.settings.index_name
self.chunk_size = chunk_size
def delete_mapping(self):
ES.api.indices.delete_mapping(
index=self.index_name,
doc_type=self.doc_type,
)

def put_mapping(self, body, **kwargs):
ES.api.indices.put_mapping(
Expand All @@ -193,10 +244,12 @@ def put_mapping(self, body, **kwargs):
index=self.index_name,
**kwargs)

def process_chunks(self, documents, operation, chunk_size):
def process_chunks(self, documents, operation, chunk_size=None):
""" Apply `operation` to chunks of `documents` of size `chunk_size`.
"""
if chunk_size is None:
chunk_size = self.chunk_size
start = end = 0
count = len(documents)

Expand All @@ -206,7 +259,7 @@ def process_chunks(self, documents, operation, chunk_size):
end += chunk_size

bulk = documents[start:end]
operation(bulk)
operation(body=bulk)

start += chunk_size
count -= chunk_size
Expand Down Expand Up @@ -240,7 +293,8 @@ def prep_bulk_documents(self, action, documents):

return _docs

def _bulk(self, action, documents, chunk_size=None):
def _bulk(self, action, documents, chunk_size=None,
refresh_index=None):
if chunk_size is None:
chunk_size = self.chunk_size

Expand All @@ -263,18 +317,21 @@ def _bulk(self, action, documents, chunk_size=None):
if body:
# Use chunk_size*2, because `body` is a sequence of
# meta, document, meta, ...
operation = partial(_bulk_body, refresh_index=refresh_index)
self.process_chunks(
documents=body,
operation=_bulk_body,
operation=operation,
chunk_size=chunk_size*2)
else:
log.warning('Empty body')

def index(self, documents, chunk_size=None):
def index(self, documents, chunk_size=None,
refresh_index=None):
""" Reindex all `document`s. """
self._bulk('index', documents, chunk_size)
self._bulk('index', documents, chunk_size, refresh_index)

def index_missing_documents(self, documents, chunk_size=None):
def index_missing_documents(self, documents, chunk_size=None,
refresh_index=None):
""" Index documents that are missing from ES index.
Determines which documents are missing using ES `mget` call which
Expand Down Expand Up @@ -306,14 +363,14 @@ def index_missing_documents(self, documents, chunk_size=None):
'index `{}`'.format(self.doc_type, self.index_name))
return

self._bulk('index', documents, chunk_size)
self._bulk('index', documents, chunk_size, refresh_index)

def delete(self, ids):
def delete(self, ids, refresh_index=None):
if not isinstance(ids, list):
ids = [ids]

documents = [{'id': _id, '_type': self.doc_type} for _id in ids]
self._bulk('delete', documents)
self._bulk('delete', documents, refresh_index=refresh_index)

def get_by_ids(self, ids, **params):
if not ids:
Expand Down Expand Up @@ -520,7 +577,8 @@ def get(self, **kw):
return self.get_resource(**kw)

@classmethod
def index_refs(cls, db_obj):
def index_refs(cls, db_obj, refresh_index=None):
for model_cls, documents in db_obj.get_reference_documents():
if getattr(model_cls, '_index_enabled', False):
cls(model_cls.__name__).index(documents)
if getattr(model_cls, '_index_enabled', False) and documents:
cls(model_cls.__name__).index(
documents, refresh_index=refresh_index)
43 changes: 32 additions & 11 deletions nefertari/scripts/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

from pyramid.paster import bootstrap
from pyramid.config import Configurator
from zope.dottedname.resolve import resolve

from nefertari.utils import dictset, split_strip, to_dicts
from nefertari.elasticsearch import ES
from nefertari import engine


Expand Down Expand Up @@ -40,26 +40,39 @@ def __init__(self, argv, log):
default=False)
parser.add_argument(
'--models',
help='Comma-separeted list of model names to index',
help=('Comma-separated list of model names to index '
'(required)'),
required=True)
parser.add_argument(
'--params', help='Url-encoded params for each model')
parser.add_argument('--index', help='Index name', default=None)
parser.add_argument('--chunk', help='Index chunk size', type=int)
parser.add_argument(
'--chunk',
help=('Index chunk size. If chunk size not provided '
'`elasticsearch.chunk_size` setting is used'),
type=int)
parser.add_argument(
'--force',
help=('Force reindexing of all documents. By default, only '
'documents that are missing from index are indexed.'),
help=('Recreate ES mappings and reindex all documents of provided '
'models. By default, only documents that are missing from '
'index are indexed.'),
action='store_true',
default=False)

self.options = parser.parse_args()
if not self.options.config:
return parser.print_help()

env = self.bootstrap[0](self.options.config)
registry = env['registry']
# Prevent ES.setup_mappings running on bootstrap;
# Restore ES._mappings_setup after bootstrap is over
mappings_setup = getattr(ES, '_mappings_setup', False)
try:
ES._mappings_setup = True
env = self.bootstrap[0](self.options.config)
finally:
ES._mappings_setup = mappings_setup

registry = env['registry']
# Include 'nefertari.engine' to setup specific engine
config = Configurator(settings=registry.settings)
config.include('nefertari.engine')
Expand All @@ -72,11 +85,11 @@ def __init__(self, argv, log):
self.settings = dictset(registry.settings)

def run(self):
from nefertari.elasticsearch import ES
ES.setup(self.settings)
model_names = split_strip(self.options.models)

for model_name in model_names:
self.log.info('Processing model `{}`'.format(model_name))
model = engine.get_document_cls(model_name)

params = self.options.params or ''
Expand All @@ -86,13 +99,21 @@ def run(self):
params.setdefault('_limit', params.get('_limit', 10000))
chunk_size = self.options.chunk or params['_limit']

es = ES(source=model_name, index_name=self.options.index)
es = ES(source=model_name, index_name=self.options.index,
chunk_size=chunk_size)
query_set = model.get_collection(**params)
documents = to_dicts(query_set)

if self.options.force:
es.index(documents, chunk_size=chunk_size)
self.log.info('Recreating `{}` ES mapping'.format(model_name))
es.delete_mapping()
es.put_mapping(body=model.get_es_mapping())
self.log.info('Indexing all `{}` documents'.format(
model_name))
es.index(documents)
else:
es.index_missing_documents(documents, chunk_size=chunk_size)
self.log.info('Indexing missing `{}` documents'.format(
model_name))
es.index_missing_documents(documents)

return 0

0 comments on commit d9a612c

Please sign in to comment.