Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing rectangles in zarr dataset loaded from GCS #691

Closed
rafa-guedes opened this issue Aug 12, 2019 · 55 comments
Closed

Missing rectangles in zarr dataset loaded from GCS #691

rafa-guedes opened this issue Aug 12, 2019 · 55 comments

Comments

@rafa-guedes
Copy link

rafa-guedes commented Aug 12, 2019

I have had some issue with loading slices from some zarr dataset stored on GCS. I don't know where the problem could be coming from so am posting it here in case someone has experienced something similar.

The figure below shows what is going on. Upon slicing and loading some zarr dataset stored on GCS using xarray, I'm getting missing squares (often in the northern hemisphere). The squares change their positions each time I reload the same slice, sometimes they appear for one single variable, sometimes for more than one (at different locations), sometimes they don't happen at all. The rectangle sizes match the chunking which for this dataset corresponds to 25 degrees in lat/lon so it looks like entire chunks are randomly failing to be transferred for some reason.

Has someone seen something similar or would have an idea about this please?

Thanks

Screenshot from 2019-08-12 14-12-41

@rabernat
Copy link
Member

Hi @rafa-guedes - could you share more information about your dataset? Like the xarray repr and the zarr .info property of one of the arrays?

@rafa-guedes
Copy link
Author

Hi @rabernat please find some more info below.

In [1]: import xarray as xr

In [2]: import zarr

In [3]: from fsspec import get_mapper

In [4]: fsmap = get_mapper('gs://oceanum-era5/wind_10m.zarr')

In [5]: dset = xr.open_zarr(fsmap, consolidated=True)

In [6]: dset
Out[6]: 
<xarray.Dataset>
Dimensions:    (latitude: 721, longitude: 1440, time: 354264)
Coordinates:
  * latitude   (latitude) float32 90.0 89.75 89.5 89.25 ... -89.5 -89.75 -90.0
  * longitude  (longitude) float32 0.0 0.25 0.5 0.75 ... 359.25 359.5 359.75
  * time       (time) datetime64[ns] 1979-01-01 ... 2019-05-31T23:00:00
Data variables:
    u10        (time, latitude, longitude) float32 dask.array<shape=(354264, 721, 1440), chunksize=(744, 100, 100)>                                                                                         
    v10        (time, latitude, longitude) float32 dask.array<shape=(354264, 721, 1440), chunksize=(744, 100, 100)>                                                                                         
Attributes:
    Conventions:  CF-1.6
    history:      2019-08-01 10:44:37 GMT by grib_to_netcdf-2.10.0: /opt/ecmw...

In [7]: g = zarr.open_group(fsmap)

In [8]: g.info
Out[8]: 
Name        : /
Type        : zarr.hierarchy.Group
Read-only   : False
Store type  : fsspec.mapping.FSMap
No. members : 5
No. arrays  : 5
No. groups  : 0
Arrays      : latitude, longitude, time, u10, v10

In [9]: g.u10.info
Out[9]: 
Name               : /u10
Type               : zarr.core.Array
Data type          : int16
Shape              : (354264, 721, 1440)
Chunk shape        : (744, 100, 100)
Order              : C
Read-only          : False
Compressor         : Blosc(cname='zstd', clevel=3, shuffle=BITSHUFFLE,
                   : blocksize=0)
Store type         : fsspec.mapping.FSMap
No. bytes          : 735622110720 (685.1G)
Chunks initialized : 57240/57240

In [10]: g.v10.info
Out[10]: 
Name               : /v10
Type               : zarr.core.Array
Data type          : int16
Shape              : (354264, 721, 1440)
Chunk shape        : (744, 100, 100)
Order              : C
Read-only          : False
Compressor         : Blosc(cname='zstd', clevel=3, shuffle=BITSHUFFLE,
                   : blocksize=0)
Store type         : fsspec.mapping.FSMap
No. bytes          : 735622110720 (685.1G)
Chunks initialized : 57240/57240

In [11]: dset.u10.encoding
Out[11]: 
{'chunks': (744, 100, 100),
 'compressor': Blosc(cname='zstd', clevel=3, shuffle=BITSHUFFLE, blocksize=0),
 'filters': None,
 '_FillValue': -32767,
 'scale_factor': 0.01,
 'dtype': dtype('int16')}

@rabernat
Copy link
Member

Perhaps GCS is not returning the data reliably? So @martindurant might have some suggestions on how to get some debugging out of gcsfs. I seem to recall a trick to turn on http logging.

@martindurant
Copy link
Contributor

You can set the level gcsfs.core.logger.set_level("DEBUG"), so long as you have some logging initialised (like logging.basicConfig()).

@rafa-guedes
Copy link
Author

Sorry for taking long to reply, I am only now getting back on this. Thanks for the suggestions @rabernat, @martindurant. There is nothing obvious from those logs though, and the problem still persists, intermittently. I may try and recreate this zarr dataset and check again. I'd be interested to hear if someone has found some similar behaviour in the past, this looks like a bit of a serious issue and I don't know where it would be coming from..

@willirath
Copy link
Member

I'm seeing similar intermittent errors with data in the gs://pangeo-parcels bucket.

Here's a gist showing that calculations fail with group not found at path ValueErrors and, if computation is done with retries=10, the statistics change intermittently:

https://nbviewer.jupyter.org/gist/willirath/b7af4bc9a93a79b81772910f8ee5c630

@willirath
Copy link
Member

Is there a way of forcing Zarr to retry if a chunk cannot be read rather than setting the chunk to all NaNs?

@rabernat
Copy link
Member

rabernat commented Oct 1, 2019 via email

@martindurant
Copy link
Contributor

Does it happen if you set the number of threads per worker to one?

@willirath
Copy link
Member

Does it happen if you set the number of threads per worker to one?

Yes, configuring KubeCluster to use --nthreads 2 does not seem to change anything.

@martindurant
Copy link
Contributor

and --nthreads 1 ?

@willirath
Copy link
Member

Errr, yes, I meant and --nthreads 1.

@martindurant
Copy link
Contributor

I honestly cannot think of why this is happening - help appreciated in debugging when/how this happens, the exact error, etc. If it happens in one thread, apparently not a race condition or effect of session sharing. If we are lucky, it is merely one particular exception from the remote backend which we don't consider as retriable, but should (e.g., API rate limit).

@willirath
Copy link
Member

Is there an easy way of preventing GCSFS from retrying at all?

@martindurant
Copy link
Contributor

Not super-easy: the GCSFileSystem class has an attribute .retries, so this can be set to 1 in every worker with a client.run call

def set_gcsfs_retry():
    import gcsfs
    gcsfs.GCSFileSystem.retries = 1

client.run(set_gcsfs_retry)

I would also run with GCSFS_DEBUG=true

gcsfs.utils.is_retriable is the place where errors are sorted.

@rabernat
Copy link
Member

rabernat commented Oct 1, 2019

One possible way to debug could be to access the data using fsspec's httpfilesystem, bypassing gcsfs completely. fsspec/filesystem_spec#144 shows how you might do this (but also reveals a bug, which has been fixed in the latest fsspec master.)

@martindurant
Copy link
Contributor

Can the data be public? I can spin up my own cluster (on pangeo!) and see if I can reproduce? Any other dataset on gcs known to show this?

@willirath
Copy link
Member

Can the data be public? I can spin up my own cluster (on pangeo!) and see if I can reproduce? Any other dataset on gcs known to show this?

It's public: https://nbviewer.jupyter.org/gist/willirath/b7af4bc9a93a79b81772910f8ee5c630

@willirath
Copy link
Member

dataset_version = "v2019.09.11.2"
bucket = f"pangeo-parcels/med_sea_connectivity_{dataset_version}/traj_data_without_stokes.zarr"

@rabernat
Copy link
Member

rabernat commented Oct 1, 2019

Do we know if this is tied to dask / distributed, or can it be reproduced purely at the gcsfs / zarr level?

@martindurant
Copy link
Contributor

(I was trying 'gs://oceanum-era5/wind_10m.zarr', sorry)

@martindurant
Copy link
Contributor

Agree, @rabernat , would be worthwhile paging through all of the pieces of a zarr in a single thread or a local 4/8-thread dask scheduler.

Here is an interesting snippet on mitigating request-rate related errors:
https://cloud.google.com/storage/docs/request-rate#ramp-up

@rafa-guedes
Copy link
Author

Do we know if this is tied to dask / distributed, or can it be reproduced purely at the gcsfs / zarr level?

I started looking into this question, will post it here once I get some insight

(I was trying 'gs://oceanum-era5/wind_10m.zarr', sorry)

I could make this one public if needed for further tests, only need a way to ensure it will be pulled from us-central region to avoid egress costs..

@rafa-guedes
Copy link
Author

@martindurant @rabernat I have done a few tests - the problem only happens with me when I load data from GCS on dask distributed cluster.

I loaded a spatial slice (always the same slice) from my dataset under 3 different conditions: [1] no dask distributed cluster, [2] distributed cluster scaled up to 2 workers and [3] up to 4 workers. Testing script is here, the machine where I ran it is described below. Each test was run 170 times.

[1] No dask distributed cluster

There were no missing cells reading zarr from GCS by running tests without distributed cluster.

[2] Local dask cluster with 2 workers

from dask.distributed import Client
client = Client()
client.cluster.scale(2)

I did not observe entire chunks missing using 2 workers. However, over 3 (1.8%) out of 170 cases, there were missing values within a chunk. These 3 cases are shown below, there were 2100, 2100 and 840 missing cells in these 3 (out of 1036800 cells).

missing_chunking_parts

[3] Local dask cluster with 4 workers

When using 4 workers:

client.scale(4)

I observed entire chunks missing over 41 out of 170 times (24%) as shown below. Interesting to note that usually the chunks missing are in the first latitude row (this dataset has reversed coordinates, [90:-90]). This is in contrast to the missing partial chunks in [2] which occurred always in the last row.

missing_chunks_4workers

VM used to run testing

$lscpu
Architecture:        x86_64
CPU op-mode(s):      32-bit, 64-bit
Byte Order:          Little Endian
Address sizes:       46 bits physical, 48 bits virtual
CPU(s):              8
On-line CPU(s) list: 0-7
Thread(s) per core:  2
Core(s) per socket:  4
Socket(s):           1
NUMA node(s):        1
Vendor ID:           GenuineIntel
CPU family:          6
Model:               63
Model name:          Intel(R) Xeon(R) CPU @ 2.30GHz
Stepping:            0
CPU MHz:             2300.000
BogoMIPS:            4600.00
Hypervisor vendor:   KVM
Virtualization type: full
L1d cache:           32K
L1i cache:           32K
L2 cache:            256K
L3 cache:            46080K
NUMA node0 CPU(s):   0-7
Flags:               fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl xtopology nonstop_tsc cpuid tsc_known_freq pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm abm invpcid_single pti ssbd ibrs ibpb stibp fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt arat md_clear arch_capabilities

$free -h
              total        used        free      shared  buff/cache   available
Mem:           29Gi       521Mi       9.6Gi       0.0Ki        19Gi        28Gi
Swap:            0B          0B          0B

@martindurant
Copy link
Contributor

Missing pieces within a chunk is totally out-of-the-world strange. Can you tell if the NaNs for a contiguous block in the original data?

What was your worker setup, please, e.g., threads/process?

@martindurant
Copy link
Contributor

Another quick question: are there any NaNs in the whole dataset, or is any NaN coming at any point necessarily a symptom of the syndrome?

@rafa-guedes
Copy link
Author

There are certainly no NaNs in this slice I am loading in these tests in the original data. There should be no NaNs in the entire dataset which covers 40 years but I need to check that..

@rafa-guedes
Copy link
Author

I can check the cluster setup a bit later on.

@rafa-guedes
Copy link
Author

What was your worker setup, please, e.g., threads/process?

The two setups looked like this:

In [1]: from dask.distributed import Client

In [2]: client = Client()

In [3]: client.cluster.scale(2)

In [4]: client
Out[4]: <Client: 'tcp://127.0.0.1:45395' processes=2 threads=4, memory=15.81 GB>

In [5]: client.cluster.scale(4)

In [6]: client
Out[6]: <Client: 'tcp://127.0.0.1:45395' processes=4 threads=8, memory=31.61 GB>

@martindurant
Copy link
Contributor

Hm, tried it now with middling (10 workers, 20 cores) and small (2 workers) cluster now, and got no missing data, working on the dask arrays directly. Note that the chunking I chose matches the zarr inetrnal chunking, which is what I think xarray does automatically :|
Running it now on the smaller one 100 times.

Screen Shot 2019-10-07 at 10 28 27

@martindurant
Copy link
Contributor

(I get the same success with a local cluster too; all of this running in a google cluster)

@willirath
Copy link
Member

I only saw these errors with fairly large kubernetes clusters. This is a fully working example that returned different counts of NaNs (which all shouldn't be there) on ocean.pangeo.io a few minutes ago:

https://nbviewer.jupyter.org/gist/willirath/b40b0b6f281cb1a46fcf848150ca0367

@rafa-guedes
Copy link
Author

In my case I was not using a kubernetes cluster but a single 8-core virtual machine, with small local dask cluster setup. Also, I was working with another dataset recently and did not notice this problem (though I did not test it extensively) so this may perhaps be related to some characteristics of the dataset, just guessing...

@rafa-guedes
Copy link
Author

@martindurant I interpreted case [2] (missing intra-chunks) in those tests wrong. My coordinates (longitude=1440, latitude=721) are not divisible by my chunk sizes (longitude=100, latitude=100), and the last chunk row and column have less cells. It turns out entire chunks are also missing for those 3 cases I mentioned, see plot.

Screenshot from 2019-10-08 14-01-15

<xarray.Dataset>
Dimensions:    (latitude: 721, longitude: 1440, time: 354264)
Coordinates:
  * latitude   (latitude) float32 90.0 89.75 89.5 89.25 ... -89.5 -89.75 -90.0
  * longitude  (longitude) float32 0.0 0.25 0.5 0.75 ... 359.25 359.5 359.75
  * time       (time) datetime64[ns] 1979-01-01 ... 2019-05-31T23:00:00
Data variables:
    u10        (time, latitude, longitude) float64 dask.array<chunksize=(744, 100, 100), meta=np.ndarray>
    v10        (time, latitude, longitude) float64 dask.array<chunksize=(744, 100, 100), meta=np.ndarray>

@rafa-guedes
Copy link
Author

rafa-guedes commented Oct 8, 2019

Just a bit more info in case it is useful.

  • From those 3 out of 170 cases where I got missing chunks on a 2-worker cluster, all missing chunks occurred in the last (thinner) chunk row as discussed above.

  • From those 41 out of 170 cases where I got missing chunks on a 4-worker cluster:

    • In 35 cases missing chunks occurred in the first latitude row.
    • In only 1 case missing chunk occurred in the last row (the thinner chunk row).

Screenshot from 2019-10-08 20-02-46

@martindurant
Copy link
Contributor

Can you please try again with fsspec and gcsfs from master?

@rafa-guedes
Copy link
Author

It looks great @martindurant - no missing chunks on either a 2- or a 4-worker distributed cluster, repeating the tests 100 times. Thanks for looking into this!

@martindurant
Copy link
Contributor

hurray!

@rafa-guedes
Copy link
Author

@willirath could you please check those also fix it for you?

@willirath
Copy link
Member

I get KilledWorker exceptions after pip installing gcsfs and fsspec master on hub.pangeo.io.

I'll have a closer look tomorrow...

@martindurant
Copy link
Contributor

martindurant commented Oct 11, 2019

I get KilledWorker exceptions

You will need the worker and client environments to agree, which would mean remaking the image in this case, a bit of a pain...

I am running the same thing in a single process and not finding any errors so far, but I have no idea how long this will take (did not think to make a progress bar...).

@martindurant
Copy link
Contributor

Finished without a NaN:

ds_combined.distance.data.sum().compute()
2445454000000.0

(I do this via the dask array so that I don't have to worry about interpretation of NaN - even one would make the result be NaN)

@rafa-guedes
Copy link
Author

Could we close this?

@martindurant
Copy link
Contributor

Waiting for the OK from @willirath ; who may want to run with Client() (i.e., a LocalCluster of dask processes, launched from the same environment) to test

@willirath
Copy link
Member

I've just created a setup with KubeCluster on the pangeo binder. Stay tuned...

@willirath
Copy link
Member

I still get NaN's.

Gist (still running, so there will be an update) is here: https://gist.github.com/willirath/b40b0b6f281cb1a46fcf848150ca0367

My impression is that the errors occur a lot less frequent. But this might be due to adapted rate limits on the GCS side?

@willirath
Copy link
Member

results = []
for n in range(20):
    
    vlist = [
        ds[vname].isnull().data.sum()
        for vname in ds.data_vars.keys()
    ]
    
    results.append(sum(vlist).compute(retries=10))
    
    print(n, results[-1])
    
    sleep(20) # allow for scaling down
0 96200000
1 0
2 0
3 0
4 0
5 0
6 0
7 0
8 0
9 0
10 0
11 0
12 0
13 181433200
14 0
15 0

@rafa-guedes
Copy link
Author

I repeated my tests 1000 times now on a 4-worker local cluster, no nans at all for me

@martindurant
Copy link
Contributor

OK, so closing this, and maybe @willirath is facing a different issue (another rate limit?). More logging may tell.

@willirath
Copy link
Member

I've raised zarr-developers/zarr-python#486 hoping to find out how to more easily trigger an Exception upon errors when loading chunks.

@martindurant
Copy link
Contributor

Zarr deciding that where a chunk does not exist, it should be nan is the right behaviour. The question is, what error is actually coming out of gcsfs (FileNotFound instead of some permission error in the file-system layer, and/or IndexError in the mapper) and what errors would be right for zarr. Is gcsfs retrying the underlying error? This info should be in the logs, or else the logging should be improved!

@alimanfoo
Copy link

Just to say we're hitting something like this currently, looking at the fsspec code I think there may still be a possibility that transient errors from storage are getting propagated as KeyErrors to zarr, which then fills in the chunk with missing data. Have raised an issue here: fsspec/filesystem_spec#255.

@alimanfoo
Copy link

We're currently using this workaround which can be used where you know all chunks should be there for an array, i.e., you never expect a KeyError to get raised from the store for any reason.

@alimanfoo
Copy link

For information, changes merged upstream in fsspec that should protect against this happening: fsspec/filesystem_spec#259

@rabernat
Copy link
Member

Thanks for tracking this down @alimanfoo!

Over in pydata/xarray#3831, we have been discussing how to set up ecosystem-wide integration testing for all of the interconnected packages one needs to use xarray + zarr on the cloud. I'd love to get your thoughts on how best to do this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants