Skip to content

Commit

Permalink
improve logging in worker.py (#902)
Browse files Browse the repository at this point in the history
* improve logging in worker

* tests for log_result_lifespan
  • Loading branch information
samuelcolvin authored and selwin committed Nov 23, 2017
1 parent f500186 commit df571e1
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 22 deletions.
48 changes: 26 additions & 22 deletions rq/worker.py
Expand Up @@ -94,6 +94,9 @@ class Worker(object):
death_penalty_class = UnixSignalDeathPenalty
queue_class = Queue
job_class = Job
# `log_result_lifespan` controls whether "Result is kept for XXX seconds"
# messages are logged after every job, by default they are.
log_result_lifespan = True

@classmethod
def all(cls, connection=None, job_class=None, queue_class=None):
Expand Down Expand Up @@ -132,7 +135,7 @@ def find_by_key(cls, worker_key, connection=None, job_class=None,
connection=connection,
job_class=job_class,
queue_class=queue_class)

worker.refresh()

return worker
Expand Down Expand Up @@ -253,7 +256,7 @@ def procline(self, message):

def register_birth(self):
"""Registers its own birth."""
self.log.debug('Registering birth of worker {0}'.format(self.name))
self.log.debug('Registering birth of worker %s', self.name)
if self.connection.exists(self.key) and \
not self.connection.hexists(self.key, 'death'):
msg = 'There exists an active worker named {0!r} already'
Expand Down Expand Up @@ -383,16 +386,15 @@ def request_force_stop(self, signum, frame):

# Take down the horse with the worker
if self.horse_pid:
msg = 'Taking down horse {0} with me'.format(self.horse_pid)
self.log.debug(msg)
self.log.debug('Taking down horse %s with me', self.horse_pid)
self.kill_horse()
raise SystemExit()

def request_stop(self, signum, frame):
"""Stops the current worker loop but waits for child processes to
end gracefully (warm shutdown).
"""
self.log.debug('Got signal {0}'.format(signal_name(signum)))
self.log.debug('Got signal %s', signal_name(signum))

signal.signal(signal.SIGINT, self.request_force_stop)
signal.signal(signal.SIGTERM, self.request_force_stop)
Expand Down Expand Up @@ -451,6 +453,8 @@ def work(self, burst=False, logging_level="INFO"):
self.register_birth()
self.log.info("RQ worker {0!r} started, version {1}".format(self.key, VERSION))
self.set_state(WorkerStatus.STARTED)
qnames = self.queue_names()
self.log.info('*** Listening on %s...', green(', '.join(qnames)))

try:
while True:
Expand Down Expand Up @@ -487,12 +491,11 @@ def work(self, burst=False, logging_level="INFO"):

def dequeue_job_and_maintain_ttl(self, timeout):
result = None
qnames = self.queue_names()
qnames = ','.join(self.queue_names())

self.set_state(WorkerStatus.IDLE)
self.procline('Listening on {0}'.format(','.join(qnames)))
self.log.info('')
self.log.info('*** Listening on {0}...'.format(green(', '.join(qnames))))
self.procline('Listening on ' + qnames)
self.log.debug('*** Listening on %s...', green(qnames))

while True:
self.heartbeat()
Expand Down Expand Up @@ -529,7 +532,7 @@ def heartbeat(self, timeout=0, pipeline=None):
connection.expire(self.key, timeout)
connection.hset(self.key, 'last_heartbeat', utcformat(utcnow()))
self.log.debug('Sent heartbeat to prevent worker timeout. '
'Next one should arrive within {0} seconds.'.format(timeout))
'Next one should arrive within %s seconds.', timeout)

def refresh(self):
data = self.connection.hmget(
Expand Down Expand Up @@ -560,7 +563,7 @@ def refresh(self):
connection=self.connection,
job_class=self.job_class)
for queue in queues.split(',')]

def increment_failed_job_count(self, pipeline=None):
connection = pipeline if pipeline is not None else self.connection
connection.hincrby(self.key, 'failed_job_count', 1)
Expand Down Expand Up @@ -765,7 +768,7 @@ def perform_job(self, job, queue):
self.connection,
job_class=self.job_class)

try:
try:
job.started_at = utcnow()
with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
rv = job.perform()
Expand Down Expand Up @@ -793,15 +796,16 @@ def perform_job(self, job, queue):
self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id))
if rv is not None:
log_result = "{0!r}".format(as_text(text_type(rv)))
self.log.debug('Result: {0}'.format(yellow(log_result)))

result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl == 0:
self.log.info('Result discarded immediately')
elif result_ttl > 0:
self.log.info('Result is kept for {0} seconds'.format(result_ttl))
else:
self.log.warning('Result will never expire, clean up result key manually')
self.log.debug('Result: %s', yellow(log_result))

if self.log_result_lifespan:
result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl == 0:
self.log.info('Result discarded immediately')
elif result_ttl > 0:
self.log.info('Result is kept for {0} seconds'.format(result_ttl))
else:
self.log.warning('Result will never expire, clean up result key manually')

return True

Expand All @@ -818,7 +822,7 @@ def handle_exception(self, job, *exc_info):
})

for handler in reversed(self._exc_handlers):
self.log.debug('Invoking exception handler {0}'.format(handler))
self.log.debug('Invoking exception handler %s', handler)
fallthrough = handler(job, *exc_info)

# Only handlers with explicit return values should disable further
Expand Down
24 changes: 24 additions & 0 deletions tests/test_worker.py
Expand Up @@ -736,6 +736,30 @@ def test_self_modification_persistence_with_error(self):
self.assertEqual(job_check.meta['baz'], 10)
self.assertEqual(job_check.meta['newinfo'], 'waka')

@mock.patch('rq.worker.logger.info')
def test_log_result_lifespan_true(self, mock_logger_info):
"""Check that log_result_lifespan True causes job lifespan to be logged."""
q = Queue()

w = Worker([q])
job = q.enqueue(say_hello, args=('Frank',), result_ttl=10)
w.perform_job(job, q)
mock_logger_info.assert_called_with('Result is kept for 10 seconds')
self.assertIn('Result is kept for 10 seconds', [c[0][0] for c in mock_logger_info.call_args_list])

@mock.patch('rq.worker.logger.info')
def test_log_result_lifespan_false(self, mock_logger_info):
"""Check that log_result_lifespan False causes job lifespan to not be logged."""
q = Queue()

class TestWorker(Worker):
log_result_lifespan = False

w = TestWorker([q])
job = q.enqueue(say_hello, args=('Frank',), result_ttl=10)
w.perform_job(job, q)
self.assertNotIn('Result is kept for 10 seconds', [c[0][0] for c in mock_logger_info.call_args_list])


def kill_worker(pid, double_kill):
# wait for the worker to be started over on the main process
Expand Down

0 comments on commit df571e1

Please sign in to comment.