Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make NetCDF file cache handling compatible with dask distributed #2822

Open
wants to merge 25 commits into
base: main
Choose a base branch
from

Conversation

gerritholl
Copy link
Collaborator

@gerritholl gerritholl commented Jun 14, 2024

This PR makes file cache handling in the NetCDF4FileHandler compatible with dask distributed. It adds a utility function in satpy.readers.utils called get_distributed_friendly_dask_array, which can be used to produce a dask.array from a netCDF4 variable that can be used in an xarray, but dask graphs remain picklable and thus computable when including this one. This utility function is now used in NetCDF4FileHandler, which replaces homegrown file handle caching by caching using xarray.backends.CachingFileManager, which is needed to implement the aforementioned utility function.

Start work on a utility function to get a dask array from a dataset
variable in a way that is friendly to dask.distributed.
For the distributed-friendly dask array helper, parameterise the test
to cover more cases.  Simplify the implementation.
We need to force the shape and the dtype when getting the
dask-distributed-friendly xarray-dataarray.  Seems to have a first
working prototype now.
@gerritholl gerritholl marked this pull request as ready for review June 14, 2024 12:46
@gerritholl gerritholl marked this pull request as draft June 14, 2024 12:50
Add group support for getting a dask distributed friendly dask array.
Speed up the related tests by sharing the dask distributed client setup
and breakdown.
Add partial backward compatibility for accessing the file handle
attribute when using caching with a NetCDF4FileHandler base class.
Backward incompatibility is not 100%.  Deleting the FileHandler closes
the manager and therefore the ``file_handle`` property, however, when
accessing the ``file_handle`` property after deleting the
``FileHandler``, it is reopened.  Therefore, calling `__del__()``
manually and then accessing ``fh.file_handle`` will now return an open file
(was a closed file).  This should not happen in any sane use scenario.
With the new dask-distributed-friendly caching, make sure we are
respecting auto_maskandscale and are not applying scale factors twice.
Copy link

codecov bot commented Jun 20, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 96.06%. Comparing base (5e27be4) to head (7c173e7).
Report is 172 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #2822   +/-   ##
=======================================
  Coverage   96.05%   96.06%           
=======================================
  Files         370      370           
  Lines       54320    54382   +62     
=======================================
+ Hits        52177    52240   +63     
+ Misses       2143     2142    -1     
Flag Coverage Δ
behaviourtests 3.99% <0.00%> (-0.01%) ⬇️
unittests 96.15% <100.00%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Remove a dead code except block that should never be reached.
@gerritholl gerritholl marked this pull request as ready for review June 20, 2024 10:25
Migrate TestNetCDF4FileHandler from unittest.TestCase to a regular
class.  Use a pytest fixture for the temporary NetCDF file.
Broaden the string that is matched against in
TestNetCDF4FileHandler.test_filenotfound.  On Linux and MacOS the
expected failure gives "No such file or directory".  On Windows it gives
"Invalid file format".
@coveralls
Copy link

coveralls commented Jun 20, 2024

Pull Request Test Coverage Report for Build 9597790771

Warning: This coverage report may be inaccurate.

This pull request's base commit is no longer the HEAD commit of its target branch. This means it includes changes from outside the original pull request, including, potentially, unrelated coverage changes.

Details

  • 92 of 92 (100.0%) changed or added relevant lines in 4 files are covered.
  • 21 unchanged lines in 5 files lost coverage.
  • Overall coverage increased (+0.006%) to 96.046%

Files with Coverage Reduction New Missed Lines %
satpy/readers/utils.py 1 93.19%
satpy/readers/generic_image.py 1 97.7%
satpy/readers/init.py 2 98.68%
satpy/readers/hdf5_utils.py 5 92.77%
satpy/readers/sar_c_safe.py 12 97.28%
Totals Coverage Status
Change from base Build 9497248893: 0.006%
Covered Lines: 51643
Relevant Lines: 53769

💛 - Coveralls

@djhoese djhoese added enhancement code enhancements, features, improvements component:readers labels Jun 26, 2024
Copy link
Member

@djhoese djhoese left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome job deciphering how to use the CachingFileManager and wrapping things in a map_blocks task. I think this is really close to being done, but I had some concerns about the helper function.

satpy/readers/netcdf_utils.py Show resolved Hide resolved
satpy/readers/utils.py Outdated Show resolved Hide resolved
Comment on lines 500 to 502
manager (xarray.backends.CachingFileManager):
Instance of xarray.backends.CachingFileManager encapsulating the
dataset to be read.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check how the docs render this. If the argument type isn't "clickable" to go directly to the xarray docs for the CFM then we could wrap the mention of it in the description with:

:class:`xarray.backends.CachingFileManager`

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The argument type was already clickable, but in the description it was not. I have now made it clickable in both cases (screenshot from local doc production):

Bildschirmfoto_2024-07-25_11-10-06

Comment on lines 480 to 481
def get_distributed_friendly_dask_array(manager, varname, chunks, dtype,
group="/", auto_maskandscale=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how I feel about this function name. Obviously it makes sense in this PR because it solves this specific problem, but it feels like there is a (shorter) more generic name that gets the point across. Another thing is that distributed_friendly is mentioned here, but that friendliness is a side effect of the "serializable" nature of the way you're accessing the data here, right? get_serializable_dask_array?

I don't feel super strongly about this, but the name was distracting to me so I thought I'd say something.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed get_serializable_dask_array.

satpy/readers/utils.py Outdated Show resolved Hide resolved
method set_auto_maskandscale, such as is the case for
NetCDF4.Dataset.
"""
def get_chunk():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The chunks is never used here. The current calling from the file handler is accessing the full shape of the variable so this is fine, but only for now. I mean that map_blocks will only ever call this function once. However, if you added a block_info kwarg to the function signature or whatever the map_blocks special keyword argument is, then you could change [:] to access a specific sub-set of the NetCDF file variable and only do a partial load. This should improve performance a lot (I think 🤞) if it was actually used in the file handler.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The chunks is never used here.

Hm? I'm passing chunks=chunks when I call da.map_blocks. What do you mean, it is never used? Do you mean I could be using chunk-location and num-chunks from a block_info dictionary passed to get_chunk?

The current calling from the file handler is accessing the full shape of the variable so this is fine, but only for now. I mean that map_blocks will only ever call this function once. However, if you added a block_info kwarg to the function signature or whatever the map_blocks special keyword argument is, then you could change [:] to access a specific sub-set of the NetCDF file variable and only do a partial load. This should improve performance a lot (I think 🤞) if it was actually used in the file handler.

I will try to wrap may head around this ☺

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think that's what I'm saying. I think the result of get_chunk() right now is broken for any chunk size other than the full shape of the array because you never do any slicing of the NetCDF variable inside get_chunk(). So, if you had a full array of 100x100 and a chunk size of 50x50, then map_blocks would call this function 4 times ((0-50, 0-50), (0-50, 50-100), (50-100, 0-50), (50-100, 50-100)). BUT each call would return the full variable 100x100. So I think this would be a case where the dask array would say "yeah, I have shape 100x100", but then once you computed it you'd get a 200x200 array back.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it now, I think.

satpy/readers/utils.py Outdated Show resolved Hide resolved
satpy/readers/netcdf_utils.py Show resolved Hide resolved
gerritholl and others added 8 commits July 24, 2024 11:37
Fix the spelling in the docstring example using netCDF4.

Co-authored-by: David Hoese <david.hoese@ssec.wisc.edu>
Add a workaround to prevent an unexpected type promotion in the unit
test for dask distributed friendly dask arrays.
When getting a dask-distributed friendly dask array from a NetCDF file
using the CachingFileManager, use the information provided in bloc_info
on the array location in case we are reading not the entire variable.
Rename get_distributed_friendly_dask_array to
get_serialisable_dask_array and remove the group argument, moving the
responsibility for handlings groups to the caller.
Pytroll uses US spelling.  Rename serializable to serialisable.

Remove removed keyword argument from call.
Ensure that the meta we pass to map_blocks also has the right dtype.
Not sure if this is necessary when map_blocks already has the right
dtype, but it can't hurt.
Fixing three merge conflicts.
@coveralls
Copy link

coveralls commented Jul 25, 2024

Pull Request Test Coverage Report for Build 10528447135

Details

  • 105 of 105 (100.0%) changed or added relevant lines in 4 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage increased (+0.004%) to 96.155%

Totals Coverage Status
Change from base Build 10528275069: 0.004%
Covered Lines: 52470
Relevant Lines: 54568

💛 - Coveralls

Copy link
Member

@mraspaud mraspaud left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just one comment inline. How is this affecting the performance of the reader? Do you consider this ready to be merged?

satpy/readers/utils.py Outdated Show resolved Hide resolved
@gerritholl
Copy link
Collaborator Author

How is this affecting the performance of the reader?

I had not tested that yet. I have now. Sadly, it gets worse :(

A simple test script that reads FCI, loads some channels, resamples, and writes them again, not specifying any dask scheduler.

With Satpy main:

time 0:45.93, RAM 9.27 GB

With this PR:

time 0:55.0, RAM 10.0 GB

Additionally, upon exiting, there is the repeated error message:

 Original exception was:
Error in sys.excepthook:

Original exception was:
Error in sys.excepthook:

Original exception was:
Error in sys.excepthook:

Do you consider this ready to be merged?

Sadly no, considering the problems described above. I will dig into this.

@gerritholl
Copy link
Collaborator Author

With Satpy main, three runs, times in seconds:

Scene creation: 13.9, 11.9, 11.5

Loading: 1.5, 1.3, 1.0

Computing: 6.5, 6.9, 6.5

With this PR:

Scene creation: 13.2, 12.0, 12.0

Loading: 3.5, 3.5, 4.4

Computing: 5.5, 5.8, 5.6

So it's in particular the loading that gets slower.

@gerritholl
Copy link
Collaborator Author

Profiling reveals that there are 160 calls to acquire_context, but only 40 to _NCDatasetWrapper.__init__, so the caching appears to be doing its job. But the load call is not reusing the opening that happened upon Scene creation, which is why performance is impacted. I will see what I can improve.

@gerritholl gerritholl marked this pull request as draft July 26, 2024 09:55
When caching, make sure we use the CachingFileManager already upon scene
creation and not only by the time we are loading.
@gerritholl
Copy link
Collaborator Author

With c2b1533 loading is much faster, although with more variability than for satpy main. Scene creation is a little slower.

Scene creation: 13.4, 14.5, 12.8, 12.6, 12.8, 13.0

Loading: 1.9, 1.0, 1.6, 1.4, 1.3, 1.0

@gerritholl
Copy link
Collaborator Author

I can't reliably reproduce the performance differences. Running it again with the main branch gives:

Scene creation, main branch: 14.5, 14.3, 13.2

Scene creation, this PR: 13.6, 12.7, 13.8

And with cProfile, it's always faster with my PR.

Considering those uncertainties, I will declare performance is the same within the measurement uncertainty.

@gerritholl gerritholl marked this pull request as ready for review July 26, 2024 15:34
Don't subclass netCDF4.Dataset, rather just return an instance from a
helper function.  Seems good enough and gets rid of the weird error
messages upon exit.
@gerritholl
Copy link
Collaborator Author

Fixed the problem with the strange exception/error messages upon exit in 9fce5a7.

Some readers read entire groups; this needs xarray kwargs to be set even
if caching is used.
@mraspaud
Copy link
Member

I'm happy with this. @djhoese can you just confirm that you are good with this being merged? (and feel free to merge it if that's the case)

if self.manager is None:
return None
return self.manager.acquire()

@staticmethod
def _set_file_handle_auto_maskandscale(file_handle, auto_maskandscale):
if hasattr(file_handle, "set_auto_maskandscale"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that this has to be handled in your PR, but if I remember correctly this Dataset-level set_auto_maskandscale was added to netcdf4-python quite a while ago. It seems error prone and confusing to silently call the method only if it exists and to not log/inform the user that it wasn't used when it was expected. Maybe we should remove this method on the file handler class and always call file_handle.set_auto_maskandscale no matter what. Your wrapper does it already.

Comment on lines +525 to +531
@pytest.fixture(scope="class")
def dask_dist_client(self):
"""Set up and close a dask distributed client."""
from dask.distributed import Client
cl = Client()
yield cl
cl.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like dask developer's recommend using their test utilities for writing distributed-based tests:

https://distributed.dask.org/en/latest/develop.html#writing-tests

Would it be possible to use their tools instead of this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component:readers enhancement code enhancements, features, improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

LI and FCI readers do not work with dask distributed scheduler
4 participants