## LANDSLIDE MONITORING SCENARIO PIPELINE

In [None]:
import digitalhub as dh
PROJECT_NAME = "landslide-monitoring"
proj = dh.get_or_create_project(PROJECT_NAME)

### Download data from Sentinel 1 (Orbit Direction Ascending/Descending)

Register to the open data space copernicus(if not already) and get your credentials.

https://identity.dataspace.copernicus.eu/auth/realms/CDSE/login-actions/registration?client_id=cdse-public&tab_id=FIiRPJeoiX4

Log the credentials as project secret keys as shown below

In [None]:
# THIS NEED TO BE EXECUTED JUST ONCE
secret0 = proj.new_secret(name="CDSETOOL_ESA_USER", secret_value="esa_username")
secret1 = proj.new_secret(name="CDSETOOL_ESA_PASSWORD", secret_value="esa_password")

### Download Sentinel 1

In [None]:
function_s1 = proj.new_function("download_images_s1",kind="container",image="ghcr.io/tn-aixpa/sentinel-tools:0.14.6",command="python")

### Log artifact

Log the shape file 'Shapes_TN' which can be downloaded from the [WebGIS Portal](https://webgis.provincia.tn.it/) from https://siatservices.provincia.tn.it/idt/vector/p_TN_377793f1-1094-4e81-810e-403897418b23.zip. Unzip the files in a folder named 'Shapes_TN' and then log it


In [None]:
artifact_name='Shapes_TN'
src_path='Shapes_TN'
artifact_data = proj.log_artifact(name=artifact_name, kind="artifact", source=src_path)

Note that to invoke the operation on the platform, the data should be available as an artifact on the platform datalake.


In [None]:
artifact = proj.get_artifact("Shapes_TN")
artifact.key

The resulting datasets will be registered as the project artifact in the datalake under the name 'Shapes_TN'

Log the Map aritfact with three files (trentino_slope_map.tiff, trentino_aspect_map.tiff, and legend.qml). The files can be downloaded from the <a href="https://huggingface.co/datasets/lbergamasco/trentino-slope-map/tree/main">Huggingface repository</a>. Copy the three files inside a folder 'Map' and log it as project artifact

In [None]:
artifact_name='Map'
src_path='Map'
artifact_data = proj.log_artifact(name=artifact_name, kind="artifact", source=src_path)

Check if the artifact is created successfully.

In [None]:
artifact = proj.get_artifact("Map")
artifact.key

### Elaboration

In [None]:
function_rs = proj.new_function("elaborate",kind="container", image="ghcr.io/tn-aixpa/rs-landslide-monitoring:0.14.6", command="/bin/bash", code_src="launch.sh")

### Pipeline

In [None]:
%%writefile "landslide_pipeline.py"

from hera.workflows import Workflow, DAG, Parameter
from digitalhub_runtime_hera.dsl import step

def pipeline():
    # Create a new Workflow with an entrypoint DAG and a parameter
    with Workflow(entrypoint="dag", arguments=[
        Parameter(name="geometry"),
        Parameter(name="outputName"),
        Parameter(name="startDate"),
        Parameter(name="endDate")
        ]) as w:

        with DAG(name="dag"):
            s1_ascending = "s1_ascending_" +  str(w.get_parameter("outputName"))
            s1_descending = "s1_descending_"+  str(w.get_parameter("outputName"))
            
            string_dict_data_asc = """{"satelliteParams":{"satelliteType": "Sentinel1","processingLevel": "LEVEL1","sensorMode": "IW","productType": "SLC","orbitDirection": "ASCENDING","relativeOrbitNumber": "117"},"startDate": \"""" + str(w.get_parameter("startDate")) + """\","endDate": \"""" + str(w.get_parameter("endDate")) + """\","geometry": \"""" + str(w.get_parameter("geometry")) + """\","area_sampling": "True","artifact_name": \"""" + str(s1_ascending) + """\"}"""
            string_dict_data_des = """{"satelliteParams":{"satelliteType": "Sentinel1","processingLevel": "LEVEL1","sensorMode": "IW","productType": "SLC","orbitDirection": "DESCENDING","relativeOrbitNumber": "168"},"startDate": \"""" + str(w.get_parameter("startDate")) + """\","endDate": \"""" + str(w.get_parameter("endDate")) + """\","geometry": \"""" + str(w.get_parameter("geometry")) + """\","area_sampling": "True","artifact_name": \"""" + str(s1_descending) + """\"}"""

            
            s1 = step(template={"action":"job",
                                "args":["main.py", string_dict_data_asc],
                                "secrets":["CDSETOOL_ESA_USER","CDSETOOL_ESA_PASSWORD"],
                                "fs_group":"8877",
                                "resources":{"cpu": "6", "mem": "32Gi"},
                                "envs":[{"name": "TMPDIR", "value": "/app/files"}],
                                "volumes":[{"volume_type": "persistent_volume_claim","name": "volume-land","mount_path": "/app/files","spec": { "size": "300Gi" }}]
                               }, 
                      function="download_images_s1",
                      name="download-asc"
                     )
        
            s2 = step(template={"action":"job",
                                "args":["main.py", string_dict_data_des],
                                "secrets":["CDSETOOL_ESA_USER","CDSETOOL_ESA_PASSWORD"],
                                "fs_group":"8877",
                                "resources":{"cpu": "6", "mem": "32Gi"},
                                "envs":[{"name": "TMPDIR", "value": "/app/files"}],
                                "volumes":[{"volume_type": "persistent_volume_claim","name": "volume-land","mount_path": "/app/files","spec": { "size": "300Gi" }}]
                               }, 
                      function="download_images_s1",
                      name="download-desc"
                     )
            
            s3 = step(template={"action":"job",
                                "args": ['/shared/launch.sh', str(s1_ascending), str(s1_descending), str(w.get_parameter("startDate")), str(w.get_parameter("endDate")), str(w.get_parameter("outputName")), 'Shapes_TN', 'ammprv_v.shp', 'Map',  str(w.get_parameter("geometry"))],
                                "fs_group":"8877",
                                "resources":{"cpu": "6", "mem":"64Gi"},
                                "envs":[{"name": "TMPDIR", "value": "/app/data"}],
                                "volumes":[{"volume_type": "persistent_volume_claim","name": "volume-flood","mount_path": "/app/data","spec": { "size": "200Gi" }}]
                               },
                      function="elaborate",
                      name="elaborate"
                     )
            
            s1 >> s2 >> s3
        
    return w


Create workflow using project repo source file

In [None]:
workflow = proj.new_workflow(name="pipeline_landslide", kind="hera", code_src= "landslide_pipeline.py", handler = "pipeline")

Build workflow

In [None]:
wfbuild = workflow.run(action="build", wait=True)

Run workflow

In [None]:
workflow_run = workflow.run(action="pipeline", parameters={
    "startDate": "2020-10-01",
    "endDate": "2020-11-01",
    "geometry": "POLYGON ((10.595369 45.923394, 10.644894 45.923394, 10.644894 45.945838, 10.595369 45.945838, 10.595369 45.923394))",
    "outputName": "landslide_2020-11-01_2020-11-10"
    })