Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: mne.parallel crashes when too much data #3190

Closed
kingjr opened this issue Apr 28, 2016 · 30 comments
Closed

BUG: mne.parallel crashes when too much data #3190

kingjr opened this issue Apr 28, 2016 · 30 comments
Assignees

Comments

@kingjr
Copy link
Member

kingjr commented Apr 28, 2016

I'm encountering some weird error when I try to decod too much data e.g.

import numpy as np
from mne import create_info, EpochsArray
from mne.decoding import TimeDecoding

# simulate data
n_trial, n_chan, n_time = 800, 300, 1445
info = create_info(n_chan, 128., 'mag')
epochs = EpochsArray(np.random.rand(n_trial, n_chan, n_time),
                     info=info,
                     events=np.zeros((n_trial, 3), int))
y = np.random.randint(0, 2, n_trial)
epochs._data[y==1, :, :] += 1

# fit
td = TimeDecoding(n_jobs=-1, predict_mode = 'mean-prediction')
td.fit(epochs, y=y)

works fine but

td.predict(epochs)

returns

---------------------------------------------------------------------------
SystemError                               Traceback (most recent call last)
<ipython-input-22-274c297025ed> in <module>()
     13 td = TimeDecoding(n_jobs=-1)
     14 td.fit(epochs, y=y)
---> 15 y_pred = td.predict(epochs)

/home/ubuntu/mne-python/mne/decoding/time_gen.py in predict(self, epochs)
   1339         """  # noqa
   1340         self._prep_times()
-> 1341         super(TimeDecoding, self).predict(epochs)
   1342         self._clean_times()
   1343         return self.y_pred_

/home/ubuntu/mne-python/mne/decoding/time_gen.py in predict(self, epochs)
    291             n_orig_epochs=n_orig_epochs, test_epochs=test_epochs,
    292             **dict(zip(['X', 'train_times'], _chunk_data(X, chunk))))
--> 293             for chunk in chunks)
    294 
    295         # Concatenate chunks across test time dimension.

/home/ubuntu/anaconda/lib/python2.7/site-packages/joblib/parallel.pyc in __call__(self, iterable)
    808                 # consumption.
    809                 self._iterating = False
--> 810             self.retrieve()
    811             # Make sure that we get a last message telling us we are done
    812             elapsed_time = time.time() - self._start_time

/home/ubuntu/anaconda/lib/python2.7/site-packages/joblib/parallel.pyc in retrieve(self)
    725                 job = self._jobs.pop(0)
    726             try:
--> 727                 self._output.extend(job.get())
    728             except tuple(self.exceptions) as exception:
    729                 # Stop dispatching any new job in the async callback thread

/home/ubuntu/anaconda/lib/python2.7/multiprocessing/pool.pyc in get(self, timeout)
    565             return self._value
    566         else:
--> 567             raise self._value
    568 
    569     def _set(self, i, obj):

SystemError: error return without exception set

or

IOError: bad message length

There is no error if I set n_jobs=1, if I reduce the dimensionality of the data (either n_chan, n_time or n_trial), and if I predict a subset: e.g.

td.predict(epochs[:596]) 

works fine, but from 597, it crashes

td.predict(epochs[:597]) 

I am running on an aws m4.4xlarge running with ubuntu anaconda and mne dev, joblib is 0.9.4.

I have no idea what to do...

@kingjr
Copy link
Member Author

kingjr commented Apr 28, 2016

In the debug mode, when I run the function called by parallel (_predict_slices instead of p_func), there is no error, so I don't think the issue comes from the parallized function.

@kingjr
Copy link
Member Author

kingjr commented Apr 28, 2016

I broke the bug down to this:

from mne.parallel import parallel_func
import numpy as np

def func(X):
    return list()

X = np.random.rand(841, 306, 1445)
parallel, p_func, n_jobs = parallel_func(func, -1)
n_chunks = 1
chunks = [np.array_split(X, 1)]
chunks = map(list, zip(*chunks))

# To minimize memory during parallelization, we apply some chunking
out = parallel(p_func(chunk) for chunk in chunks)

So.. not a GAT issue.

@jona-sassenhagen
Copy link
Contributor

Hm. I get a different error.

---------------------------------------------------------------------------
error                                     Traceback (most recent call last)
<ipython-input-2-275458652206> in <module>()
----> 1 td.predict(epochs)

/data/home1/jsassenh/tools/mne-python/mne/decoding/time_gen.py in predict(self, epochs)
   1339         """  # noqa
   1340         self._prep_times()
-> 1341         super(TimeDecoding, self).predict(epochs)
   1342         self._clean_times()
   1343         return self.y_pred_

/data/home1/jsassenh/tools/mne-python/mne/decoding/time_gen.py in predict(self, epochs)
    291             n_orig_epochs=n_orig_epochs, test_epochs=test_epochs,
    292             **dict(zip(['X', 'train_times'], _chunk_data(X, chunk))))
--> 293             for chunk in chunks)
    294
    295         # Concatenate chunks across test time dimension.

/data/home1/jsassenh/anaconda3/lib/python3.5/site-packages/sklearn/externals/joblib/parallel.py in __call__(self, iterable)
    808                 # consumption.
    809                 self._iterating = False
--> 810             self.retrieve()
    811             # Make sure that we get a last message telling us we are done
    812             elapsed_time = time.time() - self._start_time

/data/home1/jsassenh/anaconda3/lib/python3.5/site-packages/sklearn/externals/joblib/parallel.py in retrieve(self)
    725                 job = self._jobs.pop(0)
    726             try:
--> 727                 self._output.extend(job.get())
    728             except tuple(self.exceptions) as exception:
    729                 # Stop dispatching any new job in the async callback thread

/data/home1/jsassenh/anaconda3/lib/python3.5/multiprocessing/pool.py in get(self, timeout)
    606             return self._value
    607         else:
--> 608             raise self._value
    609
    610     def _set(self, i, obj):

/data/home1/jsassenh/anaconda3/lib/python3.5/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
    383                         break
    384                     try:
--> 385                         put(task)
    386                     except Exception as e:
    387                         job, ind = task[:2]

/data/home1/jsassenh/anaconda3/lib/python3.5/site-packages/sklearn/externals/joblib/pool.py in send(obj)
    367                 buffer = BytesIO()
    368                 CustomizablePickler(buffer, self._reducers).dump(obj)
--> 369                 self._writer.send_bytes(buffer.getvalue())
    370             self._send = send
    371         else:

/data/home1/jsassenh/anaconda3/lib/python3.5/multiprocessing/connection.py in send_bytes(self, buf, offset, size)
    198         elif offset + size > n:
    199             raise ValueError("buffer length < offset + size")
--> 200         self._send_bytes(m[offset:offset + size])
    201
    202     def send(self, obj):

/data/home1/jsassenh/anaconda3/lib/python3.5/multiprocessing/connection.py in _send_bytes(self, buf)
    391         n = len(buf)
    392         # For wire compatibility with 3.2 and lower
--> 393         header = struct.pack("!i", n)
    394         if n > 16384:
    395             # The payload is large so Nagle's algorithm won't be triggered

error: 'i' format requires -2147483648 <= number <= 2147483647

This is on a local Xeon with 270 GB of RAM.

@jona-sassenhagen
Copy link
Contributor

(This is in iPython.)

@kingjr
Copy link
Member Author

kingjr commented Apr 28, 2016

@jona-sassenhagen aaaah what is this.

(This is in iPython.)

me too

@jona-sassenhagen
Copy link
Contributor

Yep, same error.

from mne.parallel import parallel_func
import numpy as np

def func(X):
    return list()

X = np.random.rand(841, 306, 1445)
parallel, p_func, n_jobs = parallel_func(func, -1)
n_chunks = 1
chunks = [np.array_split(X, 1)]
chunks = map(list, zip(*chunks))

# To minimize memory during parallelization, we apply some chunking
out = parallel(p_func(chunk) for chunk in chunks)

/data/home1/jsassenh/anaconda3/lib/python3.5/multiprocessing/connection.py in _send_bytes(self, buf)
    391         n = len(buf)
    392         # For wire compatibility with 3.2 and lower
--> 393         header = struct.pack("!i", n)
    394         if n > 16384:
    395             # The payload is large so Nagle's algorithm won't be triggered

error: 'i' format requires -2147483648 <= number <= 2147483647

Did you check if it's about MNE parallel, or if it can somehow also be done just with Joblib?

@kingjr
Copy link
Member Author

kingjr commented Apr 28, 2016

It's mne.parallel joblib is fine.

FWIU, mne.parallel is trying to be too smart and wants to dump some data down on disc or something

@kingjr kingjr changed the title BUG: GAT with joblib when too much data BUG: mne.parallel crashes when too much data Apr 28, 2016
@dengemann
Copy link
Member

Retrospectively it would have been better to simply use joblib I think,
just one module to attack on debugging.

On Thu, Apr 28, 2016 at 11:36 PM Jean-Rémi KING notifications@github.com
wrote:

It's mne.parallel joblib is fine.

FWIU, mne.parallel is trying to be too smart and wants to dump some data
down on disc or something


You are receiving this because you are subscribed to this thread.
Reply to this email directly or view it on GitHub
#3190 (comment)

@kingjr
Copy link
Member Author

kingjr commented Apr 28, 2016

The error comes from the param max_n_bytes=None:

from joblib import Parallel, delayed
import numpy as np

def func(X):
    pass

X = np.random.rand(841, 306, 1445)
chunks = map(list, zip(*[np.array_split(X, 1)]))

parallel = Parallel(-1, max_nbytes=None)
p_func = delayed(func)

out = parallel(p_func(chunk) for chunk in chunks)

doesn't work but parallel = Parallel(-1) does.

Any idea what I should do to fix this?

@kingjr
Copy link
Member Author

kingjr commented Apr 28, 2016

As a temporary fix, I do

from mne import set_config
set_config('MNE_MEMMAP_MIN_SIZE', '1M')
set_config('MNE_CACHE_DIR', '.tmp')

But I'll let whoever understands this caching approach write the fix..

@jona-sassenhagen can you check that this also fixes your error?

@jona-sassenhagen
Copy link
Contributor

jona-sassenhagen commented Apr 28, 2016

from mne import set_config
set_config('MNE_MEMMAP_MIN_SIZE', '1M')
set_config('MNE_CACHE_DIR', '.tmp')

...

out = parallel(p_func(chunk) for chunk in chunks)
[Parallel(n_jobs=32)]: Done   1 out of   1 | elapsed:   42.5s finished

Yup.

@larsoner
Copy link
Member

larsoner commented Apr 28, 2016 via email

@kingjr
Copy link
Member Author

kingjr commented Apr 28, 2016

I suspect that the GAT code isn't optimal in this regard, because I first initialize n_job in Parallel, but then split the to-be-distributed data in min(data.shape, n_jobs), and not necessarily n_jobs.

This means that it's possible to get

p_func, parallel, n_jobs = Parallel(n_jobs=16)  # initialize 16 cores
n_split = min(2, 16)
chunks = np.array_split(X, n_split)  # only 2 chunk
parallel(p_func(chunk) for chunk in chunks) only recruits 2 cores

@dengemann
Copy link
Member

All this is pretty well in line with my observation that joblib with MNE
does not work nicely with mkl anaconda. For most tasks I actually don't use
joblib when I have mkl. I have to admit that I therefore might have
neglected these issues. I'm glad if someone can fix this who actually uses
joblib a lot with MNE.

On Fri, Apr 29, 2016 at 12:58 AM Jean-Rémi KING notifications@github.com
wrote:

I suspect that the GAT code isn't optimal in this regard, because I first
initialize n_job in Parallel, but then split the to-be-distributed data
in min(data.shape, n_jobs), and not necessarily n_jobs.

This means that it's possible to get

p_func, parallel, n_jobs = Parallel(n_jobs=16) # initialize 16 cores
n_split = min(2, 16)
chunks = np.array_split(X, n_split) # only 2 chunk
parallel(p_func(chunk) for chunk in chunks) only recruits 2 cores


You are receiving this because you commented.
Reply to this email directly or view it on GitHub
#3190 (comment)

@kingjr
Copy link
Member Author

kingjr commented Apr 29, 2016

But mkl only works for matrix operation right? joblib is a bit more generic. Or am I mistaken?

[On this topic, I went to a Meetup at NYC where some people presented pyfora, which tries to compile functions to parallelize them, either locally, or on cluster. My fear is that they want to be a little to smart for it to be usable, but it might be worse checking it out]

@agramfort
Copy link
Member

it is probably a joblib bug

cc @ogrisel

@larsoner
Copy link
Member

But mkl only works for matrix operation right?

MKL works on anything that can use SIMD instructions, which very often means linear algebra since they provide highly optimized BLAS and LAPACK -- but it can also work for e.g. for loops in C, because code is compiled with icc instead of gcc, which is SIMD-aware. So things like np.sum (if the data are aligned properly) might also benefit, although come to think of it I haven't actually checked to see if that's the case...

@dengemann
Copy link
Member

On my machines MKL=n_threads most of the time outperforms joblib=n_jobs on almost any scenario, filtering maybe an exception.

@jona-sassenhagen
Copy link
Contributor

Isn't MKL also usually much better memory wise? Or am I misinterpreting what top tells me?

@larsoner
Copy link
Member

parallel will need to copy all data it sends to workers (unless using memmapping), so yes it will in general be n_jobs times better

@ogrisel
Copy link

ogrisel commented Jul 12, 2016

it is probably a joblib bug

Not really, as explained by @lesteve in joblib/joblib#344: using joblib while disabling then memory mapping feature (by setting explicitly max_nbytes=None which is not the default in joblib), you run the risk of hitting the following limitation of multiprocessing: the queues implementation in the Python standard library cannot hold object larger than 2GB: https://bugs.python.org/issue17560

My suggestions would be to not set the max_nbytes parameter in MNE to let joblib use mmaping for large arrays.

If you face an issue with the memory mapping feature of joblib please feel free to report it.

@kingjr
Copy link
Member Author

kingjr commented Jul 12, 2016

@agramfort ok to change the default max_nbytes to avoid non-explicit error?

@larsoner
Copy link
Member

No, we can't change max_nbytes to anything other than None as a default because it requires writing something to disk somewhere (too invasive)

@kingjr
Copy link
Member Author

kingjr commented Jul 12, 2016

ok, so we need to add a function that check that nbytes is not > 2Go, else throw an explicit error

@larsoner
Copy link
Member

... although I just noticed that joblib defaults to using memmapping, so maybe we should have a deprecation cycle where we add it. Or maybe it's actually not so bad to use it by default, and we can just note it in the API section.

@kingjr
Copy link
Member Author

kingjr commented Jul 12, 2016

writing on disk when you dont have ssd can be pretty annoying, so I would keep the defaults, and explicit the docs.

@agramfort
Copy link
Member

agramfort commented Jul 12, 2016 via email

@larsoner
Copy link
Member

@kingjr I cannot reproduce with your snippet on Linux in Python or IPython. Can you try again and see if it works? Maybe it was some transient bug.

@larsoner
Copy link
Member

... in any case we should decide if we want to use the max_nbytes='1M' that joblib also uses. I suspect it should lead to some savings in most cases.

@larsoner larsoner self-assigned this Mar 21, 2017
@larsoner
Copy link
Member

I think we probably won't change our default behavior, since we usually use np.array_split hopefully the pickling overhead is small

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants