In [5]:
import os
import pathlib
import sys

# make coclico library importable by appending src from project root to path
cwd = pathlib.Path().resolve()
PROJ_ROOT = os.path.dirname(cwd)

sys.path.append(os.path.join(PROJ_ROOT))

In [20]:
import datetime
import getpass
import uuid
from io import BytesIO

import azure.storage.blob
import dask
import dask_geopandas
import geopandas as gpd
import pandas as pd
import planetary_computer
import pyproj
import pystac_client
import shapely
import pystac
from geopandas.array import GeometryDtype
from pyproj import Transformer
from shapely.ops import transform

from etl import p_drive, rel_root
from etl.keys import load_google_credentials
from stac.blueprint import IO, Layout, get_template_collection

In [7]:
DATA_DIR = pathlib.Path.home() / "data"
SRC_DIR = DATA_DIR / "src"
TMP_DIR = DATA_DIR / "tmp"
PRC_DIR = DATA_DIR / "prc"

In [16]:
ACCOUNT_URL = "https://coclico.blob.core.windows.net/"
CONTAINER = "coastlines-osm"

# doesn't work, the data contributor permission need to be added, so auth with sas token
# as workaround.
# default_credential = DefaultAzureCredential()

sas_token = getpass.getpass()  # prompts for the sas_token
container_client = azure.storage.blob.ContainerClient(
    ACCOUNT_URL,
    container_name=CONTAINER,
    credential=sas_token,
)

 ········


## load local data

In [9]:
s2_grid_fp = SRC_DIR / "grid" / "s2_tiles.gpkg"
s2_grid = gpd.read_file(s2_grid_fp)

s2_grid = s2_grid[["Name", "geometry"]]

In [10]:
def sentinel2_tile_to_epsg(name):
    """
    Each S-2 image is a “granule” which has a file name which includes the so-called MGRS tile code.
    MGRS tiles are in UTM projection, which means EPSG=326NN in the Northern Hemisphere, and 327NN in the Southern.
    The actual NN zone is given by the first 2 digits in the MGRS tile name. The first character will give you the
    hemisphere (C-M is South, N to X is North). For some unknown reason, some rocket scientist decided to put a ‘T’
    in front of the tile name.
    """

    zone = name[:3]
    code = zone[:-1]

    northern_hemisphere = zone[-1] >= "N"

    if northern_hemisphere:
        return int(f"326{code}")
    else:
        return int(f"327{code}")


s2_grid["EPSG"] = s2_grid.apply(lambda x: sentinel2_tile_to_epsg(x.Name), axis=1)

In [11]:
coastlines_fp = SRC_DIR / "coastlines_osm_20220901" / "lines.shp"
coastlines_ddf = dask_geopandas.read_file(coastlines_fp, npartitions=10)
coastlines_ddf = coastlines_ddf.to_crs(3857)
coastlines_ddf = coastlines_ddf.sample(frac=0.01)

In [12]:
s2_grid = s2_grid.to_crs(coastlines_ddf.crs)
delayed_s2_grid = dask.delayed(s2_grid)

In [13]:
meta = pd.DataFrame(
    {
        "FID": pd.Series(dtype="i8"),
        "tile": pd.Series(dtype=str),
        "epsg": pd.Series(dtype="i8"),
        "geometry": pd.Series(dtype=GeometryDtype),
        "length": pd.Series(dtype="f8"),
        "fid_length": pd.Series(dtype="f8"),
    }
)


def transform_geometry(geom, src_crs, dst_crs):
    tf = Transformer.from_crs(src_crs, dst_crs, always_xy=True)
    return transform(tf.transform, geom)


def partitions_to_utm_zone(df, grid):
    df = df.copy()

    if df.empty:
        return meta

    df = gpd.overlay(df, grid)
    df["geometry"] = df.apply(
        lambda x: transform_geometry(x.geometry, df.crs, x.EPSG), axis=1
    )
    df = pd.DataFrame(df).rename({"Name": "tile", "EPSG": "epsg"}, axis=1)
    df["length"] = df.geometry.map(lambda x: x.length)
    lengths = df[["length", "FID"]].groupby("FID")["length"].sum().rename("fid_length")
    df = pd.merge(df, lengths, left_on="FID", right_on="FID")
    return df


coastlines_ddf = coastlines_ddf.map_partitions(
    lambda x: partitions_to_utm_zone(x, s2_grid), meta=meta
)

In [14]:
def write_partition_to_blob(gr, df):
    if df.empty:
        return

    df = gpd.GeoDataFrame(df).set_crs(gr, allow_override=True)
    blob_client = container_client.get_blob_client(f"{gr}.parquet")
    parquet_file = BytesIO()
    df.to_parquet(parquet_file, engine="pyarrow")
    parquet_file.seek(
        0
    )  # change the stream position back to the beginning after writing

    blob_client.upload_blob(data=parquet_file)


coastlines_ddf.groupby("epsg").apply(
    lambda x: write_partition_to_blob(x.name, x), meta=pd.DataFrame()
).compute()

## Build STAC

In [26]:
# hard-coded input params which differ per dataset
COLLECTION_ID = "cl-osm"  # name of stac collection
COLLECTION_TITLE = "Coastlines Open Street Map"
STAC_DIR = "current"
TEMPLATE_COLLECTION = "template"  # stac template for dataset collection
DATASET_DESCRIPTION = (
    "Coastlines Open Street Map"
)

In [17]:
blob_list = list(container_client.list_blobs())

In [18]:
url = "https://coclico.blob.core.windows.net/coastlines-osm/32601.parquet"
data = dask_geopandas.read_parquet(url)

In [21]:
geom = data.geometry.unary_union.compute()
template = pystac.Item(
    "coastlines-osm",
    geometry=shapely.geometry.mapping(geom),
    bbox=geom.bounds,
    datetime=datetime.datetime(2022, 9, 1),
    properties={},
)

  meta = BaseGeometry()


In [43]:
"""
Generate STAC Collections for tabular datasets.
"""
__version__ = "1.0.0"
import copy
import enum
from typing import TypeVar

import dask
import dask_geopandas
import fsspec
import pandas as pd
import pyarrow
import pyarrow.parquet
import pystac
import shapely.geometry

T = TypeVar("T", pystac.Collection, pystac.Item)
SCHEMA_URI = "https://stac-extensions.github.io/table/v1.2.0/schema.json"
# https://issues.apache.org/jira/browse/PARQUET-1889: parquet doesn't officially
# have a type yet.
PARQUET_MEDIA_TYPE = "application/x-parquet"


class InferDatetimeOptions(str, enum.Enum):
    no = "no"
    midpoint = "midpoint"
    unique = "unique"
    range = "range"


def generate_item(
    uri: str,
    template,
    infer_bbox=None,
    infer_geometry=False,
    datetime_column=None,
    infer_datetime=InferDatetimeOptions.no,
    count_rows=True,
    asset_key="data",
    asset_extra_fields=None,
    proj=True,
    storage_options=None,
    validate=True,
) -> T:
    """
    Generate a STAC Item from a Parquet Dataset.
    Parameters
    ----------
    uri : str
        The fsspec-compatible URI pointing to the input table to generate a
        STAC item for.
    template : pystac.Item
        The template item. This will be cloned and new data will be filled in.
    infer_bbox : str, optional
        The column name to use setting the Item's bounding box.
        .. note::
           If the dataset doesn't provide spatial partitions, this will
           require computation.
    infer_geometry: bool, optional
        Whether to fill the item's `geometry` field with the union of the
        geometries in the `infer_bbox` column.
    datetime_column: str, optional
        The column name to use when setting the Item's `datetime` or
        `start_datetime` and `end_datetime` properties. The method used is
        determined by `infer_datetime`.
    infer_datetime: str, optional.
        The method used to find a datetime from the values in `datetime_column`.
        Use the options in the `InferDatetimeOptions` enum.
        - no : do not infer a datetime
        - midpoint : Set `datetime` to the midpoint of the highest and lowest values.
        - unique : Set `datetime` to the unique value. Raises if more than one
          unique value is found.
        - range : Set `start_datetime` and `end_datetime` to the minimum and
          maximum values.
    count_rows : bool, default True
        Whether to add the row count to `table:row_count`.
    asset_key : str, default "data"
        The asset key to use for the parquet dataset. The href will be the ``uri`` and
        the roles will be ``["data"]``.
    asset_extra_fields : dict, optional
        Additional fields to set in the asset's ``extra_fields``.
    proj : bool or dict, default True
        Whether to extract projection information from the dataset and store it
        using the `projection` extension.
        By default, just `proj:crs` is extracted. If `infer_bbox` or `infer_geometry`
        are specified, those will be set as well.
        Alternatively, provide a dict of values to include.
    storage_options: mapping, optional
        A dictionary of keywords to provide to :meth:`fsspec.get_fs_token_paths`
        when creating an fsspec filesystem with a str ``ds``.
    validate : bool, default True
        Whether to validate the returned pystac.Item.
    Returns
    -------
    pystac.Item
        The updated pystac.Item with the following fields set
        * stac_extensions : added `table` extension
        * table:columns
    Examples
    --------
    This example generates a STAC item based on the "naturalearth_lowres" datset
    from geopandas. There's a bit of setup.
    >>> import datetime, geopandas, pystac, stac_table
    >>> gdf = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))
    >>> gdf.to_parquet("data.parquet")
    Now we can create the item.
    >>> # Create the template Item
    >>> item = pystac.Item(
    ...     "naturalearth_lowres",
    ...     geometry=None,
    ...     bbox=None,
    ...     datetime=datetime.datetime(2021, 1, 1),
    ...     properties={},
    ... )
    >>> result = stac_table.generate("data.parquet", item)
    >>> result
    <Item id=naturalearth_lowres>
    """
    template = copy.deepcopy(template)

    data = None
    storage_options = storage_options or {}
    # data = dask_geopandas.read_parquet(
    #     ds, storage_options=storage_options
    # )
    ds = parquet_dataset_from_url(uri, storage_options)

    if (
        infer_bbox
        or infer_geometry
        or infer_datetime != InferDatetimeOptions.no
        or proj is True
    ):
        data = dask_geopandas.read_parquet(uri, storage_options=storage_options)

    #     # TODO: this doesn't actually work
    #     data = dask_geopandas.read_parquet(
    #         ds.files, storage_options={"filesystem": ds.filesystem}
    #     )

    name = pathlib.Path(url).stem
    template.id = name
    columns = get_columns(ds)
    template.properties["table:columns"] = columns

    if proj is True:
        proj = get_proj(data)
    proj = proj or {}

    # TODO: Add schema when published
    if SCHEMA_URI not in template.stac_extensions:
        template.stac_extensions.append(SCHEMA_URI)
    if proj and pystac.extensions.projection.SCHEMA_URI not in template.stac_extensions:
        template.stac_extensions.append(pystac.extensions.projection.SCHEMA_URI)

    extra_proj = {}
    if infer_bbox:
        # TODO: may need to convert to epsg:4326
        if not data.crs == pyproj.CRS.from_epsg(4326):
            data = data.to_crs(4326)

        if data.spatial_partitions:
            bbox = data.spatial_partitions.unary_union.bounds
        else:
            bbox = data.geometry.unary_union.compute().bounds

        extra_proj["proj:bbox"] = bbox
        template.bbox = bbox

    if infer_geometry:
        # TODO: may need to convert to epsg:4326
        if not data.crs == pyproj.CRS.from_epsg(4326):
            data = data.to_crs(4326)

        geometry = shapely.geometry.mapping(data.unary_union.compute())
        extra_proj["proj:geometry"] = geometry
        template.geometry = geometry

    if infer_bbox and template.geometry is None:
        # If bbox is set then geometry must be set as well.
        template.geometry = shapely.geometry.mapping(
            shapely.geometry.box(*template.bbox)
        )

    if infer_geometry and template.bbox is None:
        template.bbox = shapely.geometry.shape(template.geometry).bounds

    if proj or extra_proj:
        template.properties.update(**extra_proj, **proj)

    if infer_datetime != InferDatetimeOptions.no and datetime_column is None:
        raise ValueError("Must specify 'datetime_column' when 'infer_datetime != no'.")

    if infer_datetime == InferDatetimeOptions.midpoint:
        values = dask.compute(data[datetime_column].min(), data[datetime_column].max())
        template.properties["datetime"] = pd.Series(values).mean().to_pydatetime()

    if infer_datetime == InferDatetimeOptions.unique:
        values = data[datetime_column].unique().compute()
        n = len(values)
        if n > 1:
            raise ValueError(f"infer_datetime='unique', but {n} unique values found.")
        template.properties["datetime"] = values[0].to_pydatetime()

    if infer_datetime == InferDatetimeOptions.range:
        values = dask.compute(data[datetime_column].min(), data[datetime_column].max())
        values = list(pd.Series(values).dt.to_pydatetime())
        template.properties["start_datetime"] = values[0].isoformat() + "Z"
        template.properties["end_datetime"] = values[1].isoformat() + "Z"

    if count_rows:
        template.properties["table:row_count"] = sum(
            x.count_rows() for x in ds.fragments
        )

    if asset_key:
        asset = pystac.asset.Asset(
            uri,
            title="Dataset root",
            media_type=PARQUET_MEDIA_TYPE,
            roles=["data"],
            # extra_fields={"table:storage_options": asset_extra_fields},
            extra_fields=asset_extra_fields,
        )
        template.add_asset(asset_key, asset)

    if validate:
        template.validate()

    return template


def get_proj(ds):
    """
    Read projection information from the dataset.
    """
    # Use geopandas to get the proj info
    proj = {}
    maybe_crs = ds.geometry.crs
    if maybe_crs:
        maybe_epsg = ds.geometry.crs.to_epsg()
        if maybe_epsg:
            proj["proj:epsg"] = maybe_epsg
        else:
            proj["proj:wkt2"] = ds.geometry.crs.to_wkt()

    return proj


def get_columns(ds: pyarrow.parquet.ParquetDataset) -> list:
    columns = []
    fragment = ds.fragments[0]

    for field, col in zip(ds.schema, fragment.metadata.schema):
        if field.name == "__null_dask_index__":
            continue

        column = {"name": field.name, "type": col.physical_type.lower()}
        if field.metadata is not None:
            column["metadata"] = field.metadata
        columns.append(column)
    return columns


def parquet_dataset_from_url(url: str, storage_options):
    fs, _, _ = fsspec.get_fs_token_paths(url, storage_options=storage_options)
    pa_fs = pyarrow.fs.PyFileSystem(pyarrow.fs.FSSpecHandler(fs))
    url2 = url.split("://", 1)[-1]  # pyarrow doesn't auto-strip the prefix.
    ds = pyarrow.parquet.ParquetDataset(url, filesystem=pa_fs, use_legacy_dataset=False)
    return ds

In [44]:
items = []
for b in blob_list[:3]:
    url = ACCOUNT_URL + CONTAINER + "/" + b["name"]
    item = generate_item(
        uri=url,
        template=template,
        infer_bbox=False,
        infer_geometry=False,
        datetime_column=None,
        infer_datetime=InferDatetimeOptions.no,
        count_rows=True,
        asset_key="data",
        asset_extra_fields=None,
        proj=False,
        storage_options=None,
        validate=True,
    )
    items.append(item)

### Template collection

In [45]:
catalog = pystac.Catalog.from_file(os.path.join(rel_root, STAC_DIR, "catalog.json"))

layout = Layout()
template_fp = os.path.join(
    rel_root, STAC_DIR, TEMPLATE_COLLECTION, "collection.json"
)

collection = get_template_collection(template_fp, COLLECTION_ID, COLLECTION_TITLE, DATASET_DESCRIPTION)

## Build STAC collection

In [48]:
times = [i.datetime for i in items]
collection.extent = pystac.Extent(
        spatial=pystac.collection.SpatialExtent([i.bbox for i in items]),
        temporal=pystac.collection.TemporalExtent([times]),
    )

pystac.extensions.item_assets.ItemAssetsExtension.add_to(collection)
collection.stac_extensions.append(SCHEMA_URI)
collection.extra_fields["item_assets"] = {
    "data": {
        "type": PARQUET_MEDIA_TYPE,
        "title": "Dataset root",
        "roles": ["data"],
        # **asset_extra_fields,
    }
}

collection.keywords = [
    "Coastlines",
    "OpenStreetMap",
    "Coast",
]

collection.extra_fields["coclico:short_description"] = "OpenStreetMap coastlines"
collection.extra_fields["coclico:container"] = "coastlines-osm"
collection.extra_fields["coclico:storage_account"] = "coclico"
collection.providers = [
    pystac.Provider(
        "Open Street Map",
        roles=[
            pystac.provider.ProviderRole.PRODUCER,
            pystac.provider.ProviderRole.LICENSOR,
            pystac.provider.ProviderRole.PROCESSOR,
        ],
        url="https://wiki.openstreetmap.org/wiki/Coastline",
    ),
    pystac.Provider(
        "Deltares",
        roles=[pystac.provider.ProviderRole.PROCESSOR],
        url="https://deltares.nl",
    ),
    pystac.Provider(
        "Deltares",
        roles=[pystac.provider.ProviderRole.HOST],
        url="https://deltares.nl",
    ),
]
collection.links = [
    pystac.Link(
        pystac.RelType.LICENSE,
        target="https://www.census.gov/about/policies/open-gov/open-data.html",
        media_type="text/html",
        title="Terms of use",
    )
]

collection.summaries = pystac.Summaries({})

for item in items:
    collection.add_item(item)
    

# add collection to catalog
catalog.add_child(collection)

# normalize the paths
collection.normalize_hrefs(
    os.path.join(rel_root, STAC_DIR, COLLECTION_ID), strategy=layout
)

collection.validate()

# save updated catalog
catalog.save(
    catalog_type=pystac.CatalogType.SELF_CONTAINED,
    dest_href=os.path.join(rel_root, STAC_DIR),
    stac_io=IO(),
)

In [49]:
collection

0
ID: cl-osm
Title: Coastlines Open Street Map
Description: Coastlines Open Street Map
"Providers:  Open Street Map (producer, licensor, processor)  Deltares (processor)  Deltares (host)"
type: Collection
stac_extensions: ['https://raw.githubusercontent.com/openearth/coclicodata/feat/update-deltares-stac-properties/json-schema/schema.json']
title: template
keywords: ['CoCliCo']
"providers: [{'name': 'Deltares', 'description': 'Deltares is an independent institute for applied research in the field of water and subsurface.', 'roles': ['producer', 'processor'], 'url': 'https://www.deltares.nl'}]"
assets: {}

0
https://raw.githubusercontent.com/openearth/coclicodata/feat/update-deltares-stac-properties/json-schema/schema.json
https://stac-extensions.github.io/item-assets/v1.0.0/schema.json
https://stac-extensions.github.io/table/v1.2.0/schema.json

0
ID: 32601
"Bounding Box: (406091.58614345954, 3122008.9327252316, 694452.8184753559, 7943864.822675268)"
Datetime: 2022-09-01 00:00:00
"table:columns: [{'name': 'FID', 'type': 'int64'}, {'name': 'tile', 'type': 'byte_array'}, {'name': 'epsg', 'type': 'int64'}, {'name': 'geometry', 'type': 'byte_array'}, {'name': 'length', 'type': 'double'}, {'name': 'fid_length', 'type': 'double'}, {'name': '__index_level_0__', 'type': 'int64'}]"
table:row_count: 12
datetime: 2022-09-01T00:00:00Z

0
https://stac-extensions.github.io/table/v1.2.0/schema.json

0
href: https://coclico.blob.core.windows.net/coastlines-osm/32601.parquet
Title: Dataset root
Media type: application/x-parquet
Roles: ['data']
Owner:

0
Rel: collection
Target:
Media Type: application/json

0
Rel: root
Target:
Media Type: application/json

0
Rel: self
Target: /Users/calkoen/dev/coclicodata/current/cl-osm/32601/32601.json
Media Type: application/json

0
Rel: parent
Target:
Media Type: application/json

0
Rel: license
Target: https://www.census.gov/about/policies/open-gov/open-data.html
Media Type: text/html

0
Rel: item
Target:
Media Type: application/json

0
Rel: item
Target:
Media Type: application/json

0
Rel: item
Target:
Media Type: application/json

0
Rel: root
Target:
Media Type: application/json

0
Rel: parent
Target:
Media Type: application/json

0
Rel: self
Target: /Users/calkoen/dev/coclicodata/current/cl-osm/collection.json
Media Type: application/json
