Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task_timeout wrong functionality #98

Closed
mayk0gan opened this issue Sep 30, 2023 · 7 comments · Fixed by #103
Closed

task_timeout wrong functionality #98

mayk0gan opened this issue Sep 30, 2023 · 7 comments · Fixed by #103
Assignees
Labels
bug Something isn't working

Comments

@mayk0gan
Copy link

mayk0gan commented Sep 30, 2023

Hi, recently, I've been encountering defunct processes while using Pebble (https://github.com/noxdafox/pebble). Consequently, I began investigating other libraries.

I utilized the following test code (although it may look messy, it effectively highlights the issue):

from mpire import WorkerPool
from multiprocessing import Manager, Queue
import time

CHECK_INTERVAL = 1


def mock_func(i):
    import time
    print(i)
    time.sleep(30)


jobs = []
futures = {}
with WorkerPool(start_method="spawn") as pool:
    for i, job in enumerate(range(10)):
        print (f"############## {i} ##########")
        futures[job] = pool.apply_async(mock_func, args=(i, ), task_timeout=5,)

    while len(futures) > 0:
        time.sleep(CHECK_INTERVAL)
        print ("iteration", len(futures))
        for job, future in list(futures.items()):
            if future.ready():
                try:
                    result = future.get()
                    print (result)
                except TimeoutError as e:
                    print (e, job)

The output that I get:

############## 0 ##########
############## 1 ##########
############## 2 ##########
############## 3 ##########
0
############## 4 ##########
############## 5 ##########
1
############## 6 ##########
2
############## 7 ##########
3
############## 8 ##########
4
############## 9 ##########
5
6
7
8
9
iteration 10
iteration 10
Worker-0 task timed out (timeout=1) 0
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
iteration 9
None
None
None
None
None
None
None
None
None

After the first timeout, one of the workers was terminated, but the remaining workers (which should also have been terminated) were not, and they continued to wait until the end of the function (time.sleep(30)).

I would expect the following:

  • Once the worker (specifically, Worker-0 in this case) was terminated, mpire should have initiated the creation of a new worker.
  • The remaining tasks should also have encountered a timeout.
    What am I missing here?

Another simpler example:

from mpire import WorkerPool
from mpire.async_result import AsyncResult
import time

def long_task(i: int):
    print(f"[Starting task {i}]")
    time.sleep(i)
    return i

if __name__ == "__main__":
    results: list[AsyncResult] = []
    with WorkerPool(start_method="spawn", n_jobs=3) as pool:
        for i in range(10):
            results.append(pool.apply_async(long_task, (i, ), task_timeout=4))

        for idx, async_result in enumerate(results):
            print(f"Iteration {idx}")
            try:
                print(f"Result for job id {async_result.job_id} is {async_result.get()}")
            except Exception as e:
                print(f"Exception for job id {async_result.job_id}: {e}")
Iteration 0
[Starting task 1]
[Starting task 0]
[Starting task 3]
Result for job id 0 is 0
Iteration 1
[Starting task 2]
[Starting task 4]
Result for job id 1 is 1
Iteration 2
[Starting task 5]
Result for job id 2 is 2
Iteration 3
Result for job id 3 is 3
Iteration 4
[Starting task 6]
Result for job id 4 is 4
Iteration 5
[Starting task 7]
Exception for job id 5: Worker-2 task timed out (timeout=4)
Iteration 6
Result for job id 6 is 6
Iteration 7
Result for job id 7 is 7
Iteration 8
@sybrenjansen
Copy link
Owner

Hi. You are right, this is a bug.

I did some digging and in the timeout handler I don't account yet for apply calls, just map. In the map case all workers are supposed to be killed and the pool shuts down. In the apply case only the one that timeouts should be interrupted and the rest should continue as normal.

I will work on a solution shortly.

@sybrenjansen sybrenjansen added the bug Something isn't working label Oct 2, 2023
@sybrenjansen sybrenjansen self-assigned this Oct 2, 2023
@pelegn
Copy link

pelegn commented Oct 16, 2023

Hi @sybrenjansen, Is there any update on this issue?

@sybrenjansen
Copy link
Owner

I've planned to work on this end of week or early next week. I expect it will be done shortly after

@sybrenjansen
Copy link
Owner

FYI. I'm going to work on it this Friday. Expecting a new release in the following week

sybrenjansen pushed a commit that referenced this issue Oct 27, 2023
…imeout didn't interrupt all tasks when the timeout was reached. Fixes #98
sybrenjansen added a commit that referenced this issue Nov 8, 2023
* Fixed a bug where starting multiple `apply_async` tasks with a task timeout didn't interrupt all tasks when the timeout was reached. Fixes #98

---------

Co-authored-by: sybrenjansen <sybren.jansen@gmail.com>
@sybrenjansen
Copy link
Owner

Released in v2.8.1

@mayk0gan
Copy link
Author

@sybrenjansen It works great, thank you!

@sybrenjansen
Copy link
Owner

You're welcome :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants