# EOCanvas Processing

## Setup

In [1]:
import os
import numpy as np
import pandas as pd

In [2]:
import matplotlib.pyplot as plt

In [3]:
import geopandas as gpd
import contextily as cx

In [4]:
import json

In [5]:
from eocanvas import API, Credentials
from eocanvas.api import Input, Config, ConfigOption
from eocanvas.processes import SnapProcess
from eocanvas.snap.graph import Graph

In [6]:
from hda import Client

In [7]:
import threading
import queue
import time

In [8]:
DATA_DIR = '../data/'
FIGURES_DIR = '../saved_figures/'
GRAPHS_DIR = '../graphs/'

In [9]:
c = Credentials.load()
c = Client()

## Subset rectangles

In [10]:
with open(os.path.join(DATA_DIR, 'tile_buffer_wkt.json'), 'r') as f:
    tiles_wkt_dict = json.load(f)

In [15]:
tiles_wkt_dict['30UUA']

['POLYGON ((310000 5540000, 310000 5550000, 300000 5550000, 300000 5540000, 310000 5540000))',
 'POLYGON ((310000 5550000, 310000 5560000, 300000 5560000, 300000 5550000, 310000 5550000))',
 'POLYGON ((310000 5560000, 310000 5570000, 300000 5570000, 300000 5560000, 310000 5560000))',
 'POLYGON ((320000 5540000, 320000 5550000, 310000 5550000, 310000 5540000, 320000 5540000))',
 'POLYGON ((320000 5550000, 320000 5560000, 310000 5560000, 310000 5550000, 320000 5550000))',
 'POLYGON ((320000 5560000, 320000 5570000, 310000 5570000, 310000 5560000, 320000 5560000))',
 'POLYGON ((330000 5540000, 330000 5550000, 320000 5550000, 320000 5540000, 330000 5540000))',
 'POLYGON ((330000 5550000, 330000 5560000, 320000 5560000, 320000 5550000, 330000 5550000))',
 'POLYGON ((330000 5560000, 330000 5570000, 320000 5570000, 320000 5560000, 330000 5560000))',
 'POLYGON ((330000 5570000, 330000 5580000, 320000 5580000, 320000 5570000, 330000 5570000))',
 'POLYGON ((340000 5530000, 340000 5540000, 330000

## EOCanvas SNAP

Aim here is to send parallel jobs to WEkEO servers via parallel threading. For each UTM tile, parallel threads will be run, each sending a job to process a grid sub-tile. Each sub-tile will be processed for 4 months of the year. The results will be stored in `DATA_DIR+/c2rcc_results/{tileId}/{year}/{month}/{subtileIndex}`.

In [11]:
years = range(2016, 2024)
seasons = {
    "spring": ("03-01", "05-31"),
    "summer": ("06-01", "08-31"),
    "autumn": ("09-01", "11-30"),
    "winter": ("12-01", "02-28")
}

In [12]:
cloud_cover_max = 20
images_per_season = 5

In [None]:
# Query template
# TODO: filter by cloud cover? Tile footprint?
q = {
    "dataset_id": "EO:ESA:DAT:SENTINEL-2",
    "startdate": "{startdate}",
    "enddate": "{enddate}",
    "processingLevel": "S2MSI1C",
    "tileId": "{tileId}"
}

In [21]:
c.metadata("EO:ESA:DAT:SENTINEL-2")

{'type': 'object',
 'title': 'Queryable',
 'properties': {'dataset_id': {'title': 'dataset_id',
   'type': 'string',
   'oneOf': [{'const': 'EO:ESA:DAT:SENTINEL-2',
     'title': 'EO:ESA:DAT:SENTINEL-2',
     'group': None}]},
  'bbox': {'title': 'Bbox',
   'type': 'array',
   'minItems': 4,
   'maxItems': 4,
   'items': [{'type': 'number', 'maximum': 180, 'minimum': -180},
    {'type': 'number', 'maximum': 90, 'minimum': -90},
    {'type': 'number', 'maximum': 180, 'minimum': -180},
    {'type': 'number', 'maximum': 90, 'minimum': -90}]},
  'productIdentifier': {'title': 'Product Identifier',
   'type': 'string',
   'pattern': '^[a-zA-Z0-9]+$'},
  'productType': {'title': 'Product Type',
   'type': 'string',
   'oneOf': [{'const': 'S2MSI1C', 'title': 'S2MSI1C', 'group': None},
    {'const': 'S2MSI2A', 'title': 'S2MSI2A', 'group': None},
    {'const': 'AUX_GNSSRD', 'title': 'AUX_GNSSRD', 'group': None},
    {'const': 'AUX_PROQUA', 'title': 'AUX_PROQUA', 'group': None},
    {'const': 'A

In [16]:
def query_sentinel2_median_stack(year, season, tile_id):
    start_suffix, end_suffix = seasons[season]

    # Handle year wrap for winter
    if season == "winter":
        start_date = f"{year}-{start_suffix}"
        end_date = f"{year+1}-{end_suffix}"
    else:
        start_date = f"{year}-{start_suffix}"
        end_date = f"{year}-{end_suffix}"

    # Create the HDA query
    q = {
        "dataset_id": "EO:ESA:DAT:SENTINEL-2",
        "startdate": "{startdate}",
        "enddate": "{enddate}",
        "processingLevel": "S2MSI1C",
        "tileId": "{tileId}"
    }

    
    query = c.search(
        dataset_id="EO:ESA:DAT:SENTINEL-2",
        bbox=None,  # Add if you're targeting a specific area
        start_datetime=start_date,
        end_datetime=end_date,
        max_records=20,
        query={
            "platform": "Sentinel-2",
            "productType": "S2MSI1C",
            "cloudCover": {
                "lte": cloud_cover_max
            },
            "tileId": tile_id
        },
        sort_by="startDatetime",
        sort_order="ascending"
    )

    return query[:images_per_season]


In [17]:
query_sentinel2_median_stack(2020, 'summer', '30UUA')

TypeError: Client.search() got an unexpected keyword argument 'dataset_id'

In [12]:
graph_fp = os.path.join(GRAPHS_DIR, 's2_c2rcc.xml')

In [13]:
with open(graph_fp, 'r') as f:
    graph_txt = f.read()

In [14]:
config = Config(key="img1", options=ConfigOption(uncompress=False, sub_path=""))

In [None]:
def process_job(hda_query, polygon, download_dir):
    r = c.search(hda_query)
    # TODO: r may be empty
    url = r.get_download_urls()[0]
    inputs = Input(key="img1", url=url)
    graph = Graph.from_text(graph_txt.replace('$polygon', polygon))
    process = SnapProcess(snap_graph=graph, eo_config=config, eo_input=inputs)
    _ = process.prepare_inputs()
    process.run(download_dir=download_dir)
    # TODO: return something, handle errors etc

In [None]:
for tileId, subtiles in tiles_wkt_dict.items():
    for subtileIndex in range(len(subtiles)):
        for month in MONTHS:
            download_path = os.path.join(DATA_DIR, f'c2rcc_results/{tileId}/{subtileIndex}/{month}/')
            poly = subtiles[subtileIndex]
            startdate = # TODO: e.g. use a 2 month window
            enddate = 
            query = q
            job = process_job(

In [None]:
q = {
    "dataset_id": "EO:ESA:DAT:SENTINEL-2",
    "startdate": "{startdate}",
    "enddate": "{enddate}",
    "processingLevel": "S2MSI1C",
    "tileId": "{tileId}"
}

In [77]:
r = c.search(q)

In [78]:
url = r.get_download_urls()[0]
inputs = Input(key="img1", url=url)

In [81]:
graph_txt = graph_txt.replace('$polygon', str(poly))

In [82]:
graph = Graph.from_text(graph_txt)

In [83]:
config = Config(key="img1", options=ConfigOption(uncompress=False, sub_path=""))
process = SnapProcess(snap_graph=graph, eo_config=config, eo_input=inputs)
_ = process.prepare_inputs()

In [84]:
process.run(download_dir="result")

Job: 68e44e66-3e10-5d46-a6d3-30bb9ac2b5ff - Status: accepted at 2025-04-03T15:18:41.853257
Job: 68e44e66-3e10-5d46-a6d3-30bb9ac2b5ff - Status: running at 2025-04-03T15:18:52.111895
Job: 68e44e66-3e10-5d46-a6d3-30bb9ac2b5ff - Status: running at 2025-04-03T15:19:03.377061


KeyboardInterrupt: 

In [91]:
api = API()

In [92]:
jobs = api.get_jobs()

In [93]:
jobs

[Job(api=<eocanvas.api.API object at 0x7ce5a4937c70>, job_id='a70fe49a-9a87-5682-976a-6760bca81093', status='failed', started='2025-04-01 16:53:45', created='2025-04-01 16:53:45', updated='2025-04-01 17:00:04', finished='2025-04-01 17:00:04'),
 Job(api=<eocanvas.api.API object at 0x7ce5a4937c70>, job_id='b7e4e026-badc-5634-8a16-f0b958ba6638', status='successful', started='2025-04-01 16:49:36', created='2025-04-01 16:49:36', updated='2025-04-01 16:54:57', finished='2025-04-01 16:54:56'),
 Job(api=<eocanvas.api.API object at 0x7ce5a4937c70>, job_id='a0b798dc-8a56-5512-975a-09d7bf7b88f6', status='failed', started='2025-04-01 16:30:26', created='2025-04-01 16:30:26', updated='2025-04-01 16:35:16', finished='2025-04-01 16:35:15'),
 Job(api=<eocanvas.api.API object at 0x7ce5a4937c70>, job_id='f750bebb-b4b8-541f-ab96-44da5c509548', status='failed', started='2025-04-01 16:22:25', created='2025-04-01 16:22:25', updated='2025-04-01 16:24:02', finished='2025-04-01 16:24:01'),
 Job(api=<eocanvas.a

In [100]:
job = 'b7e4e026-badc-5634-8a16-f0b958ba6638'

In [101]:
api.get_job_logs(job=job)

[LogEntry(timestamp=datetime.datetime(2025, 4, 1, 16, 49, 53, 798265, tzinfo=datetime.timezone.utc), message='time="2025-04-01T16:49:53.796Z" level=info msg="Starting Workflow Executor" version=v3.5.7'),
 LogEntry(timestamp=datetime.datetime(2025, 4, 1, 16, 49, 53, 801704, tzinfo=datetime.timezone.utc), message='time="2025-04-01T16:49:53.801Z" level=info msg="Using executor retry strategy" Duration=1s Factor=1.6 Jitter=0.5 Steps=5'),
 LogEntry(timestamp=datetime.datetime(2025, 4, 1, 16, 49, 53, 801750, tzinfo=datetime.timezone.utc), message='time="2025-04-01T16:49:53.801Z" level=info msg="Executor initialized" deadline="0001-01-01 00:00:00 +0000 UTC" includeScriptOutput=false namespace=ws-serverless podName=workflow-xvt8n-stage-in-3105547291 templateName=stage-in version="&Version{Version:v3.5.7,BuildDate:2024-05-27T06:18:59Z,GitCommit:503eef1357ebc9facc3f463708031441072ef7c2,GitTag:v3.5.7,GitTreeState:clean,GoVersion:go1.21.10,Compiler:gc,Platform:linux/amd64,}"'),
 LogEntry(timestamp