Skip to content

Commit

Permalink
Fix celery#6844 by allowing safe queries via app.inspect().active(). (c…
Browse files Browse the repository at this point in the history
…elery#6849)

* Fix celery#6844 by allowing safe (i.e. skip arg derserialization) queries via app.inspect().active().

* Fix default active arg test expectation.

* Fix test asserting broken behaviour (arg/kwarg deserialization occuring when safe=True).

Co-authored-by: Damir Jungic <djungic@cisco.com>
  • Loading branch information
2 people authored and jeyrce committed Aug 25, 2021
1 parent 93a9148 commit da7e730
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 11 deletions.
7 changes: 3 additions & 4 deletions celery/app/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,17 @@ def clock(self):

def active(self, safe=None):
"""Return list of tasks currently executed by workers.
Arguments:
safe (Boolean): Set to True to disable deserialization.
Returns:
Dict: Dictionary ``{HOSTNAME: [TASK_INFO,...]}``.
See Also:
For ``TASK_INFO`` details see :func:`query_task` return value.
Note:
``safe`` is ignored since 4.0 as no objects will need
serialization now that we have argsrepr/kwargsrepr.
"""
return self._request('active')
return self._request('active', safe=safe)

def scheduled(self, safe=None):
"""Return list of scheduled tasks with details.
Expand Down
4 changes: 2 additions & 2 deletions celery/worker/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,9 +362,9 @@ def reserved(state, **kwargs):


@inspect_command(alias='dump_active')
def active(state, **kwargs):
def active(state, safe=False, **kwargs):
"""List of tasks currently being executed."""
return [request.info()
return [request.info(safe=safe)
for request in state.tset(worker_state.active_requests)]


Expand Down
4 changes: 2 additions & 2 deletions celery/worker/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,8 +600,8 @@ def info(self, safe=False):
return {
'id': self.id,
'name': self.name,
'args': self._args,
'kwargs': self._kwargs,
'args': self._args if not safe else self._argsrepr,
'kwargs': self._kwargs if not safe else self._kwargsrepr,
'type': self._type,
'hostname': self._hostname,
'time_start': self.time_start,
Expand Down
6 changes: 5 additions & 1 deletion t/unit/app/test_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ def assert_broadcast_called(self, command,

def test_active(self):
self.inspect.active()
self.assert_broadcast_called('active')
self.assert_broadcast_called('active', safe=None)

def test_active_safe(self):
self.inspect.active(safe=True)
self.assert_broadcast_called('active', safe=True)

def test_clock(self):
self.inspect.clock()
Expand Down
14 changes: 14 additions & 0 deletions t/unit/worker/test_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,20 @@ def test_active(self):
finally:
worker_state.active_requests.discard(r)

def test_active_safe(self):
kwargsrepr = '<anything>'
r = Request(
self.TaskMessage(self.mytask.name, id='do re mi',
kwargsrepr=kwargsrepr),
app=self.app,
)
worker_state.active_requests.add(r)
try:
active_resp = self.panel.handle('dump_active', {'safe': True})
assert active_resp[0]['kwargs'] == kwargsrepr
finally:
worker_state.active_requests.discard(r)

def test_pool_grow(self):

class MockPool:
Expand Down
4 changes: 2 additions & 2 deletions t/unit/worker/test_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,15 +232,15 @@ def test_info_function(self):
kwargs[str(i)] = ''.join(
random.choice(string.ascii_lowercase) for i in range(1000))
assert self.get_request(
self.add.s(**kwargs)).info(safe=True).get('kwargs') == kwargs
self.add.s(**kwargs)).info(safe=True).get('kwargs') == '' # mock message doesn't populate kwargsrepr
assert self.get_request(
self.add.s(**kwargs)).info(safe=False).get('kwargs') == kwargs
args = []
for i in range(0, 2):
args.append(''.join(
random.choice(string.ascii_lowercase) for i in range(1000)))
assert list(self.get_request(
self.add.s(*args)).info(safe=True).get('args')) == args
self.add.s(*args)).info(safe=True).get('args')) == [] # mock message doesn't populate argsrepr
assert list(self.get_request(
self.add.s(*args)).info(safe=False).get('args')) == args

Expand Down

0 comments on commit da7e730

Please sign in to comment.