# Application chaining

Use a notebook to demonstrate the application chaining:

- stage-in a pairs of EO products, pre- and post-event
- process three vegetation indexes: NDVI, NDWI and NBR
- process the burned area delineation products: NDVI/NDWI thresholds and RBR

## Worflow

### First step: stage-in Sentinel-2 Level-2A acquisitions

Stage-in the pre- and post event acquisitions with the `stage-in` docker that takes as input a catalog entry reference

And produces as output:

- a local STAC catalog with a single collection and items whose assets point to the local staged bands (e.g. 'red', 'nir', 'swir16', 'swir22')

### Second step: vegetation index

The second step produces the vegetation indexes NDVI, NDWI and NBR. It takes as input the local STAC catalog.

This step produces as output:

- a local STAC catalog with a single item whose assets point to the 'NDVI', 'NDWI' and 'NBR' assets

The Jupyter Notebook git repository is available at https://github.com/terradue-ogc-tb16/vegetation-index

Its docker git repository is available at https://github.com/terradue-ogc-tb16/docker-vegetation-index

### Third step - burned area delineation

This step takes as input a local STAC catalog with STAC catalog with a single item whose assets point to the 'NDVI', 'NDWI' and 'NBR' assets to produce a bitmask for the burned area based on the NDVI and NDWI thresholds and the Relativized Burn Ratio, an indicator of the burned area intensity .

The Jupyter Notebook git repository is available at https://github.com/terradue-ogc-tb16/burned-area-delineation

Its docker git repository is available at https://github.com/terradue-ogc-tb16/docker-burned-area-delineation

### Chaining strategy

The application chaining prepares the inputs and parameters for executing the three CWL scripts and parses the outputs for the next step in the chaining.

The Jupyter Notebook is thus used to orchestrate the `cwltool` invoking the steps.

Applications to be chained in this scenario are a combination of Jupyter notebook based applications and Application Packages expressed as CWL that read a STAC local catalog and produce a local STAC catalog.

The applications are executed in Docker containers. These docker containers were built a tool called `repo2cli` that takes a remote git repository, creates the conda environment and create a CLI utility to execute the notebook using the Jupyter Notebook APIs (nbconvert).

This tool is available at: https://github.com/terradue-ogc-tb16/repo2cli and its docker at https://github.com/terradue-ogc-tb16/docker-repo2cli

### Imports

In [1]:
import subprocess
import os
import yaml
import json
from pystac import *


In [2]:
cwltool = '/opt/anaconda/bin/cwltool'

### Stage-in

In [3]:
cwl_stagein = 'stage-in.cwl'

In [4]:
with open(cwl_stagein) as file:
    
    stagein_cwl = yaml.load(file, 
                            Loader=yaml.FullLoader)

In [5]:
stagein_cwl

{'$graph': [{'baseCommand': 'stage-in',
   'class': 'CommandLineTool',
   'hints': {'DockerRequirement': {'dockerPull': 'eoepca/stage-in:0.2'}},
   'id': 'stagein',
   'inputs': {'inp1': {'inputBinding': {'position': 1, 'prefix': '-t'},
     'type': 'string'},
    'inp2': {'inputBinding': {'position': 2}, 'type': 'string[]'}},
   'outputs': {'results': {'outputBinding': {'glob': '.'},
     'type': 'Directory'}},
   'requirements': {'EnvVarRequirement': {'envDef': {'PATH': '/opt/anaconda/envs/env_stagein/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin'}},
    'ResourceRequirement': {}}},
  {'class': 'Workflow',
   'label': 'Instac stage-in',
   'doc': 'Stage-in using Instac',
   'id': 'main',
   'inputs': {'input_reference': {'doc': 'A reference to an opensearch catalog',
     'label': 'A reference to an opensearch catalog',
     'type': 'string[]'},
    'target_folder': {'label': 'Folder to stage-in',
     'doc': 'Folder to stage-in',
     'type': 'string'}},
   'outpu

Get the CWL `Workflow` class

In [6]:
def get_workflow_class(cwl):
    
    cwl_workflow = None
    
    for block in cwl['$graph']:

        if block['class'] == 'Workflow':

            cwl_workflow = block

            break
            
    return cwl_workflow

In [7]:
cwl_workflow = get_workflow_class(stagein_cwl)

cwl_workflow

{'class': 'Workflow',
 'label': 'Instac stage-in',
 'doc': 'Stage-in using Instac',
 'id': 'main',
 'inputs': {'input_reference': {'doc': 'A reference to an opensearch catalog',
   'label': 'A reference to an opensearch catalog',
   'type': 'string[]'},
  'target_folder': {'label': 'Folder to stage-in',
   'doc': 'Folder to stage-in',
   'type': 'string'}},
 'outputs': [{'id': 'wf_outputs',
   'outputSource': ['node_1/results'],
   'type': 'Directory'}],
 'steps': {'node_1': {'in': {'inp1': 'target_folder',
    'inp2': 'input_reference'},
   'out': ['results'],
   'run': '#stagein'}}}

Get the workflow `id`

In [8]:
cwl_stagein_workflow_id = cwl_workflow['id']

cwl_stagein_workflow_id

'main'

Get the workflow `inputs`:

In [9]:
cwl_workflow['inputs']

{'input_reference': {'doc': 'A reference to an opensearch catalog',
  'label': 'A reference to an opensearch catalog',
  'type': 'string[]'},
 'target_folder': {'label': 'Folder to stage-in',
  'doc': 'Folder to stage-in',
  'type': 'string'}}

Create the `catalog definition` files

In [10]:
sentinel2_references = ['https://catalog.terradue.com/sentinel2/search?uid=S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808',
                        'https://catalog.terradue.com/sentinel2/search?uid=S2B_MSIL2A_20200130T004659_N0213_R102_T53HPA_20200130T022348']

In [11]:
stagein_results = []

for index, reference in enumerate(sentinel2_references):

    # prepare the YAML parameter file
    params = {'target_folder': './', 
              'input_reference': [reference]}
    
    
    params_file = os.path.join(os.getcwd(), 'stage-in-{}.yml'.format('pre' if index == 0 else 'post'))        
    
    with open(params_file, 'w') as yaml_file:

        yaml.dump(params,
                  yaml_file, 
                  default_flow_style=False)
    
    # invoke cwltool
    cmd_args = [cwltool, 
                '--no-read-only',
                '--no-match-user',
                '{}#{}'.format(cwl_stagein, 
                               cwl_stagein_workflow_id), 
                params_file]
        
    print(' '.join(cmd_args))

    pipes = subprocess.Popen(cmd_args, 
                             stdout=subprocess.PIPE, 
                             stderr=subprocess.PIPE)
    
    std_out, std_err = pipes.communicate()
    
    # get the stdout for each execution
    stagein_results.append(std_out)

/opt/anaconda/bin/cwltool --no-read-only --no-match-user stage-in.cwl#main /workspace/ogc-tb16/application-chaining/notebook/stage-in-pre.yml
/opt/anaconda/bin/cwltool --no-read-only --no-match-user stage-in.cwl#main /workspace/ogc-tb16/application-chaining/notebook/stage-in-post.yml


`std_out` contains a JSON document with all results produced. Look for the `catalog.json` files amongst the results: 

In [12]:
stac_catalog = None

staged_stac_catalogs = []

for index, std_out in enumerate(stagein_results):
    
    results = json.loads(std_out.decode())

    for index, listing in enumerate(results['wf_outputs']['listing']):

        if listing['class'] == 'File':
            
            if (listing['basename']) == 'catalog.json':
                
                stac_catalog = listing['location']
            
            break
    
    staged_stac_catalogs.append(stac_catalog)

In [13]:
staged_stac_catalogs

['file:///workspace/ogc-tb16/application-chaining/notebook/kevk72uv/catalog.json',
 'file:///workspace/ogc-tb16/application-chaining/notebook/rdr1xi70/catalog.json']

Inspect one of the staged STAC catalogs:

In [14]:
cat = Catalog.from_file(staged_stac_catalogs[0].replace('file://', ''))

cat.describe()

* <Catalog id=catid>
  * <Item id=S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808>


In [15]:
item = next(cat.get_items())

Getting the assets based on the band common name is quite easy:  

In [16]:
def get_assets(item):
    
    asset_swir16, asset_swir22, asset_nir, asset_red = None, None, None, None
    
    eo_item = extensions.eo.EOItemExt(item)
    
    for index, band in enumerate(eo_item.bands):
   
        if band.common_name in ['swir16']:

            asset_swir16 = item.assets[band.name].get_absolute_href()

        if band.common_name in ['swir22']:

            asset_swir22 = item.assets[band.name].get_absolute_href()

        if band.common_name in ['nir']:

            asset_nir = item.assets[band.name].get_absolute_href()
            
        if band.common_name in ['red']:

            asset_red = item.assets[band.name].get_absolute_href()
        
    return asset_swir16, asset_swir22, asset_nir, asset_red



In [17]:
asset_swir16, asset_swir22, asset_nir, asset_red = get_assets(item)

asset_swir16, asset_swir22, asset_nir, asset_red

('/workspace/ogc-tb16/application-chaining/notebook/kevk72uv/S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808/S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808/S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808.SAFE/GRANULE/L2A_T53HPA_A023408_20191216T004701/IMG_DATA/R20m/T53HPA_20191216T004701_B11_20m.jp2',
 '/workspace/ogc-tb16/application-chaining/notebook/kevk72uv/S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808/S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808/S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808.SAFE/GRANULE/L2A_T53HPA_A023408_20191216T004701/IMG_DATA/R20m/T53HPA_20191216T004701_B12_20m.jp2',
 '/workspace/ogc-tb16/application-chaining/notebook/kevk72uv/S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808/S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808/S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808.SAFE/GRANULE/L2A_T53HPA_A023408_20191216T004701/IMG_DATA/R10m/T53HPA_2

### Vegetation index

Now that we have two EO data products staged as STAC local catalogs, let's produce the vegetation indexes.

The vegetation index processing step take as input a STAC catalog with a single item that contains amongst its assets the bands with the common names:

- red
- nir
- swir16
- swir22

As such, this processing step can process both Sentinel-2 and Landsat-8 data without any adaptation

In [18]:
cwl_vegetation_index = 'vegetation-index.cwl'

In [19]:
with open(cwl_vegetation_index) as file:
    
    vegetation_index_cwl = yaml.load(file, 
                                     Loader=yaml.FullLoader)

In [20]:
cwl_vegetation_workflow_id = get_workflow_class(vegetation_index_cwl)['id']

cwl_vegetation_workflow_id

'vegetation-index'

Create the parameters file for CWL and invoke the CWL runner tool

In [21]:
get_workflow_class(vegetation_index_cwl)['inputs']

{'aoi': {'doc': 'Area of interest in WKT',
  'label': 'Area of interest',
  'type': 'string'},
 'input_reference': {'doc': 'EO product for vegetation index',
  'label': 'EO product for vegetation index',
  'type': 'Directory[]'}}

In [22]:
vegetation_index_results = []

for index, staged_stac_catalog in enumerate(staged_stac_catalogs):

    params = {'input_reference': [{'class': 'Directory', 'path': os.path.dirname(staged_stac_catalog)}],
              'aoi': 'POLYGON((136.508 -36.108,136.508 -35.654,137.178 -35.654,137.178 -36.108,136.508 -36.108))'}
    
    
    params_file = os.path.join(os.getcwd(), 'vegetation-index-{}.yml'.format('pre' if index == 0 else 'post'))        
    
    with open(params_file, 'w') as yaml_file:

        yaml.dump(params,
                  yaml_file, 
                  default_flow_style=False)
    
    cmd_args = [cwltool, 
                '--no-read-only',
                '--no-match-user',
                '{}#{}'.format(cwl_vegetation_index, 
                               cwl_vegetation_workflow_id), 
                params_file]
        
    print(' '.join(cmd_args))

    pipes = subprocess.Popen(cmd_args, 
                             stdout=subprocess.PIPE, 
                             stderr=subprocess.PIPE)
    
    std_out, std_err = pipes.communicate()
    
    vegetation_index_results.append(std_out)
    

/opt/anaconda/bin/cwltool --no-read-only --no-match-user vegetation-index.cwl#vegetation-index /workspace/ogc-tb16/application-chaining/notebook/vegetation-index-pre.yml
/opt/anaconda/bin/cwltool --no-read-only --no-match-user vegetation-index.cwl#vegetation-index /workspace/ogc-tb16/application-chaining/notebook/vegetation-index-post.yml


In [23]:
vegetation_index_stac_catalogs = []

for index, std_out in enumerate(vegetation_index_results):
    
    results = json.loads(std_out.decode())

    for index, listing in enumerate(results['wf_outputs']):

        for sublisting in listing['listing']:

            if (sublisting['basename']) == 'catalog.json':
                stac_catalog = sublisting['location']

                break

    vegetation_index_stac_catalogs.append(stac_catalog)
    
vegetation_index_stac_catalogs

['file:///workspace/ogc-tb16/application-chaining/notebook/dkb877e9/catalog.json',
 'file:///workspace/ogc-tb16/application-chaining/notebook/2anetr3f/catalog.json']

Inspect one of the vegetation index catalogs:

In [24]:
cat = Catalog.from_file(vegetation_index_stac_catalogs[0].replace('file://', ''))

In [25]:
cat.describe()

* <Catalog id=catalog-S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808>
  * <Item id=INDEX_S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808>


In [26]:
item = next(cat.get_items())

item.assets

{'nbr': <Asset href=./NBR_S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808.tif>,
 'ndvi': <Asset href=./NDVI_S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808.tif>,
 'ndwi': <Asset href=./NDWI_S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808.tif>}

In [27]:
print(item.assets['nbr'].get_absolute_href())
print(item.assets['ndvi'].get_absolute_href())
print(item.assets['ndwi'].get_absolute_href())

/workspace/ogc-tb16/application-chaining/notebook/dkb877e9/INDEX_S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808/NBR_S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808.tif
/workspace/ogc-tb16/application-chaining/notebook/dkb877e9/INDEX_S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808/NDVI_S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808.tif
/workspace/ogc-tb16/application-chaining/notebook/dkb877e9/INDEX_S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808/NDWI_S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808.tif


### Burned area delineation 

In [28]:
cwl_burned_area = 'burned-area-delineation.cwl'

In [29]:
with open(cwl_burned_area) as file:
    
    burned_area_cwl = yaml.load(file,
                               Loader=yaml.FullLoader)

In [30]:
cwl_burned_area_workflow_id = get_workflow_class(burned_area_cwl)['id']

cwl_burned_area_workflow_id

'burned-area-delineation'

In [31]:
for key, value in get_workflow_class(burned_area_cwl)['inputs'].items():
    
    print('input: {} - {}'.format(key, value['label']))

input: ndvi_threshold - NDVI difference threshold
input: ndwi_threshold - NDWI difference threshold
input: post_event - Post-event product for burned area delineation
input: pre_event - Pre-event product for burned area delineation


Create the parameters file for CWL and invoke the CWL runner tool

In [32]:
params = {'aoi': 'POLYGON((136.508 -36.108,136.508 -35.654,137.178 -35.654,137.178 -36.108,136.508 -36.108))',
          'ndvi_threshold': '0.19',
          'ndwi_threshold': '0.18',
          'pre_event': {'class': 'Directory', 'path': os.path.dirname(vegetation_index_stac_catalogs[0])},
          'post_event': {'class': 'Directory', 'path': os.path.dirname(vegetation_index_stac_catalogs[1])}
         }

params_file = os.path.join(os.getcwd(), 'burned-area.yml')
    
    
with open(params_file, 'w') as yaml_file:

    yaml.dump(params,
              yaml_file, 
              default_flow_style=False)
 

In [33]:
   
cmd_args = [cwltool, 
            '--no-read-only',
            '--no-match-user',
            '{}#{}'.format(cwl_burned_area, 
                           cwl_burned_area_workflow_id), 
            params_file]
        
print(' '.join(cmd_args))

pipes = subprocess.Popen(cmd_args, 
                         stdout=subprocess.PIPE, 
                         stderr=subprocess.PIPE)
    
std_out, std_err = pipes.communicate()
    
    

/opt/anaconda/bin/cwltool --no-read-only --no-match-user burned-area-delineation.cwl#burned-area-delineation /workspace/ogc-tb16/application-chaining/notebook/burned-area.yml


In [34]:
stac_catalog = None

for index, listing in enumerate(json.loads(std_out.decode())['wf_outputs']['listing']):

    if (listing['basename']) == 'catalog.json':
        stac_catalog = listing['location']

        break

stac_catalog

'file:///workspace/ogc-tb16/application-chaining/notebook/ye8jbyh0/catalog.json'

In [35]:
cat = Catalog.from_file(stac_catalog.replace('file://', ''))

In [36]:
cat.describe()

* <Catalog id=catalog>
  * <Item id=BURNED_AREA_DELINEATION>


In [37]:
item = next(cat.get_items())

item.assets

{'tvi': <Asset href=./BURNED_NDVI_NDWI_THRESHOLD.tif>,
 'dnbr': <Asset href=./BURNED_RBR.tif>}

In [38]:
print(item.assets['tvi'].get_absolute_href())
print(item.assets['dnbr'].get_absolute_href())

/workspace/ogc-tb16/application-chaining/notebook/ye8jbyh0/BURNED_AREA_DELINEATION/BURNED_NDVI_NDWI_THRESHOLD.tif
/workspace/ogc-tb16/application-chaining/notebook/ye8jbyh0/BURNED_AREA_DELINEATION/BURNED_RBR.tif
