# DuckDB + Parquet Data Exploration Template

In [1]:
import logging
import warnings
from pathlib import Path

import duckdb
import geopandas as gpd

from src.plotting.util import load_grids, plot_gdf_column

warnings.filterwarnings("ignore")  # hide every warning

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    datefmt="%H:%M:%S",
)
logger = logging.getLogger(__name__)


BASE = Path("/Users/kyledorman/data/planet_coverage/points_30km/")  # <-- update this
FIG_DIR = BASE.parent / "figs" / BASE.name / "skysat_dove"
FIG_DIR.mkdir(exist_ok=True, parents=True)

# Example path patterns
f_pattern = "skysat_dove/*/*/*/*/data.parquet"
all_files_pattern = str(BASE / f_pattern)

# Combined list used later when we search individual files
all_parquets = list(BASE.glob(f_pattern))

if not all_parquets:
    logger.error("No parquet files found matching pattern %s", all_files_pattern)
    raise FileNotFoundError("No parquet files found")
logger.info("Found %d parquet files", len(all_parquets))


query_df, grids_df, hex_grid = load_grids(BASE)
MIN_DIST = 5.0
valid = ~grids_df.is_land & ~grids_df.dist_km.isna() & (grids_df.dist_km < MIN_DIST)
grids_df = grids_df[valid].copy()

# --- Connect to DuckDB ---
con = duckdb.connect()

# Register a view for all files
con.execute(
    f"""
    CREATE OR REPLACE VIEW samples_all AS
    SELECT * FROM read_parquet('{all_files_pattern}');
"""
)
logger.info("Registered DuckDB view 'samples_all'")


query = """
SELECT
    grid_id,
    COUNT(DISTINCT (dove_id, skysat_id)) AS sample_count
FROM
    samples_all
/* optional row-level filters go here, e.g.:
   WHERE item_type = 'PSScene'
     AND coverage_pct > 0.5
     AND publishing_stage = 'finalized'
     AND quality_category = 'standard'
     AND has_sr_asset
     AND ground_control
*/
GROUP BY
    grid_id
"""

df = con.execute(query).fetchdf()
df.grid_id = df.grid_id.map(int)
df = df.set_index("grid_id")
joined_grids_df = grids_df[["hex_id", "geometry"]].join(df, how="left").fillna({"sample_count": 0})

gdf = gpd.GeoDataFrame(joined_grids_df, geometry="geometry", crs=grids_df.crs)


# hex_df["sample_count"] += 1

# logger.info("Query finished")

# logger.info("Plotting Counts")
# agg = (
#     hex_df.dropna(subset=["sample_count"])
#     .groupby("hex_id")
#     .agg(
#         median_sample_count=("sample_count", "median"),
#         max_sample_count=("sample_count", "max"),
#     )
# )
# agg = agg[agg.index >= 0].join(hex_grid[["geometry"]])
# gdf = gpd.GeoDataFrame(agg, geometry="geometry", crs=hex_grid.crs)

# plot_gdf_column(
#     gdf,
#     "median_sample_count",
#     title="SkySat/Dove Intersection Counts (Agg: Median)",
#     show_land_ocean=True,
#     save_path=FIG_DIR / "median_sample_count.png",
#     scale="log",
# )

# plot_gdf_column(
#     gdf,
#     "max_sample_count",
#     title="SkySat/Dove Intersection Counts (Agg: Max)",
#     show_land_ocean=True,
#     save_path=FIG_DIR / "max_sample_count.png",
#     scale="log",
# )

# # ------------------ Filtered -----------------------

# query = """
# SELECT
#     grid_id,
#     COUNT(DISTINCT (dove_id, skysat_id)) AS sample_count
# FROM
#     samples_all
# WHERE
#     tide_height_delta < 0.5
# GROUP BY
#     grid_id
# """

# df = con.execute(query).fetchdf()
# df.grid_id = df.grid_id.map(int)
# df = df.set_index("grid_id")
# hex_df = grids_df[["hex_id"]].join(df, how="left").fillna({"sample_count": 0})

# hex_df["sample_count"] += 1

# logger.info("Query finished")

# logger.info("Plotting Counts")
# agg = (
#     hex_df.dropna(subset=["sample_count"])
#     .groupby("hex_id")
#     .agg(
#         median_sample_count=("sample_count", "median"),
#         max_sample_count=("sample_count", "max"),
#     )
# )
# agg = agg[agg.index >= 0].join(hex_grid[["geometry"]])
# gdf = gpd.GeoDataFrame(agg, geometry="geometry", crs=hex_grid.crs)

# plot_gdf_column(
#     gdf,
#     "median_sample_count",
#     title="SkySat/Dove Intersection Counts (Agg: Median, Filtered)",
#     show_land_ocean=True,
#     save_path=FIG_DIR / "median_sample_count_filtered.png",
#     scale="log",
# )

# plot_gdf_column(
#     gdf,
#     "max_sample_count",
#     title="SkySat/Dove Intersection Counts (Agg: Max, Filtered)",
#     show_land_ocean=True,
#     save_path=FIG_DIR / "max_sample_count_filtered.png",
#     scale="log",
# )

10:19:54 - INFO - Found 16578 parquet files
10:19:56 - INFO - Loaded 31823 open‑ocean cells, 1495623 coastal grid cells, and heuristic table with 7566 rows
10:19:56 - INFO - Generating global hex grid: 20790 hexagon centres
10:19:57 - INFO - Generated 18085 equal‑area hexagons
10:20:28 - INFO - Finished spatial ID assignments (hex_id ↔ grid_id ↔ cell_id)
10:20:28 - INFO - Merged tidal heuristics into grids and query dataframes
10:20:30 - INFO - Registered DuckDB view 'samples_all'


In [14]:
aa.head(5)

Unnamed: 0_level_0,hex_id,geometry,sample_count
grid_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
10622174,11200,"POLYGON ((54.0352 16.95949, 54.0496 17.00978, ...",8250.0
11846093,12489,"POLYGON ((50.71467 26.15852, 50.73642 26.20876...",7353.0
13508506,14132,"POLYGON ((128.16209 40.0097, 128.25595 40.0598...",7092.0
12495729,13201,"POLYGON ((27.25305 31.33118, 27.26753 31.38138...",6697.0
11846092,12489,"POLYGON ((50.659 26.15852, 50.68073 26.20876, ...",6337.0


In [16]:
aa = gdf[gdf.sample_count > 1000].copy().to_crs("EPSG:4326")

aa = aa.sort_values(by="sample_count", ascending=False)
cent = aa.centroid.iloc[0]
m = folium.Map(
    location=[cent.y, cent.x],
    zoom_start=4, 
    tiles="CartoDB positron",
    width=1000,
    height=800
)

color_scale = linear.viridis.scale(0.0, aa.sample_count.max())  # type: ignore

for grid_id, row in aa.iterrows():
    geom = row.geometry
    value = row.sample_count
    folium.GeoJson(
        data=geom,
        style_function=lambda f, col=color_scale(value): {
            "fillColor": col,
            "color":     col,      # outline same as fill
            "weight":    1,
            "fillOpacity": 0.7,
        },
        tooltip=f"{grid_id}: {value:0f}",
    ).add_to(m)

m

In [2]:
import pandas as pd
import geopandas as gpd
import numpy as np
import matplotlib.pyplot as plt
import folium
from pathlib import Path
from tqdm import tqdm
from branca.colormap import linear

# --- Configuration ---

In [3]:
pths = list(Path("/Users/kyledorman/data/planet_coverage/points_30km/skysat/results").glob("*/*/*/*/data.parquet"))
len(pths)

32638

In [None]:
from shapely import wkb
import tqdm

for pth in tqdm.tqdm(pths):
    # load the parquet into pandas to rebuild geometries
    df_pd: pd.DataFrame = pd.read_parquet(pth)
    df_pd["geometry"] = df_pd["geometry_wkb"].apply(wkb.loads)  # type: ignore
    df_pd = df_pd.drop(columns=["geometry_wkb"])
    satellite_gdf = gpd.GeoDataFrame(df_pd, geometry="geometry", crs="EPSG:4326")
    if not satellite_gdf.geometry.is_valid.all():
        print("orig", pth)
    proj_gdf = satellite_gdf.to_crs(orig_crs)
    if not proj_gdf.geometry.is_valid.all():
        print("proj", pth)

In [None]:
pth = '/Users/kyledorman/data/planet_coverage/points_30km/skysat/results/2019/00/40/10/data.parquet'
df_pd: pd.DataFrame = pd.read_parquet(pth)
df_pd["geometry"] = df_pd["geometry_wkb"].apply(wkb.loads)  # type: ignore
satellite_gdf = gpd.GeoDataFrame(df_pd, geometry="geometry", crs="EPSG:4326")

satellite_gdf.geometry = satellite_gdf.geometry.make_valid()
proj_gdf = satellite_gdf.to_crs(orig_crs)
valid = proj_gdf.geometry.is_valid

proj_gdf.geometry = proj_gdf.geometry.make_valid()

m = folium.Map(
    location=[satellite_gdf.geometry.centroid.y.mean(), satellite_gdf.geometry.centroid.x.mean()], 
    zoom_start=4, 
    tiles="CartoDB positron",
    width=1000,
    height=600
)

for _, row in proj_gdf.to_crs(satellite_gdf.crs)[~valid].iterrows():
    folium.GeoJson(
        row.geometry,
    ).add_to(m)

m
    

In [None]:
BASE = Path("/Users/kyledorman/data/planet_coverage/ca_only/")  # <-- update this

In [None]:
ca_ocean = gpd.read_file(BASE / "ca_ocean.geojson")
orig_crs = gpd.read_file(BASE / "ocean_grids.gpkg").crs
query_df = gpd.read_file(BASE / "ocean_grids.gpkg").to_crs(ca_ocean.crs)
grids_df = gpd.read_file(BASE / "coastal_grids.gpkg").to_crs(ca_ocean.crs)

query_ca = query_df[query_df.geometry.intersects(ca_ocean.union_all())]

grids_ca = grids_df[grids_df.geometry.intersects(query_ca.union_all())]

inter_df = gpd.read_file(BASE / "coastal_skysat_dove_intersections.gpkg").to_crs(ca_ocean.crs)
inter_df['acquired_delta_minutes'] = (inter_df.acquired_delta_sec / 60).abs()
inter_df['acquired_delta_hours'] = (inter_df.acquired_delta_sec / 60 / 60).abs()
inter_df['dove_tide_height_abs'] = inter_df.dove_tide_height.abs()

inter_df_10min = inter_df[inter_df.acquired_delta_minutes < 10]

inter_df = inter_df.sort_values(by=["dove_id", "skysat_id", "cell_id"]).drop_duplicates(subset=["dove_id", "skysat_id"])
inter_df_10min = inter_df_10min.sort_values(by=["dove_id", "skysat_id", "cell_id"]).drop_duplicates(subset=["dove_id", "skysat_id"])

inter_df["pair_key"] = list(zip(inter_df["dove_id"], inter_df["skysat_id"]))
inter_df = inter_df.set_index("pair_key", drop=True)
assert not inter_df.index.duplicated().any(), "Composite key isn’t unique!"

inter_df_10min["pair_key"] = list(zip(inter_df_10min["dove_id"], inter_df_10min["skysat_id"]))
inter_df_10min = inter_df_10min.set_index("pair_key", drop=True)
assert not inter_df_10min.index.duplicated().any(), "Composite key isn’t unique!"

inter_df.head(5)

In [None]:
def plot_df(df, column_name, title, zoom=7, show_grids: bool = True):
    # --- Folium map for % ---
    if df[column_name].max() == df[column_name].min():
        scale_min = 0
    else:
        scale_min = df[column_name].min()
    color_scale = linear.viridis.scale(scale_min, df[column_name].max())
    
    m = folium.Map(
        location=[df.geometry.centroid.y.mean(), df.geometry.centroid.x.mean()], 
        zoom_start=zoom, 
        tiles="CartoDB positron",
        width=1000,
        height=600
    )

    if show_grids:
        for _, row in grids_ca.iterrows():
            folium.GeoJson(
                row.geometry,
                tooltip=str(row["cell_id"]),
                style_function=lambda feature: {
                    "color": "blue",
                    "weight": 1,
                }
            ).add_to(m)

    for grid_id, row in df.iterrows():
        value = row[column_name]
        geom = row.geometry
        folium.GeoJson(
            data=geom,
            style_function=lambda f, col=color_scale(value): {
                "fillColor": col,
                "color":     col,      # outline same as fill
                "weight":    1,
                "fillOpacity": 0.1,
            },
            tooltip=f"{grid_id}<br>{column_name}: {value:0.1f}",
        ).add_to(m)
    
    color_scale.caption = title
    color_scale.add_to(m)
    
    return m

In [None]:
limit_df = inter_df.sort_values(by=["skysat_id", "overlap_area"], ascending=False).drop_duplicates(subset=["skysat_id"])

limit_df_10min = inter_df_10min.sort_values(by=["skysat_id", "overlap_area"], ascending=False).drop_duplicates(subset=["skysat_id"])

print(len(limit_df), len(inter_df))
print(len(limit_df_10min), len(inter_df_10min))

In [None]:
limit_df.head(5)

In [None]:
plot_df(limit_df, 'acquired_delta_hours', 'acquired_delta_hours', show_grids=False)

In [None]:
plot_df(limit_df_10min, 'acquired_delta_hours', 'acquired_delta_hours', show_grids=False)

In [None]:
coverage = limit_df.to_crs(orig_crs).union_all().simplify(1000, preserve_topology=True)
coverage_df = gpd.GeoDataFrame(geometry=[coverage], crs=orig_crs).to_crs(ca_ocean.crs)

coverage_df

In [None]:
m = folium.Map(
    location=[coverage_df.geometry.iloc[0].centroid.y, coverage_df.geometry.iloc[0].centroid.x], 
    zoom_start=5, 
    tiles="CartoDB positron",
    width=1000,
    height=600
)
folium.GeoJson(
    data=coverage_df.geometry.iloc[0],
).add_to(m)
    
m

In [None]:
count_df = grids_ca.rename(columns={"cell_id": "grid_id"})[["grid_id", "geometry"]].merge(
    limit_df.groupby('grid_id').acquired_delta_sec.count(), on=["grid_id"], how="inner"
).rename(columns={'acquired_delta_sec': 'counter'})

# count_df.counter = count_df.counter.clip(0, 5)

count_df

In [None]:
plot_df(count_df, 'counter', 'per_grid_counts', show_grids=False)

In [None]:
limit_df.dove_tide_height.hist()