Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loky backend doesn't cleanup worker processes #945

Open
rbedi opened this issue Oct 5, 2019 · 5 comments
Open

Loky backend doesn't cleanup worker processes #945

rbedi opened this issue Oct 5, 2019 · 5 comments

Comments

@rbedi
Copy link

rbedi commented Oct 5, 2019

Python 3.7.2, macOS 10.13.3 and Ubuntu 18.04

I notice when using the Loky backend, joblib doesn't clean up after itself even when explicitly calling _terminate_backend(). Here's a minimal example:

from joblib import Parallel, delayed
from multiprocessing import active_children

def f(x): return x**2
par_loky = Parallel(n_jobs=32, backend="loky")
par_loky(delayed(f)(i) for i in range(10))

print(len(active_children())) # Prints 32
par_loky._terminate_backend()
print(len(active_children()))  # Prints 32

Same effect if I use the context manager to construct the pool:

from joblib import Parallel, delayed
from multiprocessing import active_children

def f(x): return x**2

with Parallel(n_jobs=32, backend="loky") as par_loky:
    par_loky([delayed(f)(i) for i in range(10)])

print(len(active_children())) # Prints 32

However, with the multiprocessing backend, it works as expected:

from joblib import Parallel, delayed
from multiprocessing import active_children

def f(x): return x**2
par_mp = Parallel(n_jobs=32, backend="multiprocessing")
par_mp(delayed(f)(i) for i in range(10))
print(len(active_children())) # prints 0, as expected
@tomMoral
Copy link
Contributor

tomMoral commented Oct 9, 2019

Hi,

thanks for reporting. This is actually the expected behavior for loky. The loky backend rely on spawn to start the new processes for the pool of worker. As this method may take up to few tenth of seconds, this can be quite costly to start a new pool each time you need to call Parallel. To that end, loky manage a reusable pool of workers that is reused for multiple call to Parallel, hence not terminated even once the Parallel object is cleaned up with _terminate_backend.

If they are not reused, the processes will be cleaned up if they time out (default is 300s and apparently there is no way to modify that yet). If you need to force the clean-up of such process, you could call:

from joblib.externals.loky import get_reusable_executor
get_reusable_executor().shutdown(wait=True)

Let me know if this solves your problem. As for the API, would it be better if you controlled the timemout delay for the worker or would you need to directly clean up the processes with imperative instruction?

@ogrisel
Copy link
Contributor

ogrisel commented Oct 17, 2019

Maybe we should improve the joblib documentation.

@dishkakrauch
Copy link

Hi,

thanks for reporting. This is actually the expected behavior for loky. The loky backend rely on spawn to start the new processes for the pool of worker. As this method may take up to few tenth of seconds, this can be quite costly to start a new pool each time you need to call Parallel. To that end, loky manage a reusable pool of workers that is reused for multiple call to Parallel, hence not terminated even once the Parallel object is cleaned up with _terminate_backend.

If they are not reused, the processes will be cleaned up if they time out (default is 300s and apparently there is no way to modify that yet). If you need to force the clean-up of such process, you could call:

from joblib.externals.loky import get_reusable_executor
get_reusable_executor().shutdown(wait=True)

Let me know if this solves your problem. As for the API, would it be better if you controlled the timemout delay for the worker or would you need to directly clean up the processes with imperative instruction?

Thank you this note cause it helped me to solve issue with using joblib as a part of apache airflow python task where I had many daemonic processes after DAG execution.

@secsilm
Copy link

secsilm commented Nov 4, 2022

Hi,

thanks for reporting. This is actually the expected behavior for loky. The loky backend rely on spawn to start the new processes for the pool of worker. As this method may take up to few tenth of seconds, this can be quite costly to start a new pool each time you need to call Parallel. To that end, loky manage a reusable pool of workers that is reused for multiple call to Parallel, hence not terminated even once the Parallel object is cleaned up with _terminate_backend.

If they are not reused, the processes will be cleaned up if they time out (default is 300s and apparently there is no way to modify that yet). If you need to force the clean-up of such process, you could call:

from joblib.externals.loky import get_reusable_executor
get_reusable_executor().shutdown(wait=True)

Let me know if this solves your problem. As for the API, would it be better if you controlled the timemout delay for the worker or would you need to directly clean up the processes with imperative instruction?

What if I use multiprocessing as backend? Can I still use from joblib.externals.loky import get_reusable_executor?

@durgeksh
Copy link

durgeksh commented Nov 9, 2023

I want to read all the sheets in an xlsx file using joblib in parallel mode. Polars is the library for processing the xlsx file.
When I use the backend as loky, it needs to recreate same object else it gives pickle error. But, if I use threading it works but there is no parallel execution at all. Below is my code.

def read_custom_csv(source="test.xlsx", sheet_list, xlsx2csv_options):
    get_reusable_executor().shutdown(wait=True)
    parser = xlsx2csv.Xlsx2csv(source, **xlsx2csv_options)
    read_csv_options = {"infer_schema_length": 0, "truncate_ragged_lines": True}
    read_csv_options_wo_header = {
        "infer_schema_length": 0,
        "truncate_ragged_lines": True,
        "skip_rows": 1,
        "has_header": False,
    }
    excluded_sheets = ["A", "B", "C", "D"]

    core_count = cpu_count()
    n_jobs = os.environ.get("THREAD_COUNT", core_count // 2)
    print(f"Using {n_jobs} number of processes for loading sheets in parallel.")
    args = list()
    for sheet in sheet_list:
        args.append(
            {
                "parser": parser,
                "sheet_name": sheet,
                "read_csv_options": read_csv_options_wo_header if sheet in excluded_sheets else read_csv_options,
            }
        )

    with parallel_config(backend="loky", n_jobs=n_jobs):
        results = Parallel(return_as="generator")(delayed(_read_excel_sheet)(**a) for a in args)

    return {sheet[0]: sheet[1] for sheet in results}


def _read_excel_sheet(parser, sheet_name, read_csv_options) -> pl.DataFrame:
    csv_buffer = StringIO()
    parser.convert(outfile=csv_buffer, sheetname=sheet_name)

    if csv_buffer.tell() != 0:
        csv_buffer.seek(0)
        return sheet_name, pl.read_csv(csv_buffer, **read_csv_options)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants