Skip to content
Browse files

Move to async fulltext, add celery job as backup

  • Loading branch information...
1 parent 8be3ddc commit a46f9223f5d7e60e4190cf41948d8c8120a7108e @mitechie committed Aug 22, 2012
Showing with 102 additions and 36 deletions.
  1. +1 −0 Makefile
  2. +52 −16 bookie/bcelery/tasks.py
  3. +46 −17 bookie/models/__init__.py
  4. +3 −2 bookie/models/fulltext.py
  5. +0 −1 bookie/tests/test_utils/test_fulltext.py
View
1 Makefile
@@ -133,6 +133,7 @@ clean_testdb:
.PHONY: builder_test
builder_test: clean_testdb test_bookie.db
+ # $(NOSE) -vx --with-id 61 bookie/tests
$(NOSE) --with-coverage --cover-package=bookie --cover-erase --with-xunit bookie/tests
.PHONY: mysql_test
View
68 bookie/bcelery/tasks.py
@@ -12,6 +12,7 @@
from bookie.lib.rrdstats import ImportQueueDepth
from bookie.models import initialize_sql
+from bookie.models import Bmark
from bookie.models import BmarkMgr
from bookie.models import TagMgr
from bookie.models.stats import StatBookmarkMgr
@@ -20,6 +21,9 @@
from bookie.bcelery import celery as mycelery
from bookie.bcelery import ini
+from whoosh.store import LockError
+from whoosh.writing import IndexingError
+
HERE = dirname(dirname(dirname(__file__)))
@@ -51,22 +55,22 @@
# CELERY_ANNOTATIONS = {"tasks.add": {"rate_limit": "10/s"}}
CELERYBEAT_SCHEDULE={
- "tasks.hourly_stats": {
- "task": "tasks.hourly_stats",
- "schedule": timedelta(seconds=60 * 60),
- },
- "tasks.stats_rrd": {
- "task": "bookie.bcelery.tasks.generate_count_rrd",
- "schedule": timedelta(seconds=60 * 60 * 12),
- },
- "tasks.importer_depth": {
- "task": "bookie.bcelery.tasks.importer_depth",
- "schedule": timedelta(seconds=60 * 5),
- },
- "tasks.generate_importer_depth_rrd": {
- "task": "bookie.bcelery.tasks.generate_importer_depth_rrd",
- "schedule": timedelta(seconds=60 * 5),
- },
+ # "tasks.hourly_stats": {
+ # "task": "bookie.bcelery.tasks.hourly_stats",
+ # "schedule": timedelta(seconds=60 * 60),
+ # },
+ # "tasks.stats_rrd": {
+ # "task": "bookie.bcelery.tasks.generate_count_rrd",
+ # "schedule": timedelta(seconds=60 * 60 * 12),
+ # },
+ # "tasks.importer_depth": {
+ # "task": "bookie.bcelery.tasks.importer_depth",
+ # "schedule": timedelta(seconds=60 * 5),
+ # },
+ # "tasks.generate_importer_depth_rrd": {
+ # "task": "bookie.bcelery.tasks.generate_importer_depth_rrd",
+ # "schedule": timedelta(seconds=60 * 5),
+ # },
"tasks.importer": {
"task": "bookie.bcelery.tasks.importer_process",
"schedule": timedelta(seconds=60 * 3),
@@ -241,3 +245,35 @@ def email_signup_user(email, msg, settings, message_data):
SignupLog(SignupLog.ERROR,
'Could not send smtp email to signup: ' + email)
trans.commit()
+
+
+@mycelery.task(ignore_result=True)
+def fulltext_index_bookmark(bid, content):
+ logger = celery.utils.log.get_logger('fulltext_index_bookmark')
+
+ transaction.begin()
+ initialize_sql(ini)
+ b = Bmark.get(bid)
+
+ if not b:
+ logger.error('Could not load bookmark to fulltext index: ' + str(bid))
+ else:
+ from bookie.models.fulltext import get_writer
+ writer = get_writer()
+ try:
+ writer.update_document(
+ bid=unicode(b.bid),
+ description=b.description if b.description else u"",
+ extended=b.extended if b.extended else u"",
+ tags=b.tag_str if b.tag_str else u"",
+ readable=content,
+ )
+ writer.commit()
+ except (IndexingError, LockError), exc:
+ # There was an issue saving into the index.
+ logger.error(exc)
+ logger.warning('sending back to the queue')
+ # This should send the work over to a celery task that will try
+ # again in that space.
+ writer.cancel()
+ fulltext_index_bookmark.retry(exc=exc, countdown=60)
View
63 bookie/models/__init__.py
@@ -34,6 +34,9 @@
from sqlalchemy.sql import func
from sqlalchemy.sql import and_
+from whoosh.store import LockError
+from whoosh.writing import IndexingError
+
from zope.sqlalchemy import ZopeTransactionExtension
DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
@@ -292,14 +295,28 @@ def _clean_content(content):
b = target.bmark
writer = get_writer()
- writer.update_document(
- bid=unicode(b.bid),
- description=b.description if b.description else u"",
- extended=b.extended if b.extended else u"",
- tags=b.tag_str if b.tag_str else u"",
- readable=target.clean_content,
- )
- writer.commit()
+ try:
+ writer.update_document(
+ bid=unicode(b.bid),
+ description=b.description if b.description else u"",
+ extended=b.extended if b.extended else u"",
+ tags=b.tag_str if b.tag_str else u"",
+ readable=target.clean_content,
+ )
+ writer.commit()
+
+ except (IndexingError, LockError), exc:
+ # There was an issue saving into the index.
+ import bookie.bcelery.tasks.fulltext_index_bookmark
+ LOG.warning('Could not fulltext index bid: ' + str(b.bid))
+ LOG.warning(exc)
+ writer.cancel()
+
+ # This should send the work over to a celery task that will try again
+ # in that space.
+ bookie.bcelery.tasks.fulltext_index_bookmark(b.bid,
+ target.clean_content)
+
event.listen(Readable, 'after_insert', sync_readable_content)
event.listen(Readable, 'after_update', sync_readable_content)
@@ -667,16 +684,28 @@ def bmark_fulltext_insert_update(mapper, connection, target):
from fulltext import get_writer
b = target
-
writer = get_writer()
- writer.update_document(
- bid=unicode(b.bid),
- description=b.description if b.description else u"",
- extended=b.extended if b.extended else u"",
- tags=b.tag_str if b.tag_str else u"",
- readable=u"",
- )
- writer.commit()
+ try:
+ writer.update_document(
+ bid=unicode(b.bid),
+ description=b.description if b.description else u"",
+ extended=b.extended if b.extended else u"",
+ tags=b.tag_str if b.tag_str else u"",
+ readable=u"",
+ )
+ writer.commit()
+
+ except (IndexingError, LockError), exc:
+ # There was an issue saving into the index.
+ import bookie.bcelery.tasks.fulltext_index_bookmark
+
+ LOG.warning('Could not fulltext index bid: ' + str(b.bid))
+ LOG.warning(exc)
+
+ writer.cancel()
+ # This should send the work over to a celery task that will try again
+ # in that space.
+ bookie.bcelery.tasks.fulltext_index_bookmark(b.bid, "")
event.listen(Bmark, 'after_insert', bmark_fulltext_insert_update)
event.listen(Bmark, 'after_update', bmark_fulltext_insert_update)
View
5 bookie/models/fulltext.py
@@ -14,11 +14,12 @@
from sqlalchemy.orm import contains_eager
from sqlalchemy.orm import joinedload
+from whoosh import qparser
from whoosh.fields import SchemaClass, TEXT, KEYWORD, ID
from whoosh.analysis import StemmingAnalyzer
from whoosh.index import create_in
from whoosh.index import open_dir
-from whoosh import qparser
+from whoosh.writing import AsyncWriter
from bookie.models import Bmark
@@ -69,7 +70,7 @@ def get_fulltext_handler(engine):
def get_writer():
global WIX
- writer = WIX.writer()
+ writer = AsyncWriter(WIX)
return writer
View
1 bookie/tests/test_utils/test_fulltext.py
@@ -116,7 +116,6 @@ def test_ajax_search(self):
"""Verify that we can get a json MorJSON response when ajax search"""
# first let's add a bookmark we can search on
self._get_good_request()
-
search_res = self.testapp.get(
'/admin/results/google',
headers={

0 comments on commit a46f922

Please sign in to comment.
Something went wrong with that request. Please try again.