# `eo-grow` Workshop

`eo-grow` is a framework for large-scale processing of EO data. In this workshop we'll learn:

- how to run an `eo-grow` pipeline,
- how to scale up a pipeline,
- how to write a new pipeline.

The framework can run:

- completely locally on a laptop,
- local processing with data storage on S3
  * use only for small data transfers!
- processing on EC2 instances with data storage on S3.

For this workshop we'll use 2nd and 3rd option.


## 0. Prerequisites

The package requires Python version `>=3.8`. For now it also requires installation from develop branches:

- install `eo-learn` from [`develop-v1.0` branch](https://github.com/sentinel-hub/eo-learn/tree/develop-v1.0):
   
  ```
  git clone -b develop-v1.0 --depth 1 https://github.com/sentinel-hub/eo-learn.git
  python install_all.py -e
  ```

- install `eo-grow` from the current branch with:

  ```
  pip install -e .
  ```
  
This workshop also requires an access to an AWS S3 bucket with data:

```
aws configure --profile workshop
```

Additionally you have to set `sentinelhub-py` OAuth credentials.

  
## 1. How to use `eo-grow`?

The core `eo-grow` structure looks like this:

![](../eo-grow.png)

- A `Pipeline` obtains configuration parameters and uses managers as helpers.
- Configuration parameters can be read from JSON files or Python dictionaries. They are parsed with a special [config language](../config-language.md) and wrapped with a `Config` class.
- Storage structure and credentials are handled by a `StorageManager`.
- AOI is buffered and split into a tiling grid with different implementations of `AreaManager`.
- EOPatch naming conventions are defined in an `EOPatchManager`.
- Logging is controlled with a `LoggingManager`.

Pipeline and manager classes all inherit from a base `EOGrowObject` and are similar in a ways that:

- they all contain their own `Schema` class that defines which config parameters they use,
- they are all meant to be inherited and customized for any use case.


The most basic procedure of using `eo-grow` is:

1. set up a project folder for storage,
2. implement a new pipeline or use one of the basic pipelines in `eogrow.pipelines`,
3. prepare a config file,
4. run a pipeline.

### Exercise 1

- As a storage we will use a project folder in an AWS S3 bucket `s3://eogrow-workshop/project/`.

- We will run a basic download pipeline (`eogrow.pipelines.download.DownloadPipeline`) for AOI defined in a file `s3://eogrow-workshop/project/input-data/bohinj_aoi.geojson`.

- We will buffer AOI by `0.01` and split AOI into a UTM grid with a patch size `250x250` pixels on `10m` resolution.


For now we will only use CLI commands to run the pipeline. `eo-grow` offers the following commands:

- `eogrow` - run a pipeline
- `eogrow-template` - create a template config for a
- `eogrow-validate` - validate a pipeline config
- `eogrow-test` - test managers on a dummy pipeline
- `eogrow-ray` - run a pipeline on a cluster

Note: names of these commands are defined in `setup.py`.

A command `eogrow-template` can help us write a config file. Let's check what templates we get for different objects:

In [None]:
!eogrow-template eogrow.pipelines.download.DownloadPipeline
# !eogrow-template eogrow.pipelines.download.DownloadPipeline download_template_openapi.json -f

# !eogrow-template eogrow.core.storage.StorageManager
# !eogrow-template eogrow.core.area.UtmZoneAreaManager
# !eogrow-template eogrow.core.eopatch.EOPatchManager
# !eogrow-template eogrow.core.logging.LoggingManager

We can use config language to:

- split config parameters into multiple files,
- avoid parameter duplications,
- reference:
  * relative file paths,
  * package import paths,
  * environmental variables

If we would like to just check if the config file contains correct parameters without running a pipeline we can do that with:

In [None]:
!eogrow-validate configs/download.json

Before we run the pipeline let's check if all managers are working correctly:

In [None]:
!eogrow-test configs/download.json

This ran a simple `TestPipeline` that only checked all managers. The pipeline produced

- logs
- cached area manager buffered shape and grid

Let's download cached data:

In [None]:
!aws s3 sync s3://eogrow-workshop/project/cache/ ./cache --profile workshop

To test if the download pipeline will produce correct results we can first run it for a single patch in the grid:

In [None]:
!eogrow configs/download.json -t 0

Now we are ready to run it for the entire grid with a command:

```
eogrow download.json
```

But before we do this, let's switch to a Ray cluster.

## 2. How to scale up?

In `eo-grow` parallelization can be achieved with:

- multiprocessing on a single machine (for simple use cases),
- Ray parallelization on:
  * a single machine
  * a **cluster of AWS EC2 instances**.

Ray cluster can be fully configured with a single YAML file as described in [Ray documentation](https://docs.ray.io/en/latest/cluster/config.html).

Once we prepared the YAML file we can spawn a ray cluster:

```bash
ray up cluster.yaml -y
```

We can attach to it with:

```bash
ray attach cluster.yaml
```

We can upload any local files to the cluster.

```bash
ray rsync_up cluster.yaml '/local/path' '/full/absolute/path/on/cluster'
```

Note: Alternativelly, we could commit local files and let the cluster pull them from a git repository.

On a cluster we can then simply run the pipeline with:
    
```bash
eogrow download.json
```

An even easier option is simply run a pipeline on a cluster using your local config to a cluster with a command:

```bash
eogrow-ray cluster.yaml configs/download.json
```

This command also has a few useful optional flags:

In [None]:
!eogrow-ray --help

Cluster CPU and memory usage can be monitored from a Ray dashboard. We can connect to it with:

```bash
ray dashboard cluster.yaml
```

The dashboard will become available at `localhost:8265`.

When we are done processing, let's make sure that we shut down the cluster:

```bash
ray down cluster.yaml -y
```

## 3. How to implement a new pipeline?

Let's start from a typical workflow, which can be created in a prototype phase. The following workflow performs a simple water detection algorithm on a stack of data that we downloaded:

In [None]:
%matplotlib inline

import matplotlib.pyplot as plt
import numpy as np

from eolearn.core import (
    EOPatch,
    EONode,
    FeatureType,
    LoadTask,
    MapFeatureTask,
    OutputTask,
    EOWorkflow,
    linearly_connect_tasks,
    OverwritePermission,
    SaveTask,
)
from eolearn.core.utils.fs import get_aws_credentials
from eolearn.features import NormalizedDifferenceIndexTask


config = get_aws_credentials(aws_profile="workshop")

bands_feature = FeatureType.DATA, "BANDS"
ndwi_feature = FeatureType.DATA, "NDWI"
water_feature = FeatureType.MASK_TIMELESS, "WATER"

load_task = LoadTask("s3://eogrow-workshop/project/data/2021-06/", config=config)

ndwi_task = NormalizedDifferenceIndexTask(bands_feature, ndwi_feature, bands=[2, 7])


class ThresholdWater(MapFeatureTask):
    def map_method(self, ndwi, threshold):
        max_ndwi = np.max(ndwi, axis=0)
        water = max_ndwi > threshold
        return water


threshold_task = ThresholdWater(ndwi_feature, water_feature, threshold=0.1)

output_task = OutputTask(name="result_eop")

nodes = linearly_connect_tasks(load_task, ndwi_task, threshold_task, output_task)
workflow = EOWorkflow(nodes)

workflow_results = workflow.execute({nodes[0]: {"eopatch_folder": "eopatch-id-08-col-3-row-1"}})

eop = workflow_results.outputs["result_eop"]

eop

In [None]:
ndwi = eop[ndwi_feature]

fig, axes = plt.subplots(nrows=3, ncols=4, figsize=(20, 15))
for index in range(12):
    ax = axes[index // 4][index % 4]
    ax.imshow(ndwi[index, ...], vmin=0.1, vmax=0.5)
    ax.set_xticks([])
    ax.set_yticks([])

fig.subplots_adjust(wspace=0, hspace=0);

In [None]:
fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(10, 10))

water = eop[water_feature]

ax.imshow(water)
ax.set_xticks([])
ax.set_yticks([]);

Now let's put this process into a pipeline. The minimum that we have to do is:

- Create a class that inherits from `Pipeline` class.
- In case you want to have custom config parameters, add `Schema` subclass that inherits from `Pipeline.Schema`.
- Implement `build_workflow` method.

In [None]:
from eogrow.core.pipeline import Pipeline


class WaterDetectionPipeline(Pipeline):
    class Schema(Pipeline.Schema):
        threshold: float

    def build_workflow(self):

        bands_feature = FeatureType.DATA, "BANDS"
        ndwi_feature = FeatureType.DATA, "NDWI"
        water_feature = FeatureType.MASK_TIMELESS, "WATER"

        load_task = LoadTask(self.storage.get_folder("data", full_path=True), config=self.sh_config)

        ndwi_task = NormalizedDifferenceIndexTask(bands_feature, ndwi_feature, bands=[2, 7])

        threshold_task = ThresholdWater(ndwi_feature, water_feature, threshold=self.config.threshold)

        save_task = SaveTask(
            self.storage.get_folder("results", full_path=True),
            features=[water_feature, FeatureType.BBOX],
            compress_level=1,
            overwrite_permission=OverwritePermission.OVERWRITE_FEATURES,
            config=self.sh_config,
        )

        nodes = linearly_connect_tasks(load_task, ndwi_task, threshold_task, output_task)
        return EOWorkflow(nodes)

This time we cannot run `WaterPipeline` with CLI because the pipeline is implemented in a notebook and we cannot reference its import path. But we can run it from Python. Let's create a config for it. 

In [None]:
from eogrow.core.config import Config


config = Config.from_path("./configs/water_detection.json")
config

Let's initialize the pipeline and check some of its basic functionalities:

In [None]:
pipeline = WaterDetectionPipeline(config)

pipeline

# pipeline.config
# pipeline.sh_config

# pipeline.storage
# pipeline.storage.filesystem
# pipeline.storage.get_folder('data')

# pipeline.area_manager
# pipeline.area_manager.get_grid()[0]

# pipeline.eopatch_manager
# pipeline.eopatch_manager.get_eopatch_filenames()
# pipeline.patch_list

# pipeline.logging_manager
# pipeline.logging_manager.get_pipeline_logs_folder('pipeline-name')
# pipeline.get_pipeline_execution_name('2021-10-19')

During `Pipeline` class initialization only config is validated and parsed according to schema and managers are initialized. No computation is done yet. Let's run the pipeline for a single `EOPatch`:

In [None]:
config = Config.from_path("./configs/water_detection.json")

config.patch_list = [8]  # References EOPatch 'eopatch-id-08-col-3-row-1'

pipeline = WaterDetectionPipeline(config)

pipeline.run()

Before we run the pipeline for all EOPatches let's write another pipeline. This one will not be limited by `EOWorkflow` execution. After all, a pipeline can implement any process!

In this example we will create a pipeline that vectorizes water masks, joins vectors from all EOPatches and saves them into a single file.

In [None]:
from eolearn.geometry import RasterToVectorTask

r2v_task = RasterToVectorTask(water_feature, values=[1], raster_dtype=np.uint8)

eop = r2v_task.execute(eop)

eop.vector_timeless["WATER"].plot();

This time we also have to implement `run_procedure` method. This is the main method that is triggered by `Pipeline.run` and its default implementation only runs an EOWorkflow.

In [None]:
import logging

import fs

from eogrow.core.pipeline import Pipeline
from eogrow.utils.fs import LocalFile
from eogrow.utils.vector import concat_gdf

LOGGER = logging.getLogger(__name__)


class WaterExportPipeline(Pipeline):

    water_feature = FeatureType.MASK_TIMELESS, "WATER"
    vector_water_feature = FeatureType.VECTOR_TIMELESS, "WATER"

    def run_procedure(self):
        workflow = self.build_workflow()
        exec_args = self.get_execution_arguments(workflow)

        successful, failed, execution_results = self.run_execution(workflow, exec_args, return_results=True)

        gdf_list = []
        for result in execution_results:
            eopatch = result.outputs.get("water-vectors")
            if not eopatch:
                continue

            gdf_list.append(eopatch[self.vector_water_feature])

        if not gdf_list:
            return successful, failed

        LOGGER.info("Preparing joined vector dataset")
        joined_gdf = concat_gdf(gdf_list)  # This assumes all dataframes are in the same CRS!

        path = fs.path.combine(self.storage.get_folder("vector_results"), "water-vectors.gpkg")
        with LocalFile(path, mode="w", filesystem=self.storage.filesystem) as local_file:
            joined_gdf.to_file(local_file.path, driver="GPKG", encoding="utf-8")
        LOGGER.info("Saved stats to %s", path)

        return successful, failed

    def build_workflow(self):

        load_task = LoadTask(
            self.storage.get_folder("results", full_path=True), lazy_loading=True, config=self.sh_config
        )

        r2v_task = RasterToVectorTask(self.water_feature, values=[1], raster_dtype=np.uint8)

        output_task = OutputTask(name="water-vectors", features=[self.vector_water_feature])

        nodes = linearly_connect_tasks(load_task, r2v_task, output_task)
        return EOWorkflow(nodes)

In [None]:
# In our implementation the pipeline doesn't need any additional parameters
config = Config.from_path("./configs/global_config.json")

config.patch_list = [8]  # References EOPatch 'eopatch-id-08-col-3-row-1'

pipeline = WaterExportPipeline(config)

pipeline.run()

Finally, let's run these new pipelines on a cluster. We can do this by uploading files to the Ray head node, starting Jupyter and run the notebook. We also create configs folder on the head done becuase it doesn't exist yet.

```bash
ray rsync_up cluster.yaml eogrow-workshop.ipynb /home/ray/eogrow-workshop.ipynb

ray exec cluster.yaml 'mkdir configs'
ray rsync_up cluster.yaml configs/global_config.json /home/ray/configs/global_config.json
ray rsync_up cluster.yaml configs/water_detection.json /home/ray/configs/water_detection.json
```

Jupyter can be started with the following command:

```bash
ray exec cluster.yaml --port-forward=8889 'docker exec -it gem_container /bin/bash -c "jupyter notebook --port=8889"'
```

Then go to `localhost:8889` and  run the relevant cells in the notebook copy.