Skip to content

Lost job is retried after *WORST* job timeout, not *ITS* timeout #402

@ross-nordstrom

Description

@ross-nordstrom

Context

I'm trying to add some fail-safes around a resource-intensive job with a lot of external dependencies, so it can sometimes hang or OOM. It usually works on the next retry.

Issue

I'd like to set a job-specific timeout and have it retry that long after an OOM. It eventually gets restarted, but after the max_timeout of all jobs in the worker. My expectation is it would restart after its job-specific timeout.

It seems like the redis ttls always use max_timeout, but they should probably use the job's specific timeout if available

https://github.com/samuelcolvin/arq/blob/9109c2e59d2b13fa59d246da03d19d7844a6fa19/arq/worker.py#L264-L265

https://github.com/samuelcolvin/arq/blob/9109c2e59d2b13fa59d246da03d19d7844a6fa19/arq/worker.py#L441-L443

See related issue: #401

Reproduction

  1. run the worker python script.py worker
  2. queue jobs by running python script.py client
  3. After the worker prints it's starting the job, ctrl+c, then restart the worker
  4. Watch the job get restarted not 3s later (flaky_job timeout), but 🐞 30s later (long_job timeout)
  5. Remove long_job_with_timeout from the functions = [...] and repeat... 📝 now it restarts the job much quicker (as expected)
import asyncio
import random

import arq.worker
from arq import create_pool
from arq.connections import RedisSettings
from arq.typing import WorkerSettingsBase
from future.moves import sys

from settings import redis_settings


async def __long_job(_ctx):
    print('Long job')
    await asyncio.sleep(20.0)
    print('Long job done!')


async def __flaky_job(_ctx):
    latency = random.uniform(1.0, 60.0)
    print(f"Starting with that will take {latency:.2f}s to run (and is allowed to run up to 3s)...")
    await asyncio.sleep(latency)
    print("Flaky job done!")


flaky_job_with_timeout = arq.worker.func(__flaky_job, name='flaky_job', timeout=3)
long_job_with_timeout = arq.worker.func(__flaky_job, name='long_job', timeout=30)


class WorkerSettings(WorkerSettingsBase):
    redis_settings = redis_settings
    functions = [flaky_job_with_timeout, long_job_with_timeout]


def worker():
    print('Starting worker')
    arq.run_worker(WorkerSettings)


async def client():
    print('Running client')
    redis = await create_pool(RedisSettings())
    await redis.enqueue_job('flaky_job')
    print('Enqueued job')


if __name__ == '__main__':
    match sys.argv[1]:
        case 'client':
            asyncio.run(client())
        case 'worker':
            worker()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions