You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm trying to parallelize my script which is written in object oriented way. I've chosen pathos because I get a PicklingError with the standard multiprocessing library.
PicklingError: Can't pickle <function <lambda> at 0x1187952f0>: attribute lookup <lambda> on jupyter_client.session failed
As my script is already very complex I just show the function which contains the parallelization. All functions are defined at top-level.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import datetime as dt
mport scipy.io
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import netCDF4
import scipy.stats as sp
#import multiprocessing as mpp
from pathos import multiprocessing as mpp
import time
import seaborn as sns
from tqdm import tqdm
class catch2grid(object):
"""See module docstring."""
def __init__(self):
"""Init of catch2grid."""
self.pbar = None
...
def main_par(self, db_Qobs_meta_dir, ens_mean_dir, ens_sd_dir, db_Qobs_dir,
l):
"""Parallel computing of several flow percentiles for Qobs and Qsim,
the standard deviation of the flow percentiles of Qsim and the
KGE alpha.
db_Qobs_meta_dir -- Path to file with meta informations on the
Catchments
ens_mean_dir -- Path to file with runoff ensemble mean
ens_sd_dir -- Path to file with runoff ensemble standard deviation
db_Qobs_dir -- Path to folder with observed runoff database_Qobs_new
"""
cpu_cores = mpp.cpu_count() - 1
df_meta = self.import_meta(db_Qobs_meta_dir)
df_meta = self.select_catchments(df_meta)
# chunking subsets for parallelization
subsets = []
lin_dist = np.linspace(0, l, cpu_cores+1)
# list of tuples with input arguments for map
for i in range(len(lin_dist) - 1):
subsets.append((db_Qobs_meta_dir, ens_mean_dir, ens_sd_dir,
db_Qobs_dir, range(np.int(lin_dist[i]),
np.int(lin_dist[i+1]), 1)))
p = mpp.Pool(cpu_cores) # launch pool of workers
res = p.starmap(self.main, subsets)
p.close()
p.join()
res_obs = []
res_sim = []
res_simsd = []
res_kgealpha = []
# collect dataframes and merge them
[res_obs.append(res[:][i][0]) for i in range(len(res))]
[res_sim.append(res[:][i][1]) for i in range(len(res))]
[res_simsd.append(res[:][i][2]) for i in range(len(res))]
[res_kgealpha.append(res[:][i][3]) for i in range(len(res))]
df_Qobs_percs = pd.concat(res_obs[:], ignore_index=True)
df_Qsim_percs = pd.concat(res_sim[:], ignore_index=True)
df_sdQsim_percs = pd.concat(res_simsd[:], ignore_index=True)
df_KGE_alpha = pd.concat(res_kgealpha[:], ignore_index=True)
return df_Qobs_percs, df_Qsim_percs, df_sdQsim_percs, df_KGE_alpha
if __name__ == "__main__":
start_time = time.time() # measuring computation time
c2g = catch2grid()
c2g.init_pbar(l)
df_Qobs_percs, df_Qsim_percs, df_sdQsim_percs, df_KGE_alpha = \
c2g.main_par(db_Qobs_meta_dir, Ens_mean_dir, Ens_sd_dir, db_Qobs_dir, l)
time_elapsed = time.time() - start_time
print('Parallel run: Time elapsed (hh.mm) {}'.format(time_elapsed/
(60**2)))
After running it I recieve the following error:
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/site-packages/spyder/utils/site/sitecustomize.py", line 705, in runfile
execfile(filename, namespace)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/site-packages/spyder/utils/site/sitecustomize.py", line 102, in execfile
exec(compile(f.read(), filename, 'exec'), namespace)
File "/Users/robinschwemmle/Desktop/MSc_Thesis/Python/catch2grid.py", line 1076, in <module>
c2g.main_par(db_Qobs_meta_dir, Ens_mean_dir, Ens_sd_dir, db_Qobs_dir, l)
File "/Users/robinschwemmle/Desktop/MSc_Thesis/Python/catch2grid.py", line 743, in main_par
res = p.starmap(self.main, subsets)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/site-packages/multiprocess/pool.py", line 268, in starmap
return self._map_async(func, iterable, starmapstar, chunksize).get()
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/site-packages/multiprocess/pool.py", line 608, in get
raise self._value
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/site-packages/multiprocess/pool.py", line 385, in _handle_tasks
put(task)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/site-packages/multiprocess/connection.py", line 209, in send
self._send_bytes(ForkingPickler.dumps(obj))
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/site-packages/multiprocess/reduction.py", line 53, in dumps
cls(buf, protocol).dump(obj)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 409, in dump
self.save(obj)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 751, in save_tuple
save(element)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 736, in save_tuple
save(element)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 736, in save_tuple
save(element)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/site-packages/dill/dill.py", line 1047, in save_instancemethod0
pickler.save_reduce(MethodType, (obj.__func__, obj.__self__), obj=obj)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 610, in save_reduce
save(args)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 736, in save_tuple
save(element)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 634, in save_reduce
save(state)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/site-packages/dill/dill.py", line 871, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 852, in _batch_setitems
save(v)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 634, in save_reduce
save(state)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/site-packages/dill/dill.py", line 871, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 634, in save_reduce
save(state)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/site-packages/dill/dill.py", line 871, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 634, in save_reduce
save(state)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/site-packages/dill/dill.py", line 871, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/site-packages/dill/dill.py", line 871, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 634, in save_reduce
save(state)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/site-packages/dill/dill.py", line 871, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/site-packages/dill/dill.py", line 871, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 634, in save_reduce
save(state)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/site-packages/dill/dill.py", line 871, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/site-packages/dill/dill.py", line 871, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 634, in save_reduce
save(state)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/site-packages/dill/dill.py", line 871, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 852, in _batch_setitems
save(v)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 634, in save_reduce
save(state)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/site-packages/dill/dill.py", line 871, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/Users/robinschwemmle/anaconda/envs/py36/lib/python3.6/pickle.py", line 496, in save
rv = reduce(self.proto)
File "stringsource", line 2, in zmq.backend.cython.socket.Socket.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__`
I double-checked my methods inside the class and all of them are bound. So, I am wondering if the error occur due to a bug in the pathos library or it's my fault. Hope you could help me out.
The text was updated successfully, but these errors were encountered:
I've not seen this error before... it looks like it's coming from trying to pickle something with cython, and there not being a proper __reduce__ method -- it's definitely failing on a C extension. Can you try to simplify your test code above to the minimal toy example that produces the same error? If you do, I can probably recommend a way around it. The error isn't coming from pathos, but from something with cython extensions.
It was the progress bar in the __init__ which caused the error.
I believe the exception is raised when trying to pickle a zmq socket referenced by tqdm via sys.stderr which tells me that you were probably running the script in a jupyter notebook, since tqdm writes to sys.stderr, and sys.stderr is a zmq socket when running in a jupyter notebook.
I'm trying to parallelize my script which is written in object oriented way. I've chosen pathos because I get a
PicklingError
with the standard multiprocessing library.PicklingError: Can't pickle <function <lambda> at 0x1187952f0>: attribute lookup <lambda> on jupyter_client.session failed
As my script is already very complex I just show the function which contains the parallelization. All functions are defined at top-level.
...
After running it I recieve the following error:
I double-checked my methods inside the class and all of them are bound. So, I am wondering if the error occur due to a bug in the pathos library or it's my fault. Hope you could help me out.
The text was updated successfully, but these errors were encountered: