This NB demonstrates the xcube **SMOS L2C data store `smos`**.

Inputs:

In [1]:
product_type = "SMOS-L2C-SM"
# product_type = "SMOS-L2C-OS"
time_range = "2022-01-01/2022-01-03"
interval = "1d"  # or None

Output:

In [2]:
target_path = f'smos-{time_range.replace("-", "").replace("/", "-")}-{interval.lower()}.zarr'
target_path

'smos-20220101-20220103-1d.zarr'

In [14]:
import json
import logging
import os
import shutil

from IPython.display import JSON
import numpy as np
import pandas as pd
import xarray as xr

from xcube.core.store import find_data_store_extensions
from xcube.core.store import get_data_store_params_schema
from xcube.core.store import new_data_store
from zappend.api import zappend

In [4]:
def get_time_ranges(time_range: str, interval: str | None):
    one_sec = pd.Timedelta("1s")
    one_day = pd.Timedelta("1d")

    start_date, stop_date = time_range.split("/", maxsplit=1)
    interval_td = pd.Timedelta(interval) if interval else one_day
    dates = pd.date_range(start_date, stop_date, freq=interval_td)

    def to_date_str(date):
        return date.strftime("%Y-%m-%d")

    return [(to_date_str(dates[i]), to_date_str(dates[i+1] - one_sec)) 
            for i in range(len(dates) - 1)]

In [5]:
time_ranges = get_time_ranges(time_range, interval)
time_ranges

[('2022-01-01', '2022-01-01'), ('2022-01-02', '2022-01-02')]

In [6]:
with open("creodias-credentials.json") as f:
    credentials = json.load(f)

In [7]:
store = new_data_store(
    'smos', 
    source_path="s3://EODATA", 
    source_storage_options=dict(
        endpoint_url="https://s3.cloudferro.com", 
        anon=False, 
        **credentials
    ),
    cache_path="nc_cache",
    xarray_kwargs=dict(
        engine="h5netcdf"
        #engine="netcdf4"
    )
)

In [8]:
JSON(store.list_data_ids())

<IPython.core.display.JSON object>

In [9]:
def generate_datasets(store, product_type, time_ranges, interval):
    
    logger = logging.getLogger("notebook")
    
    for time_range in time_ranges:        
        ds_iterator = store.open_data(
            product_type, 
            opener_id="dsiter:zarr:smos",
            time_range=time_range
        )
                
        if interval is None:
            # If we have no interval, we deliver the slices as-is.
            yield from ds_iterator
            
        temp_path = f"./temp-{'-'.join(time_range)}"
        if not os.path.exists(temp_path):
            os.mkdir(temp_path)
            
        num_datasets = len(ds_iterator)
            
        slice_paths = []
        for index, ds in enumerate(ds_iterator):
            slice_path = f"{temp_path}/slice-{index}.nc"
            logger.info(f"Writing slice %d of %d to %s", 
                        index + 1, num_datasets, slice_path)
            ds.to_netcdf(slice_path, mode="w")
            slice_paths.append(slice_path)
            
        ds = xr.open_mfdataset(slice_paths, 
                               combine="nested", 
                               concat_dim="time")

        ds_mean = ds.mean("time")
        
        # ds_mean has no time dimension, so we re-introduce it 
        ds_mean = ds_mean.expand_dims("time", axis=0)
        start, stop = pd.to_datetime(time_range)
        ds_mean.coords["time"] = xr.DataArray(
            np.array([start + (stop - start) / 2]), 
            dims="time", 
        )
        ds_mean.coords["time_bnds"] = xr.DataArray(
            np.array([[start, stop]]), 
            dims=("time", "bnds"),
        )
        
        # Align encoding and attributes
        for var_name, var in ds.variables.items():
            mean_var = ds_mean.get(var_name)
            if mean_var is not None:
                mean_var.encoding.update(var.encoding)
                mean_var.attrs.update(var.attrs)
            
        slice_path = temp_path + ".zarr"   
        logger.info(f"Writing mean slice to %s", slice_path)
        ds_mean.to_zarr(slice_path, mode="w", write_empty_chunks=False)
        
        ds_mean.close()
        ds_mean = None
        ds.close()
        ds = None
        
        logger.info(f"Removing temporary %s", temp_path)
        shutil.rmtree(temp_path, ignore_errors=True)

        # TODO: yield a slice source here, so that we can delete 
        #   the temporary slice_path after the slice has been 
        #   processed. See https://github.com/bcdev/zappend/issues/13
        yield slice_path        


In [10]:
# Test generate_datasets
#generator = generate_datasets(store, product_type, time_ranges, interval)
#ds_path = next(generator)
#ds_path

In [11]:
#with xr.open_dataset(ds_path) as ds:
#    display(ds)
#    display(ds.Soil_Moisture.plot.imshow())

In [12]:
zappend_config = {
    "target_dir": "./" + target_path, 
    
    "fixed_dims": {
        "lon": 8192,
        "lat": 4032
    },
    
    "append_dim": "time",
    
    "persist_mem_slices": False,
    
    "variables": {
        "*": {
            "encoding": {
                "chunks": [1, 4032 // 4, 8192 // 4]
            }
        },
        "time": {
            "encoding": {
                "chunks": [100]
            }
        },
        "time_bnds": {
            "encoding": {
                "chunks": [100, 2]
            }
        },
        "lat": {
            "encoding": {
                "chunks": [4032]
            }
        },
        "lon": {
            "encoding": {
                "chunks": [8192]
            }
        },
    },
    
    # Log to the console.
    # Note you could also configure the log output for dask here.
    "logging": {
        "version": 1,
        "formatters": {
            "normal": {
                "format": "%(asctime)s %(levelname)s %(message)s",
                "style": "%"
            }
        },
        "handlers": {
            "console": {
                "class": "logging.StreamHandler",
                "formatter": "normal"
            }
        },
        "loggers": {
            "zappend": {
                "level": "INFO",
                "handlers": ["console"]
            },
            "notebook": {
                "level": "INFO",
                "handlers": ["console"]
            },
            #"xcube-smos": {
            #    "level": "DEBUG",
            #    "handlers": ["console"]
            #}
        }
    }
}

In [13]:
generator = generate_datasets(store, product_type, time_ranges, interval)
zappend(generator, config=zappend_config)

2024-01-12 16:26:48,091 INFO Writing slice 1 of 29 to ./temp-2022-01-01-2022-01-01/slice-0.nc
2024-01-12 16:26:49,942 INFO Writing slice 2 of 29 to ./temp-2022-01-01-2022-01-01/slice-1.nc
2024-01-12 16:26:51,791 INFO Writing slice 3 of 29 to ./temp-2022-01-01-2022-01-01/slice-2.nc
2024-01-12 16:26:53,618 INFO Writing slice 4 of 29 to ./temp-2022-01-01-2022-01-01/slice-3.nc
2024-01-12 16:26:55,449 INFO Writing slice 5 of 29 to ./temp-2022-01-01-2022-01-01/slice-4.nc
2024-01-12 16:26:57,268 INFO Writing slice 6 of 29 to ./temp-2022-01-01-2022-01-01/slice-5.nc
2024-01-12 16:26:59,080 INFO Writing slice 7 of 29 to ./temp-2022-01-01-2022-01-01/slice-6.nc
2024-01-12 16:27:00,888 INFO Writing slice 8 of 29 to ./temp-2022-01-01-2022-01-01/slice-7.nc
2024-01-12 16:27:02,702 INFO Writing slice 9 of 29 to ./temp-2022-01-01-2022-01-01/slice-8.nc
2024-01-12 16:27:04,547 INFO Writing slice 10 of 29 to ./temp-2022-01-01-2022-01-01/slice-9.nc
2024-01-12 16:27:06,404 INFO Writing slice 11 of 29 to ./te