In [None]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Incrementally build a centroid table.

For every source file named  grid_<zone>.parquet  in <input_dir>
  • compute WGS-84 centroids,
  • append them as ONE row-group (one part file) to <output_dataset>,
  • remember progress in <output_dataset>.done  so the job can restart.

Output:
  <output_dataset>/part.<n>.parquet   ← one per processed zone
  <output_dataset>.done               ← JSON list of processed filenames
Readable by: fastparquet, pyarrow, pandas, dask.
Requires: fastparquet ≥ 0.7, geopandas, pyarrow.
"""

import json, os, re, gc, sys
from pathlib import Path

import geopandas as gpd
import pandas as pd
from fastparquet import write
from tqdm import tqdm


# ---------------------------------------------------------------------------
# helpers
# ---------------------------------------------------------------------------
def extract_zone_order(tile_id: pd.Series) -> pd.Series:
    """
    Produces a sortable key so final output is NP → UTM(1N…60S) → SP.
    """
    def _key(tid: str) -> int:
        m = re.search(r"G\d+m_(\w+)_", tid)
        if not m:
            return 9999
        zone = m.group(1)
        if zone == "NP":
            return -1
        if zone == "SP":
            return 9998
        m2 = re.match(r"(\d+)([NS])", zone)
        if not m2:
            return 9999
        znum, hemi = int(m2[1]), m2[2]
        return znum * 2 + (0 if hemi == "N" else 1)

    return tile_id.apply(_key)


def append_df(df: pd.DataFrame, dataset_path: Path) -> None:
    """
    Append `df` as ONE new row-group (one new part file) to <dataset_path>.
    """
    write(
        dataset_path,               # a directory will be created on 1st call
        df,
        compression="ZSTD",
        append=dataset_path.exists(),   # True after first zone
        file_scheme="simple",          # 1 part file per append, restart-safe
        object_encoding="utf8",        # geometry stored as WKT
    )


# ---------------------------------------------------------------------------
# main driver
# ---------------------------------------------------------------------------
def combine_grid_parquets_centroid_incremental(
    input_dir: str | Path,
    output_dataset: str | Path,
    checkpoint_file: str | Path | None = None,
) -> None:
    input_dir      = Path(input_dir)
    output_dataset = Path(output_dataset)
    checkpoint     = Path(checkpoint_file) if checkpoint_file else output_dataset.with_suffix(".done")

    # ----------------------------------------------------------------------
    # 1. Already processed zones
    done: set[str] = set()
    if checkpoint.exists():
        done.update(json.loads(checkpoint.read_text()))

    # ----------------------------------------------------------------------
    # 2. Iterate over sources
    sources = sorted(
        p for p in input_dir.iterdir()
        if p.name.startswith("grid_") and p.suffix == ".parquet"
    )

    for src in tqdm(sources, desc="UTM zones"):
        if src.name in done:
            continue          # skip if already finished earlier

        try:
            # -- read minimal columns
            gdf = gpd.read_parquet(src, columns=["tile_id", "geometry"])
            if gdf.crs is None:
                raise ValueError("missing CRS")

            # -- accurate centroid in equal-area, then back to WGS-84
            gdf_proj  = gdf.to_crs("EPSG:6933")
            centroids = (
                gdf_proj.geometry.centroid
                .to_crs("EPSG:4326")
                .apply(lambda geom: geom.wkt)
            )

            chunk = pd.DataFrame(
                {
                    "tile_id": gdf["tile_id"].values,
                    "geometry": centroids.values,
                    "zone_order": extract_zone_order(gdf["tile_id"]),
                }
            ).sort_values("zone_order").drop(columns="zone_order")

            append_df(chunk, output_dataset)

            # -- mark success
            done.add(src.name)
            checkpoint.write_text(json.dumps(sorted(done)))

            # -- memory hygiene
            del gdf, gdf_proj, centroids, chunk
            gc.collect()

        except Exception as exc:
            print(f"[WARN] {src.name} skipped: {exc}", file=sys.stderr)

    print(f"✅ Finished. Dataset at {output_dataset} now contains {len(done)} zones.")


# ---------------------------------------------------------------------------
# CLI wrapper (edit the three paths then run)


In [None]:
# Apply to current sandbox directory
input_directory = "D:/nesteo_hf/grids/grid_600m"
output_file = "D:/nesteo_hf/grids/grids_centers/grid_600m.parquet"
combine_grid_parquets_centroid_incremental(input_directory, output_file)

In [None]:
# ── prerequisites ─────────────────────────────────────────────
# pip install pyarrow fastparquet  (if not already in your env)

from glob import glob
import pyarrow as pa
import pyarrow.parquet as pq

def build_grid_index(grid_root: str, out_path: str = "grid_index.parquet"):
    """
    Scan every *.parquet produced by NestEOGrid, pull only the two
    columns we need, and write a single compact index.

    Parameters
    ----------
    grid_root : str
        Folder where your per-zone / per-level grid Parquet files live
        (or the folder you passed as `output_dir`).
    out_path : str
        Destination file for the two-column index.
    """
    parquet_files = glob(f"{grid_root}/**/*.parquet", recursive=True)
    if not parquet_files:
        raise FileNotFoundError(f"No Parquet grids under {grid_root}")

    tables = []
    for p in parquet_files:
        print(p)
        try:
            # read *only* the needed columns – Arrow streams them, so
            # very little RAM is used even for dozens of millions rows
            tables.append(pq.read_table(p, columns=["tile_id", "super_id"],
                                        memory_map=True))
        except Exception as e:
            print(f"[skip] {p}: {e}")

    pq.write_table(pa.concat_tables(tables, promote=True),
                   out_path,
                   compression="snappy")   # ~1 GB → ~200 MB on disk
    print(f"[OK] wrote {out_path} with {sum(t.num_rows for t in tables):,} rows.")

# ── usage example ─────────────────────────────────────────────
# 1. Generate or collect grid files first (one-off)
#    grid = NestEOGrid(...)                 # generate=False if already done
#    grid.collect_existing_tile_files()     # optional helper

# 2. Build the lightweight index
build_grid_index("D:/nesteo_hf/grids/grids_full")   # adjust to your output_dir


D:/nesteo_hf/grids/grids_full\grid_120000m\grid_10N_120000.parquet
D:/nesteo_hf/grids/grids_full\grid_120000m\grid_10S_120000.parquet
D:/nesteo_hf/grids/grids_full\grid_120000m\grid_11N_120000.parquet
D:/nesteo_hf/grids/grids_full\grid_120000m\grid_11S_120000.parquet
D:/nesteo_hf/grids/grids_full\grid_120000m\grid_12N_120000.parquet
D:/nesteo_hf/grids/grids_full\grid_120000m\grid_12S_120000.parquet
D:/nesteo_hf/grids/grids_full\grid_120000m\grid_13N_120000.parquet
D:/nesteo_hf/grids/grids_full\grid_120000m\grid_13S_120000.parquet
D:/nesteo_hf/grids/grids_full\grid_120000m\grid_14N_120000.parquet
D:/nesteo_hf/grids/grids_full\grid_120000m\grid_14S_120000.parquet
D:/nesteo_hf/grids/grids_full\grid_120000m\grid_15N_120000.parquet
D:/nesteo_hf/grids/grids_full\grid_120000m\grid_15S_120000.parquet
D:/nesteo_hf/grids/grids_full\grid_120000m\grid_16N_120000.parquet
D:/nesteo_hf/grids/grids_full\grid_120000m\grid_16S_120000.parquet
D:/nesteo_hf/grids/grids_full\grid_120000m\grid_17N_120000.par

In [None]:
from glob import glob
import pyarrow as pa
import pyarrow.parquet as pq

# sorting by 1N → 2N → … → 60N → 1S → 2S → … → 60S → NP → SP
from glob import glob
import os, re
import pyarrow as pa
import pyarrow.parquet as pq


# ---------- helper: natural zone ordering ---------------------------------
_zone_re = re.compile(r"^(?P<num>\d{1,2})(?P<hem>[NS])$")   # 1–60 + N/S


def _zone_sort_key(parquet_path: str):
    """
    Return a tuple that sorts like:
    0  (N hemisphere)  : 1N 2N … 60N
    1  (S hemisphere)  : 1S 2S … 60S
    2  (polar)         : NP, SP
    Anything unrecognised comes last (3, …).
    """
    zone_token = os.path.basename(parquet_path).split("_")[1]  # e.g. "11N", "NP"
    if zone_token in ("NP", "SP"):          # force polar last
        return (2, 0 if zone_token == "NP" else 1, 0)

    m = _zone_re.match(zone_token)
    if m:
        num = int(m["num"])
        group = 0 if m["hem"] == "N" else 1
        return (group, num, 0)

    # fallback – unknown pattern
    return (3, 0, 0)

# ---------- helper: 3-way ordering  (zone ➊, sub-zone ➋, level ➌) ----
import os, re

#  grid_1N_1200.parquet  → zone=1N , level=1200
#  grid_SP_120000.parquet → zone=SP, level=120000
_name_re = re.compile(r"grid_(?P<zone>(\d{1,2}[NS])|NP|SP)_(?P<lvl>\d+)\.parquet$")


def _zone_level_sort_key(path: str):
    """
    Sort order:
      ➊ UTM-N   1N … 60N     (ascending numeric)
      ➋ UTM-S   1S … 60S
      ➌ NP, SP                  (NP first)
    Within each zone: *larger* grid levels first (120 000 → 150 m).
    Unknown / malformed names go to the end.
    """
    m = _name_re.match(os.path.basename(path))
    if not m:
        return (4, 0, 0, 0)          # push unknown patterns last

    zone = m["zone"]
    level = int(m["lvl"])

    # ---- primary zone group & sub-ordering ---------------------------
    if zone in ("NP", "SP"):         # polar zones
        g1 = 2                       # after all UTM zones
        g2 = 0 if zone == "NP" else 1
        g3 = 0                       # no numeric zone
    else:                            # UTM 1–60 + N/S
        num, hem = int(zone[:-1]), zone[-1]
        g1 = 0 if hem == "N" else 1  # north group first
        g2 = num                     # 1 … 60
        g3 = 0

    # ---- level: larger first (negate so 120 000 < 300 in tuple sort) --
    return (g1, g2, g3, -level)

# ---------- main function ---------------------------------------------------
def build_grid_index(grid_root: str,
                     out_path: str = "grid_index.parquet",
                     rows_per_group: int = 100_000,
                     compression: str = "zstd"):

    wanted_cols = ["tile_id", "super_id"]
    sources = sorted(
        glob(f"{grid_root}/**/*.parquet", recursive=True),
        key=_zone_level_sort_key
    )
    if not sources:
        raise FileNotFoundError(f"No Parquet grids under {grid_root}")

    fixed_schema = pa.schema([(c, pa.string()) for c in wanted_cols])

    writer, total = None, 0

    for src in sources:
        print(src)
        pf = pq.ParquetFile(src, memory_map=True)

        for batch in pf.iter_batches(columns=wanted_cols,
                                     batch_size=rows_per_group):

            arrays = []
            for col in wanted_cols:
                if col in batch.schema.names:
                    arr = batch.column(batch.schema.get_field_index(col))
                    if not pa.types.is_string(arr.type):
                        arr = arr.cast(pa.string())  # promotes Null/Binary
                else:                               # missing column
                    arr = pa.nulls(batch.num_rows, pa.string())
                arrays.append(arr)

            table = pa.Table.from_arrays(arrays, schema=fixed_schema)

            if writer is None:                      # first chunk
                writer = pq.ParquetWriter(out_path,
                                          fixed_schema,
                                          compression=compression)

            writer.write_table(table)               # one row-group
            total += table.num_rows

    if writer:
        writer.close()

    print(f"[OK] {out_path} written with {total:,} rows "
          f"({(total + rows_per_group - 1)//rows_per_group:,} "
          f"row-groups ≈ {rows_per_group} rows each)")

build_grid_index("D:/nesteo_hf/grids/grids_full", out_path="D:/nesteo_hf/grids/grids_full/grid_index.parquet")   # or your output_dir

D:/nesteo_hf/grids/grids_full\grid_120000m\grid_1N_120000.parquet
D:/nesteo_hf/grids/grids_full\grid_12000m\grid_1N_12000.parquet
D:/nesteo_hf/grids/grids_full\grid_2400m\grid_1N_2400.parquet


KeyboardInterrupt: 