Skip to content

Commit

Permalink
global: connection pool usage
Browse files Browse the repository at this point in the history
* Use the connection pool for establishing connections with the backend.
  (closes #65)

Signed-off-by: Nikos Filippakis <nikolaos.filippakis@cern.ch>
  • Loading branch information
nikofil authored and lnielsen committed May 19, 2017
1 parent e044428 commit 4abecbd
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 6 deletions.
6 changes: 3 additions & 3 deletions invenio_indexer/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from contextlib import contextmanager

import pytz
from celery.messaging import establish_connection
from celery import current_app as current_celery_app
from elasticsearch.helpers import bulk
from flask import current_app
from invenio_records.api import Record
Expand Down Expand Up @@ -181,7 +181,7 @@ def bulk_delete(self, record_id_iterator):

def process_bulk_queue(self):
"""Process bulk indexing queue."""
with establish_connection() as conn:
with current_celery_app.pool.acquire(block=True) as conn:
consumer = Consumer(
connection=conn,
queue=self.mq_queue.name,
Expand All @@ -205,7 +205,7 @@ def process_bulk_queue(self):
@contextmanager
def create_producer(self):
"""Context manager that yields an instance of ``Producer``."""
with establish_connection() as conn:
with current_celery_app.pool.acquire(block=True) as conn:
yield Producer(
conn,
exchange=self.mq_exchange,
Expand Down
4 changes: 2 additions & 2 deletions invenio_indexer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@


def process_models_committed_signal(sender, changes):
"""Handler for indexing record metadata.
"""Handle the indexing of record metadata.
:param sender: The signal sender.
:param changes: The changes sent: a list of tuple (record, action).
Expand All @@ -60,7 +60,7 @@ def process_models_committed_signal(sender, changes):


def default_record_to_index(record):
"""Default function to get index/doc_type given a record.
"""Get index/doc_type given a record.
It tries to extract from `record['$schema']` the index and doc_type.
If it fails, return the default values.
Expand Down
6 changes: 5 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@
]

extras_require = {
'docs:python_version=="2.7"': [
# FIXME remove when celery#3993 will be resolved!
'celery>=3.1.16,<4.0.0',
],
'docs': [
'Sphinx>=1.5.1'
'Sphinx>=1.5.1,<1.6',
],
'tests': tests_require,
}
Expand Down

0 comments on commit 4abecbd

Please sign in to comment.