Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Commit

Permalink
Merge djmitche/build-relengapi:celery_logging (PR #205)
Browse files Browse the repository at this point in the history
  • Loading branch information
djmitche committed Apr 1, 2015
2 parents 283997c + cd418b1 commit 069ded0
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 17 deletions.
7 changes: 7 additions & 0 deletions docs/deployment/@relengapi/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,13 @@ To avoid these warnings, use JSON instead:
CELERY_TASK_SERIALIZER='json'
CELERY_RESULT_SERIALIZER='json'
Finally, by default Celery limits logging to the WARNING level.
To see more output from RelengAPI, without the additional verbose output from Celery itself, set ``RELENGAPI_CELERY_LOG_LEVEL`` to the desired level:

.. code-block:: none
RELENGAPI_CELERY_LOG_LEVEL = 'DEBUG'
Documentation Configuration
---------------------------

Expand Down
12 changes: 12 additions & 0 deletions relengapi/lib/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

from __future__ import absolute_import

import logging

from celery import Celery
from celery.signals import celeryd_after_setup
from flask import current_app
from werkzeug.local import LocalProxy

Expand Down Expand Up @@ -53,3 +56,12 @@ def wrap(fn):
'@db.task() takes exactly 1 argument ({0} given)'.format(
sum([len(args), len(kwargs)])))
return inner(**kwargs)


@celeryd_after_setup.connect
def setup_relengapi_logging(sender, instance, conf, **kwargs):
_relengapi_log_lvl = conf.get("RELENGAPI_CELERY_LOG_LEVEL", None)
if _relengapi_log_lvl:
n = logging.getLogger('relengapi')
n.setLevel(_relengapi_log_lvl)
n.debug("Setting relengapi logger to %s", _relengapi_log_lvl)
93 changes: 76 additions & 17 deletions relengapi/tests/test_lib_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import Queue
import contextlib
import logging
import multiprocessing
import os
import shutil
Expand All @@ -18,6 +20,7 @@
from relengapi.lib.testing.context import TestContext

test_temp_dir = os.path.join(os.path.dirname(__file__), 'test_temp')
log = logging.getLogger(__name__)


def setup_module():
Expand All @@ -31,12 +34,47 @@ def teardown_module():
shutil.rmtree(test_temp_dir)


def _run(app, event):
class FakeOutput(object):

def __init__(self, queue):
self.queue = queue

def write(self, str):
self.queue.put(str)


class SubprocessLogging(object):

def __init__(self, queue):
self.queue = queue
self._log_lines = None

@property
def log_lines(self):
if self._log_lines is None:
data = []
while True:
try:
data.append(self.queue.get_nowait())
except Queue.Empty:
break
self._log_lines = ''.join(data).split('\n')
return self._log_lines

def assertLogged(self, substr):
assert any(substr in line for line in self.log_lines), \
'\n'.join(self.log_lines)

def assertNotLogged(self, substr):
assert not any(substr in line for line in self.log_lines), \
'\n'.join(self.log_lines)


def _run(app, event, log_queue):
# since celery insists on printing its stuff to sys.__stdout__
# and sys.__stderr__..
import sys
sys.__stdout__ = sys.stdout
sys.__stderr__ = sys.stdout
sys.__stderr__ = sys.__stdout__ = FakeOutput(log_queue)

# make the kombu transport poll more frequently..
from kombu.transport.virtual import Transport
Expand All @@ -47,21 +85,22 @@ def _run(app, event):
def rdy(sender, **kwargs):
event.set()

# poll the db very frequently, to make sure the tests pass quickly
app.celery.Worker().start()


@contextlib.contextmanager
def running_worker(app):
ready_event = multiprocessing.Event()
proc = multiprocessing.Process(target=_run, args=(app, ready_event))
log_queue = multiprocessing.Queue()
proc = multiprocessing.Process(
target=_run, args=(app, ready_event, log_queue))
proc.start()
# wait until the worker is ready. This gives it a chance to set up all
# of the tables and data in the DB; otherwise, it would race with this
# process and fail periodically
ready_event.wait()
try:
yield
yield SubprocessLogging(log_queue)
finally:
# send SIGKILL since celery traps SIGTERM and turns it into an exception
# which SQLAlchemy catches and ignores
Expand All @@ -70,11 +109,16 @@ def running_worker(app):
proc.join()


test_context = TestContext(config={
config = {
'CELERY_BROKER_URL': 'sqla+sqlite:///%s/celery.db' % test_temp_dir,
'CELERY_RESULT_BACKEND': 'db+sqlite:///%s/celery.db' % test_temp_dir,
'CELERYD_POOL': 'solo',
})
}
test_context = TestContext(config=config)

config_with_logging = config.copy()
config_with_logging["RELENGAPI_CELERY_LOG_LEVEL"] = 'DEBUG'
logging_test_context = test_context.specialize(config=config_with_logging)


@celery.task
Expand All @@ -83,12 +127,13 @@ def test_task(a, b):


@celery.task(serializer='json')
def test_task_json(a, b):
def task_json(a, b):
return a + b


@celery.task(serializer='json')
def test_task_with_args(x, y):
def task_with_args(x, y):
log.info("GOT ARGS %s %s", x, y)
return x * y


Expand All @@ -101,23 +146,36 @@ def test_add(app):

@test_context
def test_mult(app):
with running_worker(app):
with running_worker(app) as logging:
with app.app_context():
eq_(task_with_args.delay(2, 3).get(interval=0.01), 6)
# by default, the info log from the task is hidden
logging.assertNotLogged('GOT ARGS 2 3')


@logging_test_context
def test_mult_with_logging(app):
"""With RELENGAPI_CELERY_LOG_LEVEL, relengapi logging appears in output"""
with running_worker(app) as logging:
with app.app_context():
eq_(test_task_with_args.delay(2, 3).get(interval=0.01), 6)
eq_(task_with_args.delay(2, 3).get(interval=0.01), 6)
logging.assertLogged('GOT ARGS 2 3')


@test_context
def test_chain(app):
with running_worker(app):
with app.app_context():
eq_(chain(test_task.s(1, 2), test_task.s(3)).delay().get(interval=0.01), 6)
eq_(chain(test_task.s(1, 2), test_task.s(3)).delay().get(
interval=0.01), 6)


@test_context
def test_chain_immutable(app):
with running_worker(app):
with app.app_context():
res = (test_task.si(2, 2) | test_task.si(4, 4) | test_task.si(8, 8)).delay()
res = (test_task.si(2, 2) | test_task.si(
4, 4) | test_task.si(8, 8)).delay()
eq_(res.get(interval=0.01), 16)
eq_(res.parent.get(interval=0.01), 8)
eq_(res.parent.parent.get(interval=0.01), 4)
Expand All @@ -141,8 +199,8 @@ def test_group_in_chain_json(app):
"""
with running_worker(app):
with app.app_context():
task_group = group(test_task_json.s(i) for i in xrange(10))
task_chain = chain(test_task_json.s(1, 2), test_task_json.s(4), task_group)
task_group = group(task_json.s(i) for i in xrange(10))
task_chain = chain(task_json.s(1, 2), task_json.s(4), task_group)
eq_(task_chain.delay().get(interval=0.01),
[7, 8, 9, 10, 11, 12, 13, 14, 15, 16])

Expand All @@ -156,7 +214,8 @@ def test_relengapi_celery_module():
as `celery -A relengapi` expects"""
import relengapi.celery
# this has to point somewhere, so point it at this directory's __init__.py
os.environ['RELENGAPI_SETTINGS'] = os.path.join(os.path.dirname(__file__), '__init__.py')
os.environ['RELENGAPI_SETTINGS'] = os.path.join(
os.path.dirname(__file__), '__init__.py')
try:
eq_(type(relengapi.celery.celery).__name__, 'Celery')
finally:
Expand Down

0 comments on commit 069ded0

Please sign in to comment.