# GeoNorge CSW til STAC Catalog – Eksperiment

Dette prosjektet demonstrerer hvordan metadatakatalogen fra GeoNorge (CSW) kan konverteres til en STAC-katalog, både som JSON og GeoParquet. Målet er å gjøre norske geodata enklere tilgjengelig og mer brukervennlig for både mennesker og maskiner.

## Teknisk gjennomgang

Notebooken `iso2stac_experiment.ipynb` viser hele prosessen:
- Henter metadata fra GeoNorge CSW med OWSLib
- Mapper metadata til STAC-format med dynamisk utvidelse av egenskaper
- Oppretter STAC-katalog og collections basert på publisher
- Eksporterer katalogen til både JSON og GeoParquet
- Viser hvordan resultatene kan visualiseres og analyseres


In [36]:
%%capture
# Install required libraries
%pip install OWSLib pystac stac-geoparquet pyarrow

Setup csw url and output path

Connect to csw using owslib

In [None]:
from owslib.csw import CatalogueServiceWeb
from pystac import Catalog, Collection, Item, Asset, Extent, SpatialExtent, TemporalExtent, CatalogType
from datetime import datetime, timezone
import re
import os

# Configuration
CSW_URL = 'https://www.geonorge.no/geonetworktest/srv/eng/csw'

### Windows file paths - change to linux style if needed
STAC_DIR = '.\\output\\stac_output_dynamic'

# Connect to CSW
print(f"Connecting to {CSW_URL}...")
csw = CatalogueServiceWeb(CSW_URL)
print(f"Connected to: {csw.identification.title}")
print(f"Service Type: {csw.identification.type}")
print(f"Service Version: {csw.identification.version}")
print(f"Abstract: {csw.identification.abstract}")

Connecting to https://www.geonorge.no/geonetworktest/srv/eng/csw...
Connected to: Geonetwork - Test
Service Type: CSW
Service Version: 2.0.2
Abstract: None


### Helper functions. 
Defines helper functions for bbox and type determination. Type determination defines the collection splits for the actual STAC catalog. 

In [38]:
# Helper Functions

def get_bbox(record):
    """Extract bbox from CSW record, handling CRS and axis order."""
    import xml.etree.ElementTree as ET
    
    # Default global bbox
    default_bbox = [-180.0, -90.0, 180.0, 90.0]
    
    try:
        # 1. Try to parse raw XML for ows:BoundingBox to handle axis order (Lat/Lon vs Lon/Lat)
        if hasattr(record, 'xml') and record.xml:
            # Parse XML
            root = ET.fromstring(record.xml)
            namespaces = {
                'ows': 'http://www.opengis.net/ows',
                'csw': 'http://www.opengis.net/cat/csw/2.0.2'
            }
            
            bbox_elem = root.find('.//ows:BoundingBox', namespaces)
            if bbox_elem is not None:
                lower_corner = bbox_elem.find('ows:LowerCorner', namespaces)
                upper_corner = bbox_elem.find('ows:UpperCorner', namespaces)
                
                if lower_corner is not None and upper_corner is not None:
                    lc_coords = [float(x) for x in lower_corner.text.split()]
                    uc_coords = [float(x) for x in upper_corner.text.split()]
                    
                    crs = bbox_elem.get('crs', '')
                    
                    # Handle Axis Order
                    # EPSG:4326 is typically Lat/Lon in OGC services
                    # Heuristic: If first coordinate is > 45 (likely Lat for Norway) and second is < 45 (Lon)
                    if '4326' in crs or (lc_coords[0] > lc_coords[1] and lc_coords[0] > 30):
                        # Swap from Lat/Lon to Lon/Lat (STAC uses Lon/Lat)
                        miny, minx = lc_coords[0], lc_coords[1]
                        maxy, maxx = uc_coords[0], uc_coords[1]
                    else:
                        minx, miny = lc_coords[0], lc_coords[1]
                        maxx, maxy = uc_coords[0], uc_coords[1]
                        
                    return [minx, miny, maxx, maxy]

        # 2. Fallback to OWSLib parsed bbox
        if record.bbox:
            # OWSLib usually returns (minx, miny, maxx, maxy)
            # But verify if it looks swapped
            b = [float(record.bbox.minx), float(record.bbox.miny), float(record.bbox.maxx), float(record.bbox.maxy)]
            # Apply same heuristic
            if b[0] > b[1] and b[0] > 30:
                 return [b[1], b[0], b[3], b[2]]
            return b

    except Exception as e:
        print(f"Warning: Could not parse bbox for {record.identifier}: {e}")

    return default_bbox

## THEMATIC COLLECTION DETERMINATION BASED ON PUBLISHER
## Could be extended to use keywords or other metadata
## this is essential for the structure of the STAC catalog
def determine_type(record):
    """Determine collection based on publisher."""
    publisher = 'unknown-publisher'
    
    # OWSLib maps dc:publisher to record.publisher
    if hasattr(record, 'publisher') and record.publisher:
        if isinstance(record.publisher, list):
             if len(record.publisher) > 0:
                 publisher = record.publisher[0]
        else:
            publisher = record.publisher
            
    # Sanitize for use in ID
    clean_publisher = re.sub(r'[^a-zA-Z0-9-_]', '_', str(publisher).lower())
    clean_publisher = re.sub(r'_+', '_', clean_publisher).strip('_')
    
    return clean_publisher if clean_publisher else 'unknown-publisher'

## The actual mapping function between CSW record and STAC Item
def csw_record_to_stac_item(record):
    """Convert a single CSW record to a pystac.Item."""
    # Sanitize ID
    item_id = re.sub(r'[^a-zA-Z0-9-_]', '_', record.identifier)
    
    bbox = get_bbox(record)
    
    # Create GeoJSON Geometry from BBox
    # bbox is [minx, miny, maxx, maxy]
    geometry = {
        "type": "Polygon",
        "coordinates": [[
            [bbox[0], bbox[1]],
            [bbox[2], bbox[1]],
            [bbox[2], bbox[3]],
            [bbox[0], bbox[3]],
            [bbox[0], bbox[1]]
        ]]
    }
    
    # Time: Default to now if not found. 
    # Real implementation should parse record.temporal_extent or modified date.
    dt = datetime.now(timezone.utc)
    
    # Base properties
    properties = {
        "title": record.title,
        "description": record.abstract,
    }
    
    # Dynamic metadata extraction
    # Extract all public attributes from the record object
    # We exclude spatial properties (bbox), internal ones (xml), and those already mapped (title, abstract, references)
    ignore_attributes = {'bbox', 'xml', 'references', 'uris', 'title', 'abstract'} 
    
    for attr in dir(record):
        # Skip private attributes and ignored ones
        if attr.startswith('_') or attr in ignore_attributes:
            continue
            
        try:
            value = getattr(record, attr)
            
            # Skip callables (methods)
            if callable(value):
                continue
                
            # Skip None or empty lists
            if value is None or (isinstance(value, list) and len(value) == 0):
                continue
                
            # Add to properties with csw: prefix to avoid collision and indicate source
            properties[f"csw:{attr}"] = value
            
        except Exception:
            continue

    item = Item(
        id=item_id,
        geometry=geometry,
        bbox=bbox,
        datetime=dt,
        properties=properties
    )
    
    # Add Assets (References)
    # In OWSLib, record.uris is a list of dicts: [{'protocol': '...', 'name': '...', 'description': '...', 'url': '...'}]
    # record.references is a list of dicts: [{'scheme': '...', 'url': '...'}]
    # We prefer URIs if available as they have more metadata (protocol/name)
    
    assets_source = record.uris if hasattr(record, 'uris') and record.uris else record.references
    
    for i, ref in enumerate(assets_source):
        href = ref.get('url')
        if not href: continue
            
        # Extract metadata
        protocol = ref.get('protocol', ref.get('scheme', 'unknown')).lower()
        name = ref.get('name', f'asset_{i}')
        description = ref.get('description', '')
        
        # Determine roles and media types
        roles = []
        media_type = None
        
        if 'image' in protocol or 'jpg' in href or 'png' in href:
            roles.append('thumbnail')
            if 'jpg' in href: media_type = 'image/jpeg'
            if 'png' in href: media_type = 'image/png'
        elif 'wms' in protocol:
            roles.append('visual')
            media_type = 'image/wms'
        elif 'wfs' in protocol:
            roles.append('data')
            media_type = 'application/wfs'
        elif 'download' in protocol:
            roles.append('data')
        else:
            roles.append('metadata')

        item.add_asset(
            key=name if name else f"asset_{i}",
            asset=Asset(
                href=href, 
                title=name, 
                description=description,
                media_type=media_type,
                roles=roles
            )
        )
            
    return item

### Harvest and build the STAC catalog

In [None]:
import time

# Harvest and Build Catalog

# Create Root Catalog
catalog = Catalog(
    id="geonorge-csw-catalog",
    description="STAC Catalog harvested from Geonorge CSW"
)

# Collections map to keep track of created collections
collections = {}

# Paging Configuration
start_position = 1
max_records = 1000 # Records per request (adjust as needed)
sleep_seconds = 1 # Pause between requests to avoid throttling

print("Starting harvest...")

while True:
    print(f"Fetching records starting at {start_position}...")
    try:
        csw.getrecords2(startposition=start_position, maxrecords=max_records, esn='full')
    except Exception as e:
        print(f"Error fetching records: {e}")
        break
    
    fetched_count = len(csw.records)
    print(f"  Fetched {fetched_count} records.")
    
    if fetched_count == 0:
        break
    
    # Process records
    for rec_id, rec in csw.records.items():
        try:
            # Determine Collection (e.g., by publisher)
            ctype = determine_type(rec)
            coll_id = f"collection-{ctype}"
            
            if coll_id not in collections:
                # Create Collection if not exists
                coll = Collection(
                    id=coll_id,
                    description=f"Collection for {ctype} datasets",
                    extent=Extent(
                        spatial=SpatialExtent([[-180, -90, 180, 90]]),
                        temporal=TemporalExtent([[None, None]])
                    )
                )
                collections[coll_id] = coll
                catalog.add_child(coll)
                
            # Create Item
            item = csw_record_to_stac_item(rec)
            collections[coll_id].add_item(item)
            
        except Exception as e:
            print(f"Failed to process record {rec_id}: {e}")

    # Check for next page
    # nextrecord is 0 if no more records
    next_record = csw.results.get('nextrecord', 0)
    if next_record == 0 or next_record <= start_position:
        break
        
    start_position = next_record
    time.sleep(sleep_seconds)

print(f"Harvest complete. Total collections: {len(collections)}")

# Update Collection Extents based on items
print("Updating collection spatial and temporal extents...")
for child in catalog.get_children():
    if isinstance(child, Collection):
        child.update_extent_from_items()

# Save
print(f"Saving catalog to {STAC_DIR}...")
os.makedirs(STAC_DIR, exist_ok=True)
catalog.normalize_and_save(STAC_DIR, catalog_type=CatalogType.SELF_CONTAINED)
print(f"Saved STAC catalog to {os.path.abspath(STAC_DIR)}")

# List structure
print("\nCatalog Structure:")
for child in catalog.get_children():
    print(f" - Collection: {child.id} ({len(list(child.get_items()))} items)")

### Convert STAC to STAC-GeoParquet

In [None]:
# Convert STAC catalog to Parquet files for easier querying and analysis

import stac_geoparquet
import os

### Windows style file paths - change to linux style if needed
output_dir = '.\\output\\'

# Output file
parquet_file = os.path.join(output_dir, "geonorge_stac.parquet")

# Collect all items
print("Collecting items for GeoParquet conversion...")

items = list(catalog.get_all_items())
print(f"Found {len(items)} items.")

if items:
    print(f"Writing to {parquet_file}...")
    record_batch_reader = stac_geoparquet.arrow.parse_stac_items_to_arrow(items)
    table = record_batch_reader.read_all()
    stac_geoparquet.arrow.to_parquet(table, parquet_file)
    print(f"Conversion complete: {os.path.abspath(parquet_file)}")
else:
    print("No items found in catalog.")

Collecting items for GeoParquet conversion...
Found 9884 items.
Writing to .\geonorge_stac.parquet...
Conversion complete: c:\dev\skygeo\src\iso2stac\geonorge_stac.parquet
Conversion complete: c:\dev\skygeo\src\iso2stac\geonorge_stac.parquet
