Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

getting the pid of the process #100

Closed
dubovikmaster opened this issue Jul 20, 2022 · 5 comments
Closed

getting the pid of the process #100

dubovikmaster opened this issue Jul 20, 2022 · 5 comments

Comments

@dubovikmaster
Copy link

Greetings!
is it possible to get the pid of a process that terminates with timeoutError in ProcessPool() class?

noxdafox added a commit that referenced this issue Jul 21, 2022
Signed-off-by: Matteo Cafasso <noxdafox@gmail.com>
@dubovikmaster
Copy link
Author

Thank you for responding)
In fact, I would like to expand the task. I found this in the process.py code:

    def update_tasks(self):
        """Handles timing out Tasks."""
        for task in self.task_manager.timeout_tasks():
            self.task_manager.task_done(
                task.id, TimeoutError("Task timeout", task.timeout))
            self.worker_manager.stop_worker(task.worker_id)

        for task in self.task_manager.cancelled_tasks():
            self.task_manager.task_done(
                task.id, CancelledError())
            self.worker_manager.stop_worker(task.worker_id)

You write the task.timeout variable to the exception, but it doesn't seem to make much sense to me. Since if this exception occurred and I am the person who ran the code, I know what timeout I specified. Why not do it like this?

def update_tasks(self):
        """Handles timing out Tasks."""
        for task in self.task_manager.timeout_tasks():
            self.task_manager.task_done(
                task.id, TimeoutError("Task timeout", task.timeout, task.worker_id, task.id))
            self.worker_manager.stop_worker(task.worker_id)

        for task in self.task_manager.cancelled_tasks():
            self.task_manager.task_done(
                task.id, CancelledError())
            self.worker_manager.stop_worker(task.worker_id)

thus, intercepting an exception, it will not be difficult to find out the pid of the process and the number of the task for which this exception occurred.

I propose to do the same in the case of raise Exception in task.

Thanks!

@noxdafox
Copy link
Owner

Hello,

it is not possible to add the PID to all exceptions. CancelledError is handled by the futures objects themselves and the exceptions raised within the user provided future are not to be touched in order to avoid overwriting information from the exceptions themselves.

Only error we can add the PID to is TimeoutError.

What is the use case you have in mind? Why do you need the worker PID? This is internal information and should not be of concern to the user.

@dubovikmaster
Copy link
Author

I agree to the worker_id account. Perhaps this information will be superfluous. But task_id is useful, for example, to understand which tasks have not worked. I think this is a frequent request

@noxdafox
Copy link
Owner

The task_id is an internal information the pool uses to track the workload. It has no value for the user as it's an incremental counter.

If you want to understand which submission worked and which didn't, you can use the ID of the Future object itself as it's the same object throughout the whole lifecycle.

In [4]: future = p.schedule(example)

In [5]: id(future)
Out[5]: 140154882138608

The future is always the same object even when passed to callbacks.

In [6]: def done_callback(future):
   ...:     print(id(future))

In [7]: future.add_done_callback(done_callback)
140154882138608

Any attribute you will add to the Future object will persist allowing you to understand which submission failed. For example, you can append to the Future object the function and the arguments which were passed.

In [1]: import pebble

In [2]: p = pebble.ProcessPool()

In [3]: def done_callback(future):
   ...:     print(future.function)
   ...:     print(future.args)

In [4]: def my_function(arg1, arg2):
   ...:     return 1

In [5]: future = p.schedule(my_function, args=[1, 2])

In [7]: future.function = my_function

In [8]: future.args = [1, 2]

In [9]: future.add_done_callback(done_callback)
<function my_function at 0x7fd7f3bf2820>
[1, 2]

@noxdafox
Copy link
Owner

noxdafox commented Sep 4, 2022

Closing this issue due to lack of response. Please re-open if you require further clarifications.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants