Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why is xarray.to_zarr slow sometimes? #150

Closed
rabernat opened this issue Mar 12, 2018 · 71 comments
Closed

Why is xarray.to_zarr slow sometimes? #150

rabernat opened this issue Mar 12, 2018 · 71 comments

Comments

@rabernat
Copy link
Member

I'm not sure if this is an xarray issue or a dask issue or a gcsfs issue, so I am posting it here.

I have a big dataset (~10TB) stored on my Lamont data storage server that looks like this:

<xarray.Dataset>
Dimensions:         (nv: 2, st_edges_ocean: 51, st_ocean: 50, time: 730, xt_ocean: 3600, xu_ocean: 3600, yt_ocean: 2700, yu_ocean: 2700)
Coordinates:
  * xt_ocean        (xt_ocean) float64 -279.9 -279.8 -279.7 -279.6 -279.5 ...
  * yt_ocean        (yt_ocean) float64 -81.11 -81.07 -81.02 -80.98 -80.94 ...
  * st_ocean        (st_ocean) float64 5.034 15.1 25.22 35.36 45.58 55.85 ...
  * st_edges_ocean  (st_edges_ocean) float64 0.0 10.07 20.16 30.29 40.47 ...
  * nv              (nv) float64 1.0 2.0
  * time            (time) float64 6.94e+04 6.94e+04 6.941e+04 6.941e+04 ...
  * xu_ocean        (xu_ocean) float64 -279.9 -279.8 -279.7 -279.6 -279.5 ...
  * yu_ocean        (yu_ocean) float64 -81.09 -81.05 -81.0 -80.96 -80.92 ...
Data variables:
    salt            (time, st_ocean, yt_ocean, xt_ocean) float32 dask.array<shape=(730, 50, 2700, 3600), chunksize=(1, 1, 2700, 3600)>
    u               (time, st_ocean, yu_ocean, xu_ocean) float32 dask.array<shape=(730, 50, 2700, 3600), chunksize=(1, 1, 2700, 3600)>
    temp            (time, st_ocean, yt_ocean, xt_ocean) float32 dask.array<shape=(730, 50, 2700, 3600), chunksize=(1, 1, 2700, 3600)>
    v               (time, st_ocean, yu_ocean, xu_ocean) float32 dask.array<shape=(730, 50, 2700, 3600), chunksize=(1, 1, 2700, 3600)>

I am trying to push it up to GCS using zarr via gcsfs.

Here is how I do it

fs = gcsfs.GCSFileSystem(project='pangeo-181919')
import zarr
compressor = zarr.Blosc(cname='zstd', clevel=3, shuffle=2)
encoding = {vname: {'compressor': compressor} for vname in ds.variables}
dsname = 'temp_salt_u_v-5day_avg'
experiment = 'control'
gcsmap = gcsfs.mapping.GCSMap('pangeo-data/cm2.6/' + experiment + '/' + dsname,
                              gcs=fs, check=True, create=False)
with dask.set_options(get=dask.threaded.get):
    with ProgressBar():
        ds.to_zarr(store=gcsmap, encoding=encoding)

This works great if I first subset the data to be something, e.g. ds = ds.isel(time=slice(0,5)). I have netdata installed on the server, which allows me to see a fairly steady outbound data rate of around 145,000 kilobits/s.

But if I try it on the whole dataset, the computation starts, but no data get sent. The timer on the progress bar just sits there and doesn't move. I can't figure out what is happening during this time. This persists for ~10 minutes, at which point I get bored and restart the notebook.

cc @kaipak, since this is related to uploading data

@mrocklin
Copy link
Member

I would try to diagnose this with the dashboard. Do progress bars arrive on the dashboard? If not then my guess is that it's quite expensive to send the graph itself. If they do arrive but nothing completes then I would look at the profile page to see what things are working on or the stack traces in the info pages of the workers to get a sense of what they're crunching on.

@rabernat
Copy link
Member Author

I don't use the dashboard because it is buggy with distributed. (This is a separate intermittent issue that I am choosing not to tackle yet; I just want to get this data up asap.)

But here is the profiler view of a short, successful upload.
bokeh_plot_1
The green is concatentate-open_dataset-... (reads) and the yellow is store.

@jhamman
Copy link
Member

jhamman commented Mar 12, 2018

When we were first doing this, we found it useful to separate the creation of the dataset from the writing of the data. That was the motivation for pydata/xarray#1811. It would be good to figure out if the creation of the zarr dataset is slow or writing data is slow. So you might consider trying out the feature in this PR. If no data is ever sent, the dask progress bar should not progress and that would make me think something is getting snagged during the creation of the dataset.

@rabernat
Copy link
Member Author

rabernat commented Mar 12, 2018

FYI, if I try with distributed, I get an error.

edit: see fsspec/gcsfs#90

@rabernat
Copy link
Member Author

If no data is ever sent, the dask progress bar should not progress and that would make me think something is getting snagged during the creation of the dataset.

I can look at the bucket directly via the GCS console and see that a bunch of objects have been created.

@rabernat
Copy link
Member Author

There is really some kind of threshold behavior here...below a certain size, everything is fine. Above that size, nothing.

I'll continue to try to debug.

@rabernat
Copy link
Member Author

So I was able to run this with the distributed scheduler by passing token='cache' to gcsfs.GCSFileSystem (see fsspec/gcsfs#90).

What I am finding is very weird. With a small subset of data (e.g. ds = ds.isel(time=slice(0,5))), everything works fine. With a larger subset (e.g. ds = ds.isel(time=slice(0,50)), the computation never shows up on the schedule. It just hangs at the point of calling to_zarr. I waited over 10 minutes, but still no tasks hit the scheduler. When I interrupt, I get this stack trace

Traceback (most recent call last):
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2881, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-32-af84f08c9909>", line 1, in <module>
    get_ipython().magic('time ds_subset.to_zarr(store=gcsmap, encoding=encoding)')
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2158, in magic
    return self.run_line_magic(magic_name, magic_arg_s)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2079, in run_line_magic
    result = fn(*args,**kwargs)
  File "<decorator-gen-59>", line 2, in time
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/IPython/core/magic.py", line 188, in <lambda>
    call = lambda f, *a, **k: f(*a, **k)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/IPython/core/magics/execution.py", line 1181, in time
    out = eval(code, glob, local_ns)
  File "<timed eval>", line 1, in <module>
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/xarray/core/dataset.py", line 1167, in to_zarr
    group=group, encoding=encoding)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/xarray/backends/api.py", line 754, in to_zarr
    dataset.dump_to_store(store, sync=True, encoding=encoding)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/xarray/core/dataset.py", line 1072, in dump_to_store
    store.sync()
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/xarray/backends/common.py", line 242, in sync
    self.writer.sync()
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/xarray/backends/common.py", line 190, in sync
    da.store(self.sources, self.targets, lock=self.lock)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/dask/array/core.py", line 953, in store
    result.compute()
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/dask/base.py", line 143, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/dask/base.py", line 392, in compute
    results = get(dsk, keys, **kwargs)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/client.py", line 2041, in get
    results = self.gather(packed, asynchronous=asynchronous)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/client.py", line 1478, in gather
    asynchronous=asynchronous)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/client.py", line 603, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/utils.py", line 251, in sync
    e.wait(10)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/threading.py", line 549, in wait
    signaled = self._cond.wait(timeout)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/threading.py", line 297, in wait
    gotit = waiter.acquire(True, timeout)
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 1821, in showtraceback
    stb = value._render_traceback_()
AttributeError: 'KeyboardInterrupt' object has no attribute '_render_traceback_'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/IPython/core/ultratb.py", line 1132, in get_records
    return _fixed_getinnerframes(etb, number_of_lines_of_context, tb_offset)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/IPython/core/ultratb.py", line 313, in wrapped
    return f(*args, **kwargs)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/IPython/core/ultratb.py", line 358, in _fixed_getinnerframes
    records = fix_frame_records_filenames(inspect.getinnerframes(etb, context))
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/inspect.py", line 1459, in getinnerframes
    frameinfo = (tb.tb_frame,) + getframeinfo(tb, context)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/inspect.py", line 1421, in getframeinfo
    lines, lnum = findsource(frame)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/IPython/core/ultratb.py", line 170, in findsource
    file = getsourcefile(object) or getfile(object)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/inspect.py", line 674, in getsourcefile
    if os.path.exists(filename):
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/genericpath.py", line 19, in exists
    os.stat(path)
KeyboardInterrupt

Not very informative obviously. Would love some more suggestions on how to debug. This is a showstopper in terms of moving forward.

@rabernat
Copy link
Member Author

Strangely, after interrupting to_zarr, I finally saw a few concatenate tasks show up on the scheduler.

@rabernat
Copy link
Member Author

Ok, so after waiting a long time (~20 minutes), the computation finally started. What could be responsible for this long wait? If it takes 20 minutes for 50 timesteps, I fear that it will take forever for the full 730 timesteps.

@jhamman
Copy link
Member

jhamman commented Mar 13, 2018

@rabernat - have you tried using pydata/xarray#1811?

I'd like to see a time breakdown from each of these three steps

%time ds = xr.open_mfdataset('your files *')

%time delayed_obj = ds.to_zarr(store=store, ..., compute=False)

%time delayed_obj.compute()

@rabernat
Copy link
Member Author

Great suggestion Joe. I will give it a try later today.

@rabernat
Copy link
Member Author

I tried with pydata/xarray#1811

%time ds = xr.open_mfdataset('your files *')

The initialization of the dataset is a bit more complicated than this, since it involves several sets of files and merge steps. However, all of this takes no more than 10s.

%time delayed_obj = ds.to_zarr(store=store, ..., compute=False)

CPU times: user 4.84 s, sys: 633 ms, total: 5.48 s
Wall time: 14.9 s

%time delayed_obj.compute()

~5-10 minutes with no scheduler activity, then the computation starts. Potentially takes many hours / days, since there is a lot of data to push.

@rabernat
Copy link
Member Author

So the fundamental question here is: what is happening during the time between calling delayed_obj.compute() and when the tasks actually start computing? Seems like a question for @mrocklin.

@guillaumeeb
Copy link
Member

guillaumeeb commented Mar 14, 2018

I've stumbled upon something similar a few weeks ago, although it was not related to zarr, only to dask distributed. I submitted 100 tasks on distributed, and computations would kick in only after a few minutes. No information onto the status page of the UI. Here is what fixed it: https://stackoverflow.com/questions/41471248/how-to-efficiently-submit-tasks-with-large-arguments-in-dask-distributed: problem of efficient serialization/distribution from client to workers.
I've no knowledge of zarr internal though, so it is probably a dead end.

@rabernat
Copy link
Member Author

Update on this issue: I ran to_zarr on the full dataset (720 timesteps). After more than 18 hours, the tasks have still not shown up on the scheduler dashboard. (A possibly related observation is that the dashboard is poorly responsive, indicating that the scheduler might be doing something strenuous.)

My understanding based on @mrocklin's earlier comment above is that this indicates that it is "quite expensive to send the graph itself", as in >18 hours expensive!

How can I examine the graph to determine which parts are expensive to serialize?

This is starting to feel like deja vu with pydata/xarray#1770 and fsspec/gcsfs#49. I am using gcsfs 0.5.0, so presumably the issue of the dirs object becoming very large should not be affecting us here.

@rabernat
Copy link
Member Author

Here is my feeble attempt to figure out how big the gcsmap object is

import cloudpickle
import sys
%time gcsmap_pickled = cloudpickle.dumps(gcsmap)
sys.getsizeof(gcsmap_pickled)

gives

CPU times: user 3.2 ms, sys: 136 µs, total: 3.33 ms
Wall time: 3.23 ms
2157

So this is not the culprit.

@rabernat
Copy link
Member Author

Here is an attempt to peek into the enormous dask graph associated with the full to_zarr operation

delayed_store = ds.to_zarr(store=gcsmap, encoding=encoding, compute=False)
for k, v in delayed_store.dask.items():
    print(k, v)
    break

gives

('store-concatenate-dda7e2a9bdd00de38820d6a6afbb671a', 82, 33, 0, 0)
(<function store_chunk at 0x7fd17c3b2510>,
('concatenate-dda7e2a9bdd00de38820d6a6afbb671a', 82, 33, 0, 0),
 <zarr.core.Array '/v' (730, 50, 2700, 3600) float32>,
 (slice(82, 83, None), slice(33, 34, None), slice(0, 2700, None), slice(0, 3600, None)),
 False, False)

It looks like there is a zarr.core.Array object of shape (730, 50, 2700, 3600) inside the graph. Perhaps this is the thing that is expensive to serialize?

@mrocklin
Copy link
Member

You might consider calling .persist() rather than .compute() and profiling that bit. This separates the "sending data to scheduler" part from the "waiting for the computation to finish" part.

In a Jupyter notebook this might look like the following:

%%prun
x = x.persist()

Or replace prun with snakeviz after calling %load_ext snakeviz

@rabernat
Copy link
Member Author

I ran this .persist profiling on a smaller subset (30 timesteps). It took about 160 seconds. I really can't make much of the snakeviz profile. It seems to be spending all its time sorting something.

image

I posted the raw profile data here if anyone wants to take a look..

@rabernat
Copy link
Member Author

So this is the line of dask that is taking all the time:
https://github.com/dask/dask/blob/master/dask/order.py#L111

@mrocklin
Copy link
Member

mrocklin commented Mar 14, 2018 via email

@mrocklin
Copy link
Member

mrocklin commented Mar 14, 2018 via email

@rabernat
Copy link
Member Author

Just glad we are getting to the bottom of it!

I am currently blocked on my use case until I can move this full dataset into GCS (which I cannot do now due to the potentially infinite wait time associated with this step).

@mrocklin
Copy link
Member

I hear you and I agree that this is important to fix. I however am unlikely to fix it for at least a week and maybe two.

If you wanted to procede without me you could revert the order.py file to something around what was in in dask 0.16. It should drop in easily without affecting the rest of the system. You also only need to do this locally on the environment from which your client runs. The scheduler and workers won't need this change.

You could also try to engage someone like @eriknw who also has the expertise to deal with task ordering issues, but is commonly busy on other projects.

@mrocklin
Copy link
Member

Or, better yet, investigate the order.py file and the associated tests (I think that the tests are decently informative) and try to find a solution! It's a pretty separable problem that is well suited to Math or CS students if you have any around. The theoretical name of this problem is, I think, the "pebbling problem"

@rabernat
Copy link
Member Author

I probably need to be working less on Pangeo stuff and more on other stuff. So I am quite glad to have a reason to just sit tight for a while!

@mrocklin
Copy link
Member

mrocklin commented Mar 14, 2018 via email

@rabernat
Copy link
Member Author

Is the profile in this comment still valid?

Now I am doing an even bigger dataset than that one (by about a factor of 2; 876081 tasks total). But I think the underlying performance issue is the same, so that profile should still be useful.

@mrocklin
Copy link
Member

I don't suppose you have something representative that I can run easily locally for testing without having access to data or custom scripts, do you? (expectation is no, but thought I'd ask)

@rabernat
Copy link
Member Author

I don't suppose you have something representative that I can run easily locally for testing without having access to data or custom scripts, do you? (expectation is no, but thought I'd ask)

The example [random] datasets described in pydata/xarray#1770 should be relevant here. Might have to increase the size / number of variables until you hit these bottlenecks.

@mrocklin
Copy link
Member

Which one in particular? The dask array one or the xarray one? Are zarr and gcsfs required to observe the poor performance in order? I apologize for the quesitons here, but you're probably the most qualified person to help identify what is neccessary to reproduce the poor performance here.

@rabernat
Copy link
Member Author

The process to store I am using here is the following

import zarr
import gcsfs
compressor = zarr.Blosc(cname='zstd', clevel=3, shuffle=2)
encoding = {vname: {'compressor': compressor} for vname in ds.variables}

token = 'cache'
fs = gcsfs.GCSFileSystem(project='pangeo-181919', token=token)
gcsmap = gcsfs.mapping.GCSMap('pangeo-data/<dataset name>',
                              gcs=fs, check=True, create=False)
delayed_store = ds.to_zarr(store=gcsmap, encoding=encoding, compute=False)
# retries was just added
persist_store = delayed_store.persist(retries=10)

@rabernat
Copy link
Member Author

Which one in particular? The dask array one or the xarray one? Are zarr and gcsfs required to observe the poor performance in order?

I don't know the answer, because I have not done the experiment. I think it would be very useful to compare raw zarr store vs xarray store to understand where the bottlenecks lie. Xarray can generate a lot of chatter in the dask graph, and perhaps this is part of the issue.

Here I am, perhaps prematurely, trying to actually push a real dataset before we have optimized this workflow. I hoped I could just brute force it through and move on, but these intermittent errors, coupled with slow upload speed (see #166), means that I am not really able to.

@mrocklin
Copy link
Member

I think that we should expect speedbumps like this whenever we increase scale by an order of magnitude or try a new technology. This identification and solution of performance issues is, I suspect, one of the major products of this collaboration.

I'm quite happy to help guide people to diagnose issues generally and to directly fix them within Dask specifically. However I am probably not very capable when to comes to running full stack experiments like this.

@rabernat
Copy link
Member Author

The problem with debugging these big data problems is that the iteration is so slow. For my current dataset, it takes many hours just to get through .persist(). Making a smaller example kind of defeats the point, because the issue is with scalability itself.

I will gladly follow your advice on how to proceed to debug in a systematic way. My thinking would be a hierarchy of experiments.

  1. The full, really xarray dataset, stored into gcsfs via zarr
  2. An identical xarray dataset, but with data generated randomly by dask
  3. The same raw arrays, but stored directly into zarr by dask (bypassing xarray)
  4. All of the above done without gcsfs, storing to disk instead.

(Please tell us what debugging info / logs / etc. should be stored to make the most out of this.)

The problem is that it's very hard for me to find time to do this. Maybe @kaipak can squeeze some of it in as part of his related work on storage benchmarking.

@mrocklin
Copy link
Member

My hope is that if you're running into problems with order.py then it doesn't engage gcsfs at all and hopefully not to_zarr (though I don't know how that works). Ideally we would find an example with xarray or dask.array with random data that was easy to reproduce that was similar to your problem that spent an inordinate amount of time in order.py (say, greater than 1ms per task).

@mrocklin
Copy link
Member

Hopefully, through looking at scaling relations (takes 1s at 10000 tasks, 10s at 100000 tasks, 100s at 1000000 tasks, so must be linear) we can extrapolate from smaller examples to larger cases.

@rabernat
Copy link
Member Author

rabernat commented Mar 19, 2018

I have created a self-contained script which benchmarks this issue using just dask, zarr, and gcsfs:
https://gist.github.com/rabernat/8a720013b86eda92127f4d954e947b4c

Here is a plot of the execution time, which shows some nonlinearity in the scaling for large size:
image

(I have not yet tried to wrap this in xarray, but it should be straightforward.)

I have reached the end of my knowledge and time for working on this. (edit: for a few days at least)

@mrocklin
Copy link
Member

I have reached the end of my knowledge and time for working on this. (edit: for a few days at least)

Cool. I appreciate you spending time on this. I'm curious, have you tried this without zarr or without gcsfs and does it make a difference? "No, I didn't try this" is a fine answer, I'm just wondering if they're essential to seeing this problem or not.

@rabernat
Copy link
Member Author

No, did not try without zarr / gcsfs. The benchmark script would be easy to modify for that scenario, should someone else wish to undertake those experiments.

@mrocklin
Copy link
Member

Here is my attempt to run this script:

mrocklin@carbon:~/workspace/play$ python dask_zarr_gcs_store_timing.py 
Traceback (most recent call last):
  File "dask_zarr_gcs_store_timing.py", line 60, in <module>
    timing[n, ns] = time_store_persist(nt, fs)
  File "dask_zarr_gcs_store_timing.py", line 38, in time_store_persist
    with temporary_gcs_path(fs) as gcsmap:
  File "/home/mrocklin/Software/anaconda/lib/python3.6/contextlib.py", line 81, in __enter__
    return next(self.gen)
  File "dask_zarr_gcs_store_timing.py", line 27, in temporary_gcs_path
    create=True)
  File "/home/mrocklin/workspace/gcsfs/gcsfs/mapping.py", line 38, in __init__
    self.gcs.mkdir(bucket)
  File "<decorator-gen-9>", line 2, in mkdir
  File "/home/mrocklin/workspace/gcsfs/gcsfs/core.py", line 51, in _tracemethod
    return f(self, *args, **kwargs)
  File "/home/mrocklin/workspace/gcsfs/gcsfs/core.py", line 633, in mkdir
    json={"name": bucket})
  File "<decorator-gen-2>", line 2, in _call
  File "/home/mrocklin/workspace/gcsfs/gcsfs/core.py", line 51, in _tracemethod
    return f(self, *args, **kwargs)
  File "/home/mrocklin/workspace/gcsfs/gcsfs/core.py", line 425, in _call
    meth = getattr(self.session, method)
AttributeError: 'NoneType' object has no attribute 'post'

@mrocklin
Copy link
Member

@rabernat does this faithfully represent your problem?

from time import time
import numpy as np

from dask.distributed import Client
import dask.array as dsa


class Empty(object):
    def __init__(self, shape, dtype):
        self.shape = shape
        self.dtype = dtype


def time_store_persist(nt):
    """Time storing a dataset of size nt into GCSFileSystem fs"""
    shape = (nt, 50, 1080, 2160)
    chunkshape = (1, 1, 1080, 2160)
    ar = dsa.random.random(shape, chunks=chunkshape)

    store = Empty(shape=ar.shape, dtype=ar.dtype)
    delayed_store = ar.store(store, lock=False, compute=False)
    with Client(processes=False) as client:
        tic = time()
        persist_store = delayed_store.persist()
        toc = time()
        del persist_store
    return toc - tic


# the sizes of datasets to try; adjust as you like
data_sizes = 2**(np.arange(0, 8))
# how many profiles to make
nsamples = 4
timing = np.zeros((len(data_sizes), nsamples))
for n, nt in enumerate(data_sizes):
    for ns in range(nsamples):
        timing[n, ns] = time_store_persist(nt)

@mrocklin
Copy link
Member

For history here I first tried using np.empty and then np.memmap but that filled memory very quickly. I made an Empty storage class that mimics the dtype and shape attributes (I think this is all that store needs? not sure). Alternatively we could just reduce the size of the data while keeping the size of the graph the same by reducing the shape and chunkshape accordingly.

@rabernat
Copy link
Member Author

rabernat commented Mar 19, 2018

Here is my attempt to run this script:

See the inline comment about initializing the gcs filesystem:

# need to run once with `token='browswer'`, and the switch to `token='cache'`
# otherwise the scheduler can't get valid credentials
# (see https://github.com/dask/gcsfs/issues/90)
fs = gcsfs.GCSFileSystem(project='pangeo-181919', token='cache')

edit: I recommend pasting the script into a notebook, which is how I developed it in the first place. Some interactivity may be needed to get the gcsfs credentials set up.

@rabernat
Copy link
Member Author

does this faithfully represent your problem?

That looks about right, yes. But will the Empty store actually work here? Doesn't it need __setitem__ defined?

@mrocklin
Copy link
Member

Things will fail when they actually start computing, but my understanding is that we're only looking to profile the time to get things to the scheduler, not the time to actually compute them. Is this correct?

@rabernat
Copy link
Member Author

additional possibly relevant observation: when calling .persist() on the real dataset, my memory usage increases to about 350 GB (this is a big server). I have not yet checked whether this is the case for fake datasets.

@mrocklin
Copy link
Member

When I ran snakeviz on that computation I noticed that it was spending a lot of time in dumps, which may be related.

mrocklin added a commit to dask/dask that referenced this issue Mar 19, 2018
When performning task ordering we sort tasks based on the
number of dependents/dependencies they have.  This is critical to
low-memory processing.

However, sometimes individual tasks have millions of dependencies,
for which an n*log(n) sort adds significant overhead.  In these cases
we give up on sorting, and just hope that the tasks are well ordered
naturally (such as is often the case in Python 3.6+ due to sorted
dicts and the natural ordering that exists when constructing common
graphs)

See pangeo-data/pangeo#150 (comment)
for a real-world case
@mrocklin
Copy link
Member

The computation spends around half its time in dumps and about half its time in order. The order time is around 1ms per task, which seemed high. I think I've resolved this in dask/dask#3303

@rabernat
Copy link
Member Author

I just tried my benchmark with dask/dask#3303 (now in dask master). Here are the old results

image

Here are the new results

image

In terms of the real world example... persisting the store operation used to take ~4 hours. Now it takes 2 minutes. I would say that this has been nailed. Thanks @mrocklin so much for your help squashing this really important bottleneck.

@mrocklin
Copy link
Member

mrocklin commented Mar 21, 2018 via email

@eriknw
Copy link

eriknw commented Mar 21, 2018

woo, awesome job all. Nice plots @rabernat. I've been lurking (sorry for not having the time to help), and just wanted to chime in to give my +1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants