Permalink
Browse files

Handle unpickleable exceptions in indexing tasks

  • Loading branch information...
1 parent 9a2c323 commit d8a8a9ac08285c615fdb6351040329fd182201fe @willkg willkg committed Mar 28, 2013
Showing with 38 additions and 4 deletions.
  1. +38 −4 apps/search/tasks.py
View
@@ -1,6 +1,7 @@
import datetime
import logging
import sys
+import traceback
from celery.task import task
from multidb.pinning import pin_this_thread, unpin_this_thread
@@ -20,6 +21,31 @@
log = logging.getLogger('k.task')
+class IndexingTaskError(Exception):
+ """Exception that captures current exception information
+
+ Some exceptions aren't pickleable. This uses traceback module to
+ format the exception that's currently being thrown and tosses it
+ in the message of IndexingTaskError at the time the
+ IndexingTaskError is created.
+
+ So you can do this::
+
+ try:
+ # some code that throws an error
+ except Exception as exc:
+ raise IndexingTaskError()
+
+ The message will have the message and traceback from the original
+ exception thrown.
+
+ Yes, this is goofy.
+
+ """
+ def __init__(self):
+ super(IndexingTaskError, self).__init__(traceback.format_exc())
+
+
@task
def index_chunk_task(write_index, batch_id, chunk):
"""Index a chunk of things.
@@ -33,7 +59,8 @@ def index_chunk_task(write_index, batch_id, chunk):
cls, id_list = chunk
- task_name = '%s %d -> %d' % (cls.get_model_name(), id_list[0], id_list[-1])
+ task_name = '{0} {1} -> {2}'.format(
+ cls.get_model_name(), id_list[0], id_list[-1])
rec = Record(
starttime=datetime.datetime.now(),
@@ -51,7 +78,10 @@ def index_chunk_task(write_index, batch_id, chunk):
except Exception:
rec.text = (u'%s: Errored out %s %s' % (
rec.text, sys.exc_type, sys.exc_value))
- raise
+ # Some exceptions aren't pickleable and we need this to throw
+ # things that are pickleable.
+ raise IndexingTaskError()
+
finally:
unpin_this_thread()
rec.endtime = datetime.datetime.now()
@@ -99,7 +129,9 @@ def index_task(cls, id_list, **kw):
except Exception as exc:
retries = index_task.request.retries
if retries >= MAX_RETRIES:
- raise
+ # Some exceptions aren't pickleable and we need this to
+ # throw things that are pickleable.
+ raise IndexingTaskError()
statsd.incr('search.tasks.index_task.retry', 1)
statsd.incr('search.tasks.index_task.retry%d' % RETRY_TIMES[retries],
@@ -124,7 +156,9 @@ def unindex_task(cls, id_list, **kw):
except Exception as exc:
retries = unindex_task.request.retries
if retries >= MAX_RETRIES:
- raise
+ # Some exceptions aren't pickleable and we need this to
+ # throw things that are pickleable.
+ raise IndexingTaskError()
statsd.incr('search.tasks.unindex_task.retry', 1)
statsd.incr('search.tasks.unindex_task.retry%d' % RETRY_TIMES[retries],

0 comments on commit d8a8a9a

Please sign in to comment.