In [None]:
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
import json

# If the notebook runs from repo/examples/
repo_root = Path.cwd().resolve().parent

# Load .env from repo root
load_dotenv(repo_root / ".env", override=True)

# Make src importable
sys.path.insert(0, str(repo_root / "src"))

# Optional but recommended: force absolute KG path too
os.environ["OPENEO_COLLECTIONS_KG_PATH"] = str((repo_root / "data" / "collections_kg.json").resolve())

from openeo_geoagent import openeo_llm


# üîπ Test AOI: small polygon example
aoi_fc = json.dumps({
    "type": "Feature",
    "geometry": {
        "type": "Polygon",
        "coordinates": [[
            [10.0, 45.0],
            [10.2, 45.0],
            [10.2, 45.2],
            [10.0, 45.2],
            [10.0, 45.0],
        ]]
    },
    "properties": {}
})


def build_pg(
    instruction: str,
    default_collection: str = "SENTINEL2_L2A",
    show_dsl: bool = True,
) -> str:
    """
    Convenient wrapper around the tool:
    - asks the tool to also return the intermediate DSL (return_dsl=True)
    - prints the intermediate DSL if show_dsl=True
    - ALWAYS returns a JSON string containing ONLY the process graph,
      or an error JSON string ({"error": ...}).
    """
    raw = openeo_llm.build_process_graph_from_instruction.invoke({
        "instruction": instruction,
        "aoi_feature_collection_json": aoi_fc,
        "default_collection": default_collection,
        "return_dsl": show_dsl,
    })

    # raw is ALWAYS expected to be a JSON string
    try:
        obj = json.loads(raw)
    except Exception:
        # Something went wrong inside the tool
        print("‚ö†Ô∏è Non-JSON output from build_process_graph_from_instruction:")
        print(raw)
        return raw

    # If the tool returned an error, pass it through unchanged
    if isinstance(obj, dict) and "error" in obj:
        print("‚õî Error from build_process_graph_from_instruction:")
        print(json.dumps(obj, indent=2, ensure_ascii=False))
        # Return the JSON error string anyway for consistency
        return json.dumps(obj, indent=2, ensure_ascii=False)

    # Case 1: return_dsl=True ‚Üí payload {"dsl": ..., "process_graph": ...}
    if isinstance(obj, dict) and "dsl" in obj and "process_graph" in obj:
        if show_dsl:
            print("\n================= DSLRequest (intermediate) =================")
            print(json.dumps(obj["dsl"], indent=2, ensure_ascii=False))
            print("=============================================================\n")

        pg = obj["process_graph"]
        return json.dumps(pg, indent=2, ensure_ascii=False)

    # Case 2: return_dsl=False ‚Üí the response is directly the process graph
    return json.dumps(obj, indent=2, ensure_ascii=False)


def run_job_and_download(
    pg_json: str,
    output_path: str,
    endpoint: str = "https://openeo.dataspace.copernicus.eu"
):
    """
    Runs the process graph on an openEO backend and downloads the result
    to the specified path.

    Parameters
    ----------
    pg_json : str
        JSON string returned by build_pg() (may also contain an error payload).
    output_path : str
        Target file or directory path. If the backend returns multiple assets,
        a folder will be created (if needed).
    endpoint : str
        openEO backend endpoint URL.
    """
    import pathlib

    # 1) Check whether the input is a valid process graph or an error payload
    try:
        pg = json.loads(pg_json)
    except Exception:
        print("pg_json is not valid JSON:")
        print(pg_json)
        return

    if isinstance(pg, dict) and "error" in pg:
        print("‚õî Error in process graph payload:")
        print(json.dumps(pg, indent=2, ensure_ascii=False))
        return

    # 2) Connect and execute as a batch job
    conn = openeo.connect(endpoint).authenticate_oidc()
    job = conn.create_job(pg)

    if hasattr(job, "start_and_wait"):
        job.start_and_wait()
    else:
        job.start()
        print("‚ö†Ô∏è This client has no start_and_wait(): job started, but this helper will not wait for completion.")

    # 3) Retrieve results using JobResults
    try:
        results = job.get_results()  # JobResults
    except Exception as e:
        print(f"‚õî get_results() failed: {e}")
        return

    out = pathlib.Path(output_path)

    # Case 1: single asset ‚Üí download directly as a file
    if hasattr(results, "get_assets"):
        assets = results.get_assets()
        if len(assets) == 1 and hasattr(results, "download_file"):
            try:
                results.download_file(str(out))
                print(f"‚úÖ Single result saved to: {out}")
                return
            except Exception as e:
                print(f"‚ö†Ô∏è download_file() failed: {e}")

    # Case 2: multiple assets, or download_file unavailable ‚Üí use a directory
    if out.suffix:
        out = out.with_suffix("")

    out.mkdir(exist_ok=True)

    if hasattr(results, "download_files"):
        try:
            results.download_files(str(out))
            print(f"‚úÖ Multiple results saved to: {out}")
            return
        except Exception as e:
            print(f"‚ö†Ô∏è download_files() failed: {e}")

    print("‚ö†Ô∏è Unrecognized results format; no files were downloaded.")


In [3]:
# instruction =("BUILD A PROCESS GRAPH that loads eight datacubes from collection id 'SENTINEL_5P_L2' over spatial_extent {'west':2.146728,'south':49.446978,'east':6.497314,'north':51.651837} and temporal_extent ['2020-01-01','2021-01-01'], with each load_collection using exactly one band in this order: 'AER_AI_340_380', then 'AER_AI_354_388', then 'CO', then 'HCHO', then 'NO2', then 'O3', then 'SO2', then 'CH4', then merge them sequentially with merge_cubes so the first two loads are merged, the result is merged with the third load, and so on until a single cube contains all eight bands, then apply aggregate_temporal_period to that merged cube with period set to 'dekad' and with a reducer process graph that computes mean over the reducer input parameter data (the mean node is the reducer result), and finally save_result the aggregated cube to format 'NetCDF' with empty options and mark save_result as the overall result.")


instruction = ('BUILD A PROCESS GRAPH that loads collection id "SENTINEL2_L2A" with bands ["B02"], spatial_extent null, temporal_extent null, and a properties filter on "eo:cloud_cover" defined as an lte process comparing the per-item value to 50, then applies filter_temporal with extent ["2024-08-01","2024-09-01"] and filter_bbox with extent {"west":5.07,"east":5.1,"south":51.21,"north":51.23}, then reduces dimension "t" using a mean reducer to get a temporal-mean cube; in parallel load collection id "TERRASCOPE_S2_NDVI_V2" with bands ["NDVI_10M"] and spatial_extent null and temporal_extent null, apply filter_temporal with the same extent and filter_bbox with the same bbox, then reduce dimension "t" with the same mean reducer; merge the two reduced cubes with merge_cubes (Sentinel-2 mean as cube1 and NDVI mean as cube2) and finally save_result the merged cube in format "NetCDF" with empty options, marking save_result as the only result node.')

pg_json = build_pg(
    instruction,
    default_collection="SENTINEL1_GRD",
    show_dsl=True,    # stampa il DSL intermedio (indices + band_math + output_packing=multi_band)
)

print("\n================= Process Graph (openEO) =================")
print(pg_json)
print("=========================================================\n")





{
  "process_graph": {
    "load_s2": {
      "process_id": "load_collection",
      "arguments": {
        "id": "SENTINEL2_L2A",
        "bands": [
          "B02"
        ],
        "spatial_extent": null,
        "temporal_extent": null
      }
    },
    "filter_props_s2": {
      "process_id": "filter_properties",
      "arguments": {
        "data": {
          "from_node": "load_s2"
        },
        "expression": {
          "process_graph": {
            "get_properties": {
              "process_id": "array_element",
              "arguments": {
                "data": {
                  "from_parameter": "item"
                },
                "label": "properties"
              }
            },
            "get_cloud_cover": {
              "process_id": "array_element",
              "arguments": {
                "data": {
                  "from_node": "get_properties"
                },
                "label": "eo:cloud_cover"
              }
            },
     

In [None]:
instruction = (
    "Sulla mia AOI con SENTINEL2_L2A dal 2021-01-01 al 2021-12-31 "
    "calcola un indice personalizzato CUSTOM_INDEX = (NIR - SWIR1) / (NIR + SWIR1) "
    "come band-math. "
    "Prima applica un mosaico temporale mensile con media, poi un filtro spaziale "
    "mediano 3x3 e una media mobile temporale su finestra 3. "
    "Restituisci mappe mensili con la media mensile dell'indice. "
)

pg_json = build_pg(
    instruction,
    default_collection="SENTINEL2_L2A",
    show_dsl=True,    # stampa il DSL intermedio (indices + band_math + output_packing=multi_band)
)

print("\n================= Process Graph (openEO) =================")
print(pg_json)
print("=========================================================\n")



‚õî Error from build_process_graph_from_instruction:
{
  "error": "DSL normalization failed",
  "details": "Missing 'temporal' in DSLRequest. The time range must be specified explicitly in the LLM output."
}

{
  "error": "DSL normalization failed",
  "details": "Missing 'temporal' in DSLRequest. The time range must be specified explicitly in the LLM output."
}

