Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

馃敤 Adding a job counter to address Semaphore issues #408

Merged
merged 2 commits into from
Oct 30, 2023

Conversation

rm-21
Copy link
Contributor

@rm-21 rm-21 commented Jun 10, 2023

Issue #405
Having a semaphore in the same event loop, while having the max number of jobs running, blocks heartbeat & cancellation tasks.

Possible solution

  • Keep the semaphore value to max_jobs + 1
  • Have a job_counter that increments when the semaphore is acquired and decrements when the semaphore is released
  • If the job_counter == max_jobs, release the semaphore immediately and return from the start_jobs function, basically foregoing starting new jobs.

The solution seems to work. The counter is threadsafe because we always increment or decrement it before releasing the semaphore. Quick test app below:

  • Define the worker definition in arq_app.py
  • Start 4 workers in different processes from start_workers.py
  • Schedule more jobs than max for each worker from arq_app.py
  • Check worker.log for the first heartbeat happening
  • Cancel a task from cancellations.py
  • Check worker.log for the latest heartbeat + cancelleation + new job
# arq_app.py
import asyncio
import datetime as dt
from typing import Any

from arq.connections import RedisSettings
from arq import create_pool

REDIS = RedisSettings(host="127.0.0.1", port=6379, database=1)

async def long_task(ctx: dict[Any, Any], num:int, sleeper: float):
    print(f"[{dt.datetime.now()}] {num}: Task entered & started...")

    # Mock call
    await asyncio.sleep(1)
    
    # Fail randomly
    # import random
    # if random.random() > 0.5:
    #     raise RuntimeError(f"[{dt.datetime.now()}] {num}: Task randomly failed...")
    
    # Balance task
    await asyncio.sleep(sleeper)

    print(f"[{dt.datetime.now()}] {num}: Task finished...")



def worker_dispatcher(worker_queue_name: str):
    class ArqWorker:
        # Functions to schedule
        functions = [long_task]

        # ARQ settings
        redis_settings = REDIS
        max_jobs = 5
        job_timeout = 9 * 60 * 60 # 9hrs
        keep_result = 9 * 60 * 60 # 9hrs
        health_check_interval = 5
        retry_jobs = False
        allow_abort_jobs = True

        queue_name = worker_queue_name

    return ArqWorker


async def main():
    redis = await create_pool(settings_=REDIS)

    for idx in range(32):
        await redis.enqueue_job("long_task", idx, 6 * 60 * 60, _queue_name=f"queue{idx % 4}")


if __name__ == "__main__":
    asyncio.run(main())
# start_workers.py
import multiprocessing as mp
import logging

from arq import run_worker
from .arq_app import worker_dispatcher

def start_worker(queue_name: str):
    logging.basicConfig(filename='worker.log', level=logging.INFO,
                        format='%(asctime)s [%(process)d] %(levelname)s: %(message)s')
    logging.info(f'Starting worker for {queue_name}')
    run_worker(worker_dispatcher(queue_name))

if __name__ == "__main__":
    for i in range(4):
        mp.Process(target=start_worker, args=(f"queue{i}",)).start()
# cancellation.py
import asyncio

from arq import create_pool
from arq.jobs import Job
from arq.connections import RedisSettings

REDIS = RedisSettings(host="127.0.0.1", port=6379, database=1)

async def job_status(task_id: str, stop: bool = False) -> None:
    job: Job = Job(
        job_id=task_id,
        redis=await create_pool(settings_=REDIS),
    )

    print(f"ID {task_id}: {await job.status()}")

    if stop:
        result: bool = await job.abort()
        print(f"ID {task_id}: Aborted: {result}")


async def main() -> None:
    await job_status(task_id="258a1b3ec3a84913a42d992b970841da", stop=True)


if __name__ == "__main__":
    asyncio.run(main())

@rm-21 rm-21 marked this pull request as draft June 10, 2023 14:03
@codecov
Copy link

codecov bot commented Jun 10, 2023

Codecov Report

Merging #408 (787bb47) into main (9109c2e) will decrease coverage by 0.27%.
The diff coverage is 80.00%.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #408      +/-   ##
==========================================
- Coverage   98.66%   98.40%   -0.27%     
==========================================
  Files          11       11              
  Lines        1052     1063      +11     
  Branches      199      200       +1     
==========================================
+ Hits         1038     1046       +8     
- Misses          6        8       +2     
- Partials        8        9       +1     
Impacted Files Coverage 螖
arq/worker.py 98.57% <80.00%> (-0.60%) 猬囷笍

Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
螖 = absolute <relative> (impact), 酶 = not affected, ? = missing data
Powered by Codecov. Last update 9109c2e...787bb47. Read the comment docs.

@rm-21 rm-21 marked this pull request as ready for review June 10, 2023 14:05
@rm-21 rm-21 changed the title 馃敤 Adding a job counter to address Semaphore issues 馃敤 Issue [#405](https://github.com/samuelcolvin/arq/issues/405): Adding a job counter to address Semaphore issues Jun 10, 2023
@rm-21 rm-21 changed the title 馃敤 Issue [#405](https://github.com/samuelcolvin/arq/issues/405): Adding a job counter to address Semaphore issues 馃敤 Adding a job counter to address Semaphore issues Jun 10, 2023
@JonasKs
Copy link
Sponsor Collaborator

JonasKs commented Jun 10, 2023

Thank you! 馃槉

Would be great if you could add a test that ensure we don't regress in the future. 馃槉

@rm-21
Copy link
Contributor Author

rm-21 commented Jun 11, 2023

Could you suggest a test? I couldn't come up with an appropriate one.

@JonasKs
Copy link
Sponsor Collaborator

JonasKs commented Jun 11, 2023

Set max jobs to 1, queue a task and ensure health check is still logged? This should fail without this implementation, right?

I'm not at home, but I'm pretty sure there's a health check test already implemented.

@rm-21
Copy link
Contributor Author

rm-21 commented Jun 11, 2023

I have setup a test that aims to cancel the job when max_jobs queue is full. The test is failing for Python 3.7. The logs show that the job was enqueued and cancelled. There is a stack trace for ConnectionError from Redis in between the 2 log lines. Can you help me debug whenever you can make some time?

I do think just re-running the CI job for 3.7 should resolve this.

Copy link
Sponsor Collaborator

@JonasKs JonasKs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you 馃槉
This looks good to me.

Samuel is busy with v2 of Pydantic, so I wouldn't expect this to be merged until that happens, but you can always use your own fork in the mean time 馃槉


if self.job_counter >= self.max_jobs:
self.sem.release()
return None
Copy link
Sponsor Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just return here.

Copy link
Owner

@samuelcolvin samuelcolvin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry the slow reply.

From a brief review, LGTM.

@JonasKs merge if you're happy.

@rm-21
Copy link
Contributor Author

rm-21 commented Oct 30, 2023

Hey @JonasKs! Any update on when can this be merged?

@JonasKs JonasKs merged commit ab2dda2 into samuelcolvin:main Oct 30, 2023
10 checks passed
@JonasKs
Copy link
Sponsor Collaborator

JonasKs commented Oct 30, 2023

Sorry this took some time, been a bit busy lately.

As for the release, I'm gonna have to put that back into @samuelcolvin's hands - not sure what he has planned. 馃槉

@samuelcolvin
Copy link
Owner

v0.26.0b1 is released, please try it, I'll release v0.26 at the end of the week, see #441.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants