Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AttributeError: Can't pickle local object 'prepare_worker.<locals>.closure.<locals>.wrapper' #72

Open
ChristianRue opened this issue Jan 21, 2020 · 8 comments

Comments

@ChristianRue
Copy link

ChristianRue commented Jan 21, 2020

I would like to run parallel on a Jupyter Notebook in AWS Sagemaker. However even in the most basic examples I get the following error message:

~/anaconda3/envs/python3/lib/python3.6/site-packages/pandarallel/pandarallel.py in closure(data, func, *args, **kwargs)
    434         try:
    435             pool = Pool(
--> 436                 nb_workers, worker_init, (prepare_worker(use_memory_fs)(worker),)
    437             )
    438 

~/anaconda3/envs/python3/lib/python3.6/multiprocessing/context.py in Pool(self, processes, initializer, initargs, maxtasksperchild)
    117         from .pool import Pool
    118         return Pool(processes, initializer, initargs, maxtasksperchild,
--> 119                     context=self.get_context())
    120 
    121     def RawValue(self, typecode_or_type, *args):

~/anaconda3/envs/python3/lib/python3.6/multiprocessing/pool.py in __init__(self, processes, initializer, initargs, maxtasksperchild, context)
    172         self._processes = processes
    173         self._pool = []
--> 174         self._repopulate_pool()
    175 
    176         self._worker_handler = threading.Thread(

~/anaconda3/envs/python3/lib/python3.6/multiprocessing/pool.py in _repopulate_pool(self)
    237             w.name = w.name.replace('Process', 'PoolWorker')
    238             w.daemon = True
--> 239             w.start()
    240             util.debug('added worker')
    241 

~/anaconda3/envs/python3/lib/python3.6/multiprocessing/process.py in start(self)
    103                'daemonic processes are not allowed to have children'
    104         _cleanup()
--> 105         self._popen = self._Popen(self)
    106         self._sentinel = self._popen.sentinel
    107         # Avoid a refcycle if the target function holds an indirect

~/anaconda3/envs/python3/lib/python3.6/multiprocessing/context.py in _Popen(process_obj)
    289         def _Popen(process_obj):
    290             from .popen_forkserver import Popen
--> 291             return Popen(process_obj)
    292 
    293     class ForkContext(BaseContext):

~/anaconda3/envs/python3/lib/python3.6/multiprocessing/popen_forkserver.py in __init__(self, process_obj)
     33     def __init__(self, process_obj):
     34         self._fds = []
---> 35         super().__init__(process_obj)
     36 
     37     def duplicate_for_child(self, fd):

~/anaconda3/envs/python3/lib/python3.6/multiprocessing/popen_fork.py in __init__(self, process_obj)
     17         util._flush_std_streams()
     18         self.returncode = None
---> 19         self._launch(process_obj)
     20 
     21     def duplicate_for_child(self, fd):

~/anaconda3/envs/python3/lib/python3.6/multiprocessing/popen_forkserver.py in _launch(self, process_obj)
     45         try:
     46             reduction.dump(prep_data, buf)
---> 47             reduction.dump(process_obj, buf)
     48         finally:
     49             set_spawning_popen(None)

~/anaconda3/envs/python3/lib/python3.6/multiprocessing/reduction.py in dump(obj, file, protocol)
     58 def dump(obj, file, protocol=None):
     59     '''Replacement for pickle.dump() using ForkingPickler.'''
---> 60     ForkingPickler(file, protocol).dump(obj)
     61 
     62 #

AttributeError: Can't pickle local object 'prepare_worker.<locals>.closure.<locals>.wrapper'

This was thrown when running


df = pandas.DataFrame(numpy.random.rand(240).reshape(80,3),columns=list('abc'))
df['id'] = numpy.arange(80) % 10
df.groupby('id')[['a']].parallel_apply(lambda x:pandas.DataFrame(numpy.array([x.values.flatten()]*2),columns=list('abcdefgh')))
@jmaralcZ
Copy link

Hi guys,
I have the same issue running all the examples here that you provide: https://github.com/nalepae/pandarallel/blob/master/docs/examples.ipynb

Running them with python 3.8 and Pipfile:
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]
flake8 = "*"

[packages]
fastapi = ""
uvicorn = "
"
pyyaml = ""
pandas = "
"
psycopg2 = ">=2.8.4"
colorlog = ""
shapely = "
"
tqdm = ""
googlemaps = "
"
timezonefinder = ""
python-levenshtein = "
"
boto3 = ""
polyline = "
"
geopandas = ""
scipy = "
"
sklearn = ""
colour = "
"
folium = ""
matplotlib = "
"
seaborn = ""
googleads = "
"
holoviews = ""
console = "
"
nox = ""
pytest = "
"
flake8 = ""
coverage = "
"
pytest-cov = ""
celery = "
"
redis = ""
python-multipart = "
"
xlrd = ""
jupyterlab = "
"
nbconvert = ""
ipywidgets = "
"
rtree = ""
pandarallel = "
"

[requires]
python_version = "3.8"

@mvcduarte
Copy link

I have the same problem here! Using pandarallel on Windows 10 and Jupyter notebook.

@C1ARKGABLE
Copy link

C1ARKGABLE commented Mar 7, 2020

Getting the same issue on MacOS in Python 3.8

Package Version
black 19.10b0
numpy 1.17.4
pandarallel 1.4.6
pandas 0.25.3
pip 20.0.2

Edit: Tried the same code in Python 3.7.3 which works no problem.

@achu44
Copy link

achu44 commented Mar 7, 2020

Getting the same issue: Windows 10 and Spyder

@Ram1004
Copy link

Ram1004 commented Mar 9, 2020

Having the same issue on Windows 10 with Jupyter Notebook

@jqqqqqqqqqq
Copy link

I managed to fix this inspired by this

Take a look at this issue

But something is working very weird:

  • The global scope is not visible anymore. For example, I have to import pandas again in order to use it in function passed into apply, otherwise it will throw NameError: name 'pd' is not defined

  • The other issue is irrelevant from this one. I have to put my entire code into if __name__ == '__main__':, or I'll encounter issue 76. I'm not sure the proper way to handle it

Since I didn't solve it completely and I don't have any time and effort to work on it, I will not submit any pull request for now.

Here is my patch:

diff --git a/pandarallel/pandarallel.py b/pandarallel/pandarallel.py
index b7783ea..c3e918d 100644
--- a/pandarallel/pandarallel.py
+++ b/pandarallel/pandarallel.py
@@ -64,96 +64,112 @@ def is_memory_fs_available():
     return os.path.exists(MEMORY_FS_ROOT)
 
 
-def prepare_worker(use_memory_fs):
-    def closure(function):
-        def wrapper(worker_args):
-            """This function runs on WORKERS.
-
-            If Memory File System is used:
-            1. Load all pickled files (previously dumped by the MASTER) in the
-               Memory File System
-            2. Undill the function to apply (for lambda functions)
-            3. Tell to the MASTER the input file has been read (so the MASTER can remove it
-               from the memory
-            4. Apply the function
-            5. Pickle the result in the Memory File System (so the Master can read it)
-            6. Tell the master task is finished
-
-            If Memory File System is not used, steps are the same except 1. and 5. which are
-            skipped.
-            """
-            if use_memory_fs:
-                (
-                    input_file_path,
-                    output_file_path,
-                    index,
-                    meta_args,
-                    queue,
-                    progress_bar,
-                    dilled_func,
-                    args,
-                    kwargs,
-                ) = worker_args
-
-                try:
-                    with open(input_file_path, "rb") as file:
-                        data = pickle.load(file)
-                        queue.put((INPUT_FILE_READ, index))
-
-                    result = function(
-                        data,
-                        index,
-                        meta_args,
-                        queue,
-                        progress_bar,
-                        dill.loads(dilled_func),
-                        *args,
-                        **kwargs
-                    )
+class prepare_worker_with_memory_fs:
+    def __init__(self, func):
+        self.func = func
+
+    def __call__(self, worker_args):
+        """This function runs on WORKERS.
+
+        If Memory File System is used:
+        1. Load all pickled files (previously dumped by the MASTER) in the
+           Memory File System
+        2. Undill the function to apply (for lambda functions)
+        3. Tell to the MASTER the input file has been read (so the MASTER can remove it
+           from the memory
+        4. Apply the function
+        5. Pickle the result in the Memory File System (so the Master can read it)
+        6. Tell the master task is finished
+
+        If Memory File System is not used, steps are the same except 1. and 5. which are
+        skipped.
+        """
+        (
+            input_file_path,
+            output_file_path,
+            index,
+            meta_args,
+            queue,
+            progress_bar,
+            dilled_func,
+            args,
+            kwargs,
+        ) = worker_args
 
-                    with open(output_file_path, "wb") as file:
-                        pickle.dump(result, file)
+        try:
+            with open(input_file_path, "rb") as file:
+                data = pickle.load(file)
+                queue.put((INPUT_FILE_READ, index))
 
-                    queue.put((VALUE, index))
+            result = self.func(
+                data,
+                index,
+                meta_args,
+                queue,
+                progress_bar,
+                dill.loads(dilled_func),
+                *args,
+                **kwargs
+            )
 
-                except Exception:
-                    queue.put((ERROR, index))
-                    raise
-            else:
-                (
-                    data,
-                    index,
-                    meta_args,
-                    queue,
-                    progress_bar,
-                    dilled_func,
-                    args,
-                    kwargs,
-                ) = worker_args
-
-                try:
-                    result = function(
-                        data,
-                        index,
-                        meta_args,
-                        queue,
-                        progress_bar,
-                        dill.loads(dilled_func),
-                        *args,
-                        **kwargs
-                    )
-                    queue.put((VALUE, index))
+            with open(output_file_path, "wb") as file:
+                pickle.dump(result, file)
 
-                    return result
+            queue.put((VALUE, index))
 
-                except Exception:
-                    queue.put((ERROR, index))
-                    raise
+        except Exception:
+            queue.put((ERROR, index))
+            raise
 
-        return wrapper
+class prepare_worker_without_memory_fs:
+    def __init__(self, func):
+        self.func = func
 
-    return closure
+    def __call__(self, worker_args):
+        """This function runs on WORKERS.
+
+        If Memory File System is used:
+        1. Load all pickled files (previously dumped by the MASTER) in the
+           Memory File System
+        2. Undill the function to apply (for lambda functions)
+        3. Tell to the MASTER the input file has been read (so the MASTER can remove it
+           from the memory
+        4. Apply the function
+        5. Pickle the result in the Memory File System (so the Master can read it)
+        6. Tell the master task is finished
+
+        If Memory File System is not used, steps are the same except 1. and 5. which are
+        skipped.
+        """
+        (
+            data,
+            index,
+            meta_args,
+            queue,
+            progress_bar,
+            dilled_func,
+            args,
+            kwargs,
+        ) = worker_args
+
+        try:
+            result = self.func(
+                data,
+                index,
+                meta_args,
+                queue,
+                progress_bar,
+                dill.loads(dilled_func),
+                *args,
+                **kwargs
+            )
+            queue.put((VALUE, index))
+
+            return result
 
+        except Exception:
+            queue.put((ERROR, index))
+            raise
 
 def create_temp_files(nb_files):
     """Create temporary files in Memory File System."""
@@ -438,9 +454,14 @@ def parallelize(
         nb_workers = len(chunk_lengths)
 
         try:
-            pool = Pool(
-                nb_workers, worker_init, (prepare_worker(use_memory_fs)(worker),),
-            )
+            if use_memory_fs:
+                pool = Pool(
+                    nb_workers, worker_init, (prepare_worker_with_memory_fs(worker),),
+                )
+            else:
+                pool = Pool(
+                    nb_workers, worker_init, (prepare_worker_without_memory_fs(worker),),
+                )
 
             map_result = pool.map_async(global_worker, workers_args)
             pool.close()

@mikelehen
Copy link

mikelehen commented Oct 24, 2020

I'm running into this as well. In case it's of any help, I think the issue may only occur when you are using the "spawn" start method of multiprocessing (i.e. multiprocessing.set_start_method("spawn")) which is the default mode under Windows and macOS (at least in python 3.8 and newer since "fork" is unreliable on macOS--see https://bugs.python.org/issue33725).

Would be great to see a fix for this implemented.

mikelehen added a commit to act-now-coalition/covid-data-model that referenced this issue Oct 24, 2020
Created a parallel_utils helper module that has parallel_map() and
pandas_parallel_apply() functions which use multiprocessing / pandarallel on
non-macOS but plain non-parallel versions on macOS.

This works around bugs in Python and pandarallel:
https://bugs.python.org/issue33725#msg343838
nalepae/pandarallel#72
mikelehen pushed a commit to act-now-coalition/covid-data-model that referenced this issue Oct 27, 2020
Created a parallel_utils helper module that has parallel_map() and
pandas_parallel_apply() functions which use multiprocessing / pandarallel on
non-macOS but plain non-parallel versions on macOS.

This works around bugs in Python and pandarallel:
https://bugs.python.org/issue33725#msg343838
nalepae/pandarallel#72
@shermansiu
Copy link

shermansiu commented Apr 27, 2024

I can reproduce this on Linux:

Python: 3.10.13
Pandarallel: 1.6.5
Pandas: 2.2.0
Numpy: 1.26.4

import numpy as np
import pandas as pd
import pandarallel

pandarallel.core.CONTEXT = pandarallel.core.multiprocessing.get_context('spawn')
pandarallel.pandarallel.initialize()

df = pd.DataFrame(np.random.rand(240).reshape(80,3),columns=list('abc'))
df['id'] = np.arange(80) % 10
df.groupby('id')[['a']].parallel_apply(lambda x: pd.DataFrame(np.array([x.values.flatten()]*2),columns=list('abcdefgh')))

This bug only appears for me when using the "spawn" start method.

Traceback
RemoteTraceback: 
"""
Traceback (most recent call last):
  File "[~/miniconda3/envs/fn_env/lib/python3.10/multiprocessing/pool.py", line 125](~/miniconda3/envs/fn_env/lib/python3.10/multiprocessing/pool.py#line=124), in worker
    result = (True, func(*args, **kwds))
  File "[~/miniconda3/envs/fn_env/lib/python3.10/multiprocessing/pool.py", line 51](~/miniconda3/envs/fn_env/lib/python3.10/multiprocessing/pool.py#line=50), in starmapstar
    return list(itertools.starmap(args[0], args[1]))
  File "[~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/core.py", line 95](~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/core.py#line=94), in __call__
    result = self.work_function(
  File "[~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/data_types/dataframe_groupby.py", line 40](~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/data_types/dataframe_groupby.py#line=39), in work
    return [compute_result(key, df) for key, df in data]
  File "[~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/data_types/dataframe_groupby.py", line 40](~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/data_types/dataframe_groupby.py#line=39), in <listcomp>
    return [compute_result(key, df) for key, df in data]
  File "[~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/data_types/dataframe_groupby.py", line 34](~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/data_types/dataframe_groupby.py#line=33), in compute_result
    result = user_defined_function(
  File "/tmp/ipykernel_3714664/809581393.py", line 11, in meow
NameError: name 'pd' is not defined
"""

The above exception was the direct cause of the following exception:

NameError                                 Traceback (most recent call last)
Cell In[8], line 12
     10 def meow(x):
     11     return pd.DataFrame(np.array([x.values.flatten()]*2),columns=list('abcdefgh'))
---> 12 df.groupby('id')[['a']].parallel_apply(meow)

File [~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/core.py:333](~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/core.py#line=332), in parallelize_with_memory_file_system.<locals>.closure(data, user_defined_function, *user_defined_function_args, **user_defined_function_kwargs)
    325     return wrapped_reduce_function(
    326         (Path(output_file.name) for output_file in output_files),
    327         reduce_extra,
    328     )
    329 except EOFError:
    330     # Loading the files failed, this most likely means that there
    331     # was some error during processing and the files were never
    332     # saved at all.
--> 333     results_promise.get()
    335     # If the above statement does not raise an exception, that
    336     # means the multiprocessing went well and we want to re-raise
    337     # the original EOFError.
    338     raise

File [~/miniconda3/envs/fn_env/lib/python3.10/multiprocessing/pool.py:774](~/miniconda3/envs/fn_env/lib/python3.10/multiprocessing/pool.py#line=773), in ApplyResult.get(self, timeout)
    772     return self._value
    773 else:
--> 774     raise self._value

NameError: name 'pd' is not defined
"""

The above exception was the direct cause of the following exception:

NameError                                 Traceback (most recent call last)
Cell In[8], line 12
     10 def meow(x):
     11     return pd.DataFrame(np.array([x.values.flatten()]*2),columns=list('abcdefgh'))
---> 12 df.groupby('id')[['a']].parallel_apply(meow)

File [~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/core.py:333](http://dlu:8000/user/sherman/lab/tree/mnt/ubuntu_data_hdd/open_source/code/board_game_nn_huggingface/data/~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/core.py#line=332), in parallelize_with_memory_file_system.<locals>.closure(data, user_defined_function, *user_defined_function_args, **user_defined_function_kwargs)
    325     return wrapped_reduce_function(
    326         (Path(output_file.name) for output_file in output_files),
    327         reduce_extra,
    328     )
    329 except EOFError:
    330     # Loading the files failed, this most likely means that there
    331     # was some error during processing and the files were never
    332     # saved at all.
--> 333     results_promise.get()
    335     # If the above statement does not raise an exception, that
    336     # means the multiprocessing went well and we want to re-raise
    337     # the original EOFError.
    338     raise

File [~/miniconda3/envs/fn_env/lib/python3.10/multiprocessing/pool.py:774](http://dlu:8000/user/sherman/lab/tree/mnt/ubuntu_data_hdd/open_source/code/board_game_nn_huggingface/data/~/miniconda3/envs/fn_env/lib/python3.10/multiprocessing/pool.py#line=773), in ApplyResult.get(self, timeout)
    772     return self._value
    773 else:
--> 774     raise self._value

NameError: name 'pd' is not defined

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants