In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from __future__ import annotations

import time
import shutil
import psutil
import logging
import numpy as np
import xarray as xr
from pathlib import Path
from dask.distributed import Client

from opera_tropo.log.loggin_setup import log_runtime 
from opera_tropo.core import calculate_ztd
from opera_tropo._pack import pack_ztd
from opera_tropo.checks import validate_input
from opera_tropo.utils import round_mantissa_xr, get_chunks_indices
from opera_tropo.product_info import TROPO_PRODUCTS

try:
    from RAiDER.models.model_levels import A_137_HRES, LEVELS_137_HEIGHTS
except ImportError as e:
    print(f"RAiDER is not properly installed or accessible. Error: {e}")

In [3]:
process = psutil.Process()
print(f'MEM: {process.memory_info().rss / 1e9:.2f} GB')

work_dir = Path('/u/aurora-r0/govorcin/01_OPERA/TROPO/interface/data')

ds = xr.open_dataset(work_dir / 'ECMWF_TROP_202402151200_202402151200_1.nc',  
                     chunks={'level':-1},
                     mask_and_scale=True)

MEM: 0.26 GB


In [4]:
process = psutil.Process()
print(f'MEM: {process.memory_info().rss / 1e9:.2f} GB')

client = Client(
    n_workers=4,
    threads_per_worker=2,
    memory_limit='15GB',
    local_directory=work_dir,
)
client.dashboard_link

MEM: 0.29 GB


'http://127.0.0.1:8787/status'

In [5]:
chunks_ix = get_chunks_indices(ds)
all_ch = {'time' : slice(None, None, None),
          'level' : slice(None, None, None),
          'longitude': slice(None, None, None),
          'latitude': slice(None, None, None)}
len(chunks_ix)

25

In [6]:
chunks_ix[0]

{'time': slice(None, None, None),
 'level': slice(None, None, None),
 'latitude': slice(0, 512, None),
 'longitude': slice(0, 1024, None)}

In [7]:
ds_subset = ds.isel(all_ch)

In [8]:
ds_subset

Unnamed: 0,Array,Chunk
Bytes,6.69 GiB,274.00 MiB
Shape,"(1, 137, 2560, 5120)","(1, 137, 512, 1024)"
Dask graph,25 chunks in 2 graph layers,25 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 6.69 GiB 274.00 MiB Shape (1, 137, 2560, 5120) (1, 137, 512, 1024) Dask graph 25 chunks in 2 graph layers Data type float32 numpy.ndarray",1  1  5120  2560  137,

Unnamed: 0,Array,Chunk
Bytes,6.69 GiB,274.00 MiB
Shape,"(1, 137, 2560, 5120)","(1, 137, 512, 1024)"
Dask graph,25 chunks in 2 graph layers,25 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,6.69 GiB,274.00 MiB
Shape,"(1, 137, 2560, 5120)","(1, 137, 512, 1024)"
Dask graph,25 chunks in 2 graph layers,25 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 6.69 GiB 274.00 MiB Shape (1, 137, 2560, 5120) (1, 137, 512, 1024) Dask graph 25 chunks in 2 graph layers Data type float32 numpy.ndarray",1  1  5120  2560  137,

Unnamed: 0,Array,Chunk
Bytes,6.69 GiB,274.00 MiB
Shape,"(1, 137, 2560, 5120)","(1, 137, 512, 1024)"
Dask graph,25 chunks in 2 graph layers,25 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,6.69 GiB,274.00 MiB
Shape,"(1, 137, 2560, 5120)","(1, 137, 512, 1024)"
Dask graph,25 chunks in 2 graph layers,25 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 6.69 GiB 274.00 MiB Shape (1, 137, 2560, 5120) (1, 137, 512, 1024) Dask graph 25 chunks in 2 graph layers Data type float32 numpy.ndarray",1  1  5120  2560  137,

Unnamed: 0,Array,Chunk
Bytes,6.69 GiB,274.00 MiB
Shape,"(1, 137, 2560, 5120)","(1, 137, 512, 1024)"
Dask graph,25 chunks in 2 graph layers,25 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,6.69 GiB,274.00 MiB
Shape,"(1, 137, 2560, 5120)","(1, 137, 512, 1024)"
Dask graph,25 chunks in 2 graph layers,25 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 6.69 GiB 274.00 MiB Shape (1, 137, 2560, 5120) (1, 137, 512, 1024) Dask graph 25 chunks in 2 graph layers Data type float32 numpy.ndarray",1  1  5120  2560  137,

Unnamed: 0,Array,Chunk
Bytes,6.69 GiB,274.00 MiB
Shape,"(1, 137, 2560, 5120)","(1, 137, 512, 1024)"
Dask graph,25 chunks in 2 graph layers,25 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [9]:
chunk_size : [int, int] = [128, 128]
out_chunk_size : [int, int, int] = [8, 256, 258]

if chunk_size is None:
     # Keep native chunks but make sure that
     # Map block runs over all levels
     ds_subset = ds_subset.chunk({'level':-1})
else:
    chunks = {
        'longitude': chunk_size[1], 
        'latitude': chunk_size[0],
        'time': 1, 
        'level': len(A_137_HRES) - 1
    }
    ds_subset = ds_subset.chunk(chunks)

In [10]:
chunksizes = {key: value[0] for key, value in ds_subset.chunksizes.items()}
print(f'Chunk sizes: {chunksizes}')
out_heights = None
# Get output size
cols = ds_subset.sizes.get('latitude')
rows = ds_subset.sizes.get('longitude')

if out_heights is not None and len(out_heights) > 0:
    zlevels = np.array(out_heights)
else:
    zlevels = np.flipud(LEVELS_137_HEIGHTS)

Chunk sizes: {'time': 1, 'level': 137, 'latitude': 128, 'longitude': 128}


In [11]:
out_size = np.empty((cols, rows, len(zlevels)),
                        dtype=np.float32)

# To skip interpolation if out_heights are same as default
if np.array_equal(out_heights, np.flipud(LEVELS_137_HEIGHTS)):
    out_heights = None

In [50]:
out_size.height>80e3

AttributeError: 'numpy.ndarray' object has no attribute 'height'

In [13]:
# Get output template
template = pack_ztd(
    wet_ztd=out_size, 
    hydrostatic_ztd=out_size,
    lons=ds_subset.longitude.values, 
    lats=ds_subset.latitude.values,
    zs=zlevels, 
    model_time=ds_subset.time.values,
    chunk_size={"longitude": chunksizes['longitude'],
                "latitude": chunksizes['latitude'],
                "height": -1, "time": 1},
    keep_bits=False)

In [14]:
%%time
process = psutil.Process()
print(f'MEM: {process.memory_info().rss / 1e9:.2f} GB')
template

MEM: 15.53 GB
CPU times: user 494 μs, sys: 141 μs, total: 635 μs
Wall time: 488 μs


Unnamed: 0,Array,Chunk
Bytes,7.08 GiB,9.06 MiB
Shape,"(1, 145, 2560, 5120)","(1, 145, 128, 128)"
Dask graph,800 chunks in 1 graph layer,800 chunks in 1 graph layer
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 7.08 GiB 9.06 MiB Shape (1, 145, 2560, 5120) (1, 145, 128, 128) Dask graph 800 chunks in 1 graph layer Data type float32 numpy.ndarray",1  1  5120  2560  145,

Unnamed: 0,Array,Chunk
Bytes,7.08 GiB,9.06 MiB
Shape,"(1, 145, 2560, 5120)","(1, 145, 128, 128)"
Dask graph,800 chunks in 1 graph layer,800 chunks in 1 graph layer
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.08 GiB,9.06 MiB
Shape,"(1, 145, 2560, 5120)","(1, 145, 128, 128)"
Dask graph,800 chunks in 1 graph layer,800 chunks in 1 graph layer
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 7.08 GiB 9.06 MiB Shape (1, 145, 2560, 5120) (1, 145, 128, 128) Dask graph 800 chunks in 1 graph layer Data type float32 numpy.ndarray",1  1  5120  2560  145,

Unnamed: 0,Array,Chunk
Bytes,7.08 GiB,9.06 MiB
Shape,"(1, 145, 2560, 5120)","(1, 145, 128, 128)"
Dask graph,800 chunks in 1 graph layer,800 chunks in 1 graph layer
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [15]:
%%time
process = psutil.Process()
print(f'MEM: {process.memory_info().rss / 1e9:.2f} GB')
out_ds = ds_subset.map_blocks(calculate_ztd,
                kwargs={'out_heights': out_heights}, 
                        template=template).compute()

MEM: 15.53 GB
CPU times: user 1min 44s, sys: 1min 25s, total: 3min 10s
Wall time: 14min 43s


In [16]:
%%time
process = psutil.Process()
print(f'MEM: {process.memory_info().rss / 1e9:.2f} GB')
out_ds

MEM: 41.70 GB
CPU times: user 260 μs, sys: 167 μs, total: 427 μs
Wall time: 389 μs


In [16]:
%%time
calculate_ztd(ds_subset)

CPU times: user 1min 35s, sys: 1min 42s, total: 3min 18s
Wall time: 3min 10s


In [34]:
# NOTE dropping heights above 30km lower storage from 2.1GB to 1.7GB, above 20km = 1.5GB
compression_options = {
        "zlib": True,
        "complevel": 4,
        "shuffle": True,
        "grid_mapping": 'spatial_ref',
        "chunksizes" : [1, 8, 512, 512]
    }

In [35]:
encoding = {var: compression_options for var in out_ds.data_vars}

In [43]:
np.count_nonzero(out_ds.height > 25e3)

38

In [57]:
out_ds.height.sel(height=slice(None, 81e3)).height.min()

In [48]:
%%time
out_ds.sel(height=slice(-600, 20e3)).to_netcdf(work_dir / 'test3.nc', encoding=encoding, mode='w')

CPU times: user 2min 16s, sys: 7.3 s, total: 2min 23s
Wall time: 2min 14s


In [44]:
ds1 = xr.open_dataset(work_dir / 'test3.nc', chunks={})
ds1

Unnamed: 0,Array,Chunk
Bytes,5.62 GiB,8.00 MiB
Shape,"(1, 115, 2560, 5120)","(1, 8, 512, 512)"
Dask graph,750 chunks in 2 graph layers,750 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 5.62 GiB 8.00 MiB Shape (1, 115, 2560, 5120) (1, 8, 512, 512) Dask graph 750 chunks in 2 graph layers Data type float32 numpy.ndarray",1  1  5120  2560  115,

Unnamed: 0,Array,Chunk
Bytes,5.62 GiB,8.00 MiB
Shape,"(1, 115, 2560, 5120)","(1, 8, 512, 512)"
Dask graph,750 chunks in 2 graph layers,750 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,5.62 GiB,8.00 MiB
Shape,"(1, 115, 2560, 5120)","(1, 8, 512, 512)"
Dask graph,750 chunks in 2 graph layers,750 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 5.62 GiB 8.00 MiB Shape (1, 115, 2560, 5120) (1, 8, 512, 512) Dask graph 750 chunks in 2 graph layers Data type float32 numpy.ndarray",1  1  5120  2560  115,

Unnamed: 0,Array,Chunk
Bytes,5.62 GiB,8.00 MiB
Shape,"(1, 115, 2560, 5120)","(1, 8, 512, 512)"
Dask graph,750 chunks in 2 graph layers,750 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [46]:
ds1.close()

In [64]:
desired_order = ("time", "height", "longitude", "latitude")
ds1 = ds1.assign_coords({dim: ds1[dim] for dim in desired_order if dim in ds1})

In [31]:
del ds1

In [25]:
process = psutil.Process()
print(f'MEM: {process.memory_info().rss / 1e9:.2f} GB')

MEM: 2.22 GB


In [27]:
xr.decode_cf(out_ds)