# Benchmarking geospatial tabular data retrieval

This experiment benchmarks the efficiency of geospatial data retrieval using eight different data dissemination strategies by retrieving coastal transects for the Basque Country, Spain, from the Global Coastal Transect System. We evaluate the efficiency gains of cloud-optimized data, geospatial sorting, and metadata filtering with Spatio-Temporal Asset Catalogs across different data models (GeoPandas, Dask GeoPandas, and DuckDB) and retrieval methods (spatial join and predicate pushdown). Geospatial sorting, which involves sorting data based on quadkey (a geohash facilitating efficient spatial indexing is examined. Metadata filtering, performed on the attributes provided in the STAC collection, allows for selective retrieval of relevant data partitions. We also compare retrieval methods, including spatial join operations, which merge datasets based on their spatial relationship, and predicate pushdown, a query optimization technique that applies filters early in the data retrieval process to enhance performance by reducing the amount of data transferred and processed. For more details see Calkoen et al, Enabling Coastal Analytics at Planetary Scale. 

In [None]:
import sys

from coastpy.utils.config import configure_instance

instance_type = configure_instance()

import logging
import os
import pathlib

import dask

dask.config.set({"dataframe.query-planning": False})

import tempfile

import dask_geopandas
import duckdb
import fsspec
import geopandas as gpd
import hvplot.pandas
import pandas as pd
import pystac_client
import shapely
from dotenv import load_dotenv

from coastpy.io.engine import STACQueryEngine
from coastpy.stac.utils import read_snapshot

load_dotenv(override=True)

sas_token = os.getenv("AZURE_STORAGE_SAS_TOKEN")
account_name = "coclico"
storage_options = {"account_name": account_name, "sas_token": sas_token}

# Create a dataset with the transects unsorted to do some experiments
COMPUTE_UNSORTED_GCTS = False
RUN_QUERY_EXPERIMENT = False

GCTS_CONTAINER = "az://transects/gcts"
GCTS_UNSORTED_CONTAINER = "az://transects/gcts-2000m-unsorted.parquet"

QUERY_METRICS_FP = "az://experiments/enabling-coastal-analytics/query-metrics.parquet"
QUERY_STATISTICS_FP = QUERY_METRICS_FP.replace("metrics.parquet", "statistics.csv")
QUERY_STATISTICS_FORMATTED_FP = QUERY_STATISTICS_FP.replace(".csv", "-formatted.csv")

## Connect to the STAC catalog

In [None]:
coclico_catalog = pystac_client.Client.open(
    "https://coclico.blob.core.windows.net/stac/v1/catalog.json"
)
gcts_collection = coclico_catalog.get_child("gcts")
gcts_extents = read_snapshot(gcts_collection, columns=["geometry", "assets"])
gcts_extents.head()

## Compute unsorted transct dataset

To benchmark the ordering data access create unsorted transect dataset. 

In [None]:
if COMPUTE_UNSORTED_GCTS:
    # Load all transects into memory
    gcts_hrefs = gcts_extents.href.to_list()
    transects_ddf = dask_geopandas.read_parquet(gcts_hrefs)
    transects = transects_ddf.compute()

    # Do a random sortation of all data
    transects = transects.sample(frac=1)
    transects_ddf = dask_geopandas.from_geopandas(
        transects, npartitions=transects_ddf.npartitions
    )

    # Write the unsorted data in an equal amount of partitions to cloud storage
    for i, ddf in enumerate(transects_ddf.partitions):
        filename = f"part.{i}.parquet"
        outpath = f"{GCTS_UNSORTED_CONTAINER}/{filename}"
        df = ddf.compute()
        with fsspec.open(outpath, mode="wb", **storage_options) as f:
            df.to_parquet(f)

## Select a region of interest to do the query experiments

In [None]:
from ipyleaflet import Map, basemaps

m = Map(basemap=basemaps.Esri.WorldImagery, scroll_wheel_zoom=True)
m.center = 43.406241, -2.976665
m.zoom = 9
m.layout.height = "800px"
m

In [None]:
roi = shapely.geometry.box(m.west, m.south, m.east, m.north)
roi = gpd.GeoDataFrame(geometry=[roi], crs=4326)

## Retrieve data for the region of interest from the unsorted data container

## Function to benchmark different query methods

In [None]:
import time
import uuid


def benchmark_experiment(func, *args, n=3, **kwargs):
    """
    Benchmarks the given function, logging execution time and other metrics, automatically using the function's name as the experiment key.

    Args:
        func (callable): The function to benchmark.
        *args: Positional arguments to pass to the function.
        n (int): Number of times to run the experiment.
        **kwargs: Keyword arguments to pass to the function.

    Returns:
        A list of dictionaries containing the logged experiment results.
    """
    experiment_key = func.__name__  # Automatically get the function name
    results = []

    for i in range(n):
        start_time = time.time()
        df = func(*args, **kwargs)
        nrows, ncols = df.shape if df is not None else (None, None)
        end_time = time.time()

        wall_clock_time = end_time - start_time

        experiment_id = f"{experiment_key}_{uuid.uuid4().hex[:8]}"

        results.append(
            {
                "experiment": experiment_key,
                "experiment_id": experiment_id,
                "wall_clock_time": wall_clock_time,
                "nrows": nrows,
                "ncols": ncols,
            }
        )

    return results

## Different methods for accessing GCTS

In [None]:
def gpd_sjoin_from_gcts_as_gpkg():
    # Define the remote path
    remote_path = "az://experiments/gcts-2000m.gpkg"

    # Initialize fsspec filesystem
    fs = fsspec.filesystem("az", **storage_options)

    with tempfile.NamedTemporaryFile(suffix=".gpkg") as tmp:
        fs.get(remote_path, tmp.name)

        transects = gpd.read_file(tmp.name)
        transects = transects.sjoin(roi).drop(columns=["index_right"])

    return transects


def dask_sjoin_from_unsorted_gcts():
    fs = fsspec.filesystem("az", **storage_options)
    files = fs.glob(GCTS_UNSORTED_CONTAINER + "/*.parquet")
    transects_ddf = dask_geopandas.read_parquet(files, filesystem=fs)
    transects = transects_ddf.sjoin(roi).compute()

    return transects


def duckdb_sjoin_from_unsorted_gcts():
    con = duckdb.connect(database=":memory", read_only=False)
    con.execute("INSTALL azure;")
    con.execute("LOAD azure;")
    con.execute("INSTALL spatial;")
    con.execute("LOAD spatial;")

    try:
        con.execute(
            f"""
            CREATE SECRET secret1 (
            TYPE AZURE,
            CONNECTION_STRING '{os.getenv('CLIENT_AZURE_STORAGE_CONNECTION_STRING')}');
        """
        )
    except Exception as e:
        print(
            f"Likely the secret is already available. See exception message below:\n\n{e}"
        )

    roi_wkt = roi.geometry.to_wkt().iloc[0]

    con.execute(
        """CREATE OR REPLACE TABLE transects AS SELECT * FROM read_parquet('az://transects/gcts-2000m-unsorted.parquet/*.parquet');"""
    )
    df = con.execute(
        f"""SELECT * FROM transects WHERE ST_Intersects(ST_GeomFromWKB(transects.geometry), ST_GeomFromText('{roi_wkt}'))"""
    ).fetchdf()
    con.close()
    return df


def dask_sjoin_from_sorted_gcts():
    fs = fsspec.filesystem("az", **storage_options)
    files = fs.glob(GCTS_CONTAINER + "/minx*.parquet")
    transects_ddf = dask_geopandas.read_parquet(files, filesystem=fs)
    transects = transects_ddf.sjoin(roi).compute()
    return transects


def duckdb_sjoin_from_sorted_gcts():
    con = duckdb.connect(database=":memory", read_only=False)
    con.execute("INSTALL azure;")
    con.execute("INSTALL spatial;")
    con.execute("LOAD spatial;")
    con.execute("LOAD azure;")

    try:
        con.execute(
            f"""
            CREATE SECRET secret1 (
            TYPE AZURE,
            CONNECTION_STRING '{os.getenv('CLIENT_AZURE_STORAGE_CONNECTION_STRING')}');
        """
        )
    except Exception as e:
        print(
            f"Likely the secret is already available. See exception message below:\n\n{e}"
        )

    roi_wkt = roi.geometry.to_wkt().iloc[0]

    con.execute(
        """CREATE OR REPLACE TABLE transects AS SELECT * FROM read_parquet('az://transects/gcts-2000m.parquet/minx*.parquet');"""
    )
    df = con.execute(
        f"""SELECT * FROM transects WHERE ST_Intersects(ST_GeomFromWKB(transects.geometry), ST_GeomFromText('{roi_wkt}'))"""
    ).fetchdf()
    con.close()
    return df


def dask_sjoin_with_stac_filter():
    gcts_hrefs_roi = gpd.sjoin(gcts_extents, roi).href.to_list()
    transects_ddf = dask_geopandas.read_parquet(
        gcts_hrefs_roi, storage_options=storage_options
    )
    transects = transects_ddf.sjoin(roi).compute()
    return transects


def duckdb_predicate_pushdown():
    con = duckdb.connect(database=":memory", read_only=False)
    con.execute("INSTALL azure;")
    con.execute("LOAD azure;")

    try:
        con.execute(
            f"""
            CREATE SECRET secret1 (
            TYPE AZURE,
            CONNECTION_STRING '{os.getenv('CLIENT_AZURE_STORAGE_CONNECTION_STRING')}');
        """
        )
    except Exception as e:
        print(
            f"Likely the secret is already available. See exception message below:\n\n{e}"
        )

    query = f"""
            SELECT *
            FROM read_parquet('az://transects/gcts-2000m.parquet/minx*.parquet')
            WHERE
                bbox.minx <= {m.east} AND
                bbox.miny <= {m.north} AND
                bbox.maxx >= {m.west} AND
                bbox.maxy >= {m.south};
    """
    df = con.execute(query).fetchdf()
    con.close()
    return df


def duckdb_predicate_pushdown_with_stac_filter():
    duckdb_engine = STACQueryEngine(
        "https://coclico.blob.core.windows.net/stac/v1/catalog.json",
        "gcts-2000m",
        "azure",
    )
    transects = duckdb_engine.get_data_within_bbox(m.west, m.south, m.east, m.north)
    return transects

## Define the benchmark attributs

In [None]:
from dataclasses import dataclass
from typing import Optional


@dataclass
class MethodAttributes:
    method: str
    data_model: str
    data_format: str
    cloud_optimized: bool
    geospatially_sorted: bool
    stac_filter: bool


def make_method_attrs_df():
    COLUMN_ORDER = [
        "experiment",
        "data_format",
        "data_model",
        "cloud_optimized",
        "method",
        "geospatially_sorted",
        "stac_filter",
    ]

    attrs = {
        "gpd_sjoin_from_gcts_as_gpkg": MethodAttributes(
            method="Spatial join",
            data_model="GeoPandas",
            data_format="gpkg",
            cloud_optimized=False,
            geospatially_sorted=False,
            stac_filter=False,
        ),
        "dask_sjoin_from_unsorted_gcts": MethodAttributes(
            method="Spatial join",
            data_model="Dask GeoPandas",
            data_format="GeoParquet",
            cloud_optimized=True,
            geospatially_sorted=False,
            stac_filter=False,
        ),
        "duckdb_sjoin_from_unsorted_gcts": MethodAttributes(
            method="Spatial join",
            data_model="DuckDB",
            data_format="GeoParquet",
            cloud_optimized=True,
            geospatially_sorted=False,
            stac_filter=False,
        ),
        "dask_sjoin_from_sorted_gcts": MethodAttributes(
            method="Spatial join",
            data_model="Dask GeoPandas",
            data_format="GeoParquet",
            cloud_optimized=True,
            geospatially_sorted=True,
            stac_filter=False,
        ),
        "duckdb_sjoin_from_sorted_gcts": MethodAttributes(
            method="Spatial join",
            data_model="DuckDB",
            data_format="GeoParquet",
            cloud_optimized=True,
            geospatially_sorted=True,
            stac_filter=False,
        ),
        "dask_sjoin_with_stac_filter": MethodAttributes(
            method="Spatial join",
            data_model="Dask GeoPandas",
            data_format="GeoParquet",
            cloud_optimized=True,
            geospatially_sorted=True,
            stac_filter=True,
        ),
        "duckdb_predicate_pushdown": MethodAttributes(
            method="Predicate pushdown",
            data_model="DuckDB",
            data_format="GeoParquet",
            cloud_optimized=True,
            geospatially_sorted=True,
            stac_filter=False,
        ),
        "duckdb_predicate_pushdown_with_stac_filter": MethodAttributes(
            method="Predicate pushdown",
            data_model="DuckDB",
            data_format="GeoParquet",
            cloud_optimized=True,
            geospatially_sorted=True,
            stac_filter=True,
        ),
    }
    df = pd.DataFrame(
        [{"experiment": key, **vars(value)} for key, value in attrs.items()]
    )

    return df[COLUMN_ORDER]

## Run the benchmark

In [None]:
if RUN_QUERY_EXPERIMENT:
    N = 20
    results = []
    # NOTE, this time's out on MSPC, so test on local machine with more memory.
    # results.append(benchmark_experiment(gpd_sjoin_from_gcts_as_gpkg, n=2))
    results.append(benchmark_experiment(dask_sjoin_from_unsorted_gcts, n=N))
    results.append(benchmark_experiment(duckdb_sjoin_from_unsorted_gcts, n=N))
    results.append(benchmark_experiment(dask_sjoin_from_sorted_gcts, n=N))
    results.append(benchmark_experiment(duckdb_sjoin_from_sorted_gcts, n=N))
    results.append(benchmark_experiment(dask_sjoin_with_stac_filter, n=N))
    results.append(benchmark_experiment(duckdb_predicate_pushdown, n=N))
    results.append(
        benchmark_experiment(duckdb_predicate_pushdown_with_stac_filter, n=N)
    )

    # Save metrics and stats
    metrics = []
    for experiment in results:
        experiment = pd.DataFrame(experiment)
        metrics.append(experiment)
    metrics = pd.concat(metrics).reset_index(drop=True)

    with fsspec.open(QUERY_METRICS_FP, mode="wb", **storage_options) as f:
        metrics.to_parquet(f)

    stats = (
        metrics[["experiment", "wall_clock_time"]]
        .groupby("experiment")
        .describe()
        .droplevel(0, axis=1)
        .reset_index()
        .sort_values(by="mean", ascending=False)
        .reset_index(drop=True)
    )

    method_attrs_df = make_method_attrs_df()

    missing_descriptions = [
        e
        for e in stats.experiment.to_list()
        if e not in method_attrs_df.experiment.to_list()
    ]
    if any(missing_descriptions):
        raise ValueError(print(f"Missing descriptions in the method_attrs_df for  {e}"))

    stats = pd.merge(method_attrs_df, stats, on="experiment", how="left")

    with fsspec.open(QUERY_STATISTICS_FP, mode="w", **storage_options) as f:
        stats.to_csv(f, index=False)

## Load metrics

In [None]:
with fsspec.open(QUERY_METRICS_FP, mode="rb", **storage_options) as f:
    metrics = pd.read_parquet(f)

with fsspec.open(QUERY_STATISTICS_FP, mode="r", **storage_options) as f:
    stats = pd.read_csv(f)

## Make table with query times

In [None]:
def format_stats_df(df):
    df = (
        df.sort_values("mean", ascending=False)
        .assign(mean=lambda df: df["mean"].round(1))
        .assign(std=lambda df: df["std"].round(1))
        .drop(
            columns=[
                "experiment",
                "data_format",
                "count",
                "min",
                "max",
                "25%",
                "50%",
                "75%",
                "max",
            ]
        )
        .rename(
            columns={
                "data_model": "Data model",
                "method": "Query method",
                "cloud_optimized": "Cloud-optimized",
                "geospatially_sorted": "Spatial sort",
                "stac_filter": "STAC filter",
                "mean": "mean (s)",
                "std": "std (s)",
            }
        )
        .replace({True: "X", False: ""})
        .reset_index(drop=True)
    )
    return df[
        [
            "Data model",
            "Query method",
            "Cloud-optimized",
            "Spatial sort",
            "STAC filter",
            "mean (s)",
            "std (s)",
        ]
    ]


formatted_stats = format_stats_df(stats)

# with fsspec.open(QUERY_STATISTICS_FORMATTED_FP, mode="w", **storage_options) as f:
#     formatted_stats.to_csv(f, index=False)

formatted_stats

## Make boxplot

In [None]:
metrics

In [None]:
import holoviews as hv
import hvplot.pandas
from bokeh.io import export_svg

hv.extension("bokeh")


with fsspec.open(QUERY_METRICS_FP, mode="rb", **storage_options) as f:
    metrics = pd.read_parquet(f)

rename_experiment_mapper = {
    "gpd_sjoin_from_gcts_as_gpkg": "Expt. 1",
    "dask_sjoin_from_unsorted_gcts": "Expt. 3",
    "duckdb_sjoin_from_unsorted_gcts": "Expt. 5",
    "dask_sjoin_from_sorted_gcts": "Expt. 2",
    "duckdb_sjoin_from_sorted_gcts": "Expt. 4",
    "dask_sjoin_with_stac_filter": "Expt. 6",
    "duckdb_predicate_pushdown": "Expt. 7",
    "duckdb_predicate_pushdown_with_stac_filter": "Expt. 8",
}

metrics = metrics.assign(
    experiment=metrics.experiment.replace(rename_experiment_mapper)
).sort_values("experiment", ascending=False)

boxplot = metrics.hvplot(
    by="experiment",
    y="wall_clock_time",
    legend=False,
    kind="box",
    xlabel="",
    ylabel="Execution time (s)",
    loglog=True,
    invert=True,
    rot=0,
    grid=True,
    bgcolor="",
    # color=""
)


QUERY_METRICS_BOXPLOT_FP = (
    "az://figures/enabling-coastal-analytics/query-metrics-boxplot.svg"
)

renderer = hv.renderer("bokeh")
bokeh_figure = renderer.get_plot(boxplot).state

bokeh_figure.output_backend = "svg"
bokeh_figure.background_fill_color = None
bokeh_figure.border_fill_color = None

# outpath = (pathlib.Path("~") / QUERY_METRICS_BOXPLOT_FP.strip("az://")).expanduser()
# outpath.parent.mkdir(exist_ok=True, parents=True)

# with outpath.open(mode="w") as fp:
#     export_svg(bokeh_figure, filename=fp.name)
#     with fsspec.open(
#         QUERY_METRICS_BOXPLOT_FP, mode="wb", **storage_options
#     ) as cloud_file:
#         with open(fp.name, mode="rb") as fp_read:
#             cloud_file.write(fp_read.read())

In [None]:
boxplot