Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workflow for moving data to cloud #48

Closed
rabernat opened this issue Dec 11, 2017 · 52 comments
Closed

workflow for moving data to cloud #48

rabernat opened this issue Dec 11, 2017 · 52 comments

Comments

@rabernat
Copy link
Member

I am currently transferring a pretty large dataset (~11 TB) from a local server to gcs.
Here is an abridged version basic workflow:

# open dataset (about 80 x 133 GB netCDF files)
ds = xr.open_mfsdataset('*.nc', chunks={'time': 1, 'depth':1}) 

# configure gfs
import gcsfs
config_json = '~/.config/gcloud/legacy_credentials/ryan.abernathey@gmail.com/adc.json'
fs = gcsfs.GCSFileSystem(project='pangeo-181919', token=config_json)
bucket = 'pangeo-data-private/path/to/data'
gcsmap = gcsfs.mapping.GCSMap(bucket, gcs=fs, check=True, create=True)

# set recommended compression
import zarr
compressor = zarr.Blosc(cname='zstd', clevel=5, shuffle=zarr.Blosc.AUTOSHUFFLE)
encoding = {v: {'compressor': compressor} for v in ds.data_vars}

# store
ds.to_zarr(store=gcsmap, mode='w', encoding=encoding)

Each chunk in the dataset has 2700 x 3600 elements (about 75 MB), and there are 292000 total chunks in the dataset.

I am doing this through dask.distributed using a single, multi-threaded worker (24 threads). I am watching the progress through the dashboard.

Once I call to_zarr, it takes a long time before anything happens (about 1 hour). I can't figure out what dask is doing during this time. At some point the client errors with the following exception: tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe371f58a58> exception was never retrieved. Nevertheless, the computation eventually hits the scheduler, and I can watch its progress.

image

I can see that there are over 1 million tasks. Most of the time is being spent in tasks called open_dataset-concatenate and store-concatenate. There are 315360 of each task, and each takes about ~20s. Doing the math, at this rate it will take a couple of days to upload the data, this is slower than scp by a factor of 2-5.

I'm not sure if it's possible to do better. Just raising this issue to start a discussion.

A command line utility to import netcdf directly to gcs/zarr would be a very useful tool to have.

@rabernat
Copy link
Member Author

Here is an alternative, more "cloudy" way this might work:

  • We develop a CLI that allow us to upload netcdf files, e.g pangeo-upload-dataset --bucket path/to/bucket -chunks time:1,depth:1 *.nc
  • Each netcdf file is stored in a temporary bucket
  • Some sort of load-balanced pool of workers waits for each individual upload to complete and then kicks off a job that
    • transcodes the netcdf file to zarr
    • deletes the temporary file

In this case, we would end up with many zarr stores, just like we have many netcdf files. We would need an open_multizarr function in xarray to simplify automatic concatenation of such stores.

@mrocklin
Copy link
Member

Can you get a sense for what the bottleneck is? I/O? Compression?

@rabernat
Copy link
Member Author

My first response would be "neither," since neither the system CPU (5%) nor outbound IP traffic (~20,000 kilobit/s) is anywhere close to saturated.

Reading the data from disk could also be a bottleneck, especially if each of these 24 threads is accessing a different, random chunk of the data. From the timing of the tasks above, reading and writing seem to be similar. But maybe I am not measuring correctly.

@mrocklin
Copy link
Member

If you're using the dask.distributed scheduler (which given the images above, you probably are) I recommend looking at the "Profile" tab.

@mrocklin
Copy link
Member

You could also consider changing the threads/processes mixture using the n_workers= and threads_per_worker= keywords to Client or LocalCluster

@rabernat
Copy link
Member Author

I don't have a "Profile" tab. I guess my distributed version is out of date. I do have a "System" tab.

Thanks for the suggestions about profiling. I will try to do some more systematic profiling. For now I just wanted to get the transfer started, and I am reluctant to interrupt it.

@mrocklin
Copy link
Member

Yeah, you might consider upgrading at some point. Doc page on the profiler: http://distributed.readthedocs.io/en/latest/diagnosing-performance.html#statistical-profiling

@rabernat
Copy link
Member Author

Yes, that definitely would have been good to have!

@rabernat
Copy link
Member Author

Looks like I am on track to do about 100 GB in one day. At this rate, it will take 100 days to upload the dataset.

@jhamman - did you have similar performance with the Newmann Met ensemble?

@mrocklin
Copy link
Member

Heh, I recommend profiling and looking at using more processes.

@rabernat
Copy link
Member Author

rabernat commented Dec 12, 2017 via email

@mrocklin
Copy link
Member

I meant that you might consider using more processes and fewer threads per process

client = Client(n_workers=4, threads_per_worker=4)

@jhamman
Copy link
Member

jhamman commented Dec 12, 2017

@jhamman - did you have similar performance with the Newmann Met ensemble?

Yes. I'm not sure I ever got past the setup / serialization step.

@jhamman
Copy link
Member

jhamman commented Dec 14, 2017

@rabernat and @mrocklin - I'm using fsspec/gcsfs#49 and the xarray/master branch to move data to GCP now. Initialization was much faster. I'll report back if/when this completes.

@rabernat
Copy link
Member Author

I'm glad that the initialization bottleneck seems to be solved!

I still think we have a lot of work ahead figuring out how to tune chunks / compression / n_procs / n_threads to efficiently move data into the cloud using this method.

@rabernat
Copy link
Member Author

FYI, I have been playing around with to_zarr using the default DirectoryStore. I am still getting long initialization wait times. So that suggests gcsfs is not necessarily the culprit.

@mrocklin
Copy link
Member

@rabernat in your situation I might try dumping a tiny xarray dataset to zarr and profile the operation, seeing which parts of the process take up the most time. I generally use the %snakeviz magic for this

pip install snakeviz
%load_ext snakeviz
%snakeviz ds.to_zarr(...)

@mrocklin
Copy link
Member

@jhamman checking in, were you able to upload anything to GCS or is Geyser still down?

@jhamman
Copy link
Member

jhamman commented Dec 20, 2017

Reporting back after doing a bit of profiling on a ~21Mb dataset. For the tests I'm reporting now, I persisted the dataset into memory prior to writing to the zarr store. I also compared writing to a local store on a SSD. I have attached the results from running the to_zarr method with %prun. These files can be read using snakeviz via the following syntax:

snakeviz filename.prof

Profiles.zip

As a teaser, here is a snapshot from snakeviz.

profile

The outermost gray ring is method 'acquire' of '_thread.lock' objects.

Finally, here is the notebook I used to generate these tests.

@mrocklin
Copy link
Member

_thread.lock.acquire is a sign that this is using the multithreaded scheduler, which is difficult to profile. Can you try a second time with dask.set_options(get=dask.get)?

@mrocklin
Copy link
Member

Ah, I see that you're using a client. We might want to avoid this when profiling. Although the dask.set_options call should override that regardless

@jhamman
Copy link
Member

jhamman commented Dec 20, 2017

@mrocklin - See attached profiles using dask.set_options(get=dask.get) instead of client.

Profiles.zip

@mrocklin
Copy link
Member

@martindurant may want to see this. There is a lot of time spent in operations like _list_bucket.

Also interesting is that most of the time is spent in SSL_read rather than writing.

@jhamman I might suggest rechunking your data differently to have fewer larger chunks, and then see how that affects bandwidth. I suspect that we are mostly bound here by administrative checkins with GCS and not by compressing or sending bytes.

@jhamman
Copy link
Member

jhamman commented Dec 20, 2017

I tried another chunk configuration (5 chunks per dataarray) and the throughput went from 380s to 405s (slower). I'm also curious about the SSL_read calls. Is it possible we're doing something silly where we initialize the arrays on GCS then read them back, before writing to them again?

@mrocklin
Copy link
Member

Yeah you're right, it looks like we load in all of the data to the local process.

AbstractWritableDataStore.set_variables calls

         if vn not in self.variables:

Where self.variables is a property that calls load

@property
def variables(self):
    # Because encoding/decoding might happen which may require both the
    # attributes and the variables, and because a store may be updated
    # we need to load both the attributes and variables
    # anytime either one is requested.
    variables, _ = self.load()
    return variables

This appears to be about 120s of your 400s

@mrocklin
Copy link
Member

Another issue seems to be excessive metadata/attrs collection. For example we seem to be creating around 100 Zarr arrays. In each case we seem to spend around 60ms getting metadata, resulting in around 60s of lost time.

Two questions:

  1. @jhamman should we be making 100 zarr arrays here? Does that sound right given the process you're doing?
  2. @alimanfoo given that we know the metadata locally perhaps we can hand this to the Array constructor to avoid touching the store mapping unnecessarily?

@mrocklin
Copy link
Member

We seem to spend a long time dealing with attrs. Each of these can take some time.

More broadly, every time we touch the GCS mapping from far away there is a non-trivial cost. It appears that XArray-on-Zarr touches the mapping frequently. I wonder if there is some way to fill out all of our metadata locally and then push that up to GCS in a single go.

@mrocklin
Copy link
Member

@martindurant do you have any thoughts on long-running connections and gcsfs? Is this feasible to avoid the thousand small SSL handshakes we're doing here?

@jhamman
Copy link
Member

jhamman commented Dec 21, 2017

Ouch. The variables property issue seems like a bug and one that I'm pretty sure I introduced. I'll look into it more tomorrow and see if we can come up with a fix on the xarray side for that point.

The change on the xarray side was here: https://github.com/pydata/xarray/pull/1609/files#diff-e7faa2e88465688f603e8f1f6d4db821R226

@rabernat
Copy link
Member Author

rabernat commented Dec 21, 2017

This is pretty fascinating. Some of the backend optimizations required to improve this could potentially be combined with pydata/xarray#1087. Fetching attributes lazily sounds like a low hanging fruit.

@jhamman
Copy link
Member

jhamman commented Dec 21, 2017

I have implemented a somewhat unsatisfactory fix for the xarray issue. I'm testing it now and will try to get it completed today.

@jhamman
Copy link
Member

jhamman commented Dec 21, 2017

@jhamman should we be making 100 zarr arrays here? Does that sound right given the process you're doing?

I would expect the number to be closer to 38 arrays (7variables x 5 chunks + 3 coordinates).

Below are some updated profiles using the changes in pydata/xarray#1799.

Profiles_after_xr_fix.zip

@mrocklin
Copy link
Member

Now it looks like the biggest issue is in attribute handling and preparing the variables prior to sending data?

@jhamman
Copy link
Member

jhamman commented Dec 21, 2017

I'm still confused/concerned about the amount of time we spend with the SSL handshakes.

@mrocklin
Copy link
Member

SSL handshakes are expensive, especially if you are far away from the other destination. There are several network roundtrips to do the full handshake. Currently gcsfs does this handshake every time we touch any piece of data. I hope that we can reduce this with long-running connections or Sessions. @martindurant would know more though, I suspect that he has thought about this before.

@mrocklin
Copy link
Member

@jhamman you should also try merging in fsspec/gcsfs#22 . I suspect that your times will go down significantly

@alimanfoo
Copy link

alimanfoo commented Dec 22, 2017 via email

@jhamman
Copy link
Member

jhamman commented Dec 24, 2017

@mrocklin and @alimanfoo - I just tried my little test case again with fsspec/gcsfs#22, fsspec/gcsfs#49, and https://github.com/alimanfoo/zarr/pull/220. I've had to move to a new machine so the tests are not going to be a perfect match but it seems like we've cut down the number of SSL_read calls from 345228 to 6910. This seems to be yielding about a 5x speedup. That said, it still takes about 60 seconds to push 20 Mb so there is probably still room for improvement.

Snakeviz ready profile is attached.

program_to_zarr_gcsfs.prof.zip

@alimanfoo
Copy link

alimanfoo commented Dec 24, 2017 via email

@alimanfoo
Copy link

alimanfoo commented Dec 24, 2017 via email

@jhamman
Copy link
Member

jhamman commented Dec 26, 2017

Thanks @alimanfoo. I just opened a xarray PR (pydata/xarray#1800) that uses update rather than set_item. This has reduced the write time from 60 seconds to ~35 seconds. An updated profile is attached for those who are interested.

program_to_zarr_gcsfs.prof 2.zip

@alimanfoo
Copy link

alimanfoo commented Dec 26, 2017 via email

@martindurant
Copy link
Contributor

Does fsspec/gcsfs#55 impact the performance? Previously, there was no Session invoked, I think now some connections should be reusable.

@jhamman
Copy link
Member

jhamman commented Jan 3, 2018

@martindurant - yes, that moved the mark from ~35 seconds to ~25 seconds thanks to a sharp decrease in the number of ssl read calls (1885 vs 6910):

         640185 function calls (639148 primitive calls) in 24.996 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1885   22.844    0.012   22.844    0.012 {built-in method _openssl.SSL_read}
      770    0.349    0.000    0.349    0.000 {built-in method _openssl.SSL_write}

This is using zarr-master, pydata/xarray#1800, and the gcsfs combination of fsspec/gcsfs#22, fsspec/gcsfs#49, and fsspec/gcsfs#55.

An updated profile is attached: program_to_zarr_gcsfs.prof.zip

@jhamman
Copy link
Member

jhamman commented Jan 24, 2018

@mrocklin
Copy link
Member

This could probably still use some documentation to help other groups (like @rabernat 's) push relevant data to the cloud

@rabernat
Copy link
Member Author

I am eager to try this out. I guess I just have to update my xarray, gcsfs, and zarr to latest master? Or are there other steps that need to be documented?

@martindurant
Copy link
Contributor

gcsfs was just released, so a normal conda update should do for that one.

@mrocklin
Copy link
Member

Thank you for keeping up with work on gcsfs @martindurant . It was very useful to have your time on this.

@alimanfoo
Copy link

alimanfoo commented Jan 24, 2018 via email

@stale
Copy link

stale bot commented Jun 15, 2018

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale label Jun 15, 2018
@jhamman
Copy link
Member

jhamman commented Jun 21, 2018

closing.

@rabernat wrote: http://pangeo-data.org/data.html#guide-to-preparing-cloud-optimized-data

This basically summarizes the best known workflow for moving zarr-like datasets to the cloud.

@stale stale bot removed the stale label Jun 21, 2018
@jhamman jhamman closed this as completed Jun 21, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants