Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

open_mfdataset usage and limitations. #2501

Closed
Thomas-Z opened this issue Oct 23, 2018 · 22 comments
Closed

open_mfdataset usage and limitations. #2501

Thomas-Z opened this issue Oct 23, 2018 · 22 comments

Comments

@Thomas-Z
Copy link
Contributor

I'm trying to understand and use the open_mfdataset function to open a huge amount of files.
I thought this function would be quit similar to dask.dataframe.from_delayed and allow to "load" and work on an amount of data only limited by the number of Dask workers (or "unlimited" considering it could be "lazily loaded").

But my tests showed something quit different.
It seems xarray requires the index to be copied back to the Dask client in order to "auto_combine" data.

Doing some tests on a small portion of my data I have something like this.

Each file has these dimensions: time: ~2871, xx_ind: 40, yy_ind: 128.
The concatenation of these files is made on the time dimension and my understanding is that only the time is loaded and brought back to the client (other dimensions are constant).

Parallel tests are made with 200 dask workers.

=================== Loading 1002 files ===================

xr.open_mfdataset('*1002*.nc')

peak memory: 1660.59 MiB, increment: 1536.25 MiB
Wall time: 1min 29s


xr.open_mfdataset('*1002*.nc', parallel=True)

peak memory: 1745.14 MiB, increment: 1602.43 MiB
Wall time: 53 s

=================== Loading 5010 files ===================

xr.open_mfdataset('*5010*.nc')

peak memory: 7419.99 MiB, increment: 7315.36 MiB
Wall time: 8min 33s


xr.open_mfdataset('*5010*.nc', parallel=True)

peak memory: 8249.75 MiB, increment: 8112.07 MiB
Wall time: 4min 48s

As you can see, the amount of memory used for this operation is significant and I won't be able to do this on much more files.
When using the parallel option, the loading of files take a few seconds (judging from what the Dask dashboard is showing) and I'm guessing the rest of the time is for the "auto_combine".

So I'm wondering if I'm doing something wrong, if there other way to load data or if I cannot use xarray directly for this quantity of data and have to use Dask directly.

Thanks in advance.

INSTALLED VERSIONS

commit: None
python: 3.5.2.final.0
python-bits: 64
OS: Linux
OS-release: 4.15.0-34-generic
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: None
LANG: fr_FR.UTF-8
LOCALE: fr_FR.UTF-8
xarray: 0.10.9+32.g9f4474d.dirty
pandas: 0.23.4
numpy: 1.15.2
scipy: 1.1.0
netCDF4: 1.4.1
h5netcdf: 0.6.2
h5py: 2.8.0
Nio: None
zarr: 2.2.0
cftime: 1.0.1
PseudonetCDF: None
rasterio: None
iris: None
bottleneck: None
cyordereddict: None
dask: 0.19.4
distributed: 1.23.3
matplotlib: 3.0.0
cartopy: None
seaborn: None
setuptools: 40.4.3
pip: 18.1
conda: None
pytest: 3.9.1
IPython: 7.0.1
sphinx: None

@rabernat
Copy link
Contributor

In open_mfdataset, all of the dimensions and coordinates of the individual files have to be checked and verified to be compatible. That is often the source of slow performance with open_mfdataset.

To help us help you debug, please provide more information about the files your are opening. Specifically, please call open_dataset() directly on the first two files and copy and paste the output here. Specifically, do something like this

from glob import glob
import xarray as xr
all_files = glob('*1002*.nc')
display(xr.open_dataset(all_files[0]))
display(xr.open_dataset(all_files[1]))

@rabernat
Copy link
Contributor

^ I'm assuming you're in a notebook. If not, call print instead of display.

@Thomas-Z
Copy link
Contributor Author

Thank you for looking into this.

I just want to point out that I'm not that much concerned with the "slow performance" but much more with the memory consumption and the limitation it implies.

from glob import glob
import xarray as xr

all_files = glob('...*TP110*.nc')
display(xr.open_dataset(all_files[0]))
display(xr.open_dataset(all_files[1]))
<xarray.Dataset>
Dimensions:                            (meas_ind: 40, time: 2871, wvf_ind: 128)
Coordinates:
  * time                               (time) datetime64[ns] 2017-06-19T14:24:20.792036992 ... 2017-06-19T15:14:38.491743104
  * meas_ind                           (meas_ind) int8 0 1 2 3 4 ... 36 37 38 39
  * wvf_ind                            (wvf_ind) int8 0 1 2 3 ... 125 126 127
    lat                                (time) float64 ...
    lon                                (time) float64 ...
    lon_40hz                           (time, meas_ind) float64 ...
    lat_40hz                           (time, meas_ind) float64 ...
Data variables:
    time_40hz                          (time, meas_ind) datetime64[ns] ...
    surface_type                       (time) float32 ...
    rad_surf_type                      (time) float32 ...
    qual_alt_1hz_range                 (time) float32 ...
    qual_alt_1hz_swh                   (time) float32 ...
    qual_alt_1hz_sig0                  (time) float32 ...
    qual_alt_1hz_off_nadir_angle_wf    (time) float32 ...
    qual_inst_corr_1hz_range           (time) float32 ...
    qual_inst_corr_1hz_swh             (time) float32 ...
    qual_inst_corr_1hz_sig0            (time) float32 ...
    qual_rad_1hz_tb_k                  (time) float32 ...
    qual_rad_1hz_tb_ka                 (time) float32 ...
    alt_state_flag_acq_mode_40hz       (time, meas_ind) float32 ...
    alt_state_flag_tracking_mode_40hz  (time, meas_ind) float32 ...
    orb_state_flag_diode               (time) float32 ...
    orb_state_flag_rest                (time) float32 ...
    ecmwf_meteo_map_avail              (time) float32 ...
    trailing_edge_variation_flag       (time) float32 ...
    trailing_edge_variation_flag_40hz  (time, meas_ind) float32 ...
    ice_flag                           (time) float32 ...
    interp_flag_mean_sea_surface       (time) float32 ...
    interp_flag_mdt                    (time) float32 ...
    interp_flag_ocean_tide_sol1        (time) float32 ...
    interp_flag_ocean_tide_sol2        (time) float32 ...
    interp_flag_meteo                  (time) float32 ...
    alt                                (time) float64 ...
    alt_40hz                           (time, meas_ind) float64 ...
    orb_alt_rate                       (time) float32 ...
    range                              (time) float64 ...
    range_40hz                         (time, meas_ind) float64 ...
    range_used_40hz                    (time, meas_ind) float32 ...
    range_rms                          (time) float32 ...
    range_numval                       (time) float32 ...
    number_of_iterations               (time, meas_ind) float32 ...
    net_instr_corr_range               (time) float64 ...
    model_dry_tropo_corr               (time) float32 ...
    model_wet_tropo_corr               (time) float32 ...
    rad_wet_tropo_corr                 (time) float32 ...
    iono_corr_gim                      (time) float32 ...
    sea_state_bias                     (time) float32 ...
    swh                                (time) float32 ...
    swh_40hz                           (time, meas_ind) float32 ...
    swh_used_40hz                      (time, meas_ind) float32 ...
    swh_rms                            (time) float32 ...
    swh_numval                         (time) float32 ...
    net_instr_corr_swh                 (time) float32 ...
    sig0                               (time) float32 ...
    sig0_40hz                          (time, meas_ind) float32 ...
    sig0_used_40hz                     (time, meas_ind) float32 ...
    sig0_rms                           (time) float32 ...
    sig0_numval                        (time) float32 ...
    agc                                (time) float32 ...
    agc_rms                            (time) float32 ...
    agc_numval                         (time) float32 ...
    net_instr_corr_sig0                (time) float32 ...
    atmos_corr_sig0                    (time) float32 ...
    off_nadir_angle_wf                 (time) float32 ...
    off_nadir_angle_wf_40hz            (time, meas_ind) float32 ...
    tb_k                               (time) float32 ...
    tb_ka                              (time) float32 ...
    mean_sea_surface                   (time) float64 ...
    mean_topography                    (time) float64 ...
    geoid                              (time) float64 ...
    bathymetry                         (time) float64 ...
    inv_bar_corr                       (time) float32 ...
    hf_fluctuations_corr               (time) float32 ...
    ocean_tide_sol1                    (time) float64 ...
    ocean_tide_sol2                    (time) float64 ...
    ocean_tide_equil                   (time) float32 ...
    ocean_tide_non_equil               (time) float32 ...
    load_tide_sol1                     (time) float32 ...
    load_tide_sol2                     (time) float32 ...
    solid_earth_tide                   (time) float32 ...
    pole_tide                          (time) float32 ...
    wind_speed_model_u                 (time) float32 ...
    wind_speed_model_v                 (time) float32 ...
    wind_speed_alt                     (time) float32 ...
    rad_water_vapor                    (time) float32 ...
    rad_liquid_water                   (time) float32 ...
    ice1_range_40hz                    (time, meas_ind) float64 ...
    ice1_sig0_40hz                     (time, meas_ind) float32 ...
    ice1_qual_flag_40hz                (time, meas_ind) float32 ...
    seaice_range_40hz                  (time, meas_ind) float64 ...
    seaice_sig0_40hz                   (time, meas_ind) float32 ...
    seaice_qual_flag_40hz              (time, meas_ind) float32 ...
    ice2_range_40hz                    (time, meas_ind) float64 ...
    ice2_le_sig0_40hz                  (time, meas_ind) float32 ...
    ice2_sig0_40hz                     (time, meas_ind) float32 ...
    ice2_sigmal_40hz                   (time, meas_ind) float32 ...
    ice2_slope1_40hz                   (time, meas_ind) float64 ...
    ice2_slope2_40hz                   (time, meas_ind) float64 ...
    ice2_mqe_40hz                      (time, meas_ind) float32 ...
    ice2_qual_flag_40hz                (time, meas_ind) float32 ...
    mqe_40hz                           (time, meas_ind) float32 ...
    peakiness_40hz                     (time, meas_ind) float32 ...
    ssha                               (time) float32 ...
    tracker_40hz                       (time, meas_ind) float64 ...
    tracker_used_40hz                  (time, meas_ind) float32 ...
    tracker_diode_40hz                 (time, meas_ind) float64 ...
    pri_counter_40hz                   (time, meas_ind) float64 ...
    qual_alt_1hz_off_nadir_angle_pf    (time) float32 ...
    off_nadir_angle_pf                 (time) float32 ...
    off_nadir_angle_rain_40hz          (time, meas_ind) float32 ...
    uso_corr                           (time) float64 ...
    internal_path_delay_corr           (time) float64 ...
    modeled_instr_corr_range           (time) float32 ...
    doppler_corr                       (time) float32 ...
    cog_corr                           (time) float32 ...
    modeled_instr_corr_swh             (time) float32 ...
    internal_corr_sig0                 (time) float32 ...
    modeled_instr_corr_sig0            (time) float32 ...
    agc_40hz                           (time, meas_ind) float32 ...
    agc_corr_40hz                      (time, meas_ind) float32 ...
    scaling_factor_40hz                (time, meas_ind) float64 ...
    epoch_40hz                         (time, meas_ind) float64 ...
    width_leading_edge_40hz            (time, meas_ind) float64 ...
    amplitude_40hz                     (time, meas_ind) float64 ...
    thermal_noise_40hz                 (time, meas_ind) float64 ...
    seaice_epoch_40hz                  (time, meas_ind) float64 ...
    seaice_amplitude_40hz              (time, meas_ind) float64 ...
    ice2_epoch_40hz                    (time, meas_ind) float64 ...
    ice2_amplitude_40hz                (time, meas_ind) float64 ...
    ice2_mean_amplitude_40hz           (time, meas_ind) float64 ...
    ice2_thermal_noise_40hz            (time, meas_ind) float64 ...
    ice2_slope_40hz                    (time, meas_ind) float64 ...
    signal_to_noise_ratio              (time) float32 ...
    waveforms_40hz                     (time, meas_ind, wvf_ind) float32 ...
Attributes:
    Conventions:                       CF-1.1
    title:                             GDR - Expertise dataset
    institution:                       CNES
    source:                            radar altimeter
    history:                           2017-07-21 08:25:07 : Creation
    contact:                           CNES aviso@oceanobs.com, EUMETSAT ops@...
    references:                        L1 library=V4.5p1, L2 library=V5.5p2, ...
    processing_center:                 SALP
    reference_document:                SARAL/ALTIKA Products Handbook, SALP-M...
    mission_name:                      SARAL
    altimeter_sensor_name:             ALTIKA
    radiometer_sensor_name:            ALTIKA_RAD
    doris_sensor_name:                 DGXX
    cycle_number:                      110
    absolute_rev_number:               22545
    pass_number:                       1
    absolute_pass_number:              109219
    equator_time:                      2017-06-19 14:49:32.128000
    equator_longitude:                 227.77
    first_meas_time:                   2017-06-19 14:24:20.792037
    last_meas_time:                    2017-06-19 15:14:38.491743
    xref_altimeter_level1:             ALK_ALT_1PaS20170619_154722_20170619_1...
    xref_radiometer_level1:            ALK_RAD_1PaS20170619_154643_20170619_1...
    xref_altimeter_characterisation:   ALK_CHA_AXVCNE20131115_120000_20100101...
    xref_radiometer_characterisation:  ALK_CHR_AXVCNE20110207_180000_20110101...
    xref_altimeter_ltm:                ALK_CAL_AXXCNE20170720_110014_20130102...
    xref_doris_uso:                    SRL_OS1_AXXCNE20170720_083800_20130226...
    xref_orbit_data:                   SRL_VOR_AXVCNE20170720_111700_20170618...
    xref_pf_data:                      SRL_VPF_AXVCNE20170720_111800_20170618...
    xref_pole_location:                SMM_POL_AXXCNE20170721_071500_19870101...
    xref_gim_data:                     SRL_ION_AXPCNE20170620_074756_20170619...
    xref_mog2d_data:                   SMM_MOG_AXVCNE20170709_191501_20170619...
    xref_orf_data:                     SRL_ORF_AXXCNE20170720_083800_20160704...
    xref_meteorological_files:         SMM_APA_AXVCNE20170619_170611_20170619...
    ellipsoid_axis:                    6378136.3
    ellipsoid_flattening:              0.0033528131778969
<xarray.Dataset>
Dimensions:                            (meas_ind: 40, time: 2779, wvf_ind: 128)
Coordinates:
  * time                               (time) datetime64[ns] 2017-06-19T15:14:39.356848 ... 2017-06-19T16:04:56.808873920
  * meas_ind                           (meas_ind) int8 0 1 2 3 4 ... 36 37 38 39
  * wvf_ind                            (wvf_ind) int8 0 1 2 3 ... 125 126 127
    lat                                (time) float64 ...
    lon                                (time) float64 ...
    lon_40hz                           (time, meas_ind) float64 ...
    lat_40hz                           (time, meas_ind) float64 ...
Data variables:
    time_40hz                          (time, meas_ind) datetime64[ns] ...
    surface_type                       (time) float32 ...
    rad_surf_type                      (time) float32 ...
    qual_alt_1hz_range                 (time) float32 ...
    qual_alt_1hz_swh                   (time) float32 ...
    qual_alt_1hz_sig0                  (time) float32 ...
    qual_alt_1hz_off_nadir_angle_wf    (time) float32 ...
    qual_inst_corr_1hz_range           (time) float32 ...
    qual_inst_corr_1hz_swh             (time) float32 ...
    qual_inst_corr_1hz_sig0            (time) float32 ...
    qual_rad_1hz_tb_k                  (time) float32 ...
    qual_rad_1hz_tb_ka                 (time) float32 ...
    alt_state_flag_acq_mode_40hz       (time, meas_ind) float32 ...
    alt_state_flag_tracking_mode_40hz  (time, meas_ind) float32 ...
    orb_state_flag_diode               (time) float32 ...
    orb_state_flag_rest                (time) float32 ...
    ecmwf_meteo_map_avail              (time) float32 ...
    trailing_edge_variation_flag       (time) float32 ...
    trailing_edge_variation_flag_40hz  (time, meas_ind) float32 ...
    ice_flag                           (time) float32 ...
    interp_flag_mean_sea_surface       (time) float32 ...
    interp_flag_mdt                    (time) float32 ...
    interp_flag_ocean_tide_sol1        (time) float32 ...
    interp_flag_ocean_tide_sol2        (time) float32 ...
    interp_flag_meteo                  (time) float32 ...
    alt                                (time) float64 ...
    alt_40hz                           (time, meas_ind) float64 ...
    orb_alt_rate                       (time) float32 ...
    range                              (time) float64 ...
    range_40hz                         (time, meas_ind) float64 ...
    range_used_40hz                    (time, meas_ind) float32 ...
    range_rms                          (time) float32 ...
    range_numval                       (time) float32 ...
    number_of_iterations               (time, meas_ind) float32 ...
    net_instr_corr_range               (time) float64 ...
    model_dry_tropo_corr               (time) float32 ...
    model_wet_tropo_corr               (time) float32 ...
    rad_wet_tropo_corr                 (time) float32 ...
    iono_corr_gim                      (time) float32 ...
    sea_state_bias                     (time) float32 ...
    swh                                (time) float32 ...
    swh_40hz                           (time, meas_ind) float32 ...
    swh_used_40hz                      (time, meas_ind) float32 ...
    swh_rms                            (time) float32 ...
    swh_numval                         (time) float32 ...
    net_instr_corr_swh                 (time) float32 ...
    sig0                               (time) float32 ...
    sig0_40hz                          (time, meas_ind) float32 ...
    sig0_used_40hz                     (time, meas_ind) float32 ...
    sig0_rms                           (time) float32 ...
    sig0_numval                        (time) float32 ...
    agc                                (time) float32 ...
    agc_rms                            (time) float32 ...
    agc_numval                         (time) float32 ...
    net_instr_corr_sig0                (time) float32 ...
    atmos_corr_sig0                    (time) float32 ...
    off_nadir_angle_wf                 (time) float32 ...
    off_nadir_angle_wf_40hz            (time, meas_ind) float32 ...
    tb_k                               (time) float32 ...
    tb_ka                              (time) float32 ...
    mean_sea_surface                   (time) float64 ...
    mean_topography                    (time) float64 ...
    geoid                              (time) float64 ...
    bathymetry                         (time) float64 ...
    inv_bar_corr                       (time) float32 ...
    hf_fluctuations_corr               (time) float32 ...
    ocean_tide_sol1                    (time) float64 ...
    ocean_tide_sol2                    (time) float64 ...
    ocean_tide_equil                   (time) float32 ...
    ocean_tide_non_equil               (time) float32 ...
    load_tide_sol1                     (time) float32 ...
    load_tide_sol2                     (time) float32 ...
    solid_earth_tide                   (time) float32 ...
    pole_tide                          (time) float32 ...
    wind_speed_model_u                 (time) float32 ...
    wind_speed_model_v                 (time) float32 ...
    wind_speed_alt                     (time) float32 ...
    rad_water_vapor                    (time) float32 ...
    rad_liquid_water                   (time) float32 ...
    ice1_range_40hz                    (time, meas_ind) float64 ...
    ice1_sig0_40hz                     (time, meas_ind) float32 ...
    ice1_qual_flag_40hz                (time, meas_ind) float32 ...
    seaice_range_40hz                  (time, meas_ind) float64 ...
    seaice_sig0_40hz                   (time, meas_ind) float32 ...
    seaice_qual_flag_40hz              (time, meas_ind) float32 ...
    ice2_range_40hz                    (time, meas_ind) float64 ...
    ice2_le_sig0_40hz                  (time, meas_ind) float32 ...
    ice2_sig0_40hz                     (time, meas_ind) float32 ...
    ice2_sigmal_40hz                   (time, meas_ind) float32 ...
    ice2_slope1_40hz                   (time, meas_ind) float64 ...
    ice2_slope2_40hz                   (time, meas_ind) float64 ...
    ice2_mqe_40hz                      (time, meas_ind) float32 ...
    ice2_qual_flag_40hz                (time, meas_ind) float32 ...
    mqe_40hz                           (time, meas_ind) float32 ...
    peakiness_40hz                     (time, meas_ind) float32 ...
    ssha                               (time) float32 ...
    tracker_40hz                       (time, meas_ind) float64 ...
    tracker_used_40hz                  (time, meas_ind) float32 ...
    tracker_diode_40hz                 (time, meas_ind) float64 ...
    pri_counter_40hz                   (time, meas_ind) float64 ...
    qual_alt_1hz_off_nadir_angle_pf    (time) float32 ...
    off_nadir_angle_pf                 (time) float32 ...
    off_nadir_angle_rain_40hz          (time, meas_ind) float32 ...
    uso_corr                           (time) float64 ...
    internal_path_delay_corr           (time) float64 ...
    modeled_instr_corr_range           (time) float32 ...
    doppler_corr                       (time) float32 ...
    cog_corr                           (time) float32 ...
    modeled_instr_corr_swh             (time) float32 ...
    internal_corr_sig0                 (time) float32 ...
    modeled_instr_corr_sig0            (time) float32 ...
    agc_40hz                           (time, meas_ind) float32 ...
    agc_corr_40hz                      (time, meas_ind) float32 ...
    scaling_factor_40hz                (time, meas_ind) float64 ...
    epoch_40hz                         (time, meas_ind) float64 ...
    width_leading_edge_40hz            (time, meas_ind) float64 ...
    amplitude_40hz                     (time, meas_ind) float64 ...
    thermal_noise_40hz                 (time, meas_ind) float64 ...
    seaice_epoch_40hz                  (time, meas_ind) float64 ...
    seaice_amplitude_40hz              (time, meas_ind) float64 ...
    ice2_epoch_40hz                    (time, meas_ind) float64 ...
    ice2_amplitude_40hz                (time, meas_ind) float64 ...
    ice2_mean_amplitude_40hz           (time, meas_ind) float64 ...
    ice2_thermal_noise_40hz            (time, meas_ind) float64 ...
    ice2_slope_40hz                    (time, meas_ind) float64 ...
    signal_to_noise_ratio              (time) float32 ...
    waveforms_40hz                     (time, meas_ind, wvf_ind) float32 ...
Attributes:
    Conventions:                       CF-1.1
    title:                             GDR - Expertise dataset
    institution:                       CNES
    source:                            radar altimeter
    history:                           2017-07-21 08:25:19 : Creation
    contact:                           CNES aviso@oceanobs.com, EUMETSAT ops@...
    references:                        L1 library=V4.5p1, L2 library=V5.5p2, ...
    processing_center:                 SALP
    reference_document:                SARAL/ALTIKA Products Handbook, SALP-M...
    mission_name:                      SARAL
    altimeter_sensor_name:             ALTIKA
    radiometer_sensor_name:            ALTIKA_RAD
    doris_sensor_name:                 DGXX
    cycle_number:                      110
    absolute_rev_number:               22546
    pass_number:                       2
    absolute_pass_number:              109220
    equator_time:                      2017-06-19 15:39:46.492000
    equator_longitude:                 35.21
    first_meas_time:                   2017-06-19 15:14:39.356848
    last_meas_time:                    2017-06-19 16:04:56.808874
    xref_altimeter_level1:             ALK_ALT_1PaS20170619_154722_20170619_1...
    xref_radiometer_level1:            ALK_RAD_1PaS20170619_154643_20170619_1...
    xref_altimeter_characterisation:   ALK_CHA_AXVCNE20131115_120000_20100101...
    xref_radiometer_characterisation:  ALK_CHR_AXVCNE20110207_180000_20110101...
    xref_altimeter_ltm:                ALK_CAL_AXXCNE20170720_110014_20130102...
    xref_doris_uso:                    SRL_OS1_AXXCNE20170720_083800_20130226...
    xref_orbit_data:                   SRL_VOR_AXVCNE20170720_111700_20170618...
    xref_pf_data:                      SRL_VPF_AXVCNE20170720_111800_20170618...
    xref_pole_location:                SMM_POL_AXXCNE20170721_071500_19870101...
    xref_gim_data:                     SRL_ION_AXPCNE20170620_074756_20170619...
    xref_mog2d_data:                   SMM_MOG_AXVCNE20170709_191501_20170619...
    xref_orf_data:                     SRL_ORF_AXXCNE20170720_083800_20160704...
    xref_meteorological_files:         SMM_APA_AXVCNE20170619_170611_20170619...
    ellipsoid_axis:                    6378136.3
    ellipsoid_flattening:              0.0033528131778969```

@rsignell-usgs
Copy link

rsignell-usgs commented May 30, 2019

I'm hitting some memory issues with using open_mfdataset with a cluster also.

Specifically, I'm trying to open 8760 NetCDF files with an 8 node, 40 cpu LocalCluster.

When I issue:

ds = xr.open_mfdataset(files, parallel=True)

all looks good on the Dask dashboard:
2019-05-30_9-55-05
2019-05-30_9-54-49
and the tasks complete with no errors in about 4 minutes.

Then 4 more minutes go by before I get a bunch of errors like:

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker process 26054 was killed by unknown signal
distributed.nanny - WARNING - Restarting worker

and my cell doesn't complete.

Any suggestions?

@rabernat
Copy link
Contributor

Try writing a preprocessor function that drops all coordinates

def drop_coords(ds):
    return ds.reset_coords(drop=True)

@rsignell-usgs
Copy link

rsignell-usgs commented Jun 27, 2019

I tried this, and either I didn't apply it right, or it didn't work. The memory use kept growing until the process died. My code to process the 8760 netcdf files with open_mfdataset looks like this:

import xarray as xr
from dask.distributed import Client, progress, LocalCluster

cluster = LocalCluster()
client = Client(cluster)

import pandas as pd

dates = pd.date_range(start='2009-01-01 00:00',end='2009-12-31 23:00', freq='1h')
files = ['./nc/{}/{}.CHRTOUT_DOMAIN1.comp'.format(date.strftime('%Y'),date.strftime('%Y%m%d%H%M')) for date in dates]

def drop_coords(ds):
    return ds.reset_coords(drop=True)

ds = xr.open_mfdataset(files, preprocess=drop_coords, autoclose=True, parallel=True)
ds1 = ds.chunk(chunks={'time':168, 'feature_id':209929})

import numcodecs
numcodecs.blosc.use_threads = False
ds1.to_zarr('zarr/2009', mode='w', consolidated=True)

I transfered the netcdf files from AWS S3 to my local disk to run this, using this command:

rclone sync --include '*.CHRTOUT_DOMAIN1.comp' aws-east:nwm-archive/2009 . --checksum --fast-list --transfers 16

@TomAugspurger, if you could take a look, that would be great, and if you have any ideas of how to make this example simpler/more easily reproducible, please let me know.

@TomAugspurger
Copy link
Contributor

Thanks, will take a look this afternoon. Are there any datasets on https://pangeo-data.github.io/pangeo-datastore/ that would exhibit this poor behavior? I may not have access to the bucket (or I'm misusing rclone)

2019/06/27 14:23:50 NOTICE: Config file "/Users/taugspurger/.config/rclone/rclone.conf" not found - using defaults
2019/06/27 14:23:50 Failed to create file system for "aws-east:nwm-archive/2009": didn't find section in config file

@rabernat
Copy link
Contributor

Are there any datasets on https://pangeo-data.github.io/pangeo-datastore/ that would exhibit this poor behavior?

The datasets in our cloud datastore are designed explicitly to avoid this problem!

@rabernat
Copy link
Contributor

@rsignell-usgs

Can you post the xarray repr of two sample files post pre-processing function?

@TomAugspurger
Copy link
Contributor

The datasets in our cloud datastore are designed explicitly to avoid this problem!

Good to know!

FYI, #2501 (comment) was user error (I can access it, but need to specify the us-east-1 region). Taking a look now.

@rsignell-usgs
Copy link

@TomAugspurger, I'm back from vacation now and ready to attack this again. Any updates on your end?

@TomAugspurger
Copy link
Contributor

I'm looking into it today. Can you clarify

The memory use kept growing until the process died.

by "process" do you mean a dask worker process, or just the main python process executing the ds = xr.open_mfdataset(...) code?

@rsignell-usgs
Copy link

@TomAugspurger, okay, I just ran the above code again and here's what happens:

The open_mfdataset proceeds nicely on my 8 workers with 40 cores, eventually completing the 8760 open_dataset tasks in about 10 minutes. One interesting thing is that the number of tasks keep dropping as time goes on. Not sure why that would be:
2019-07-08_13-40-09
2019-07-08_13-42-21
2019-07-08_13-43-15
2019-07-08_13-43-58
2019-07-08_13-49-57
The memory usage on the workers seems okay during this process:
2019-07-08_13-38-52

Then, despite the tasks showing on the dashboard being completed, the open_mfdataset command does not complete, but nothing has died, and I'm not sure what's happening. I check top and get this:
2019-07-08_13-51-13

then after about 10 more minutes, I get these warnings:
2019-07-08_13-56-19

and then the errors:

distributed.client - WARNING - Couldn't gather 17520 keys, rescheduling {'getattr-fd038834-befa-4a9b-b78f-51f9aa2b28e5': ('tcp://127.0.0.1:45640',), 'drop_coords-39be9e52-59de-4e1f-b6d8-27e7d931b5af': ('tcp://127.0.0.1:55881',), 'drop_coords-8bd07037-9ca4-4f97-83fb-8b02d7ad0333': ('tcp://127.0.0.1:56164',), 'drop_coords-ca3dd72b-e5af-4099-b593-89dc97717718': ('tcp://127.0.0.1:59961',), 'getattr-c0af8992-e928-4d42-9e64-340303143454': ('tcp://127.0.0.1:42989',), 'drop_coords-8cdfe5fb-7a29-4606-8692-efa747be5bc1': ('tcp://127.0.0.1:35445',), 'getattr-03669206-0d26-46a1-988d-690fe830e52f': 
...

Full error listing here:
https://gist.github.com/rsignell-usgs/3b7101966b8c6d05f48a0e01695f35d6

Does this help? I'd be happy to screenshare if that would be useful.

@rsignell-usgs
Copy link

@rabernat , to answer your question, if I open just two files:

ds = xr.open_mfdataset(files[:2], preprocess=drop_coords, autoclose=True, parallel=True)

the resulting dataset is:

<xarray.Dataset>
Dimensions:         (feature_id: 2729077, reference_time: 1, time: 2)
Coordinates:
  * reference_time  (reference_time) datetime64[ns] 2009-01-01
  * feature_id      (feature_id) int32 101 179 181 ... 1180001803 1180001804
  * time            (time) datetime64[ns] 2009-01-01 2009-01-01T01:00:00
Data variables:
    streamflow      (time, feature_id) float64 dask.array<shape=(2, 2729077), chunksize=(1, 2729077)>
    q_lateral       (time, feature_id) float64 dask.array<shape=(2, 2729077), chunksize=(1, 2729077)>
    velocity        (time, feature_id) float64 dask.array<shape=(2, 2729077), chunksize=(1, 2729077)>
    qSfcLatRunoff   (time, feature_id) float64 dask.array<shape=(2, 2729077), chunksize=(1, 2729077)>
    qBucket         (time, feature_id) float64 dask.array<shape=(2, 2729077), chunksize=(1, 2729077)>
    qBtmVertRunoff  (time, feature_id) float64 dask.array<shape=(2, 2729077), chunksize=(1, 2729077)>
Attributes:
    featureType:                timeSeries
    proj4:                      +proj=longlat +datum=NAD83 +no_defs
    model_initialization_time:  2009-01-01_00:00:00
    station_dimension:          feature_id
    model_output_valid_time:    2009-01-01_00:00:00
    stream_order_output:        1
    cdm_datatype:               Station
    esri_pe_string:             GEOGCS[GCS_North_American_1983,DATUM[D_North_...
    Conventions:                CF-1.6
    model_version:              NWM 1.2
    dev_OVRTSWCRT:              1
    dev_NOAH_TIMESTEP:          3600
    dev_channel_only:           0
    dev_channelBucket_only:     0
    dev:                        dev_ prefix indicates development/internal me...

@TomAugspurger
Copy link
Contributor

@rsignell-usgs very helpful, thanks. I'd noticed that there was a pause after the open_dataset tasks finish, indicating that either the scheduler or (more likely) the client was doing work rather than the cluster. Most likely @rabernat's guess

In open_mfdataset, all of the dimensions and coordinates of the individual files have to be checked and verified to be compatible. That is often the source of slow performance with open_mfdataset.

is correct. Verifying all that now, and looking into if / how that can be done on the workers.

@rsignell-usgs
Copy link

rsignell-usgs commented Jul 8, 2019

@TomAugspurger , I thought @rabernat's suggestion of implementing

def drop_coords(ds):
    return ds.reset_coords(drop=True)

would avoid this checking. Did I understand or implement this incorrectly?

@rsignell-usgs
Copy link

rsignell-usgs commented Jul 10, 2019

@TomAugspurger , I sat down here at Scipy with @rabernat and he instantly realized that we needed to drop the feature_id coordinate to prevent open_mfdataset from trying to harmonize that coordinate from all the chunks.

So if I use this code, the open_mfdataset command finishes:

def drop_coords(ds):
    ds = ds.drop(['reference_time','feature_id'])
    return ds.reset_coords(drop=True)

and I can then add back in the dropped coordinate values at the end:

dsets = [xr.open_dataset(f) for f in files[:3]]
ds.coords['feature_id'] = dsets[0].coords['feature_id']

I'm now running into memory issues when I write the zarr data -- but I should raise that as a new issue, right?

@TomAugspurger
Copy link
Contributor

TomAugspurger commented Jul 10, 2019 via email

@rabernat
Copy link
Contributor

I believe that the memory issue is basically the same as dask/distributed#2602.

The graphs look like: read --> rechunk --> write.

Reading and rechunking increase memory consumption. Writing relieves it. In Rich's case, the workers just load too much data before they write it. Eventually they run out of memory.

@TomAugspurger
Copy link
Contributor

TomAugspurger commented Jul 10, 2019 via email

@rafa-guedes
Copy link
Contributor

rafa-guedes commented Jul 18, 2019

Hi guys, I'm having some issue that looks similar to @rsignell-usgs. Trying to open 413 netcdf files using open_mfdataset with parallel=True. The dataset (successfully opened with parallel=False) has ~300G on disk and looks like:

In [1] import xarray as xr

In [2]: dset = xr.open_mfdataset("./bom-ww3/bom-ww3_*.nc", chunks={'time': 744, 'latitude': 100, 'longitude': 100}, parallel=False)

In [3]: dset
Out[3]:
<xarray.Dataset>
Dimensions:    (latitude: 190, longitude: 289, time: 302092)
Coordinates:
  * longitude  (longitude) float32 70.0 70.4 70.8 71.2 ... 184.4 184.8 185.2
  * latitude   (latitude) float32 -55.6 -55.2 -54.8 -54.4 ... 19.2 19.6 20.0
  * time       (time) datetime64[ns] 1979-01-01 ... 2013-05-31T23:00:00.000013440
Data variables:
    hs         (time, latitude, longitude) float32 dask.array<shape=(302092, 190, 289), chunksize=(745, 100, 100)>
    fp         (time, latitude, longitude) float32 dask.array<shape=(302092, 190, 289), chunksize=(745, 100, 100)>
    dp         (time, latitude, longitude) float32 dask.array<shape=(302092, 190, 289), chunksize=(745, 100, 100)>
    wl         (time, latitude, longitude) float32 dask.array<shape=(302092, 190, 289), chunksize=(745, 100, 100)>
    U10        (time, latitude, longitude) float32 dask.array<shape=(302092, 190, 289), chunksize=(745, 100, 100)>
    V10        (time, latitude, longitude) float32 dask.array<shape=(302092, 190, 289), chunksize=(745, 100, 100)>
    hs1        (time, latitude, longitude) float32 dask.array<shape=(302092, 190, 289), chunksize=(745, 100, 100)>
    hs2        (time, latitude, longitude) float32 dask.array<shape=(302092, 190, 289), chunksize=(745, 100, 100)>
    tp1        (time, latitude, longitude) float32 dask.array<shape=(302092, 190, 289), chunksize=(745, 100, 100)>
    tp2        (time, latitude, longitude) float32 dask.array<shape=(302092, 190, 289), chunksize=(745, 100, 100)>
    lp0        (time, latitude, longitude) float32 dask.array<shape=(302092, 190, 289), chunksize=(745, 100, 100)>
    lp1        (time, latitude, longitude) float32 dask.array<shape=(302092, 190, 289), chunksize=(745, 100, 100)>
    lp2        (time, latitude, longitude) float32 dask.array<shape=(302092, 190, 289), chunksize=(745, 100, 100)>
    th0        (time, latitude, longitude) float32 dask.array<shape=(302092, 190, 289), chunksize=(745, 100, 100)>
    th1        (time, latitude, longitude) float32 dask.array<shape=(302092, 190, 289), chunksize=(745, 100, 100)>
    th2        (time, latitude, longitude) float32 dask.array<shape=(302092, 190, 289), chunksize=(745, 100, 100)>
    hs0        (time, latitude, longitude) float32 dask.array<shape=(302092, 190, 289), chunksize=(745, 100, 100)>
    tp0        (time, latitude, longitude) float32 dask.array<shape=(302092, 190, 289), chunksize=(745, 100, 100)>

Trying to read it on a standard python session gives me core dumped:

In [1]: import xarray as xr

In [2]: dset = xr.open_mfdataset("./bom-ww3/bom-ww3_*.nc", chunks={'time': 744, 'latitude': 100, 'longitude': 100}, parallel=True)
Bus error (core dumped)

Trying to read it on a dask cluster I get:

In [1]: from dask.distributed import Client

In [2]: import xarray as xr

In [3]: client = Client()

In [4]: dset = xr.open_mfdataset("./bom-ww3/bom-ww3_*.nc", chunks={'time': 744, 'latitude': 100, 'longitud
   ...: e': 100}, parallel=True)
free(): double free detected in tcache 2free(): double free detected in tcache 2

free(): double free detected in tcache 2
distributed.nanny - WARNING - Worker process 18744 was killed by signal 11
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker process 18740 was killed by signal 6
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker process 18742 was killed by signal 7
distributed.nanny - WARNING - Worker process 18738 was killed by signal 6
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
free(): double free detected in tcache 2munmap_chunk(): invalid pointer

free(): double free detected in tcache 2
free(): double free detected in tcache 2
distributed.nanny - WARNING - Worker process 19082 was killed by signal 6
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker process 19073 was killed by signal 6
distributed.nanny - WARNING - Restarting worker
---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
<ipython-input-4-740561b80fec> in <module>()
----> 1 dset = xr.open_mfdataset("./bom-ww3/bom-ww3_*.nc", chunks={'time': 744, 'latitude': 100, 'longitude': 100}, parallel=True)

/usr/local/lib/python3.7/dist-packages/xarray/backends/api.py in open_mfdataset(paths, chunks, concat_dim, compat, preprocess, engine, lock, data_vars, coords, combine, autoclose, parallel, **kwargs)
    772         # calling compute here will return the datasets/file_objs lists,
    773         # the underlying datasets will still be stored as dask arrays
--> 774         datasets, file_objs = dask.compute(datasets, file_objs)
    775 
    776     # Combine all datasets, closing them in case of a ValueError

/usr/local/lib/python3.7/dist-packages/dask/base.py in compute(*args, **kwargs)
    444     keys = [x.__dask_keys__() for x in collections]
    445     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 446     results = schedule(dsk, keys, **kwargs)
    447     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    448 

/home/oceanum/.local/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2525                     should_rejoin = False
   2526             try:
-> 2527                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2528             finally:
   2529                 for f in futures.values():

/home/oceanum/.local/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1821                 direct=direct,
   1822                 local_worker=local_worker,
-> 1823                 asynchronous=asynchronous,
   1824             )
   1825 

/home/oceanum/.local/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    761         else:
    762             return sync(
--> 763                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    764             )
    765 

/home/oceanum/.local/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    330             e.wait(10)
    331     if error[0]:
--> 332         six.reraise(*error[0])
    333     else:
    334         return result[0]

/usr/lib/python3/dist-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

/home/oceanum/.local/lib/python3.7/site-packages/distributed/utils.py in f()
    315             if callback_timeout is not None:
    316                 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 317             result[0] = yield future
    318         except Exception as exc:
    319             error[0] = sys.exc_info()

/home/oceanum/.local/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

/home/oceanum/.local/lib/python3.7/site-packages/tornado/gen.py in run(self)
    740                     if exc_info is not None:
    741                         try:
--> 742                             yielded = self.gen.throw(*exc_info)  # type: ignore
    743                         finally:
    744                             # Break up a reference to itself

/home/oceanum/.local/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1678                             exc = CancelledError(key)
   1679                         else:
-> 1680                             six.reraise(type(exception), exception, traceback)
   1681                         raise exc
   1682                     if errors == "skip":

/usr/lib/python3/dist-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

KilledWorker: ('open_dataset-e7916acb-6d9f-4532-ab76-5b9c1b1a39c2', <Worker 'tcp://10.240.0.5:36019', memory: 0, processing: 63>)

Is there anything obviously wrong I'm trying here please?

@dcherian dcherian mentioned this issue Aug 1, 2019
3 tasks
@dcherian
Copy link
Contributor

I think this is stale now. See https://xarray.pydata.org/en/stable/io.html#reading-multi-file-datasets for latest guidance on reading such datasets. Please open a new issue if you are still having trouble with open_mfdataset

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants