From b0ce319e6dcc702c980b4afc7b72fce26a14936d Mon Sep 17 00:00:00 2001 From: Sergey Tikhonov Date: Thu, 22 Oct 2015 12:17:34 +0300 Subject: [PATCH] Cherry-pick ba75fa0e for #1847 and #2827 --- celery/worker/loops.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/celery/worker/loops.py b/celery/worker/loops.py index 08bba315608..8b006a8f2c6 100644 --- a/celery/worker/loops.py +++ b/celery/worker/loops.py @@ -47,6 +47,12 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos, if not obj.restart_count and not obj.pool.did_start_ok(): raise WorkerLostError('Could not start worker processes') + # consumer.consume() may have prefetched up to our + # limit - drain an event so we are in a clean state + # prior to starting our event loop. + if connection.transport.driver_type == 'amqp': + hub.call_soon(connection.drain_events) + # FIXME: Use loop.run_forever # Tried and works, but no time to test properly before release. hub.propagate_errors = errors