# Creating a STAC of Multi-sensor Topographic Data

- Ingest metadata from data and user input
- Create a STAC Item
  - Adding extensions
  - Adding assets
- Create more Items
- Build the Collection
- Update the Collection
- Write the Collection locally

In [147]:
import json
from datetime import datetime
from pathlib import Path
import pandas as pd
from datetime import datetime, timezone, timedelta
from dateutil import parser
import numpy as np

import pystac
from topo4d_ext import DataType, Topo4DExtension, CRSType, ProductMeta, TrafoMeta
from builder import extract_metadata_from_las, make_item_asset

from tqdm import tqdm
import os

from typing import Dict, Any

## Define dirs and paths

In [148]:
data_dir = "../demo/Isar/data/Isar_pointclouds"
aux_dir = None

## Extract metadata from las/laz file

In [149]:
file_paths = [f for f in Path(data_dir).glob("*") if f.suffix.lower() in [".las", ".laz"]]

for file_path in file_paths:
    print(f"Processing {file_path}")
    meta_las = extract_metadata_from_las(file_path, if_save=True)

Processing ..\demo\Isar\data\Isar_pointclouds\Isar_20240812_UPH_10cm.copc.laz
Processing ..\demo\Isar\data\Isar_pointclouds\Isar_20241105_UPH_10cm.copc.laz
Processing ..\demo\Isar\data\Isar_pointclouds\Isar_20250325_ULS_10cm.copc.laz


In [150]:
file_paths = [f for f in Path(data_dir).glob("*") if f.suffix.lower() in [".las", ".laz"]]

meta_las = extract_metadata_from_las(file_paths[0], if_save=True)
file_id = Path(file_paths[0]).stem.split(".")[0]

## Ingest metadata

In [151]:
meta_userinput = pd.read_csv(os.path.join(data_dir, "auxilary.csv"), index_col="id").to_dict(orient="index")

# merge meta_las and meta_userinput
metadata = meta_las
metadata.update(meta_userinput[file_id])

print(json.dumps(metadata, indent=2))

{
  "id": "Isar_20240812_UPH_10cm",
  "header": {
    "DEFAULT_POINT_FORMAT": "<PointFormat(3, 0 bytes of extra dims)>",
    "DEFAULT_VERSION": [
      1,
      2
    ],
    "are_points_compressed": true,
    "creation_date": "2025-05-05",
    "evlrs": "[<laspy.copc.CopcHierarchyVlr object at 0x00000251B52AF040>]",
    "extra_header_bytes": null,
    "extra_vlr_bytes": null,
    "file_source_id": 0,
    "generating_software": "PDAL 2.7.0 (d5d146)",
    "global_encoding": "<laspy.header.GlobalEncoding object at 0x00000251B5342A40>",
    "major_version": 1,
    "maxs": [
      674345.436,
      5266840.8687,
      910.46
    ],
    "minor_version": 4,
    "mins": [
      673496.386,
      5266235.7505,
      880.4851
    ],
    "number_of_evlrs": 1,
    "number_of_points_by_return": [
      23314488,
      0,
      0,
      0,
      0,
      0,
      0,
      0,
      0,
      0,
      0,
      0,
      0,
      0,
      0
    ],
    "offset_to_point_data": 1239,
    "offsets": [
      6

## Create a STAC Item from metadata

### Item `id`

In [152]:
def get_id(metadata: Dict[str, Any]) -> str:
    return metadata.get("id")

In [153]:
item_id = get_id(metadata)
item_id

'Isar_20240812_UPH_10cm'

### Item `datetime`

In [154]:
def get_datetime(metadata: Dict) -> datetime:
    dt_str = metadata.get("datetime")
    if not dt_str:
        raise ValueError("Missing 'datetime' in metadata")

    dt = parser.parse(dt_str)

    tz_str = metadata.get("timezone")
    if tz_str and tz_str.startswith("UTC"):
        sign = 1 if "+" in tz_str else -1
        offset_str = tz_str[3:].replace("+", "").replace("-", "")
        parts = offset_str.split(":")
        hours = int(parts[0]) if parts[0] else 0
        minutes = int(parts[1]) if len(parts) > 1 else 0
        offset = timedelta(hours=sign * hours, minutes=sign * minutes)
        tz = timezone(offset)
        dt = dt.replace(tzinfo=tz)
    elif tz_str and "/" in tz_str:
        tz = pytz.timezone(tz_str)
        dt = tz.localize(dt)
    elif dt.tzinfo is None:
        dt = dt.replace(tzinfo=timezone.utc)

    return dt

In [155]:
item_datetime = get_datetime(metadata)
item_datetime

datetime.datetime(2024, 8, 12, 0, 0, tzinfo=datetime.timezone(datetime.timedelta(seconds=3600)))

### Item `bbox`, `geometry` and `native_crs`

In [156]:
from shapely.geometry import box, mapping
from pyproj import CRS, Transformer

def get_geo(metadata: Dict[str, Any]) -> Dict[str, Any]:
    header = metadata.get("header")
    vlrs = metadata.get("vlrs", [])
    native_crs = metadata.get("native_crs", None)

    if not header or "mins" not in header or "maxs" not in header:
        raise ValueError("Invalid header format. 'mins' and 'maxs' are required.")

    xmin, ymin, zmin = header["mins"]
    xmax, ymax, zmax = header["maxs"]

    crs = None
    for vlr in vlrs:
        if vlr.get("user_id", "").lower() == "lasf_projection":
            if "wkt" in vlr and vlr["wkt"]:
                crs = CRS.from_wkt(vlr["wkt"])
                break
            elif "epsg_code" in vlr and vlr["epsg_code"]:
                crs = CRS.from_epsg(vlr["epsg_code"])
                break

    if not crs and native_crs:
        try:
            crs = CRS.from_user_input(native_crs)
        except Exception:
            crs = None

    bbox = [xmin, ymin, xmax, ymax]

    if crs and crs.to_epsg() != 4326:
        try:
            transformer = Transformer.from_crs(crs, CRS.from_epsg(4326), always_xy=True)

            min_lon, min_lat = transformer.transform(xmin, ymin)
            max_lon, max_lat = transformer.transform(xmax, ymax)

            # geom in WGS84
            bbox = [min_lon, min_lat, max_lon, max_lat]
            geom = mapping(box(min_lon, min_lat, max_lon, max_lat))
        except Exception:
            pass 
    else:
        # geom in native CRS
        geom = mapping(box(xmin, ymin, xmax, ymax))

    return {
        "bbox": bbox,
        "geometry": geom,
        "native_crs": crs.to_string() if crs else None
    }

In [157]:
item_geo = get_geo(metadata)
print(json.dumps(item_geo, indent=2))

{
  "bbox": [
    11.304866670991222,
    47.52637673872775,
    11.316376321635966,
    47.53158992234481
  ],
  "geometry": {
    "type": "Polygon",
    "coordinates": [
      [
        [
          11.316376321635966,
          47.52637673872775
        ],
        [
          11.316376321635966,
          47.53158992234481
        ],
        [
          11.304866670991222,
          47.53158992234481
        ],
        [
          11.304866670991222,
          47.52637673872775
        ],
        [
          11.316376321635966,
          47.52637673872775
        ]
      ]
    ]
  },
  "native_crs": "EPSG:25832"
}


### Create the `item`

In [158]:
item = pystac.Item(
    id=item_id,
    geometry=item_geo["geometry"],
    bbox=item_geo["bbox"],
    datetime=item_datetime.replace(tzinfo=None),
    properties={}
)

In [159]:
item.validate()

['https://schemas.stacspec.org/v1.1.0/item-spec/json-schema/item.json']

### Adding the Topo4D extension

In [160]:
topo4d_ext = Topo4DExtension.ext(item, add_if_missing=True)
print(f"Topo4D Extension added: {Topo4DExtension.has_extension(item)}")

Topo4D Extension added: True


In [161]:
topo4d_ext.tz = metadata.get("timezone", "UTC")
topo4d_ext.native_crs = item_geo.get("native_crs", None)

In [162]:
item.properties

{'datetime': '2024-08-12T00:00:00Z',
 'topo4d:tz': 'UTC+1',
 'topo4d:native_crs': 'EPSG:25832'}

### Adding assets

In [163]:
asset_name, item_asset = make_item_asset(asset_url=metadata.get("asset_url"), user_input=metadata)

item.add_asset(
    key=asset_name,
    asset=item_asset
)

item

## Create more items

In [164]:
def item_from_metadata(metadata: Dict[str, Any]) -> pystac.Item:
    item_id = get_id(metadata)
    item_datetime = get_datetime(metadata)
    item_geo = get_geo(metadata)

    item = pystac.Item(
        id=item_id,
        geometry=item_geo["geometry"],
        bbox=item_geo["bbox"],
        datetime=item_datetime.replace(tzinfo=None),
        properties={}
    )

    topo4d_ext = Topo4DExtension.ext(item, add_if_missing=True)
    topo4d_ext.tz = metadata.get("timezone", "UTC")
    topo4d_ext.native_crs = item_geo.get("native_crs", None)
    topo4d_ext.data_type = DataType(metadata.get("data_type", None))
    topo4d_ext.sensor = metadata.get("sensor", None)
    topo4d_ext.acquisition_date = metadata.get("acquisition_date", None)
    topo4d_ext.orientation = metadata.get("orientation", None)
    topo4d_ext.spatial_resolution = metadata.get("spatial_resolution", None)
    topo4d_ext.measurement_error = metadata.get("measurement_error", None)


    asset_name, item_asset = make_item_asset(asset_url=metadata.get("asset_url"), user_input=metadata)

    item.add_asset(
        key=asset_name,
        asset=item_asset
    )

    return item

In [165]:
meta_userinput = pd.read_csv(os.path.join(data_dir, "auxilary.csv"), index_col="id").to_dict(orient="index")

item_list = []
metadata_list = []

for file_path in tqdm(file_paths, desc="Processing files"):
    meta_las = extract_metadata_from_las(file_path, if_save=True)
    file_id = Path(file_path).stem.split(".")[0]

    if file_id not in meta_userinput:
        print(f"Skipping {file_id} as it is not in user input metadata")
        continue

    metadata = meta_las
    metadata.update(meta_userinput[file_id])

    metadata_list.append(metadata)

    item = item_from_metadata(metadata)
    # item.validate() # Uncomment when the Schema url is public

    item.set_self_href(f"{data_dir}/{item.id}.json")
    item_list.append(item)

Processing files: 100%|██████████| 3/3 [00:00<00:00, 269.98it/s]


In [166]:
item_list[0].get_self_href()

'c:/Users/jiapan/02_projects/2025_4DWORKS/4D-WORKS/demo/Isar/data/Isar_pointclouds/Isar_20240812_UPH_10cm.json'

### Adding TrafoMeta

In [167]:
trafo_paths = [f for f in Path(data_dir).glob("trafo*.txt")]
ref_id = "Isar_20241105_UPH_10cm"
# find the match id in item_list
if ref_id in [item.id for item in item_list]:
    reference_epoch = [item for item in item_list if item.id == ref_id][0]

    reference_epoch_link = pystac.Link(
        rel=reference_epoch.id,
        target=reference_epoch.get_self_href(),
        title="Reference Epoch",
        media_type=pystac.MediaType.JSON)

    for idx, trafo_path in enumerate(trafo_paths):
        with open(trafo_path, "r") as f:
            # read trafo meta from txt file as array
            trafo_mat = [np.array(line.strip().split(), dtype=float).tolist() for line in f if line.strip()]

        trafo_meta = TrafoMeta.create(
            reference_epoch=reference_epoch_link.to_dict(),
            transformation=trafo_mat,
            )

        item = item_list[idx]
        Topo4DExtension.ext(item, add_if_missing=True).trafometa = trafo_meta

### Adding ProductMeta

In [168]:
for idx, meta in enumerate(metadata_list):
    item = item_list[idx]
    if "product_name" in meta:
        product_meta = ProductMeta.create(
            product_name=meta["product_name"],
            lastupdate=datetime.strptime(meta['header'].get("creation_date", None), "%Y-%m-%d").isoformat(),
            param={"param": meta.get("param", None)},
            derived_from=meta.get("derived_from", None),
            product_level=meta.get("product_level", None),
        )
        Topo4DExtension.ext(item, add_if_missing=True).productmeta = product_meta

## Create Collection

### Collection `id`

In [169]:
collection_id = f"Isar_pointclouds_collection"
collection_id

'Isar_pointclouds_collection'

### Collection `title`

In [170]:
collection_title = "Isar Point Clouds Collection"
collection_title

'Isar Point Clouds Collection'

### Collection `description`

In [171]:
collection_desc = f'''### {collection_title}

A collection of point cloud from Isar riverbank, near Wallgau, Germany.
'''
print(collection_desc)

### Isar Point Clouds Collection

A collection of point cloud from Isar riverbank, near Wallgau, Germany.



### Collection `license`

In [172]:
collection_license = "CC-BY-4.0"
collection_license

'CC-BY-4.0'

### Collection `provider`

In [173]:
collection_providers = [
    pystac.Provider(
        name="TUM Remote Sensing Applications",
        roles=[
            pystac.ProviderRole.PROCESSOR,
            pystac.ProviderRole.PRODUCER,
            pystac.ProviderRole.LICENSOR,
            pystac.ProviderRole.HOST
            ],
        url="https://www.asg.ed.tum.de/rsa/startseite/",
    ),
]

### Collection `extend`

In [174]:
spatial_extent = pystac.SpatialExtent([[-180.0, -90.0, 180.0, 90.0]])
temporal_extent = pystac.TemporalExtent([[datetime(2013, 6, 1), None]])
collection_extent = pystac.Extent(spatial_extent, temporal_extent)

In [175]:
collection = pystac.Collection(
    id=collection_id,
    title=collection_title,
    description=collection_desc,
    extent=collection_extent,
    license=collection_license,
    providers=collection_providers,
)

In [176]:
collection.to_dict()

{'type': 'Collection',
 'id': 'Isar_pointclouds_collection',
 'stac_version': '1.1.0',
 'description': '### Isar Point Clouds Collection\n\nA collection of point cloud from Isar riverbank, near Wallgau, Germany.\n',
 'links': [],
 'title': 'Isar Point Clouds Collection',
 'extent': {'spatial': {'bbox': [[-180.0, -90.0, 180.0, 90.0]]},
  'temporal': {'interval': [['2013-06-01T00:00:00Z', None]]}},
 'license': 'CC-BY-4.0',
 'providers': [{'name': 'TUM Remote Sensing Applications',
   'roles': ['processor', 'producer', 'licensor', 'host'],
   'url': 'https://www.asg.ed.tum.de/rsa/startseite/'}]}

In [177]:
# add items to the collection
for item in item_list:
    collection.add_item(item)

### Collection `summaries`

In [178]:
# add collection summaries
collection.summaries.add("num_items", {"count": len(item_list)})
collection.summaries.add("timestamp_list", {"list": [item.datetime.isoformat() for item in item_list]})
collection.summaries.add("temporal_resolution", {"resolution": "half-annual"})

In [179]:
collection.update_extent_from_items()
collection.extent.to_dict()
collection.to_dict()

{'type': 'Collection',
 'id': 'Isar_pointclouds_collection',
 'stac_version': '1.1.0',
 'description': '### Isar Point Clouds Collection\n\nA collection of point cloud from Isar riverbank, near Wallgau, Germany.\n',
 'links': [{'rel': 'item',
   'href': 'c:/Users/jiapan/02_projects/2025_4DWORKS/4D-WORKS/demo/Isar/data/Isar_pointclouds/Isar_20240812_UPH_10cm.json',
   'type': 'application/geo+json'},
  {'rel': 'item',
   'href': 'c:/Users/jiapan/02_projects/2025_4DWORKS/4D-WORKS/demo/Isar/data/Isar_pointclouds/Isar_20241105_UPH_10cm.json',
   'type': 'application/geo+json'},
  {'rel': 'item',
   'href': 'c:/Users/jiapan/02_projects/2025_4DWORKS/4D-WORKS/demo/Isar/data/Isar_pointclouds/Isar_20250325_ULS_10cm.json',
   'type': 'application/geo+json'}],
 'title': 'Isar Point Clouds Collection',
 'extent': {'spatial': {'bbox': [[11.304168016412058,
     47.52600520593624,
     11.3172511501222,
     47.53281392856261]]},
  'temporal': {'interval': [['2024-08-12T00:00:00Z',
     '2025-03-25T

### Save STAC

In [180]:
from pathlib import Path

root_path = str(Path(f"{data_dir}"))
print(f"Root path for collection: {root_path}")

from pystac.layout import TemplateLayoutStrategy

# Set up a flatten layout strategy
strategy = TemplateLayoutStrategy(
    item_template="${id}.json"
)

collection.normalize_hrefs(root_path, strategy=strategy)

Root path for collection: ..\demo\Isar\data\Isar_pointclouds


In [181]:
# collection.validate_all() # Uncomment when the Schema url is public

In [182]:
collection.save(pystac.CatalogType.SELF_CONTAINED)

In [183]:
collection.describe()

* <Collection id=Isar_pointclouds_collection>
  * <Item id=Isar_20240812_UPH_10cm>
  * <Item id=Isar_20241105_UPH_10cm>
  * <Item id=Isar_20250325_ULS_10cm>
