In [1]:
!uv pip install psycopg psycopg[binary] fsspec datetime pathlib
!uv pip install -e ../..
!uv pip install memory_profiler
%load_ext memory_profiler

[2mUsing Python 3.10.17 environment at: /home/bitner/data/stac-geoparquet/.venv[0m
[2mAudited [1m5 packages[0m [2min 3ms[0m[0m
[2mUsing Python 3.10.17 environment at: /home/bitner/data/stac-geoparquet/.venv[0m
[2K[2mResolved [1m19 packages[0m [2min 360ms[0m[0m                                        [0m
[2K[2mPrepared [1m1 package[0m [2min 381ms[0m[0m                                              
[2mUninstalled [1m1 package[0m [2min 0.67ms[0m[0m
[2K[2mInstalled [1m1 package[0m [2min 0.60ms[0m[0m.dev5+g889c95c.d20250626 (from [0m
 [33m~[39m [1mstac-geoparquet[0m[2m==0.7.1.dev5+g889c95c.d20250626 (from file:///home/bitner/data/stac-geoparquet)[0m
[2mUsing Python 3.10.17 environment at: /home/bitner/data/stac-geoparquet/.venv[0m
[2mAudited [1m1 package[0m [2min 2ms[0m[0m


In [2]:
import logging
from typing import Any

from stac_geoparquet.pgstac_reader import (
    pgstac_dsn,
    pgstac_to_arrow,
    pgstac_to_iter,
)

logger = logging.getLogger()
logger.setLevel(logging.INFO)


# pgstac test items derived from naip
db = pgstac_dsn(
    "postgres://username:password@localhost:5439/postgis", statement_timeout=300000
)
db

"user=username password=password dbname=postgis host=localhost port=5439 options=' -c search_path=pgstac,public -c statement_timeout=300000'"

Create Functions to Modify Each Item

In [3]:
def inject_links(item: dict[str, Any]) -> dict[str, Any]:
    item["links"] = [
        {
            "rel": "collection",
            "type": "application/json",
            "href": f"https://planetarycomputer.microsoft.com/api/stac/v1/collections/{item['collection']}",  # noqa: E501
        },
        {
            "rel": "parent",
            "type": "application/json",
            "href": f"https://planetarycomputer.microsoft.com/api/stac/v1/collections/{item['collection']}",  # noqa: E501
        },
        {
            "rel": "root",
            "type": "application/json",
            "href": "https://planetarycomputer.microsoft.com/api/stac/v1/",
        },
        {
            "rel": "self",
            "type": "application/geo+json",
            "href": f"https://planetarycomputer.microsoft.com/api/stac/v1/collections/{item['collection']}/items/{item['id']}",  # noqa: E501
        },
        {
            "rel": "preview",
            "href": f"https://planetarycomputer.microsoft.com/api/data/v1/item/map?collection={item['collection']}&item={item['id']}",  # noqa: E501
            "title": "Map of item",
            "type": "text/html",
        },
    ]
    return item


def inject_assets(item: dict[str, Any], render_config) -> dict[str, Any]:
    item["assets"]["tilejson"] = {
        "href": (
            "https://planetarycomputer.microsoft.com/api/data/v1/item/tilejson.json?"
            f"collection={item['collection']}"
            f"&item={item['id']}&{render_config}"
        ),
        "roles": ["tiles"],
        "title": "TileJSON with default rendering",
        "type": "application/json",
    }
    item["assets"]["rendered_preview"] = {
        "href": (
            "https://planetarycomputer.microsoft.com/api/data/v1/item/preview.png?"
            f"collection={item['collection']}"
            f"&item={item['id']}&{render_config}"
        ),
        "rel": "preview",
        "roles": ["overview"],
        "title": "Rendered preview",
        "type": "image/png",
    }
    return item


def naip_year_to_int(item: dict[str, Any]) -> dict[str, Any]:
    """Convert the year to an integer."""
    if "naip:year" in item["properties"] and isinstance(
        item["properties"]["naip:year"], str
    ):
        item["properties"]["naip:year"] = int(item["properties"]["naip:year"])
    return item


render_config = "render=myrenderconfig"


def clean_item(item: dict[str, Any]) -> dict[str, Any]:
    """Clean items by making sure that naip:year is an int and injecting links and assets."""
    return naip_year_to_int(inject_links(inject_assets(item, render_config)))

Demonstrate Injecting Additional Links and Assets.

In [4]:
items = pgstac_to_iter(
    db,
    row_func=clean_item,
)

print(next(items)["id"])

INFO:stac_geoparquet.pgstac_reader:Fetching Data from PGStac Into an Iterator of Items
INFO:stac_geoparquet.pgstac_reader:With no filter, fetching all items
INFO:stac_geoparquet.pgstac_reader:Getting Base Item for pgstac-test-collection
pgstac-test-item-0089


In [None]:
render_config = "render=myrenderconfig"
items = pgstac_to_iter(
    db,
    row_func=clean_item,
)
# batches = parse_stac_items_to_arrow(items=items, chunk_size=100000, schema="ChunksToDisk", tmpdir='/tmp/pqtest')
# batches

Inject Additional Links and Assets, Convert to RecordBatchReader, and Dump to Parquet (We need to refetch the data as the iterator was spent)

In [6]:
arrow = pgstac_to_arrow(
    db,
    row_func=clean_item,
)

INFO:stac_geoparquet.arrow._api:parse_stac_items_to_arrow start | CPU%: 0.0 | CPU_USER_TIME: 2.830 | RSS(MB):234.64 | USS(MB):149.66
INFO:stac_geoparquet.pgstac_reader:Fetching Data from PGStac Into an Iterator of Items
INFO:stac_geoparquet.pgstac_reader:With no filter, fetching all items
INFO:stac_geoparquet.pgstac_reader:Getting Base Item for pgstac-test-collection
INFO:stac_geoparquet.arrow._batch:Items Length: 65536


Dump any partition in pgstac that has been updated after a given time in order to incrementally dump only new records to parquet.

In [13]:
%memit pgstac_to_parquet(db,output_path="/tmp/pgstactoparquet4.parquet",row_func=clean_item,chunk_size=100000,schema="FullFile")

INFO:stac_geoparquet.arrow._api:Saving STAC Items to Parquet
INFO:stac_geoparquet.arrow._api:Exporting PgSTAC to <pyarrow._fs.LocalFileSystem object at 0x73d6584fb530> /tmp/pgstactoparquet4.parquet
INFO:stac_geoparquet.arrow._api:parse_stac_items_to_arrow start | CPU%: 77.4 | CPU_USER_TIME: 415.880 | RSS(MB):3739.94 | USS(MB):17.41
INFO:stac_geoparquet.pgstac_reader:Fetching Data from PGStac Into an Iterator of Items
INFO:stac_geoparquet.pgstac_reader:With no filter, fetching all items
INFO:stac_geoparquet.pgstac_reader:Getting Base Item for pgstac-test-collection
INFO:stac_geoparquet.arrow._batch:Items Length: 500101
INFO:stac_geoparquet.arrow._api:Parsed to arrow | CPU%: 94.3 | CPU_USER_TIME: 514.360 | RSS(MB):4861.70 | USS(MB):1911.30
INFO:stac_geoparquet.arrow._api:Written to parquet | CPU%: 99.4 | CPU_USER_TIME: 515.470 | RSS(MB):4177.33 | USS(MB):1227.85
peak memory: 9443.96 MiB, increment: 5703.12 MiB


In [14]:
%memit pgstac_to_parquet(db,output_path="/tmp/pgstactoparquet4.parquet",row_func=clean_item,chunk_size=100000,schema="FirstBatch")

INFO:stac_geoparquet.arrow._api:Saving STAC Items to Parquet
INFO:stac_geoparquet.arrow._api:Exporting PgSTAC to <pyarrow._fs.LocalFileSystem object at 0x73d683122df0> /tmp/pgstactoparquet4.parquet
INFO:stac_geoparquet.arrow._api:parse_stac_items_to_arrow start | CPU%: 75.5 | CPU_USER_TIME: 515.950 | RSS(MB):4160.68 | USS(MB):17.38
INFO:stac_geoparquet.pgstac_reader:Fetching Data from PGStac Into an Iterator of Items
INFO:stac_geoparquet.pgstac_reader:With no filter, fetching all items
INFO:stac_geoparquet.pgstac_reader:Getting Base Item for pgstac-test-collection
INFO:stac_geoparquet.arrow._batch:Items Length: 100000
INFO:stac_geoparquet.arrow._api:Parsed to arrow | CPU%: 97.8 | CPU_USER_TIME: 534.780 | RSS(MB):4893.75 | USS(MB):1538.61
INFO:stac_geoparquet.arrow._api:Batch 0 | CPU%: 101.0 | CPU_USER_TIME: 534.780 | RSS(MB):4893.75 | USS(MB):1538.72
INFO:stac_geoparquet.arrow._batch:Items Length: 100000
INFO:stac_geoparquet.arrow._api:Batch 1 | CPU%: 86.2 | CPU_USER_TIME: 554.770 | RS

In [15]:
%memit pgstac_to_parquet(db,output_path="/tmp/pgstactoparquet4.parquet",row_func=clean_item,chunk_size=100000,schema="ChunksToDisk")

INFO:stac_geoparquet.arrow._api:Saving STAC Items to Parquet
INFO:stac_geoparquet.arrow._api:Exporting PgSTAC to <pyarrow._fs.LocalFileSystem object at 0x73d65934f270> /tmp/pgstactoparquet4.parquet
INFO:stac_geoparquet.arrow._api:parse_stac_items_to_arrow start | CPU%: 71.6 | CPU_USER_TIME: 613.140 | RSS(MB):4517.27 | USS(MB):17.35
INFO:stac_geoparquet.pgstac_reader:Fetching Data from PGStac Into an Iterator of Items
INFO:stac_geoparquet.pgstac_reader:With no filter, fetching all items
INFO:stac_geoparquet.pgstac_reader:Getting Base Item for pgstac-test-collection
INFO:stac_geoparquet.arrow._batch:Items Length: 100000
INFO:stac_geoparquet.arrow._api:Batch 0 | CPU%: 97.7 | CPU_USER_TIME: 632.030 | RSS(MB):5198.04 | USS(MB):1543.83
INFO:stac_geoparquet.arrow._batch:Items Length: 100000
INFO:stac_geoparquet.arrow._api:Batch 1 | CPU%: 88.7 | CPU_USER_TIME: 652.160 | RSS(MB):5467.35 | USS(MB):2081.73
INFO:stac_geoparquet.arrow._batch:Items Length: 100000
INFO:stac_geoparquet.arrow._api:Batc

In [9]:
%memit sync_pgstac_to_parquet(db,output_path="/tmp/pgstactoparquet4",row_func=clean_item,schema="FirstBatch",chunk_size=100000,)

INFO:stac_geoparquet.pgstac_reader:Syncing PgSTAC partitions that have been updated since None to /tmp/pgstactoparquet4 on filesystem <pyarrow._fs.LocalFileSystem object at 0x73d683122af0>.
INFO:stac_geoparquet.arrow._api:Saving STAC Items to Parquet
INFO:stac_geoparquet.arrow._api:Exporting PgSTAC to <pyarrow._fs.LocalFileSystem object at 0x73d683122af0> /tmp/pgstactoparquet4/pgstac-test-collection/items_20110701_20110801.parquet
INFO:stac_geoparquet.arrow._api:parse_stac_items_to_arrow start | CPU%: 97.4 | CPU_USER_TIME: 15.270 | RSS(MB):1090.88 | USS(MB):14.47
INFO:stac_geoparquet.pgstac_reader:Fetching Data from PGStac Into an Iterator of Items
INFO:stac_geoparquet.pgstac_reader:Using Collection pgstac-test-collection, Start 2011-07-31 00:00:00+00:00, End 2011-07-31 00:00:00.000001+00:00
INFO:stac_geoparquet.pgstac_reader:Getting Base Item for pgstac-test-collection
INFO:stac_geoparquet.arrow._batch:Items Length: 12
INFO:stac_geoparquet.arrow._api:Parsed to arrow | CPU%: 50.8 | CPU

In [10]:
%memit sync_pgstac_to_parquet(db,output_path="/tmp/pgstactoparquet4",row_func=clean_item,schema="FirstBatch",chunk_size=10000,)

INFO:stac_geoparquet.pgstac_reader:Syncing PgSTAC partitions that have been updated since None to /tmp/pgstactoparquet4 on filesystem <pyarrow._fs.LocalFileSystem object at 0x73d6f0a2b970>.
INFO:stac_geoparquet.arrow._api:Saving STAC Items to Parquet
INFO:stac_geoparquet.arrow._api:Exporting PgSTAC to <pyarrow._fs.LocalFileSystem object at 0x73d6f0a2b970> /tmp/pgstactoparquet4/pgstac-test-collection/items_20110701_20110801.parquet
INFO:stac_geoparquet.arrow._api:parse_stac_items_to_arrow start | CPU%: 1.6 | CPU_USER_TIME: 111.130 | RSS(MB):2367.04 | USS(MB):17.21
INFO:stac_geoparquet.pgstac_reader:Fetching Data from PGStac Into an Iterator of Items
INFO:stac_geoparquet.pgstac_reader:Using Collection pgstac-test-collection, Start 2011-07-31 00:00:00+00:00, End 2011-07-31 00:00:00.000001+00:00
INFO:stac_geoparquet.pgstac_reader:Getting Base Item for pgstac-test-collection
INFO:stac_geoparquet.arrow._batch:Items Length: 12
INFO:stac_geoparquet.arrow._api:Parsed to arrow | CPU%: 70.2 | CPU

In [12]:
%memit sync_pgstac_to_parquet(db,output_path="/tmp/pgstactoparquet4",row_func=clean_item,schema="ChunksToDisk",chunk_size=100000,)

INFO:stac_geoparquet.pgstac_reader:Syncing PgSTAC partitions that have been updated since None to /tmp/pgstactoparquet4 on filesystem <pyarrow._fs.LocalFileSystem object at 0x73d6f0a2b0b0>.
INFO:stac_geoparquet.arrow._api:Saving STAC Items to Parquet
INFO:stac_geoparquet.arrow._api:Exporting PgSTAC to <pyarrow._fs.LocalFileSystem object at 0x73d6f0a2b0b0> /tmp/pgstactoparquet4/pgstac-test-collection/items_20110701_20110801.parquet
INFO:stac_geoparquet.arrow._api:parse_stac_items_to_arrow start | CPU%: 81.7 | CPU_USER_TIME: 315.610 | RSS(MB):2365.64 | USS(MB):17.47
INFO:stac_geoparquet.pgstac_reader:Fetching Data from PGStac Into an Iterator of Items
INFO:stac_geoparquet.pgstac_reader:Using Collection pgstac-test-collection, Start 2011-07-31 00:00:00+00:00, End 2011-07-31 00:00:00.000001+00:00
INFO:stac_geoparquet.pgstac_reader:Getting Base Item for pgstac-test-collection
INFO:stac_geoparquet.arrow._batch:Items Length: 12
INFO:stac_geoparquet.arrow._api:Batch 0 | CPU%: 70.2 | CPU_USER_T