Skip to content

Commit

Permalink
Fix recursion while waiting for redis connection
Browse files Browse the repository at this point in the history
`Arq` has ability to retry connections to redis in case of problems.
That is a nice feature, but it crashes after hundreds of attempts
reaching maximal recursion depth. This change modifies re-try algorithm
from recursion to iteration avoiding the limit.

In practice it would be nice to have ability to wait forever as issue
with redis instance should not kill worker process, but that is a
separate change that can be built on top.

Fixes exception when retrying redis connection many times:
  RecursionError: maximum recursion depth exceeded while getting the str of an object
  • Loading branch information
nierob committed May 19, 2022
1 parent 4c70063 commit 50bbdbf
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 34 deletions.
60 changes: 26 additions & 34 deletions arq/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,42 +226,34 @@ def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis:
ssl=settings.ssl,
)

try:
pool = pool_factory(
db=settings.database, username=settings.username, password=settings.password, encoding='utf8'
)
pool.job_serializer = job_serializer
pool.job_deserializer = job_deserializer
pool.default_queue_name = default_queue_name
await pool.ping()

except (ConnectionError, OSError, RedisError, asyncio.TimeoutError) as e:
if retry < settings.conn_retries:
logger.warning(
'redis connection error %s:%s %s %s, %d retries remaining...',
settings.host,
settings.port,
e.__class__.__name__,
e,
settings.conn_retries - retry,
while True:
try:
pool = pool_factory(
db=settings.database, username=settings.username, password=settings.password, encoding='utf8'
)
await asyncio.sleep(settings.conn_retry_delay)
pool.job_serializer = job_serializer
pool.job_deserializer = job_deserializer
pool.default_queue_name = default_queue_name
await pool.ping()

except (ConnectionError, OSError, RedisError, asyncio.TimeoutError) as e:
if retry < settings.conn_retries:
logger.warning(
'redis connection error %s:%s %s %s, %d retries remaining...',
settings.host,
settings.port,
e.__class__.__name__,
e,
settings.conn_retries - retry,
)
await asyncio.sleep(settings.conn_retry_delay)
retry=retry + 1
else:
raise
else:
raise
else:
if retry > 0:
logger.info('redis connection successful')
return pool

# recursively attempt to create the pool outside the except block to avoid
# "During handling of the above exception..." madness
return await create_pool(
settings,
retry=retry + 1,
job_serializer=job_serializer,
job_deserializer=job_deserializer,
default_queue_name=default_queue_name,
)
if retry > 0:
logger.info('redis connection successful')
return pool


async def log_redis_info(redis: Redis, log_func: Callable[[str], Any]) -> None:
Expand Down
8 changes: 8 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ async def test_redis_timeout(mocker, create_pool):
assert arq.utils.asyncio.sleep.call_count == 5


async def test_redis_timeout_and_retry_many_times(mocker, create_pool):
mocker.spy(arq.utils.asyncio, 'sleep')
retry_count = 2000
with pytest.raises(ConnectionError):
await create_pool(RedisSettings(port=0, conn_retry_delay=0, conn_retries=retry_count))
assert arq.utils.asyncio.sleep.call_count == retry_count


@pytest.mark.skip(reason='this breaks many other tests as low level connections remain after failed connection')
async def test_redis_sentinel_failure(create_pool, cancel_remaining_task, mocker):
settings = RedisSettings()
Expand Down

0 comments on commit 50bbdbf

Please sign in to comment.