Skip to content

How to retry a timed out job? #401

@ross-nordstrom

Description

@ross-nordstrom

Context

I'm trying to add some fail-safes around a resource-intensive job with a lot of external dependencies, so it can sometimes hang or OOM. It usually works on the next retry.

Issue

I'd like to set a job-specific timeout and have it retry after a TimeoutError, but I can't figure out how to do that. The TimeoutError seems to be terminal and I can't get it to retry... any advice on how to make this work?

See related issue: #402

Reproduction

  1. run the worker python script.py worker
  2. queue jobs by running python script.py client
  3. Watch the job run, timeout, and NOT retry in the worker
import asyncio
import random

import arq.worker
from arq import create_pool
from arq.connections import RedisSettings
from arq.typing import WorkerSettingsBase
from future.moves import sys

from settings import redis_settings


async def __flaky_job(_ctx):
    latency = random.uniform(1.0, 60.0)
    print(f"Starting with that will take {latency:.2f}s to run (and is allowed to run up to 3s)...")
    await asyncio.sleep(latency)
    print("Done!")


flaky_job_with_timeout = arq.worker.func(__flaky_job, name='flaky_job', timeout=3)


class WorkerSettings(WorkerSettingsBase):
    redis_settings = redis_settings
    functions = [flaky_job_with_timeout]


def worker():
    print('Starting worker')
    arq.run_worker(WorkerSettings)


async def client():
    print('Running client')
    redis = await create_pool(RedisSettings())
    await redis.enqueue_job('flaky_job')
    print('Enqueued job')


if __name__ == '__main__':
    match sys.argv[1]:
        case 'client':
            asyncio.run(client())
        case 'worker':
            worker()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions