Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xesmf included in pangeo.pydata.org ? #309

Closed
naomi-henderson opened this issue Jun 12, 2018 · 62 comments
Closed

xesmf included in pangeo.pydata.org ? #309

naomi-henderson opened this issue Jun 12, 2018 · 62 comments
Labels

Comments

@naomi-henderson
Copy link
Contributor

Hi all,
I have put together a use-case for reproducing the canonical ICCP-AR5 Figure12.5 using the CMIP5 multi-model surface temperature data. I have converted the needed netcdf files to zarr and uploaded to pangeo.pydata.org. I just noticed that the xesmf packages has not been included, so I can't do the fast and easy spatial regridding provided by xesmf to combine all models on the same grid. See:
use case on pangeo.pydata.org.
Any chance we can add xesmf to the standard packages?

Here is the figure this notebook can reproduce:

wgi_ar5_fig12-5

@rabernat
Copy link
Member

This is a great point @naomi-henderson. (FYI, we can't follow the link http://pangeo.pydata.org/user/naomi-henderson/lab/tree/CMIP5-MakeFigure12.5.ipynb. If you would like to share a notebook from pangeo, your best bet right now is to download it and upload it as a gist.)

We have discussed xesmf use cases in #197, but your example is much more concrete. It would be a great example to highlight the capabilities of pangeo to the climate community.

The main roadblock right now is pangeo-data/helm-chart#29, which is holding us up from adding new packages to pangeo.pydata.org. Hopefully that will be resolved very soon, and we can move ahead with this.

@naomi-henderson
Copy link
Contributor Author

Ah, thanks @rabernat - I will wait for this to be sorted out. I will also need xESMF to finish the atmospheric moisture budget use case. The ability to do regridding is essential in dealing with the CMIP5 (and soon to come CMIP6) data.

@mrocklin
Copy link
Member

xesmf is now on the stock software environment on Pangeo. I believe that this is now resolved. I'm curious to see it used.

cc @JiaweiZhuang

@naomi-henderson
Copy link
Contributor Author

@mrocklin fantastic! So far so good, xesmf is happily calculating the weight files and reusing them when appropriate. Thanks much

@rabernat
Copy link
Member

rabernat commented Jun 13, 2018 via email

@JiaweiZhuang
Copy link
Member

Glad to see this! @naomi-henderson could you share your notebook (I still cannot access it)? Let me try to tweak it to support distributed.

@naomi-henderson
Copy link
Contributor Author

That would be great, @JiaweiZhuang ! The notebook runs fine (under 3 minutes) on my linux box, but unfortunately gets bogged down at the beginning when reading the zarr data on pangeo.pydata.org. The xesmf regridding is so fast in comparison that I haven't had a chance to check out how the workers are handling this part. (It is monthly data chunked (12,ny,nx) in (time,lat,lon) - is that the problem?) If you scroll down to the bottom of each gist notebook you will see that it takes 4x as long using 10workers/20cores on pangeo.pydata.org as on my linux box (where the zarr-ed data is on an internal drive) with 2 threads.

If anyone has words of wisdom on the data reading/ startup delay issues, I am very open to suggestions and would very much like to get this working properly.

Here is my notebook (with output) configured/run on linux box

and here is the same notebook (with output) configured/run on pangeo.pydata.org

@mrocklin
Copy link
Member

cc @martindurant , putting this on your backlog if I may. I suspect that you'll enjoy diving into performance issues here.

@martindurant
Copy link
Contributor

I am just heading out for a weekend away, but if no one beats me to it, I can look early next week.
My immediate thoughts are, that GCS file-listing and reading metadata must still be done from the client process, and there may be many of these - that's the most likely slow-down in the initial stage.

# ~time to list all files
%time data_fs.du(base_path, True, True)
CPU times: user 2.33 s, sys: 171 ms, total: 2.51 s
Wall time: 25.5 s
12336380536

# size in GB
_ / 2**30
11.489149682223797

# number of files
len(data_fs.walk(base_path))
23832

Can you estimate the per-node data throughput, perhaps by doing a trivial operation such as sum(), versus your local disc? Network data rates >100MB/s are typical, but your disc may be orders of magnitude faster. There may be an opportunity to eat the cost just once if the data can be persisted.

@naomi-henderson
Copy link
Contributor Author

Thanks for looking at this. I will be gone for the next week, but will do some simple benchmarking of GCS vs my local disc when I return. Thanks for showing how to get the time to list all files and get the number of files - very useful!

@mrocklin
Copy link
Member

mrocklin commented Jun 22, 2018 via email

@martindurant
Copy link
Contributor

Quite some time ago, now, we realised that it simply took far too long to list all the files in a bucket for data-heavy buckets, and we moved to per-directory listing. This means, though, a new call for every directory, and many calls for a deeply nested set of directories, such as zarr CDF structures seem to have. The following is a set of exists calls seen for accessing one of the standard pangeo datastets. There are repeats in the list, and many calls here are for free, since listings are cached; however, apparent non-existent directories are also tried, the full tree is interrogated up-front, and every single .zarray and .zgroup are opened and read, all small files (often < 1kB) but requiring a new HTTP call every time, and all these happen in serial

exists(args=('pangeo-data/newman-met-ensemble/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/.zgroup',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/.zattrs/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/.zgroup/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/elevation/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/elevation/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/lat/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/lat/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/lon/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/lon/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/mask/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/mask/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/pcp/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/pcp/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/t_max/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/t_max/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/t_mean/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/t_mean/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/t_min/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/t_min/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/t_range/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/t_range/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/time/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/time/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/time/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/time/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/lat/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/lon/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/time/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/.zgroup',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/_ipython_canary_method_should_not_exist_/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/_ipython_canary_method_should_not_exist_/.zgroup',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/_repr_mimebundle_/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/_repr_mimebundle_/.zgroup',), kwargs={})

(note that I have no idea what the _ipython_canary.. or _repr_mimebundle_ are, they do not appear in any file listing as far as I can tell)

All of this aside, the general rule is still to have data chunks which are large enough that the download outweighs overheads from filesystem operations: for newmann there is 120GB in 1930 files, for CMIP5-ts 11GB in 23800 files

@martindurant
Copy link
Contributor

@alimanfoo, the zarr storage of cdf-like nested group data allows access at any level in the hierarchy, which is very convenient, but I wonder is there any possibility of consolidating the group and attr files such that so many list and open operations would not be required at metadata inference time?

@rabernat
Copy link
Member

... I wonder is there any possibility of consolidating the group and attr files such that so many list and open operations would not be required at metadata inference time?

👍 to this idea

for newmann there is 120GB in 1930 files, for CMIP5-ts 11GB in 23800 files

@naomi-henderson: how were the cmip5 files chunked? Would it make sense to use larger chunks? Can we see the repr of one of the datasets?

@martindurant
Copy link
Contributor

cmip5 seems to contain many datasets in directories, here is one, pangeo-data/CMIP5-ts/CanESM2/rcp45, picked at random

Dimensions:  (lat: 64, lon: 128, time: 3540)
Coordinates:
  * lat      (lat) float64 -87.86 -85.1 -82.31 -79.53 -76.74 -73.95 -71.16 ...
  * lon      (lon) float64 0.0 2.812 5.625 8.438 11.25 14.06 16.88 19.69 ...
  * time     (time) float64 552.5 553.5 554.5 555.5 556.5 557.5 558.5 559.5 ...
Data variables:
    ts       (time, lat, lon) float32 dask.array<shape=(3540, 64, 128), chunksize=(12, 64, 128)>
Attributes:
    start_date:  2006-01-16T12:00:00.000000000

and here is newmann

Dimensions:    (ensemble: 9, lat: 224, lon: 464, time: 12054)
Coordinates:
  * lat        (lat) float64 25.06 25.19 25.31 25.44 25.56 25.69 25.81 25.94 ...
  * lon        (lon) float64 -124.9 -124.8 -124.7 -124.6 -124.4 -124.3 ...
  * time       (time) datetime64[ns] 1980-01-01 1980-01-02 1980-01-03 ...
Dimensions without coordinates: ensemble
Data variables:
    elevation  (ensemble, lat, lon) float64 dask.array<shape=(9, 224, 464), chunksize=(1, 224, 464)>
    mask       (ensemble, lat, lon) int32 dask.array<shape=(9, 224, 464), chunksize=(1, 224, 464)>
    pcp        (ensemble, time, lat, lon) float64 dask.array<shape=(9, 12054, 224, 464), chunksize=(1, 287, 224, 464)>
    t_max      (ensemble, time, lat, lon) float64 dask.array<shape=(9, 12054, 224, 464), chunksize=(1, 287, 224, 464)>
    t_mean     (ensemble, time, lat, lon) float64 dask.array<shape=(9, 12054, 224, 464), chunksize=(1, 287, 224, 464)>
    t_min      (ensemble, time, lat, lon) float64 dask.array<shape=(9, 12054, 224, 464), chunksize=(1, 287, 224, 464)>
    t_range    (ensemble, time, lat, lon) float64 dask.array<shape=(9, 12054, 224, 464), chunksize=(1, 287, 224, 464)>
Attributes:
    _ARRAY_DIMENSIONS:         {'ensemble': 9, 'lat': 224, 'lon': 464, 'time'...
    history:                   Version 1.0 of ensemble dataset, created Decem...
    institution:               National Center for Atmospheric Research (NCAR...
    nco_openmp_thread_number:  1
    references:                Newman et al. 2015: Gridded Ensemble Precipita...
    source:                    Generated using version 1.0 of CONUS ensemble ...
    title:                     CONUS daily 12-km gridded ensemble precipitati...

@naomi-henderson
Copy link
Contributor Author

I chunked the cmip5 data by 12 months. I could certainly use larger chunks, say 20 year, but the run lengths for each of the models can be different and not all chunks would be equal. Note that this is a small subset of the available data for even this simple 2 dimensional (lon,lat) variable and I have only used one ensemble member per model. Ensemble members are run for different lengths of time, so cannot be combined as in the Newmann dataset. Even if we increase the chunks size by a factor of 20 (or no chunking at all - in which case we might as well stick with netcdf, no?) we still have more files per Gbyte of data.

The 3 dimensional data (lon, lat and pressure level) will be better, but if we really want to be able to fly through heterogeneous cmip type data (which can't be open_mfdataset-combined), we need to be able to deal with these issues.

Would it be useful for me to rechunk the data, upload and try again?

@alimanfoo
Copy link

Re performance I can suggest a couple of possible avenues to explore, not mutually exclusive.

The first obvious suggestion is to investigate whether it is strictly necessary to locate and read all metadata files up front. If any of that could be delayed and then done within some parallel work then some wall time could be saved.

A second suggestion is to explore ways of making use of the fact that Zarr can use separate stores for data and metadata (that was @mrocklin's idea). All Zarr API calls support both a store and a chunk_store argument. If only store is provided then the same store is used for both metadata (.zarray, .zgroup, .zattrs) and data (chunks). However if chunk_store is provided then that is used for data and store is used for metadata.

There are a number of possible ways this could be leveraged. For example, data and metadata could both be stored on GCS as usual but in separate buckets, or separate directory paths within a bucket. This could reduce the cost of directory or bucket listing when locating metadata files. But you could do other things too. E.g., you could store the chunks on GCS as usual (one chunk per object), but invent some way of baking all metadata into a single GCS object, then write a custom Mapping to expose that as a store to Zarr, with the idea that it is read once via a single HTTP request and then cached by the client process. You could also use something other than GCS to store metadata, but leave chunks on GCS. Lots of possibilities, all you need to do is implement a Mapping interface.

Also final quick thought, this may not help at all as it sounds like metadata is already being cached, but there is a LRUStoreCache in Zarr which can be used as a wrapper around either metadata store or data store (or both).

Happy to discuss further.

@mrocklin
Copy link
Member

@alimanfoo would it be possible to optionally collect all metadata from everywhere within a Zarr dataset and bring it into a single file at the top level? This would have to be invalidated on any write operation that changed metadata, but in the write-once-read-many case it might be helpful.

@alimanfoo
Copy link

@mrocklin yes, you could do something like this now via the store/chunk_store API without any code or spec changes in Zarr. Assuming data is prepared on a local file system first then uploaded to GCS, steps could be something like...

(0) Build the Zarr files and upload to GCS as usual (already done).

(1) Decide on how to pack metadata from multiple files into a single file. Lots of ways you could do this, e.g., you could just build a big JSON document mapping the relative file paths to file contents, e.g.:

{
    ".zgroup": {
        ...
    },
    "elevation/.zarray": {
        ...
    },
    "elevation/.zattrs": {
        ...
    },
    ...
}

(2) Write a script to crawl the directory structure for metadata files and build the combined metadata file.

(3) Upload the combined metadata file to GCS, put it at the root of the hierarchy under a resource name like ".zmeta" (or put it wherever you like).

(4) Implement a Mapping interface (e.g., called something like CombinedMetaStore) that can read keys out of a combined metadata file. Importantly this would read the whole combined file once and cache it.

Then you could open a Zarr hierarchy with something like:

gcsmap = gcsfs.GCSMap('pangeo-data/newman-met-ensemble')
meta_store = CombinedMetaStore(gcsmap, key='.zmeta')
root = zarr.Group(store=meta_store, chunk_store=gcsmap)

For access via xarray the xarray API may have to be changed to pass through both store and chunk_store args.

N.B., this assumes data are read-only once in GCS, so there is nothing like invalidation of the combined metadata file. But for this use case I don't think that would be necessary.

Happy to elaborate if this isn't making sense yet.

Btw also happy to discuss spec changes and/or features to support this kind of thing natively in Zarr, but trying to exhaust all possibilities for hacking this without spec or code changes first.

@martindurant
Copy link
Contributor

From the pangeo perspective, reading such a combined metadata file would presumably require (subtle) changes in xarray code.

@alimanfoo
Copy link

On the xarray side in theory all you'd have to do is allow a user to provide both store and chunk_store args and pass those through to zarr, although that's said without looking at the code.

One other thing I noticed, some of the exists calls look odd, e.g., can't think why the following would be needed:

exists(args=('pangeo-data/newman-met-ensemble/.zattrs/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/.zgroup/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/_repr_mimebundle_/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/_repr_mimebundle_/.zgroup',), kwargs={})

...or why an exists would get called on both array and group at the same path, e.g.:

exists(args=('pangeo-data/newman-met-ensemble/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/.zgroup',), kwargs={})

@martindurant
Copy link
Contributor

Yeah, I can't say where those off exists calls are coming from, but of course only files that are found are loaded, which can still be many small files. I suppose if .zarray doesn't exist, the code next tries .zgroup.

It should be pretty simple to make a function to collect all the group/array/attrs into a single JSON blob and store it in the top-level directory, and have xarray look for it, and use it as the chunks_store if available. The function would only we called rarely by users who know the data set is to be read-only. The function itself could live in xarray or zarr itself, but the xarray code would need to change to, at the minimum, allow chunks_store or (better) look for the aggregated JSON file.

In the case in hand, where there are many small datasets not in a single group structure, all this may not make a massive difference.

@martindurant
Copy link
Contributor

@naomi-henderson , there are two concrete things here that can be done: 1) implement zarr metadata consolidation, 2) rechunk your data into larger pieces. I intend to work on 1), but 2) can help you more immediately, even though the total file sizes are not very big, as you pointed out.
Is there any other way in which I can help you to be more productive?

@alimanfoo
Copy link

alimanfoo commented Jun 26, 2018 via email

@naomi-henderson
Copy link
Contributor Author

@martindurant I have rechunked the data for 120 months instead of 12, with better performance. Well, at least so that the openzarr step takes about the same as the first reduction (computing 12 month averages).

The file system listing to find the models is negligible compared to the openzarr step.

Here are typical results, but note that the openzarr step for the smaller chunked case is very unpredictable, sometimes pausing for > 1 minute between reads of one model and the next (different models each time).

on pangeo.pydata.org with the default 20 workers:

### historical runs, chunk = 120 months

finding models:     
CPU times: user 194 ms, sys: 21 ms, total: 215 ms
Wall time: 1.72 s

openzarr:
CPU times: user 9.89 s, sys: 900 ms, total: 10.8 s
Wall time: 1min 29s

calculate annual means:
CPU times: user 39.4 s, sys: 6.04 s, total: 45.4 s
Wall time: 1min 26s

regridding:
CPU times: user 1.85 s, sys: 802 ms, total: 2.65 s
Wall time: 2.55 s

concatenating:
CPU times: user 254 ms, sys: 438 ms, total: 692 ms
Wall time: 654 ms

TOTAL: 
CPU times: user 51.6 s, sys: 8.21 s, total: 59.8 s
Wall time: 3min
### historical runs, chunk = 12 months

finding models:
CPU times: user 200 ms, sys: 18 ms, total: 218 ms
Wall time: 2.5 s

openzarr:
CPU times: user 10.8 s, sys: 866 ms, total: 11.7 s
Wall time: 1min 51s  (Varies from 1min 35s to 3min 7s, sometimes stopping for >1min)

annual means:
CPU times: user 1min 19s, sys: 10 s, total: 1min 29s
Wall time: 2min 41s

regridding:
CPU times: user 1.85 s, sys: 884 ms, total: 2.73 s
Wall time: 2.61 s

concatenating:
CPU times: user 258 ms, sys: 426 ms, total: 684 ms
Wall time: 641 ms

TOTAL:
CPU times: user 1min 32s, sys: 12.2 s, total: 1min 44s
Wall time: 4min 38s

For comparison (perhaps unfairly), on my linux machine (32 threads available):

### historical runs, chunk = 12 months

finding models:
CPU times: user 4.56 ms, sys: 3.95 ms, total: 8.51 ms
Wall time: 7.98 ms

openzarr:
CPU times: user 301 ms, sys: 34 ms, total: 335 ms
Wall time: 330 ms

annual means:
CPU times: user 5min 1s, sys: 11min 2s, total: 16min 4s
Wall time: 40.6 s

regridding:
CPU times: user 1.37 s, sys: 647 ms, total: 2.02 s
Wall time: 2 s

concatenating:
CPU times: user 439 ms, sys: 1.26 s, total: 1.7 s
Wall time: 170 ms

TOTAL:
CPU times: user 5min 3s, sys: 11min 4s, total: 16min 8s
Wall time: 43.1 s

@naomi-henderson
Copy link
Contributor Author

btw, why do the permissions on the .zgroup and .zattrs files, created by to_zarr, need to to be so restrictive? would it cause trouble if I allowed group/other read access?

zarr5/CMIP5-ts/ACCESS1-0/rcp85:
drwxrwxr-x 6 naomi naomi 4096 Jun 26 20:55 .
drwxrwxr-x 5 naomi naomi 4096 Jun 26 20:55 ..
drwxrwxr-x 2 naomi naomi 4096 Jun 26 20:55 lat
drwxrwxr-x 2 naomi naomi 4096 Jun 26 20:55 lon
drwxrwxr-x 2 naomi naomi 4096 Jun 26 20:55 time
drwxrwxr-x 2 naomi naomi 4096 Jun 26 20:55 ts
-rw------- 1 naomi naomi   53 Jun 26 20:55 .zattrs
-rw------- 1 naomi naomi   24 Jun 26 20:55 .zgroup

@martindurant
Copy link
Contributor

why do the permissions on the .zgroup and .zattrs files need to to be so restrictive?

That looks pretty odd, and I'm sure making them broader is fine. Indeed, actually there is an argument in general to make them all read-only for typical write-once datasets.

@martindurant
Copy link
Contributor

^ and, of course, it is generally more worthwhile attempting higher compression ratios for slower bandwidth storage - but that only affects the download time, not the connection overhead. I wonder if we should have a utility function that can estimate the disc size of some array for a few common compression configs and/or encodings.

@alimanfoo
Copy link

FWIW for maximum speed I use numcodecs.Blosc(cname='lz4', clevel=1, shuffle=0) but that tends to be optimal only when you have very high I/O (e.g., to a local SSD). For slower I/O settings I tend to use numcodecs.Blosc(cname='zstd', clevel=1, shuffle=...) where I will try out all three shuffle options (0 = no shuffle, 1 = byte shuffle, 2 = bit shuffle) on the data to see if they make a difference to compression ratio. For floating point data, shuffle probably won't do much unless combined with some kind of filter to fix the precision of the data, e.g., numcodecs.FixedScaleOffset or numcodecs.Quantize or I believe xarray has a built-in scale-offset. In general I've tended to benchmark a number of options for the datasets I most often work with.

If the size of the network pipe on google cloud is ~100 Mb/s then any compressor that decompresses faster than that is probably worth a try. The plot below was made on a particular type of genomic data and so may not translate for geo data, but may give a general sense of relative performance for different compressors (numbers next to each bar like "61.1X" are the compression ratios):

image

@martindurant
Copy link
Contributor

@naomi-henderson , is it useful for me to replicate your data to another GCS location and make consolidated metadata files, and point you to a version of zarr that would make use of them? That might solve your immediate problem and provide good motivation for the consolidation PR over on zarr. I can do this too, but I don't know when I would get to it. Note that I do not expect metadata consolidation to have much of an effect when working with local data.

@naomi-henderson
Copy link
Contributor Author

yes @martindurant , I think that would be very helpful - I am curious if it is the repeated metadata or the latency which is the main issue here.

What do you need in order to consolidate the metadata, the original netcdf or the chunked zarr files?

By the way, I am currently using 120 month chunked data (in pangeo-data/CMIP5-ts/120chunk/), but since the resolution varies from model to model, this does not guarantee a fixed chunk size. Note that I could make the chunk a more uniform size (e.g., @rabernat 's 20M) if I allow the number of months to vary from model to model based on the resolution of the data. Here are the chunk sizes if I use 120 months per chunk:

8.0M	ACCESS1-0/historical/ts/0.0.0
8.0M	ACCESS1-3/historical/ts/0.0.0
2.4M	bcc-csm1-1/historical/ts/0.0.0
15M	        bcc-csm1-1-m/historical/ts/0.0.0
2.5M	BNU-ESM/historical/ts/0.0.0
2.5M	CanCM4/historical/ts/0.0.0
2.5M	CanESM2/historical/ts/0.0.0
16M	        CCSM4/historical/ts/0.0.0
16M	        CESM1-BGC/historical/ts/0.0.0
4.1M	CESM1-CAM5-1-FV2/historical/ts/0.0.0
16M	        CESM1-CAM5/historical/ts/0.0.0
16M	        CESM1-FASTCHEM/historical/ts/0.0.0
4.1M	CESM1-WACCM/historical/ts/0.0.0
1.3M	CMCC-CESM/historical/ts/0.0.0
28M	     CMCC-CM/historical/ts/0.0.0
4.7M	CMCC-CMS/historical/ts/0.0.0
9.3M	CNRM-CM5-2/historical/ts/0.0.0
5.4M	CSIRO-Mk3-6-0/historical/ts/0.0.0
1.2M	CSIRO-Mk3L-1-2/historical/ts/0.0.0
2.3M	FGOALS-g2/historical/ts/0.0.0
4.1M	FGOALS-s2/historical/ts/0.0.0
2.4M	FIO-ESM/historical/ts/0.0.0
3.8M	GFDL-CM2p1/historical/ts/0.0.0
3.8M	GFDL-CM3/historical/ts/0.0.0
3.9M	GFDL-ESM2G/historical/ts/0.0.0
3.8M	GFDL-ESM2M/historical/ts/0.0.0
3.8M	GISS-E2-H-CC/historical/ts/0.0.0
3.8M	GISS-E2-H/historical/ts/0.0.0
3.8M	GISS-E2-R-CC/historical/ts/0.0.0
3.8M	GISS-E2-R/historical/ts/0.0.0
2.0M	HadCM3/historical/ts/0.0.0
8.0M	HadGEM2-AO/historical/ts/0.0.0
7.3M	HadGEM2-CC/historical/ts/0.0.0
7.3M	HadGEM2-ES/historical/ts/0.0.0
6.3M	inmcm4/historical/ts/0.0.0
2.8M	IPSL-CM5A-LR/historical/ts/0.0.0
6.0M	IPSL-CM5A-MR/historical/ts/0.0.0
2.8M	IPSL-CM5B-LR/historical/ts/0.0.0
53M	        MIROC4h/historical/ts/0.0.0
9.2M	MIROC5/historical/ts/0.0.0
2.4M	MIROC-ESM-CHEM/historical/ts/0.0.0
2.4M	MIROC-ESM/historical/ts/0.0.0
4.7M	MPI-ESM-LR/historical/ts/0.0.0
4.7M	MPI-ESM-MR/historical/ts/0.0.0
4.7M	MPI-ESM-P/historical/ts/0.0.0
15M	        MRI-CGCM3/historical/ts/0.0.0
15M	        MRI-ESM1/historical/ts/0.0.0
4.1M	NorESM1-ME/historical/ts/0.0.0
4.1M	NorESM1-M/historical/ts/0.0.0

@martindurant
Copy link
Contributor

@naomi-henderson , you could install my zarr branch
pip install git+https://github.com/martindurant/zarr.git@consolidate_metadata
(do this in a non-production environment!)

The function xarr.convenience.consolidate_metadata can be run on an existing mapping containing a zarr dataset and the "hack" in the Group.__init__ will look for these consolidated metadata keys and use them if they exist. That's how I did my timings, above, for the case of one data-set.

Of course, it would be best not to run the consolidation on any original data - only on stuff no one else will be using, or that can be easily re-made, should the process somehow break what is already there.

@naomi-henderson
Copy link
Contributor Author

thanks @martindurant ! I think I need to down-rev my xarray? Your pip install worked, but then to_zarr throws the error:

NotImplementedError: Zarr version 2.2 or greater is required by xarray. See zarr installation http://zarr.readthedocs.io/en/stable/#installation

What version of xarray are you using? I am at '0.10.7'

@martindurant
Copy link
Contributor

That zarr appears to have version '2.2.1.dev5+dirty', maybe xarray can't parse that... my xarray is 0.10.3.

@naomi-henderson
Copy link
Contributor Author

ah, your zarr thinks it is 0.4.1.dev843 ?

@martindurant
Copy link
Contributor

Huh, I have no idea where 0.4.1.dev843 comes from. You could instead grab the repo

> conda create -n testenv python zarr xarray gcsfs ipython -c conda-forge
> conda activate testenv
> git clone https://github.com/martindurant/zarr
> cd zarr
> git checkout consolidate_metadata
> pyhon setup.py install
> ipython
In [ ]: import gcsfs
   ...: import xarray as xr

In [ ]: %%time
   ...: gcs = gcsfs.GCSFileSystem(token='anon')
   ...: mapper = gcsfs.GCSMap(gcs=gcs, root='mdtemp/CMIP5-ts/CanESM2/rcp45/')
   ...: d = xr.open_zarr(mapper)

@mrocklin
Copy link
Member

mrocklin commented Jul 2, 2018 via email

@martindurant
Copy link
Contributor

Thanks, @mrocklin , that's it

mdurant@blackleaf:~/code/zarr$ git push --tags fork
...
 * [new tag]         v2.2.0a1 -> v2.2.0a1
 * [new tag]         v2.2.0a2 -> v2.2.0a2
 * [new tag]         v2.2.0rc1 -> v2.2.0rc1
 * [new tag]         v2.2.0rc2 -> v2.2.0rc2
 * [new tag]         v2.2.0rc3 -> v2.2.0rc3

@mrocklin
Copy link
Member

mrocklin commented Jul 2, 2018

@naomi-henderson if you pip install again all should be well

@naomi-henderson
Copy link
Contributor Author

zarr-2.2.1.dev6 it is! thanks @martindurant and @mrocklin

@naomi-henderson
Copy link
Contributor Author

@martindurant , perhaps I don't quite understand the process. If I already have the zarr data, then I should be able to run consolidate_metadata and it should add a file called .zmetadata
Here I can open the dataset, but then get an error - what am I missing?

group = 'zarr/CMIP5-ts/MRI-CGCM3/rcp85'
#xr.open_zarr(group)
zarr.convenience.consolidate_metadata(group)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-27-90161f14a048> in <module>()
      1 group = 'zarr/CMIP5-ts/MRI-CGCM3/rcp85'
      2 xr.open_zarr(group)
----> 3 zarr.convenience.consolidate_metadata(group)

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/zarr/convenience.py in consolidate_metadata(mapping, out_key)
   1090 
   1091     out = {key: mapping[key].decode() for key in mapping if is_zarr_key(key)}
-> 1092     mapping[out_key] = json.dumps(out).encode()

TypeError: 'str' object does not support item assignment

@martindurant
Copy link
Contributor

I should have been clearer - the function works on a mapping, so for local paths you would do

from zarr.storage import DirectoryStore
d = DirectoryStore('zarr/CMIP5-ts/MRI-CGCM3/rcp85')
consolidate_metadata(d)

data = xr.open_zarr('zarr/CMIP5-ts/MRI-CGCM3/rcp85')  # or reuse d here

for working directly on GCS

gcs = gcsfs.GCSFileSystem()
mapping = gcsfs.GCSMap(gcs=gcs, root='bucket/path.zarr')
consolidate_metadata(mapping)

data = xr.open_zarr(mapping)

@naomi-henderson
Copy link
Contributor Author

thanks @martindurant , it is working like a charm.

With consolidated metadata:

%%time

gcs = gcsfs.GCSFileSystem(token='anon')
mapper = gcsfs.GCSMap(gcs=gcs, root='pangeo-data/CMIP5-ts/meta/CanESM2/rcp45/')
d = xr.open_zarr(mapper)
CPU times: user 31.2 ms, sys: 3.16 ms, total: 34.4 ms
Wall time: 739 ms

Original data

%%time

gcs = gcsfs.GCSFileSystem(token='anon')
mapper = gcsfs.GCSMap(gcs=gcs, root='pangeo-data/CMIP5-ts/20mb/CanESM2/rcp45/')
d = xr.open_zarr(mapper)
CPU times: user 85.7 ms, sys: 7.01 ms, total: 92.8 ms
Wall time: 2.4 s

@naomi-henderson
Copy link
Contributor Author

Wow, @martindurant , I can't wait to use this on pangeo.pydata.org ! I can't start a server this afternoon for some reason, though.

Working directly on GCS from my Columbia linux server I see a factor of 4 improvement on opening the 49 historical model simulations: ~4 min (original zarr) and ~1 min (consolidated metadata). The first reduce step (annual means), which touches all of the data, takes about the same time for original and consolidated.

For now I can use your zarr to fix the data, thanks!

@JiaweiZhuang
Copy link
Member

JiaweiZhuang commented Jul 10, 2018

Will it scatter the weight matrix to the workers?

No, currently it will cast distributed dask arrays to in-memory numpy arrays. In @naomi-henderson's example, the original data were reduced significantly (taking time average) before passing to xESMF. At that stage there's no need of distributed computing at all.

If the workflow is to reduce a large mount of data and finally make some 1D&2D plots, I'd argue that distributed regridding is rarely needed. It is much better to take average before regridding. Distributed regridding would only be useful for read-regrid-write workflow, to pre-process large volumes of data for later analysis. See more at #334.

@stale
Copy link

stale bot commented Sep 8, 2018

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale label Sep 8, 2018
@rabernat
Copy link
Member

rabernat commented Sep 8, 2018

We should follow up on this discussion. Has their been any progress on either the xesmf side or the zarr side that brings us closer to making this use case work well?

@stale stale bot removed the stale label Sep 8, 2018
@stale
Copy link

stale bot commented Nov 7, 2018

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale label Nov 7, 2018
@stale
Copy link

stale bot commented Nov 14, 2018

This issue has been automatically closed because it had not seen recent activity. The issue can always be reopened at a later date.

@stale stale bot closed this as completed Nov 14, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants