Skip to content

Commit

Permalink
global: doc_type and signals
Browse files Browse the repository at this point in the history
Signed-off-by: Jiri Kuncar <jiri.kuncar@cern.ch>
  • Loading branch information
jirikuncar committed Jan 13, 2016
1 parent a04ef3f commit 6f2d7c2
Show file tree
Hide file tree
Showing 13 changed files with 264 additions and 57 deletions.
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ charset = utf-8
indent_size = 4
# isort plugin configuration
known_first_party = invenio_indexer
known_third_party = invenio_search,invenio_records
known_third_party = invenio_db,invenio_search,invenio_records
multi_line_output = 2
default_section = THIRDPARTY

Expand Down
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ cache:
- $HOME/.cache/pip

services:
- elasticsearch
- redis
- rabbitmq

Expand Down
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ recursive-include docs Makefile
recursive-include examples *.py
recursive-include invenio_indexer *.po *.pot *.mo
recursive-include invenio_indexer *.py
recursive-include tests *.json
recursive-include tests *.py
88 changes: 54 additions & 34 deletions invenio_indexer/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,44 @@

from __future__ import absolute_import, print_function

from contextlib import contextmanager

from celery.messaging import establish_connection
from elasticsearch.helpers import bulk
from flask import current_app
from invenio_records.api import Record
from invenio_search import current_search_client
from invenio_search import current_search, current_search_client
from invenio_search.utils import schema_to_index
from kombu import Producer
from kombu import Producer as KombuProducer
from kombu.compat import Consumer
from sqlalchemy.orm.exc import NoResultFound

from .signals import before_record_index


def _record_to_index(record):
"""Get index/doctype given a record."""
index, doctype = schema_to_index(record.get('$schema', ''))
if index and doctype:
return index, doctype
"""Get index/doc_type given a record."""
index_names = current_search.mappings.keys()
schema = record.get('$schema', '')
if isinstance(schema, dict):
schema = schema.get('$ref', '')

index, doc_type = schema_to_index(schema, index_names=index_names)

if index and doc_type:
return index, doc_type
else:
return current_app.config['INDEXER_DEFAULT_INDEX'], \
current_app.config['INDEXER_DEFAULT_DOCTYPE']
return (current_app.config['INDEXER_DEFAULT_INDEX'],
current_app.config['INDEXER_DEFAULT_DOC_TYPE'])


class Producer(KombuProducer):
"""Producer validating published messages."""

def publish(self, data, **kwargs):
"""Validate operation type."""
assert data.get('op') in {'index', 'create', 'delete', 'update'}
return super(Producer, self).publish(data, **kwargs)


class RecordIndexer(object):
Expand All @@ -56,7 +73,7 @@ class RecordIndexer(object):
works by queuing requests for indexing records and processing these
requests in bulk.
Elasticsearch index and doctype for a record is determined from the
Elasticsearch index and doc_type for a record is determined from the
``$schema`` attribute.
:param search_client: Elasticsearch client. Defaults to
Expand Down Expand Up @@ -91,8 +108,8 @@ def mq_exchange(self):
@property
def mq_routing_key(self):
"""Message queue routing key."""
return self._routing_key or \
current_app.config['INDEXER_MQ_ROUTING_KEY']
return (self._routing_key or
current_app.config['INDEXER_MQ_ROUTING_KEY'])

#
# High-level API
Expand All @@ -108,14 +125,14 @@ def index(self, record):
:param record: Record instance.
"""
index, doctype = self._record_to_index(record)
index, doc_type = self._record_to_index(record)

return self.client.index(
id=str(record.id),
version=record.revision_id,
version_type=self._version_type,
index=index,
doctype=doctype,
doc_type=doc_type,
body=self._prepare_record(record),
)

Expand All @@ -131,12 +148,12 @@ def delete(self, record):
:param record: Record instance.
"""
index, doctype = self._record_to_index(record)
index, doc_type = self._record_to_index(record)

return self.client.delete(
id=str(record.id),
index=index,
doctype=doctype,
doc_type=doc_type,
)

def delete_by_id(self, record_uuid):
Expand Down Expand Up @@ -170,37 +187,41 @@ def process_bulk_queue(self):
count = bulk(
self.client,
self._actionsiter(consumer.iterqueue()),
stats_only=True)
stats_only=True,
)

consumer.close()

return count

@contextmanager
def create_producer(self):
"""Context manager that yields an instance of ``Producer``."""
with establish_connection() as conn:
yield Producer(
conn,
exchange=self.mq_exchange,
routing_key=self.mq_routing_key,
auto_declare=True,
)

#
# Low-level implementation
#
def _bulk_op(self, record_id_iterator, op_type, index=None, doctype=None):
def _bulk_op(self, record_id_iterator, op_type, index=None, doc_type=None):
"""Index record in Elasticsearch asynchronously.
:param record_id_iterator: Iterator that yields record UUIDs.
:param op_type: Indexing operation (one of ``index``, ``create``,
``delete`` or ``update``).
"""
assert op_type in ('index', 'create', 'delete', 'update')

with establish_connection() as conn:
producer = Producer(
conn,
exchange=self.mq_exchange,
routing_key=self.mq_routing_key,
auto_declare=True,
)
with self.create_producer() as producer:
for rec in record_id_iterator:
producer.publish(dict(
id=str(rec),
op=op_type,
index=index,
doctype=doctype
doc_type=doc_type
))

def _actionsiter(self, message_iterator):
Expand All @@ -225,16 +246,15 @@ def _delete_action(self, payload):
:param payload: Decoded message body.
:returns: Dictionary defining an Elasticsearch bulk 'delete' action.
"""
if payload['index'] and payload['doctype']:
index, doctype = payload['index'], payload['doctype']
else:
index, doc_type = payload.get('index'), payload.get('doc_type')
if not (index and doc_type):
record = Record.get_record(payload['id'])
index, doctype = self._record_to_index(record)
index, doc_type = self._record_to_index(record)

return {
'_op_type': 'delete',
'_index': index,
'_type': doctype,
'_type': doc_type,
'_id': payload['id'],
}

Expand All @@ -245,12 +265,12 @@ def _index_action(self, payload):
:returns: Dictionary defining an Elasticsearch bulk 'index' action.
"""
record = Record.get_record(payload['id'])
index, doctype = self._record_to_index(record)
index, doc_type = self._record_to_index(record)

return {
'_op_type': 'index',
'_index': index,
'_type': doctype,
'_type': doc_type,
'_id': str(record.id),
'_version': record.revision_id,
'_version_type': self._version_type,
Expand Down
4 changes: 2 additions & 2 deletions invenio_indexer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
INDEXER_DEFAULT_INDEX = "records-record-v1.0.0"
"""Default index to use if no schema is defined."""

INDEXER_DEFAULT_DOCTYPE = "record-v1.0.0"
"""Default doctype to use if no schema is defined."""
INDEXER_DEFAULT_DOC_TYPE = "record-v1.0.0"
"""Default doc_type to use if no schema is defined."""

INDEXER_MQ_EXCHANGE = Exchange('indexer', type='direct')
"""Default exchange for message queue."""
Expand Down
53 changes: 53 additions & 0 deletions invenio_indexer/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2016 CERN.
#
# Invenio is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.

"""Utility functions for data processing."""

from flask import current_app
from invenio_records.models import RecordMetadata

from .api import RecordIndexer


def process_models_committed_signal(sender, changes):
"""Handler for indexing record metadata."""
record_indexer = RecordIndexer()
op_map = {
'insert': 'index',
'update': 'index',
'delete': 'delete',
}
with record_indexer.create_producer() as producer:
for obj, change in changes:
if isinstance(obj, RecordMetadata):
if change in op_map:
index, doc_type = record_indexer._record_to_index(
obj.json or {}
)
producer.publish(dict(
op=op_map[change],
id=str(obj.id),
index=index,
doc_type=doc_type,
))
10 changes: 7 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,20 @@ def app(request):
CELERY_CACHE_BACKEND="memory",
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
CELERY_RESULT_BACKEND="cache",
INDEXER_DEFAULT_INDEX='records-default-v1.0.0',
INDEXER_DEFAULT_DOC_TYPE='default-v1.0.0',
SQLALCHEMY_DATABASE_URI=os.environ.get(
'SQLALCHEMY_DATABASE_URI', 'sqlite:///test.db'),
SQLALCHEMY_TRACK_MODIFICATIONS=False,
SQLALCHEMY_TRACK_MODIFICATIONS=True,
TESTING=True,
SEARCH_AUTOINDEX=[],
)
FlaskCLI(app)
FlaskCeleryExt(app)
InvenioDB(app)
InvenioRecords(app)
InvenioSearch(app)
search = InvenioSearch(app)
search.register_mappings('records', 'data')
InvenioIndexer(app)

with app.app_context():
Expand All @@ -89,7 +93,7 @@ def script_info(app):

@pytest.fixture()
def queue(app):
"""Get ScriptInfo object for testing CLI."""
"""Get queue object for testing bulk operations."""
queue = app.config['INDEXER_MQ_QUEUE']

with app.app_context():
Expand Down
Empty file added tests/data/__init__.py
Empty file.
1 change: 1 addition & 0 deletions tests/data/records/authorities/authority-v1.0.0.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"mappings": {}}
1 change: 1 addition & 0 deletions tests/data/records/bibliographic/bibliographic-v1.0.0.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"mappings": {}}
1 change: 1 addition & 0 deletions tests/data/records/default-v1.0.0.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"mappings": {}}
37 changes: 20 additions & 17 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,25 @@ def test_indexer_bulk_index(app, queue):

def test_delete_action(app):
"""Test delete action."""
testid = str(uuid.uuid4())
action = RecordIndexer()._delete_action(
dict(id=testid, op='delete', index='idx', doctype='doc'))
assert action['_op_type'] == 'delete'
assert action['_index'] == 'idx'
assert action['_type'] == 'doc'
assert action['_id'] == testid

with patch('invenio_indexer.api.Record.get_record') as r:
r.return_value = {'$schema': '/authors/author-1.0.0.json'}
with app.app_context():
testid = str(uuid.uuid4())
action = RecordIndexer()._delete_action(
dict(id='myid', op='delete', index=None, doctype=None))
dict(id=testid, op='delete', index='idx', doc_type='doc'))
assert action['_op_type'] == 'delete'
assert action['_index'] == 'authors-author-1.0.0'
assert action['_type'] == 'author-1.0.0'
assert action['_id'] == 'myid'
assert action['_index'] == 'idx'
assert action['_type'] == 'doc'
assert action['_id'] == testid

with patch('invenio_indexer.api.Record.get_record') as r:
r.return_value = {'$schema': {
'$ref': '/records/authorities/authority-v1.0.0.json'
}}
action = RecordIndexer()._delete_action(
dict(id='myid', op='delete', index=None, doc_type=None))
assert action['_op_type'] == 'delete'
assert action['_index'] == 'records-authorities-authority-v1.0.0'
assert action['_type'] == 'authority-v1.0.0'
assert action['_id'] == 'myid'


def test_index_action(app):
Expand All @@ -102,7 +105,7 @@ def receiver(sender, json=None, record=None):
))
assert action['_op_type'] == 'index'
assert action['_index'] == app.config['INDEXER_DEFAULT_INDEX']
assert action['_type'] == app.config['INDEXER_DEFAULT_DOCTYPE']
assert action['_type'] == app.config['INDEXER_DEFAULT_DOC_TYPE']
assert action['_id'] == str(record.id)
assert action['_version'] == record.revision_id
assert action['_version_type'] == 'external_gte'
Expand Down Expand Up @@ -150,7 +153,7 @@ def test_index(app):
version=0,
version_type='force',
index=app.config['INDEXER_DEFAULT_INDEX'],
doctype=app.config['INDEXER_DEFAULT_DOCTYPE'],
doc_type=app.config['INDEXER_DEFAULT_DOC_TYPE'],
body={'title': 'Test'},
)

Expand All @@ -172,7 +175,7 @@ def test_delete(app):
client_mock.delete.assert_called_with(
id=str(recid),
index=app.config['INDEXER_DEFAULT_INDEX'],
doctype=app.config['INDEXER_DEFAULT_DOCTYPE'],
doc_type=app.config['INDEXER_DEFAULT_DOC_TYPE'],
)

with patch('invenio_indexer.api.RecordIndexer.delete') as fun:
Expand Down

0 comments on commit 6f2d7c2

Please sign in to comment.