# **ATL06 to ATL11**

Converting the ICESat-2 ATL06 (Land Ice Height) product to ATL11 (Land Ice Height Changes).
Also convert the ATL11 file format from HDF5 to [Zarr](https://zarr.readthedocs.io/).

In [1]:
import os
import glob
import sys
import subprocess

import dask
import dask.distributed
import h5py
import intake
import itertools
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pyproj
import tqdm
import xarray as xr
import zarr

os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"

In [2]:
client = dask.distributed.Client(n_workers=64, threads_per_worker=1)
client

0,1
Client  Scheduler: tcp://127.0.0.1:33217  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 64  Cores: 64  Memory: 201.22 GB


In [3]:
def first_last_cycle_numbers(referencegroundtrack: int, orbitalsegment: int):
    """
    Obtain the first and last cycle numbers for an ATL06 track, given the
    reference ground track and orbital segment number as input.
    """
    files = glob.glob(
        f"ATL06.003/**/ATL06*_*_{referencegroundtrack:04d}??{orbitalsegment:02d}_*.h5"
    )

    first_cycle = min(files)[-14:-12]  # e.g. '02'
    last_cycle = max(files)[-14:-12]  # e.g. '07'

    return first_cycle, last_cycle

In [4]:
# Create ATL06_to_ATL11 processing script, if not already present
if not os.path.exists("ATL06_to_ATL11_Antarctica.sh"):
    # find first and last cycles for each reference ground track and each orbital segment
    futures = []
    for referencegroundtrack in range(1387, 0, -1):
        for orbitalsegment in [10, 11, 12]:  # loop through Antarctic orbital segments
            cyclenums = client.submit(
                first_last_cycle_numbers,
                referencegroundtrack,
                orbitalsegment,
                key=f"{referencegroundtrack:04d}-{orbitalsegment}",
            )
            futures.append(cyclenums)

    # Prepare string to write into ATL06_to_ATL11_Antarctica.sh bash script
    writelines = []
    for f in tqdm.tqdm(
        iterable=dask.distributed.as_completed(futures=futures), total=len(futures)
    ):
        referencegroundtrack, orbitalsegment = f.key.split("-")
        first_cycle, last_cycle = f.result()
        writelines.append(
            f"python3 ATL11/ATL06_to_ATL11.py"
            f" {referencegroundtrack} {orbitalsegment}"
            f" --cycles {first_cycle} {last_cycle}"
            f" --Release 3"
            f" --directory 'ATL06.003/**/'"
            f" --out_dir ATL11.001\n"
        )
    writelines.sort()  # sort writelines in place

    # Finally create the bash script
    with open(file="ATL06_to_ATL11_Antarctica.sh", mode="w") as f:
        f.writelines(writelines)

100%|██████████| 4161/4161 [02:29<00:00, 27.75it/s]


Now use [GNU parallel](https://www.gnu.org/software/parallel/parallel_tutorial.html) to run the script in parallel.
Will take about 1 week to run on 64 cores.

Reference:

- O. Tange (2018): GNU Parallel 2018, Mar 2018, ISBN 9781387509881, DOI https://doi.org/10.5281/zenodo.1146014

In [5]:
# !PYTHONPATH=`pwd` PYTHONWARNINGS="ignore" parallel -a ATL06_to_ATL11_Antarctica.sh --bar --resume-failed --results logdir --joblog log --jobs 64 > /dev/null

[7m100% 4161:0=0s python3 ATL11/ATL06_to_ATL11.py 1387 12 --cycles 01 06 --Release [0m[0m


In [6]:
# df_log = pd.read_csv(filepath_or_buffer="log", sep="\t")
# df_log.query(expr="Exitval > 0")

## Convert from HDF5 to Zarr format

For faster data access speeds!
We'll collect the data for each Reference Ground Track,
and store it inside a Zarr format.

Grouping hierarchy:
  - Reference Ground Track (1-1387)
    - Orbital Segments (10, 11, 12)
      - Laser Pairs (pt1, pt2, pt3)
        - Attributes (longitude, latitude, h_corr, delta_time, etc)

In [7]:
# for atl11file in tqdm.tqdm(iterable=sorted(glob.glob("ATL11.001/*.h5"))):
#     name = os.path.basename(p=os.path.splitext(p=atl11file)[0])

max_cycles: int = max([int(f[-12:-11]) for f in glob.glob("ATL11.001/*.h5")])
print(f"{max_cycles} ICESat-2 cycles available")

7 ICESat-2 cycles available


In [8]:
@dask.delayed
def open_ATL11(atl11file: str, group: str) -> xr.Dataset:
    """
    Opens up an ATL11 file using xarray and does some light pre-processing:
    - Mask values using _FillValue ??
    - Convert attribute format from binary to str
    - Rename 'quality_summary' to f'quality_summary_{subgroup}'
    """
    pair, subgroup = group.split("/")
    ds = xr.open_dataset(
        filename_or_obj=atl11file,
        group=f"{pair}/{subgroup}",
        engine="h5netcdf",
        mask_and_scale=True,
    )

    # Change xarray.Dataset attributes from binary to str type
    # fixes issue when saving to Zarr format later
    # TypeError: Object of type bytes is not JSON serializable
    for key, variable in ds.variables.items():
        assert isinstance(ds[key].DIMENSION_LABELS, np.ndarray)
        ds[key].attrs["DIMENSION_LABELS"] = (
            ds[key].attrs["DIMENSION_LABELS"].astype(str)
        )

    # Rename quality_summary variable to avoid name class when merging
    ds = ds.rename({"quality_summary": f"quality_summary_{subgroup}"})

    # Convert variables to correct datatype, except for delta_time
    for variable in list(ds.variables):
        current_dtype = ds[variable].dtype
        if "h_corr" in variable:  # cast height variables from float64 to float32
            desired_dtype = "float32"
        else:
            desired_dtype = ds[variable].datatype.lower()
        if current_dtype != desired_dtype:
            if variable != "delta_time":
                # print(variable, current_dtype, desired_dtype)
                try:
                    ds[variable].data = ds[variable].data.astype(dtype=desired_dtype)
                except ValueError:  # for coordinate variables (e.g. ref_pt)
                    tmp_attrs = ds[variable].attrs
                    ds[variable] = ds[variable].astype(desired_dtype)
                    ds[variable].attrs = tmp_attrs

    return ds

In [9]:
# Consolidate together Antarctic orbital segments 10, 11, 12 into one file
# Also consolidate all three laser pairs pt1, pt2, pt3 into one file
atl11_dict = {}
for rgt in tqdm.trange(1387):
    atl11files: list = glob.glob(f"ATL11.001/ATL11_{rgt+1:04d}1?_????_??_v00?.h5")

    # Manually handle exceptional cases
    try:
        assert len(atl11files) == 3  # Should be 3 files for Orbital Segments 10,11,12
    except AssertionError:
        if len(atl11files) == 2 and rgt + 1 in [208, 1036]:
            pass
        else:
            raise
    # Note ["ATL11.001/ATL11_014512_0206_03_v001.h5"] is missing pt2 and pt3 groups

    if atl11files:
        pattern: dict = intake.source.utils.reverse_format(
            format_string="ATL11.001/ATL11_{referencegroundtrack:4}{orbitalsegment:2}_{cycles:4}_{revision:2}_v{version:3}.h5",
            resolved_string=sorted(atl11files)[1],  # get the '11' one, not '10' or '12'
        )
        zarrfilepath: str = "ATL11.001z123/ATL11_{referencegroundtrack}1x_{cycles}_{revision}_v{version}.zarr".format(
            **pattern
        )
        atl11_dict[zarrfilepath] = atl11files

100%|██████████| 1387/1387 [00:09<00:00, 142.77it/s]


In [10]:
# Gather up all the dask.delayed conversion tasks to store data into Zarr!
stores = []
for zarrfilepath, atl11files in tqdm.tqdm(iterable=atl11_dict.items()):
    zarr.open(store=zarrfilepath, mode="w")  # Make a new file/overwrite existing
    datasets = []
    for atl11file in atl11files:  # Orbital Segments: 10, 11, 12
        for pair in ("pt1", "pt2", "pt3"):  # Laser pairs: pt1, pt2, pt3
            # Attributes: longitude, latitude, h_corr, delta_time, etc
            corrected_height_ds = open_ATL11(
                atl11file=atl11file, group=f"{pair}/corrected_h"
            )
            reference_surface_ds = open_ATL11(
                atl11file=atl11file, group=f"{pair}/ref_surf"
            )
            ds = dask.delayed(obj=xr.combine_by_coords)(
                datasets=[corrected_height_ds, reference_surface_ds]
            )

            # Special exceptions to skip over
            if atl11file in ("ATL11.001/ATL11_014512_0206_03_v001.h5",) and pair in (
                "pt2",
                "pt3",
            ):
                continue
                # print(atl11file, pair)
                # xr.open_dataset(
                #    atl11file, engine="h5netcdf", group=pair,
                # )  # will fail as is missing
            datasets.append(ds)

    dataset = dask.delayed(obj=xr.concat)(objs=datasets, dim="ref_pt")
    store_task = dataset.to_zarr(store=zarrfilepath, mode="w", consolidated=True)
    stores.append(store_task)

100%|██████████| 1387/1387 [00:33<00:00, 41.51it/s]


In [11]:
# Do all the HDF5 to Zarr conversion! Should take an hour or so to run
# Check conversion progress here, https://stackoverflow.com/a/37901797/6611055
futures = [client.compute(store_task) for store_task in stores]
for f in tqdm.tqdm(
    iterable=dask.distributed.as_completed(futures=futures), total=len(stores)
):
    pass

100%|██████████| 1387/1387 [55:53<00:00,  2.42s/it]  


In [12]:
ds = xr.open_dataset(zarrfilepath, engine="zarr", backend_kwargs={"consolidated": True})
ds.h_corr.__array__().shape

(185215, 6)