# EO Data Processing with the openEO MultiBackendJobManager

In this notebook, we will demonstrate how to use the [MultiBackendJobManager](https://open-eo.github.io/openeo-python-client/cookbook/job_manager.html) to set-up and excecute multiple jobs at once using OpenEO. This example will specifically focus on how to obtain S1 and S2 data, as obtained for the LCFM project, starting from input polygons. 

**Note**: The S1 mosaics are currently only available for the year 2023-2024


## Import Libraries and Define Constants

Here we define the input parameters for our extraction. In this notebook I will automatically generate input polygons and temporal extents required for the data extractions. These input parameters are then added to a jobs database which will later on be used by the MultiBackendJobManager to orchestrate the multiple openEO jobs. 

In [1]:
import openeo
from openeo.extra.job_management import MultiBackendJobManager

# Parameters to automatically generate polygons
BASE_SPATIAL = {"west": 664000.0, "south": 5611120.0, "crs": "EPSG:32631", "srs": "EPSG:32631"}
SPATIAL_WINDOW_SIZE = 10000  # 10 km in meters
SPATIAL_WINDOW_GAP = 10000   # 10 km gap

#parameters to automatically generate temporal extends on a montly basis (The montly basis is required for the S1/S2 extractions)
BASE_TEMPORAL = "2023-01-01" 
NB_MONTHS = 3

MAX_CLOUD_COVER = 85

## Creating the Jobs Tracker
As previously explained The `MultiBackendJobManager` operates using a predefined jobs tracker in the shape of a pandas dataframe. In this dataframe, the user must set up with all varying input parameters for each job.

To automate the creation of our job tracker, we will define a few helper functions. However, note that the job tracker does not need to be set up automatically through code; it can also be created manually e.g. as a csv file and imported as a pandas DataFrame.

We furthermore create input polygons to mimic the WAC usecase.

In [13]:

import pandas as pd
from utils import create_polygon, create_spatial_extent, create_temporal_extent


def prepare_jobs_df(max_offset:int) -> pd.DataFrame:
    """Prepare a DataFrame containing job configurations for benchmarking."""
    jobs = []

    # Create combinations for the spatial grid
    for offset_x in range(max_offset):
        for offset_y in range(max_offset):

            spatial_extent = create_spatial_extent(offset_x, offset_y, BASE_SPATIAL, SPATIAL_WINDOW_SIZE, SPATIAL_WINDOW_GAP)
            temporal_extent = create_temporal_extent(BASE_TEMPORAL, NB_MONTHS)

            #note that this step is not required, since openEO can directly work with the extent. However here we mimic polygons drawn around fields.
            #spatial_polygon = create_polygon(spatial_extent)

            #to combine both extractions in tje job tracker we make it an option

            for ii in [1, 2]:

                jobs.append({
                    "spatial_extent": spatial_extent,
                    "temporal_extent": temporal_extent,
                    "max_cloud_cover": MAX_CLOUD_COVER,
                    "sentinel": ii
                })

    return pd.DataFrame(jobs)

jobs_df = prepare_jobs_df(max_offset = 2)
print(jobs_df)

                                      spatial_extent  \
0  {'west': 664000.0, 'south': 5611120.0, 'east':...   
1  {'west': 664000.0, 'south': 5611120.0, 'east':...   
2  {'west': 664000.0, 'south': 5631120.0, 'east':...   
3  {'west': 664000.0, 'south': 5631120.0, 'east':...   
4  {'west': 684000.0, 'south': 5611120.0, 'east':...   
5  {'west': 684000.0, 'south': 5611120.0, 'east':...   
6  {'west': 684000.0, 'south': 5631120.0, 'east':...   
7  {'west': 684000.0, 'south': 5631120.0, 'east':...   

            temporal_extent  max_cloud_cover  sentinel  
0  [2023-01-01, 2023-04-01]               85         1  
1  [2023-01-01, 2023-04-01]               85         2  
2  [2023-01-01, 2023-04-01]               85         1  
3  [2023-01-01, 2023-04-01]               85         2  
4  [2023-01-01, 2023-04-01]               85         1  
5  [2023-01-01, 2023-04-01]               85         2  
6  [2023-01-01, 2023-04-01]               85         1  
7  [2023-01-01, 2023-04-01]            

## Creating the Job

Define the `start_job` function, which creates the actual job for each row in our jobs DataFrame.
Do note that the `MultiBackendJobManager` expects a start_joc functionality with the following structure _start_job(row: pd.Series, connection: openeo.Connection, **kwargs)_.

In this example,we combine both extrations in one dataframe. However, for large scale processing it can be easier to perform these seperately.

In [10]:
from eo_fetcher import process_sentinel1_mosaic, process_sentinel2_data
import json
import ast


def start_job(row: pd.Series, connection: openeo.Connection, **kwargs) -> openeo.BatchJob:
    """Start a new job using the specified row and connection."""

    #change the strings for the dataframe back to the desired format
    spatial_extent = row["spatial_extent"]
    temporal_extent = ast.literal_eval(row["temporal_extent"])
    max_cloud_cover = float(row["max_cloud_cover"])
    sentinel = int(row["sentinel"])

    if sentinel == 1:

        WAC_cube = process_sentinel1_mosaic(connection,
                                    spatial_extent,
                                    temporal_extent)
    elif sentinel == 2:

        WAC_cube = process_sentinel2_data(connection,
                                spatial_extent, 
                                temporal_extent, 
                                max_cloud_cover) 
    
    return WAC_cube.create_job(
        title='WAC extraction'
    )


## Running Jobs Using MultiBackendJobManager

With our spatial extents set up, we can now run the jobs using the `MultiBackendJobManager`. This involves defining an output file where the job tracker will store the job statuses and metadata.

### Steps to Run the Jobs:

1. Define the Output File:
   The output file will store the job tracker data, including job statuses and metadata.
   Note that this output file needs a unique name and cannot be overwritten or appended to. 

2. Initialize the MultiBackendJobManager:
   We create an instance of the `MultiBackendJobManager` and add a backend of our choice. This backend will be responsible for executing the jobs.

3. Run Multiple Jobs:
   Use `manager.run_jobs` to create the desired jobs and send them to the backend.
   The output file, defined in step 1, will act as a the live job tracker and contain all the information from the original jobs DataFrame, as well as updates on the actual job statuses.

In [11]:
# Generate a unique name for the tracker
job_tracker = 'WAC_job_tracker.csv'

# Initiate MultiBackendJobManager 
manager = MultiBackendJobManager()  
connection = openeo.connect(url="openeo.dataspace.copernicus.eu").authenticate_oidc()

#standard users can run 2 jobs in parallel max
manager.add_backend("cdse", connection=connection, parallel_jobs=2)

# Run the jobs
manager.run_jobs(df=jobs_df, start_job=start_job, job_db=job_tracker)

Authenticated using refresh token.


KeyError: 'status'

In [29]:
spatial_extent = create_spatial_extent(1, 1, BASE_SPATIAL, SPATIAL_WINDOW_SIZE, SPATIAL_WINDOW_GAP)
spatial_extent

{'west': 684000.0,
 'south': 5631120.0,
 'east': 694000.0,
 'north': 5641120.0,
 'crs': 'EPSG:32631',
 'srs': 'EPSG:32631'}

**Note**

No effort has been made on job-optimisation to lower the cost and allow for upscaling towardls large scale processing. 



In [30]:
import openeo
connection = openeo.connect(url="openeo.dataspace.copernicus.eu").authenticate_oidc()

temporal_extent = ['2023-01-01', '2023-06-01']

spatial_extent ={'west': 684000.0,
                'south': 5631120.0,
                'east': 694000.0,
                'north': 5641120.0,
                'crs': 'EPSG:32631'}

WAC_cube = process_sentinel2_data(connection,
                                spatial_extent,
                                temporal_extent,
                                max_cloud_cover=90)

WAC_cube.execute_batch('test.nc')


Authenticated using refresh token.
0:00:00 Job 'j-2409136bba0141b0ac9397160145b2a2': send 'start'
0:00:17 Job 'j-2409136bba0141b0ac9397160145b2a2': created (progress 0%)
0:00:22 Job 'j-2409136bba0141b0ac9397160145b2a2': created (progress 0%)
0:00:29 Job 'j-2409136bba0141b0ac9397160145b2a2': created (progress 0%)
0:00:37 Job 'j-2409136bba0141b0ac9397160145b2a2': created (progress 0%)
0:00:46 Job 'j-2409136bba0141b0ac9397160145b2a2': running (progress N/A)
0:00:59 Job 'j-2409136bba0141b0ac9397160145b2a2': running (progress N/A)
0:01:14 Job 'j-2409136bba0141b0ac9397160145b2a2': running (progress N/A)
0:01:34 Job 'j-2409136bba0141b0ac9397160145b2a2': running (progress N/A)
0:01:58 Job 'j-2409136bba0141b0ac9397160145b2a2': running (progress N/A)
0:02:28 Job 'j-2409136bba0141b0ac9397160145b2a2': running (progress N/A)
0:03:05 Job 'j-2409136bba0141b0ac9397160145b2a2': running (progress N/A)
0:03:52 Job 'j-2409136bba0141b0ac9397160145b2a2': running (progress N/A)
0:05:01 Job 'j-2409136bba0141b

In [31]:
import xarray as xr

test = xr.open_dataset('test.nc')
