### **Using SNAP with ArgoWorkflows**

This notebook will show how to write INCA data to an exising zarr store. 

**First some imports and global settings**

In [None]:
from dotenv import load_dotenv
import os
from hera.workflows import models, CronWorkflow, script, Artifact, Parameter, DAG, Steps, Step, NoneArchiveStrategy, Workflow
from hera.shared import global_config

global_config.host = "https://dev.services.eodc.eu/workflows/"
global_config.namespace = "<YOUR NAMESPACE>"
global_config.token = "<YOUR TOKEN>"
global_config.image = "ghcr.io/eodcgmbh/cluster_image:2025.2.0"

**Setting up Volume**

We need to set up our the connection to the volume we want to write to. 

In [None]:
nfs_volume = models.Volume(
    name="eodc-mount",
    persistent_volume_claim={"claimName": "eodc-nfs-claim"},
    )

security_context = {"runAsUser": <YOUR UID>,
                    "runAsGroup": <YOUR GID>}

**Writing scripts**

For this example we will write INCA data downloaded from the Geosphere Datahub to an existing Zarr store on the EODC NFS.

First, we need to extent our zarr store, so we can write new data to it. INCA data is available hourly, so we will extend it to the time when running the workflow.

In [None]:
@script(volume_mounts=[models.VolumeMount(name="eodc-mount", mount_path="/eodc")])

def extend_time_dimension(store_path: str = "/eodc/products/eodc/geosphere_inca/INCA.zarr"):
    import datetime
    import numpy as np
    import zarr

    # In our zarr stores time coordinate, the origin is 2011-03-15, the time steps are set as hours between that
    # origin time and now. 
    now = datetime.datetime.now()
    now_np = np.datetime64(now).astype('datetime64[h]')
    origin = np.datetime64("2011-03-15T00:00:00").astype("datetime64[h]")

    # The new extent and shape for the time coordinate is set
    new_shape = int((now_np-origin).astype(int))
    new_extent = np.arange(0,new_shape,1)

    # Connect to the zarr store
    store = zarr.storage.LocalStore(store_path)
    group = zarr.group(store=store)

    # As not only the time coordinate, but also the data arrays have to be extended, they are filtered out in
    # this step
    array_names=set(group.array_keys())
    coords = {"time", "x", "y"}
    data_arrays = array_names-coords

    # The time coordinate as well as the data arrays are resized to the new shape.
    group["time"].resize(new_shape)
    for array in data_arrays:
        group_shape  = group[array].shape
        group[array].resize((new_shape, group_shape[1], group_shape[2]))

    # The metadata has to be consolidated to make sure it can be properly read.
    zarr.consolidate_metadata(store)

    # Fresh connect to the zarr store in order to write the updated time coordinates to it.
    store = zarr.storage.LocalStore(store_path)
    group = zarr.group(store=store)

    group["time"][:]=new_extent


Next we download data from the datahub and save it as an Artifact to pass it to the next step. There is seperate file for every month but the data is updated hourly, the files have the Year/Month in the name. As we will run the workflow every day, we will choose to get the the file with the Year/Month of the previous day to make sure that we choose the file from the previous month on the first of the next month, and fill the previous month with data completely. We also have eight parameters for the INCA data which are also part of the filename.

In [None]:
@script(outputs=Artifact(name="inca-file", path="/tmp/INCA.nc", archive=NoneArchiveStrategy()))

def inca_download(param: str):
    from urllib.request import urlretrieve
    import datetime

    ym = (datetime.date.today()-datetime.timedelta(days=1)).strftime("%Y%m")

    url = f"https://public.hub.geosphere.at/datahub/resources/inca-v1-1h-1km/filelisting/{param}/INCAL_HOURLY_{param}_{ym}.nc"
    urlretrieve(url, f"/tmp/INCA.nc")

Finally, we want to write the data from the netCDF file to the zarr store. As we want to update daily and only have files with monthly data we will overwrite existing data in the store, this is only feasible because the files are relatively small and processing is fast.

In [None]:
@script(inputs=Artifact(name="inca-file", path="/tmp/INCA.nc"),
        volume_mounts=[models.VolumeMount(name="eodc-mount", mount_path="/eodc")])

def inca_write(param: str, store_path: str="/eodc/private/tempearth/INCA.zarr"):
    import xarray as xr
    import numpy as np
    import zarr
    import pandas as pd

    artifact_path = f"/tmp/INCA.nc"

    # To make it easier when writing the data to the store we define a simple function to get the correct indices
    # of where to write to
    def get_idx(array1, array2):
        min_idx = np.where(array1 == array2[0])[0][0]
        max_idx = np.where(array1 == array2[-1])[0][0] + 1
        return min_idx, max_idx

    # Connect to the store
    store = zarr.storage.LocalStore(store_path)
    group = zarr.group(store=store)

    # Get relevant data from the store
    dtype = group[param].dtype
    fill_value = group[param].attrs.get('_FillValue')
    freq = group.attrs.get('freq')

    x_extent = group["x"][:]
    y_extent = group["y"][:]
    origin = xr.open_zarr(store_path).time[0].values

    # Load the geosphere data
    data = xr.open_dataset(artifact_path, mask_and_scale=False).load()

    # Get the indices of where to write to 
    x_min, x_max = get_idx(x_extent, data["x"].values)
    y_min, y_max = get_idx(y_extent, data["y"].values)

    time_min, time_max = data.time.values[0].astype("datetime64[h]"), data.time.values[-1].astype("datetime64[h]") + 1
    time_delta_min, time_delta_max = (time_min - origin).astype("int64"), (time_max - origin).astype("int64")

    # In case the data from the netCDF file is incomplete we need to fill missing time steps with noData values
    # in order to avoid potential errors when writing to the zarr store.
    full_range = pd.date_range(time_min, time_max, freq=freq).values.astype("datetime64[ns]")

    for value in data.time.values:
        if value in set(full_range):
            continue
        else:
            empty_array = np.full((full_range.shape[0], data["x"].values.shape[0], data["y"].values.shape[0]),
                                fill_value=fill_value, dtype=dtype)

            template = xr.Dataset({f"{param}": (("time", "x", "y"), empty_array)},
                                  coords={
                                    "time": full_range,
                                    "x": data["x"].values,
                                    "y": data["y"].values
                                  }
                                  )

            data = data.combine_first(template)
            break

    # Finally we can write to the store.
    group[param][time_delta_min:time_delta_max, y_min:y_max, x_min:x_max] = data[param].values

**Creating the Workflow**

As already mentioned we have eight parameters for which we want the writing to the store be done in parallel. Also we want the workflow to run daily.

Steps act the same way as DAGs, with the difference of running in the flow the are written in. The DAG specified here acts as a template which is then used in the Steps.

In [None]:
inca_parameters = ["RR", "T2M", "TD2M", "P0", "UU", "VV", "RH2M", "GL"]

# At first a CronWorkflow is created with the necessary inputs.
with CronWorkflow(
    generate_name="inca-zarr-",
    schedule="0 6 * * *",
    volumes = [nfs_volume],
    security_context=security_context,
    entrypoint="workflow"
) as w:
    
    # Secondly, a DAG is defined which shall be executed for each parameter. The inputs are defined in the Steps below. So this DAG acts like a function being defined and executed in a different step.
    with DAG(name="pipeline", inputs=[Parameter(name="args")]) as pipeline:
        
        # The arguments for the scripts are passed as a dictionary, wherease the arguments for the 'param' parameter are taken from the input of the DAG
        download = inca_download(arguments={"param":"{{inputs.parameters.args}}"},)

        # The Artifact written with download also has to be given to the function.
        process = inca_write(arguments=[{"param": "{{inputs.parameters.args}}"}, 
                                        download.get_artifact("inca-file").with_name("inca-file")],)

        # Here the sequence of the steps is defined
        download >> process

    # As we defined "workflow" as the entrypoint in the CronWorkflow this part gets executed first, in contrast to DAGs Steps will be executed in the order they are in
    with Steps(name="workflow"):
        # First the time dimension is extended in the zarr store
        extend_time_dimension()

        # Now the DAG is executed, it is used as a template, passing the inca_parameters as with_param and using "{{item}}" in arguments the DAG will be executed parallel for each parameter.
        Step(name="parallel-pipelines", template=pipeline, with_param=inca_parameters, arguments={"args":"{{item}}"})

**Submitting the Workflow**

In [None]:
w.create()