Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pickling issue with multiprocessing #17

Closed
m-rph opened this issue Mar 1, 2021 · 3 comments
Closed

Pickling issue with multiprocessing #17

m-rph opened this issue Mar 1, 2021 · 3 comments
Assignees
Labels
bug Something isn't working

Comments

@m-rph
Copy link
Contributor

m-rph commented Mar 1, 2021

Describe the bug
Enabling more than 1 jobs crashes the code because the arguments can not be pickled.

Code to reproduce the behavior

space = dict(
        alpha=np.linspace(0, 1),
        beta=np.linspace(0, 1),
        start=[-0.95],
        end=[0.95])
points = np.random.rand(10)
def model(opt):
    rv = lambda x:x
    return np.sum(rv(points))

hyper = Hyperactive()

hyper.add_search(model, space, n_iter=50, n_jobs=2, )
hyper.run()

Error message from command line


TypeError                                 Traceback (most recent call last)
<ipython-input-6-53d227ee7377> in <module>
     12 
     13 hyper.add_search(model, space, n_iter=50, n_jobs=2, )
---> 14 hyper.run()

~/anaconda3/lib/python3.8/site-packages/hyperactive/hyperactive.py in run(self, max_time)
    199             self.process_infos[nth_process]["max_time"] = max_time
    200 
--> 201         self.results_list = run_search(self.process_infos, self.distribution)

~/anaconda3/lib/python3.8/site-packages/hyperactive/run_search.py in run_search(search_processes_infos, distribution)
     45             distribution
     46         )
---> 47         results_list = distribution(process_func, process_infos, **dist_paras)
     48 
     49     return results_list

~/anaconda3/lib/python3.8/site-packages/hyperactive/distribution.py in multiprocessing_wrapper(process_func, search_processes_paras, **kwargs)
     16     n_jobs = len(search_processes_paras)
     17 
---> 18     pool = Pool(n_jobs, **kwargs)
     19     results = pool.map(process_func, search_processes_paras)
     20 

~/anaconda3/lib/python3.8/multiprocessing/context.py in Pool(self, processes, initializer, initargs, maxtasksperchild)
    117         '''Returns a process pool object'''
    118         from .pool import Pool
--> 119         return Pool(processes, initializer, initargs, maxtasksperchild,
    120                     context=self.get_context())
    121 

~/anaconda3/lib/python3.8/multiprocessing/pool.py in __init__(self, processes, initializer, initargs, maxtasksperchild, context)
    210         self._processes = processes
    211         try:
--> 212             self._repopulate_pool()
    213         except Exception:
    214             for p in self._pool:

~/anaconda3/lib/python3.8/multiprocessing/pool.py in _repopulate_pool(self)
    301 
    302     def _repopulate_pool(self):
--> 303         return self._repopulate_pool_static(self._ctx, self.Process,
    304                                             self._processes,
    305                                             self._pool, self._inqueue,

~/anaconda3/lib/python3.8/multiprocessing/pool.py in _repopulate_pool_static(ctx, Process, processes, pool, inqueue, outqueue, initializer, initargs, maxtasksperchild, wrap_exception)
    324             w.name = w.name.replace('Process', 'PoolWorker')
    325             w.daemon = True
--> 326             w.start()
    327             pool.append(w)
    328             util.debug('added worker')

~/anaconda3/lib/python3.8/multiprocessing/process.py in start(self)
    119                'daemonic processes are not allowed to have children'
    120         _cleanup()
--> 121         self._popen = self._Popen(self)
    122         self._sentinel = self._popen.sentinel
    123         # Avoid a refcycle if the target function holds an indirect

~/anaconda3/lib/python3.8/multiprocessing/context.py in _Popen(process_obj)
    282         def _Popen(process_obj):
    283             from .popen_spawn_posix import Popen
--> 284             return Popen(process_obj)
    285 
    286     class ForkServerProcess(process.BaseProcess):

~/anaconda3/lib/python3.8/multiprocessing/popen_spawn_posix.py in __init__(self, process_obj)
     30     def __init__(self, process_obj):
     31         self._fds = []
---> 32         super().__init__(process_obj)
     33 
     34     def duplicate_for_child(self, fd):

~/anaconda3/lib/python3.8/multiprocessing/popen_fork.py in __init__(self, process_obj)
     17         self.returncode = None
     18         self.finalizer = None
---> 19         self._launch(process_obj)
     20 
     21     def duplicate_for_child(self, fd):

~/anaconda3/lib/python3.8/multiprocessing/popen_spawn_posix.py in _launch(self, process_obj)
     45         try:
     46             reduction.dump(prep_data, fp)
---> 47             reduction.dump(process_obj, fp)
     48         finally:
     49             set_spawning_popen(None)

~/anaconda3/lib/python3.8/multiprocessing/reduction.py in dump(obj, file, protocol)
     58 def dump(obj, file, protocol=None):
     59     '''Replacement for pickle.dump() using ForkingPickler.'''
---> 60     ForkingPickler(file, protocol).dump(obj)
     61 
     62 #

TypeError: cannot pickle '_thread.RLock' object

System information:

  • OS Platform and Distribution: MacOS 10.14.6
  • Python version:
sys.version_info
# sys.version_info(major=3, minor=8, micro=5, releaselevel='final', serial=0)
  • Hyperactive version: 3.0.5
@m-rph m-rph added the bug Something isn't working label Mar 1, 2021
@SimonBlanke SimonBlanke self-assigned this Mar 1, 2021
@SimonBlanke
Copy link
Owner

I was not able to reproduce this error in Ubuntu 20.04 with the same python and hyperactive version. The fact that it works on my end surprises me, because Hyperactive uses multiprocessing as default. I always thought multiprocessing cannot handle lambda because it uses pickle.

Maybe you could try using joblib for Hyperactive. It is a shot in the dark but maybe this works for your OS.

Here is a snipped to try out:

import numpy as np
from hyperactive import Hyperactive

space = dict(alpha=np.linspace(0, 1), beta=np.linspace(0, 1), start=[-0.95], end=[0.95])
points = np.random.rand(10)

def model(opt):
    rv = lambda x: x
    return np.sum(rv(points))

hyper = Hyperactive(distribution="joblib")
hyper.add_search(
    model,
    space,
    n_iter=50,
    n_jobs=2,
)
hyper.run()

Let me know if this solves your problem.

It is difficult for me to reproduce this error, because I do not have access to MacOS. I hope we find a solution with joblib.

@m-rph
Copy link
Contributor Author

m-rph commented Mar 1, 2021

Joblib works, thanks!

One more thing, n_jobs is independent of the number of runs, is this correct? So if I wanted to run the experiment 50 times, I'd have to call run 50 times and collect the results each time, is this correct? I am asking because running:

hyper.add_search(model, space, n_iter=50, n_jobs=N)

gives me min(N, 12) for N >= -1, and I assume 12 is because my CPU shows 12 cores.

@SimonBlanke
Copy link
Owner

Very good! It works now, because joblib uses an alternative to pickle.

I am not sure i understand your question.
n_jobs opens n processes during the optimization run. Each of those processes runs n_iter iterations. If n_jobs == -1 then n_jobs is set to the number of CPU cores available.

I hope this answers your question. If you need more clarification or have another question feel free to open another issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants