In [61]:
import pystac
import geopandas as gpd
from shapely.geometry import mapping
from datetime import datetime, timezone
from google.cloud import storage
import os
import shlex
import subprocess
import shutil
from pathlib import Path

In [45]:

#  Google Cloud Storage bucket and prefix (folder) where COGs are located.
GCS_BUCKET = "swhm_data"  # e.g., "my-imagery-bucket"
GCS_PREFIX = "public/layers"   # e.g., "sentinel-2/l2a/" or leave empty for root

# The public-facing URL for your GCS bucket.
# This is used to create accessible links in the STAC catalog.
# For GCS, it's typically "https://storage.googleapis.com/{BUCKET_NAME}/{FILE_PATH}"
# You could also use a custom domain.
ROOT_CATALOG_URL = f"https://storage.googleapis.com/{GCS_BUCKET}"
CATALOG_JSON_DEST = f"{ROOT_CATALOG_URL}/{GCS_PREFIX}"
# Where the script will save the generated STAC catalog on your local machine.
OUTPUT_DIR = "../../stac_catalog"

# Details for your STAC Catalog.
CATALOG_ID = "swhm-catalog"
CATALOG_TITLE = "Stormwater Heatmap Catalog"
CATALOG_DESCRIPTION = "A STAC catalog for COG imagery stored in GCS, created with rio-stac."

client = storage.Client(project="swhm-prod")
bucket = client.bucket(GCS_BUCKET)
# --- End of Configuration ---


In [46]:
def list_blobs_with_prefix(
    bucket_name: str,
    prefix: str,
    file_extension: str = '.geojson',
    delimiter: str = None
) -> list[storage.blob.Blob]:
    """
    Lists all the blobs in a GCS bucket with a given prefix and file extension,
    and returns the Blob objects.
    """
    
    
    blobs = client.list_blobs(
        bucket_name,
        prefix=prefix,
        delimiter=delimiter
    )

    print(f"Fetching blobs from bucket '{bucket_name}' with prefix '{prefix}'...")

    matching_blobs = []
    for blob in blobs:
        name_lower = blob.name.lower()
        if name_lower.endswith(file_extension) or name_lower.endswith('.geojson'):
            
            matching_blobs.append(blob)

    if delimiter:
        prefixes = getattr(blobs, 'prefixes', None)
        if prefixes:
            print("Sub-prefixes found:")
            for p in prefixes:
                print(f"  {p}")

    return matching_blobs

In [47]:
blobs = list_blobs_with_prefix(GCS_BUCKET, GCS_PREFIX)

Fetching blobs from bucket 'swhm_data' with prefix 'public/layers'...


## Collection

In [48]:
#base geom on the first blob 
first_blob = blobs[1]
first_blob


<Blob: swhm_data, public/layers/vector/cig_grid_wgs/cig_grid_wgs.geojson, 1751488021970745>

In [49]:
fc_url = first_blob.public_url
item_id = os.path.splitext(os.path.basename(fc_url))[0]
item_id

fc_url = first_blob.public_url
print(f"Reading data from {fc_url}...")
gdf = gpd.read_file(fc_url)
footprint_geom = mapping(gdf.unary_union.convex_hull)
bounds = gdf.total_bounds
bbox = list(bounds)
bbox

Reading data from https://storage.googleapis.com/swhm_data/public/layers/vector/cig_grid_wgs/cig_grid_wgs.geojson...


  footprint_geom = mapping(gdf.unary_union.convex_hull)


[np.float64(-124.75864669837131),
 np.float64(46.604450743574134),
 np.float64(-121.60590179263949),
 np.float64(49.05931514730483)]

In [50]:
#collection 

item_datetime = datetime.now(timezone.utc)
collection_id = "vector"
collection_description = "Vectors and things."
collection_license = "PDDL-1.0"  # Public Domain Dedication and License

spatial_extent = pystac.SpatialExtent(bboxes=[bbox])
temporal_extent = pystac.TemporalExtent(intervals=[[item_datetime, None]])
collection_extent = pystac.Extent(spatial=spatial_extent, temporal=temporal_extent)

collection = pystac.Collection(
    id=collection_id,
    description=collection_description,
    extent=collection_extent,
    license=collection_license,
    title="vectors"
)

In [62]:
# List to collect all items
stac_items = []
# Loop over all blobs (skip non-GeoJSON files)
for blob in blobs:
    fc_url = blob.public_url

    # Skip non-GeoJSON files
    if not fc_url.endswith(".geojson"):
        continue

    try:
        print(f"Reading data from {fc_url}...")
        gdf = gpd.read_file(fc_url)

        if gdf.empty:
            print(f"Warning: Skipping empty GeoDataFrame from {fc_url}")
            continue

        item_id = os.path.splitext(os.path.basename(fc_url))[0]
        footprint_geom = mapping(gdf.unary_union.convex_hull)
        bbox = list(gdf.total_bounds)

        item_datetime = datetime.utcnow()

        item = pystac.Item(
            id=item_id,
            geometry=footprint_geom,
            bbox=bbox,
            datetime=item_datetime,
            properties={
                "source": "GCS blob",
                "feature_count": len(gdf)
            }
        )

        # Optionally set custom metadata on the blob
        blob.metadata = {'processed_by': 'STAC batch script'}

        asset = pystac.Asset(
            href=fc_url,
            media_type=blob.content_type,
            title=item_id,
            roles=["data"]
        )

        item.add_asset("GeoJSON_data", asset)

        stac_items.append(item)

    except Exception as e:
        print(f"Error processing {fc_url}: {e}")

Reading data from https://storage.googleapis.com/swhm_data/public/layers/vector/PugetSoundWA/PugetSoundWA.geojson...
Reading data from https://storage.googleapis.com/swhm_data/public/layers/vector/cig_grid_wgs/cig_grid_wgs.geojson...


  footprint_geom = mapping(gdf.unary_union.convex_hull)
  item_datetime = datetime.utcnow()
  footprint_geom = mapping(gdf.unary_union.convex_hull)
  item_datetime = datetime.utcnow()


In [52]:
# #
# #make item from blob

# first_blob = blobs[1]
# first_blob
# #<Blob: swhm_data, public/layers/vector/cig_grid_wgs/cig_grid_wgs.geojson, 1751488021970745>
# fc_url = first_blob.public_url
# item_id = os.path.splitext(os.path.basename(fc_url))[0]
# item_id

# fc_url = first_blob.public_url
# print(f"Reading data from {fc_url}...")
# gdf = gpd.read_file(fc_url)
# footprint_geom = mapping(gdf.unary_union.convex_hull)
# bounds = gdf.total_bounds
# bbox = list(bounds)
# bbox

# ## Items 

# item_id = os.path.splitext(os.path.basename(fc_url))[0]
# item = pystac.Item(
#     id=item_id,
#     geometry=footprint_geom,
#     bbox=bbox,
#     datetime=item_datetime,
#     properties={}
# )
# ## Asset 
# metadata = {'color': 'Red', 'name': 'Test'}
# first_blob.metadata = metadata

# asset_href = first_blob.public_url
# asset_media_type = first_blob.content_type
# first_blob.metadata

# asset_title = "CIG GRID"
# asset = pystac.Asset(
#     href=asset_href,
#     media_type=asset_media_type,
#     title=asset_title,
#     roles=["data"]
# )

# item.add_asset("GeoJSON_data", asset)

In [53]:
## add item to collection 
for item in stac_items:
    collection.add_item(item)

collection.set_self_href("collection.json")

In [54]:
collection.normalize_hrefs(
    root_href="https://storage.googleapis.com/swhm_data/public/layers/vector"
)

## Upload

In [55]:
def upload_stac_assets(root_dir, bucket, prefix):
    """
    Finds and uploads STAC asset files to Google Cloud Storage.

    This function walks through the specified root directory and looks for
    JSON files that have the same name as their parent directory
    (e.g., 'asset_a/asset_a.json'). It then uploads them to a
    specified GCS bucket, maintaining the relative directory structure.

    Args:
        root_dir (str): The absolute path to the directory to search in.
        bucket (str): The name of the GCS bucket to upload to.
        prefix (str): The prefix (sub-folder) within the GCS bucket.
    """
    print(f"Starting scan in: {root_dir}")
    print(f"Uploading to: gs://{bucket}/{prefix}")
    print("-" * 30)

    # os.walk is perfect for recursively scanning a directory tree.
    # It yields the current directory path, a list of subdirectories, and a list of files.
    # We use '_' for dirnames as it's not used in this loop.
    for dirpath, _, filenames in os.walk(root_dir):
        # Get the name of the current directory being processed
        current_dir_name = os.path.basename(dirpath)
        # Construct the expected filename (e.g., directory 'asset_a' -> file 'asset_a.json')
        expected_filename = f"{current_dir_name}.json"

        # Check if a file with the expected name exists in the current directory
        if expected_filename in filenames:
            # Construct the full local path to the source file
            local_file_path = os.path.join(dirpath, expected_filename)

            # Determine the relative path from the root_dir.
            # This is used to replicate the directory structure in GCS.
            relative_path = os.path.relpath(local_file_path, root_dir)

            # Construct the destination path in GCS
            gcs_destination = f"gs://{bucket}/{prefix}{relative_path}"

            print(f"Found matching asset: {local_file_path}")
            print(f"  -> Uploading to: {gcs_destination}")

            try:
                # Build the command as a list for security and reliability
                gc_cmd = [
                    "gsutil",
                    "cp",
                    local_file_path,
                    gcs_destination
                ]

                # For logging, create a shell-safe string representation
                bash_command = ' '.join(shlex.quote(arg) for arg in gc_cmd)
                print(f"  -> Executing: {bash_command}")

                # Execute the command
                # check=True will raise an exception if gsutil returns an error
                subprocess.run(gc_cmd, capture_output=True, text=True, check=True)

                print("  -> Upload successful!")

            except FileNotFoundError:
                print("  -> ERROR: 'gsutil' command not found.")
                print("     Please ensure the Google Cloud SDK is installed and in your PATH.")
                # Stop the script if gsutil isn't available
                return
            except subprocess.CalledProcessError as e:
                # This block runs if gsutil returns a non-zero exit code (an error)
                print(f"  -> ERROR: Upload failed for {local_file_path}")
                print(f"  -> gsutil stderr: {e.stderr}")

            print("-" * 30)





In [65]:
def upload_stac_assets(root_dir, bucket, prefix, dry_run=False, return_summary=False):
    """
    Finds and uploads STAC asset JSON files to Google Cloud Storage.

    - Uploads all .json files in the root directory (regardless of name).
    - Uploads .json files in subdirectories that match the directory name (e.g. 'foo/foo.json').

    Args:
        root_dir (str): Absolute or relative path to the root directory.
        bucket (str): GCS bucket name.
        prefix (str): Path prefix within the GCS bucket.
        dry_run (bool): If True, simulate uploads without executing them.
        return_summary (bool): If True, return a summary dictionary of results.

    Returns:
        dict (optional): Summary of uploaded, skipped, and failed files.
    """
    root_path = Path(root_dir).resolve()

    if not root_path.is_dir():
        print(f"ERROR: {root_path} is not a valid directory.")
        return

    if not shutil.which("gsutil"):
        print("ERROR: 'gsutil' command not found in PATH.")
        return

    if not prefix.endswith("/"):
        prefix += "/"

    print(f"Scanning: {root_path}")
    print(f"Uploading to: gs://{bucket}/{prefix}")
    print("-" * 40)

    uploaded = []
    skipped = []
    failed = []

    # Helper: Upload a single file
    def upload_file(file_path):
        relative_path = file_path.relative_to(root_path)
        gcs_path = f"gs://{bucket}/{prefix}{relative_path.as_posix()}"

        print(f"Found: {file_path}")
        print(f"  -> GCS Path: {gcs_path}")

        if dry_run:
            print(f"  -> DRY RUN: Skipping actual upload.")
            skipped.append(str(file_path))
            return

        try:
            gc_cmd = ["gsutil", "cp", str(file_path), gcs_path]
            print(f"  -> Executing: {' '.join(shlex.quote(arg) for arg in gc_cmd)}")
            subprocess.run(gc_cmd, capture_output=True, text=True, check=True)
            print("  -> Upload successful!")
            uploaded.append(str(file_path))
        except subprocess.CalledProcessError as e:
            print(f"  -> ERROR: Upload failed for {file_path}")
            print(f"     stderr: {e.stderr}")
            failed.append((str(file_path), e.stderr))

        print("-" * 40)

    # Upload all .json files in root directory
    for json_file in root_path.glob("*.json"):
        upload_file(json_file)

    # Upload matching <dir>/<dir>.json files in subdirectories
    for dir_path in root_path.rglob("*"):
        if dir_path.is_dir():
            expected_json = dir_path / f"{dir_path.name}.json"
            if expected_json.exists():
                upload_file(expected_json)

    if return_summary:
        return {
            "uploaded": uploaded,
            "skipped": skipped,
            "failed": failed,
        }

In [66]:
upload_stac_assets(root_dir='../../stac_catalog/vector', bucket=GCS_BUCKET,prefix=f"{GCS_PREFIX}/vector/")


Scanning: /Users/christiannilsen/Documents/repos/swmh-stac-catalog/catalog/stac_catalog/vector
Uploading to: gs://swhm_data/public/layers/vector/
----------------------------------------
Found: /Users/christiannilsen/Documents/repos/swmh-stac-catalog/catalog/stac_catalog/vector/collection.json
  -> GCS Path: gs://swhm_data/public/layers/vector/collection.json
  -> Executing: gsutil cp /Users/christiannilsen/Documents/repos/swmh-stac-catalog/catalog/stac_catalog/vector/collection.json gs://swhm_data/public/layers/vector/collection.json
  -> Upload successful!
----------------------------------------
Found: /Users/christiannilsen/Documents/repos/swmh-stac-catalog/catalog/stac_catalog/vector/cig_grid_wgs/cig_grid_wgs.json
  -> GCS Path: gs://swhm_data/public/layers/vector/cig_grid_wgs/cig_grid_wgs.json
  -> Executing: gsutil cp /Users/christiannilsen/Documents/repos/swmh-stac-catalog/catalog/stac_catalog/vector/cig_grid_wgs/cig_grid_wgs.json gs://swhm_data/public/layers/vector/cig_grid_wg

In [67]:
#save collection 
output_path = f"{OUTPUT_DIR}/{collection_id}"
os.makedirs(os.path.dirname(output_path), exist_ok=True)
#collection.normalize_hrefs(os.path.dirname(output_path))
collection.save(dest_href=output_path, catalog_type="ABSOLUTE PUBLISHED")