Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to organize files on GCP #19

Closed
rabernat opened this issue Oct 13, 2017 · 41 comments
Closed

How to organize files on GCP #19

rabernat opened this issue Oct 13, 2017 · 41 comments
Labels

Comments

@rabernat
Copy link
Member

Now that we have our GCP credits, we can start moving some bigger data volumes to cloud storage. GCP has two basic categories of storage:

When the xarray zarr backend is working (see pydata/xarray#1528), it should be straightforward to plug it into the object storage.

For our file-based datasets, I think persistent disk is the way to go. I propose we create a unique persistent disk for each dataset in our collection. That will give us maximum flexibility in organizing and sharing the different datasets.

So, what are the datasets we want to load first? I would propose we start with

@jhamman
Copy link
Member

jhamman commented Oct 17, 2017

I can move a portion of the met ensemble to a PD. I'll ping this issue when its available.

@rabernat
Copy link
Member Author

rabernat commented Oct 17, 2017 via email

@rabernat
Copy link
Member Author

rabernat commented Nov 16, 2017

I want to move this discussion out of email and back to github (cc @jhamman, @mrocklin, @kmpaul, @mmccarty)

We were discussing the tradeoffs between google cloud storage vs persistent disk for storing our datasets.

I think we all agree that some hypothetical zarr backed chunk store in a cloud bucket would be an ideal storage layer for xarray. However, our goal is to build a tool to analyze existing datasets. We should have some way to support regular netCDF files for the time being. The options seem to be

  1. store the files on a persistent disk
  2. store the files themselves in cloud storage

I don't know how 2 would work. Some questions that come to mind are

  • Is there a way to make the cloud storage look like a posix filesystem?
  • Can you seek on files that are stored as objects in cloud storage? This is crucial because often we just want to read small chunks of the netCDF files.

As far as option 1, some questions are:

  • Are the performance penalties when multiple compute nodes try to read from a read-only persistent disk simultaneously?
  • How scalable is persistent disk?

We are making a lot of guesses about how these different approaches would perform. I would like to have some quantitative benchmarks instead.

@mrocklin
Copy link
Member

I don't have answers here because I don't know anyone who uses volumes for large amounts of data.

Can you seek on files that are stored as objects in cloud storage? This is crucial because often we just want to read small chunks of the netCDF files.

Yes, and the gcsfs library that Dask uses provides a Python file-like interface that provides seek/tell functionality. Unfortunately this isn't enough for most HDF5/NetCDF libraries. They expect a C File pointer.

I'm still inclined to push on easy conversion to a cloud-friendly format. What stops us from doing something like the following on one of our HPC systems?

ds = xarray.open_mf_dataset(...)
to_zarr(ds, 'gcs://...')

@rabernat
Copy link
Member Author

rabernat commented Nov 16, 2017

What stops us from doing something like the following on one of our HPC systems?

Me finishing pydata/xarray#1528 😬

(edit: I hope to do so over Thanksgiving)

@jhamman
Copy link
Member

jhamman commented Dec 5, 2017

@rabernat and @mrocklin - following up on our conversation earlier today. I am uploading two copies of the Newmann met ensemble to GCS:

  1. gs://pangeo-data/newmann-met-ensemble-netcdf will include the same netcdf files that currently reside on Cheyenne (/glade/u/home/jhamman/workdir/GARD_inputs/newman_ensemble/)
  2. gs://pangeo-data/newmann-met-ensemble will include a zarr dataset with the same data.

I few things to note:

  1. @davidedelvento can correct me if I'm wrong but Cheyenne compute nodes do not have outside network access so I'm pushing the zarr dataset from geyser.
  2. the workflow to publish a zarr dataset from geyser to GCS is as follows:
In [1]: import xarray as xr

In [2]: from distributed import LocalCluster, Client
   ...: client = Client(LocalCluster(processes=True, n_workers=10, threads_per_worker=4))
   ...: client
   ...:
Out[2]: <Client: scheduler='tcp://127.0.0.1:51070' processes=10 cores=40>

In [3]: ds = xr.open_mfdataset('/glade/u/home/jhamman/workdir/GARD_inputs/newman_ensemble/conus_ens_[01]*',
   ...:                        engine='netcdf4', concat_dim='ensemble', chunks={'time': 180, 'lat': 50, 'lon': 50})
   ...:

In [4]: import gcsfs
   ...: fs = gcsfs.GCSFileSystem(project='pangeo-181919', token='/glade/u/home/jhamman/.config/gcloud/application_default_credentials.
   ...: json')
   ...:

In [5]: gcsmap = gcsfs.mapping.GCSMap('pangeo-data/newmann-met-ensemble', gcs=fs, check=True, create=False)

In [6]: ds.to_zarr(store=gcsmap)

@mrocklin
Copy link
Member

mrocklin commented Dec 5, 2017

Reading locally

In [1]: import gcsfs

In [2]: import xarray as xr

In [3]: fs = gcsfs.GCSFileSystem()

In [4]: gcsmap = gcsfs.mapping.GCSMap('pangeo-data/newmann-met-ensemble', gcs=fs
   ...: , check=True, create=False)
   ...: 

In [5]: ds = xr.open_zarr(gcsmap)

In [6]: ds
Out[6]: 
<xarray.Dataset>
Dimensions:  (lat: 224, lon: 464, time: 12054)
Coordinates:
  * lat      (lat) float64 25.06 25.19 25.31 25.44 25.56 25.69 25.81 25.94 ...
  * lon      (lon) float64 -124.9 -124.8 -124.7 -124.6 -124.4 -124.3 -124.2 ...
  * time     (time) datetime64[ns] 1980-01-01 1980-01-02 1980-01-03 ...
Data variables:
    *empty*
Attributes:
    history:                   Version 1.0 of ensemble dataset, created Decem...
    institution:               National Center for Atmospheric Research (NCAR...
    nco_openmp_thread_number:  1
    references:                Newman et al. 2015: Gridded Ensemble Precipita...
    source:                    Generated using version 1.0 of CONUS ensemble ...
    title:                     CONUS daily 12-km gridded ensemble precipitati...

Is it odd that there are no data variables?

@jhamman
Copy link
Member

jhamman commented Dec 5, 2017

@mrocklin - after I sent my last message, the to_zarr line crashed because not all the chunk sizes were identical (I think). I've updated the chunk sizes and just started publishing the dataset again. I'm guessing that is why you are not seeing anything there yet.

@mrocklin
Copy link
Member

mrocklin commented Dec 5, 2017

I would like (and plan) to make pangeo-data publicly readable with a requester-pays model. If anyone objects to this decision then please let me know.

@jhamman
Copy link
Member

jhamman commented Dec 5, 2017

@rabernat or @mrocklin - can someone else try running the code snippit I posted above? I'm not seeing the parallel read/write that we'd expect here.

@mrocklin
Copy link
Member

mrocklin commented Dec 5, 2017

I've been able to read the dataset, at least the metadata. I haven't tried anything serious with it yet. I'm intending to run it on the cluster soon. Will report back shortly.

@mrocklin
Copy link
Member

mrocklin commented Dec 5, 2017

Things work more or less on the cluster. I need to change around some things on my end, but I was surprised at how nicely things ran.

  1. Need to specify cpu and memory constraints in the helm chart to avoid over-subscription of workers
  2. Need to figure out how to give workers google authentication. Currently the entire dataset is publicly readable and we're paying for it (I'll switch this off when I'm not using it)
  3. GCSFS had an issue that I think @martindurant has already fixed Exists not owned fsspec/gcsfs#45
  4. @jhamman found that XArray expected chunk sizes to be exact
  5. Graph serialization time was a bit long. We might profile this at some point.

When doing a trivial standard deviation computation we found that I/O was about 10-20% of the total cost.

import gcsfs

import xarray as xr

fs = gcsfs.GCSFileSystem(token='cloud')

gcsmap = gcsfs.mapping.GCSMap('pangeo-data/newmann-met-ensemble', gcs=fs)
ds = xr.open_zarr(gcsmap)
ds

from dask.distributed import Client, config
client = Client(config['scheduler-address'])
client

result = ds.pcp.std(dim='ensemble')
result.nbytes

from dask.distributed import progress

result = result.persist()  # TODO: profile this line
progress(result)

@mrocklin
Copy link
Member

mrocklin commented Dec 5, 2017

My current config file

worker:
    pipPackages: >-
        git+https://github.com/mrocklin/gcsfs.git@mapping-no-exists
        git+https://github.com/rabernat/xarray.git@zarr_backend
    condaPackages: -c conda-forge zarr blosc
    replicas: 8

jupyter:
    pipPackages: >-
        git+https://github.com/mrocklin/gcsfs.git@mapping-no-exists
        git+https://github.com/rabernat/xarray.git@zarr_backend
    condaPackages: -c conda-forge zarr blosc

@mrocklin
Copy link
Member

mrocklin commented Dec 5, 2017

I will endeavor to clean up and publish my helm chart and instructions tomorrow.

@mrocklin
Copy link
Member

mrocklin commented Dec 6, 2017

Looking at our billing it seems like we're losing way more money than we should be to storage costs.

https://console.cloud.google.com/billing/unbilledinvoice?project=pangeo-181919

According to this page Regional storage is $0.02 per Gigabyte-Month, so our 5TB dataset should cost us around $100 a month. This seems reasonable to me. However it appears that we're currently losing tens of dollars a day. Do we have an account contact that we can reach out to to help explain these costs?

@mrocklin
Copy link
Member

mrocklin commented Dec 6, 2017

These are the three buckets we currently have

mrocklin@carbon:~$ gsutil du -hs gs://pangeo-data
1.36 TiB    gs://pangeo-data
mrocklin@carbon:~$ gsutil du -hs gs://pangeo
2.15 KiB    gs://pangeo
mrocklin@carbon:~$ gsutil du -hs gs://zarr_store_test
2.8 KiB     gs://zarr_store_test

@rabernat
Copy link
Member Author

rabernat commented Dec 6, 2017 via email

@mrocklin
Copy link
Member

mrocklin commented Dec 6, 2017

Verifying, ok to delete? Nothing of value stored on those?

@mrocklin
Copy link
Member

mrocklin commented Dec 6, 2017

@jhamman I've fixed a few of the things on my end. I notice that the dataset is mostly (entirely?) NaN valued.

@rabernat
Copy link
Member Author

rabernat commented Dec 6, 2017

The two persistent disks are GFDL CM2.6 and Newman Met Ensemble. I will delete them both.

@rabernat
Copy link
Member Author

rabernat commented Dec 6, 2017

I am concerned about this NaNs issue.

@mrocklin
Copy link
Member

mrocklin commented Dec 6, 2017

Instructions on launching with Helm: https://github.com/pangeo-data/pangeo/wiki/Launch-development-cluster-on-Google-Cloud-Platform-with-Kubernetes-and-Helm

Also, I'm publishing a helm chart repository at https://dask.github.io/helm-chart

@mrocklin
Copy link
Member

mrocklin commented Dec 6, 2017

I am concerned about this NaNs issue.

I spoke with @jhamman . Apparently his transfer process timed out partway through.

@rabernat
Copy link
Member Author

rabernat commented Dec 6, 2017

Thanks a lot for getting these docs up Matt!

I don't want to slow you down...but I am trying to migrate us away from the wiki and towards the sphinx site for our documentation. Is there any chance you could convert your new docs to .rst (pandoc makes this easy) and push them to the docs/setup_guides folder? Obviously this is not urgent.

@mrocklin
Copy link
Member

mrocklin commented Dec 6, 2017 via email

@davidedelvento
Copy link

you are correct, the batch nodes cannot reach outside network

@jgerardsimcock
Copy link

I am trying to move netcdf files from an ftp site onto gcs and want to save them as a single zarr datastore so that we can analyze the data as a single block and slice along the dims that we need. Is there a way to incrementally concat years and dims (rcps, gcms) to a zarr store on google cloud?

From the discussion above, its clear I can save all files into some local store, open_mfdataset then to ds.to_zarr to my bucket. Is there a better way?

@rabernat
Copy link
Member Author

From the discussion above, its clear I can save all files into some local store, open_mfdataset then to ds.to_zarr to my bucket. Is there a better way?

This is the recommended way. You need to use the gcsfs package to help zarr talk to gcs. This is described in the xarray docs here:
http://xarray.pydata.org/en/latest/io.html#cloud-storage-buckets

Make sure you are using the latest release of dask and the github master branch of gcsfs. We have been very actively debugging and optimizing this workflow over the past weeks (see dask/dask#3271 and fsspec/gcsfs#92).

I have been making notes on this transfer workflow in #150 and #166. Here are some tricks I have worked out. Make sure you are using a distributed dask cluster in order to monitor the progress of the transfer. It is good to split the upload into steps as follows:

ds = xr.open_mfdatset(...)
delayed_store = ds.to_zarr(store=gcsmap, encoding=encoding, compute=False)
# this will start the computation on your cluster
# retries are necessary because gcs api calls sometimes fail
persist_store = delayed_store.persist(retries=100)

@rabernat
Copy link
Member Author

rabernat commented Mar 27, 2018

I should mention that we are in great need of collaborators who understand this stack to help continue to optimize and debug the upload workflow. @jgerardsimcock: I would be thrilled if you could share your experience and report any roadblocks you encounter.

@mrocklin
Copy link
Member

mrocklin commented Mar 27, 2018 via email

@rabernat
Copy link
Member Author

No, and this is a huge pain point for me! I am doing uploads of 10TB datasets which take a week. If they fail towards the end, there is no way to recover or resume. I can only delete everything and start over.

@rabernat
Copy link
Member Author

clarification: this is an xarray limitation. Zarr itself supports appending no problem.

@jgerardsimcock
Copy link

@rabernat Thank you for the confirmation. The xarray/zarr append would be very useful. The idea of a multi-TB upload failure is terrible. Technically, what would a path towards implementing a fix for that look like?

@martindurant
Copy link
Contributor

The zarr structure to implement xarray is pretty simple, and any xarray dataset can be opened directly by zarr, so that it can append/update data chunks. I don't know how hard to implement that would be via the xarray interface.

@jhamman
Copy link
Member

jhamman commented Mar 27, 2018

@jgerardsimcock - it may be worth opening an xarray issue for this specific topic. In my view, there are some design decisions to make but it is likely an achievable task.

@rabernat
Copy link
Member Author

I agree with @jhamman: appending support is a good goal for xarray. I encourage moving the discussion of this to an xarray issue.

The idea of a multi-TB upload failure is terrible.

@jgerardsimcock: unfortunately it is the norm right now. 😢 I have had to restart these uploads 3-4 times before finally finishing successfully. The errors are intermittent, hard to reproduce, and hard to blame on any specific layer of the software stack. As you will learn if you start doing this, there is lots of room for improvement.

@rabernat
Copy link
Member Author

Just to add a note of optimism...once the datasets are actually inside gcs in zarr format, they work great!

@stale
Copy link

stale bot commented Jun 15, 2018

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale label Jun 15, 2018
@jhamman
Copy link
Member

jhamman commented Jun 21, 2018

To review, this issue started with us thinking we were going to use persistent disks. Now we're using zarr and gcs. We have moved quite a long way since October!

@jhamman jhamman closed this as completed Jun 21, 2018
@skeller88
Copy link

Hi, I'm doing a satellite image deep learning project and am attempting to use Dask for image preprocessing and the model training workflow. I'm new to this thread and am interested in it because it gives me clues on current best practices around using Dask.

Was this discussion around where to store the data catalog? I'm guessing you went with cloud storage instead of disk because it's more scalable (multiple compute instances can only share the same disk if they're in the same zone)?

@martindurant
Copy link
Contributor

I think the main reason is simpler: the data are public, so the catalog referencing them should be open to all. You can load this stuff from your laptop too, if the bytes-size isn't too big. Also, cloud storage is of course much cheaper than discs and easier to connect to because you don't need to do any cluster infrastructure.

To be sure, dask is also used elsewhere with clustered storage like HDFS. NFS-like ("network local") performance will not necessarily be any better than cloud, but you may want that for isolation reasons.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants