In [2]:
%reload_ext autoreload
%autoreload 2
%matplotlib widget

import importlib
import os

from matplotlib import pyplot as plt
from sqlalchemy import create_engine

from ppcollapse import setup_logger
from ppcollapse.utils.config import ConfigManager
from ppcollapse.utils.database import get_collapses_df, get_image
from setup_django_ppcx import setup_django

# Allow Django ORM to work in Jupyter's async environment
os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true"

# Setup Django
config = ConfigManager(config_path="config.yaml")
setup_django(db_config=config.get("database"))

logger = setup_logger(level="INFO", name="ppcx")
config = ConfigManager(config_path="config.yaml")
db_engine = create_engine(config.db_url)

# Import Django models
ppcx_app_models = importlib.import_module("ppcx_app.models")
Collapse = ppcx_app_models.Collapse
Image = ppcx_app_models.Image

Manual fix of some collapse dates


In [3]:
# 09/10/2017 - volume wrong (should be 3500 m3)
collapse_id = 1122
collapse = Collapse.objects.get(id=collapse_id)
collapse.volume = 3500
collapse.save()

In [61]:
import math
import re
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from pathlib import Path
from typing import Iterator, NamedTuple

import numpy as np
from django.contrib.gis.geos import GEOSGeometry
from django.contrib.gis.geos import MultiPolygon as GEOSMultiPolygon
from django.db import transaction
from shapely import wkt as shapely_wkt
from shapely.errors import TopologicalError
from shapely.geometry import Polygon as ShpPolygon


class CvatPoly(NamedTuple):
    image_name: str
    width: int
    height: int
    points: np.ndarray  # (N,2) float
    volume: float | None


def _parse_points(points_str: str) -> np.ndarray:
    pts = []
    for tok in points_str.split(";"):
        tok = tok.strip()
        if not tok:
            continue
        x_str, y_str = tok.split(",")
        pts.append((float(x_str), float(y_str)))
    return np.asarray(pts, dtype=float)


def _parse_dt_from_name(name: str):
    # Example: PPCX_2_2017_10_09_12_02_02_REG.jpg
    m = re.search(r"_(\d{4})_(\d{2})_(\d{2})_(\d{2})_(\d{2})_(\d{2})_", name)
    if not m:
        return None
    y, mo, d, h, mi, s = map(int, m.groups())

    return datetime(y, mo, d, h, mi, s)


def _find_image(
    name: str,
    search_mode: str = "auto",
    time_tolerance_seconds: int = 300,
) -> Image | None:
    """
    Find an Image in the database by filename or datetime.

    Args:
        name: Image filename (e.g., "PPCX_2_2017_10_09_12_02_02_REG.jpg")
        search_mode: How to search for the image. Options:
            - "auto": Try filename -> exact timestamp -> closest within tolerance
            - "filename": Only by filename
            - "exact": Only by exact timestamp parsed from filename
            - "closest": Find closest image by timestamp (within tolerance)
        time_tolerance_seconds: Maximum time difference (in seconds) when searching
                                for closest image. Default: 300 (5 minutes)

    Returns:
        Image object if found, None otherwise
    """

    basename = Path(name).name

    # Parse datetime from filename once
    dt = _parse_dt_from_name(basename)

    # MODE: filename only
    if search_mode == "auto" or search_mode == "filename":
        # Try to match by filename
        qs = Image.objects.filter(file_path__endswith=basename)
        if qs.exists():
            return qs.first()

        # Try conataining filename (in case of subdirs)
        qs = Image.objects.filter(file_path__icontains=basename)
        if qs.exists():
            return qs.first()

        return None

    # MODE: exact timestamp only
    if search_mode == "auto" or search_mode == "exact":
        # Try to match by exact timestamp
        if dt is None:
            return None
        qs = Image.objects.filter(
            acquisition_timestamp__year=dt.year,
            acquisition_timestamp__month=dt.month,
            acquisition_timestamp__day=dt.day,
            acquisition_timestamp__hour=dt.hour,
            acquisition_timestamp__minute=dt.minute,
            acquisition_timestamp__second=dt.second,
        )
        return qs.first() if qs.exists() else None

    # MODE: closest within tolerance
    if search_mode == "auto" or search_mode == "closest":
        if dt is None:
            return None

        # Search within tolerance window
        time_delta = timedelta(seconds=time_tolerance_seconds)
        qs = Image.objects.filter(
            acquisition_timestamp__date__gte=dt - time_delta,
            acquisition_timestamp__date__lte=dt + time_delta,
        )
        if not qs.exists():
            return None

        # Find closest
        nearest = min(
            qs, key=lambda im: abs((im.acquisition_timestamp - dt).total_seconds())
        )
        time_diff = abs((nearest.acquisition_timestamp - dt).total_seconds())
        if time_diff <= time_tolerance_seconds:
            return nearest
        return None

    # Invalid search mode
    raise ValueError(
        f"Invalid search_mode: {search_mode}. "
        f"Must be one of: 'auto', 'filename', 'exact', 'closest'"
    )


def _iter_cvat_polygons(
    xml_path: str | Path, label_name: str = "collapse"
) -> Iterator[CvatPoly]:
    tree = ET.parse(xml_path)
    root = tree.getroot()
    for img_el in root.findall("./image"):
        name = img_el.get("name")
        width = int(img_el.get("width"))
        height = int(img_el.get("height"))
        # Iterate polygons in this image
        for poly_el in img_el.findall("./polygon"):
            if poly_el.get("label") != label_name:
                continue
            pts = _parse_points(poly_el.get("points", ""))
            if pts.shape[0] < 3:
                continue
            vol = None
            for attr_el in poly_el.findall("./attribute"):
                if attr_el.get("name", "").lower() == "volume":
                    try:
                        v = float(str(attr_el.text).strip())
                        if math.isfinite(v):
                            vol = v
                    except Exception:
                        pass
            yield CvatPoly(name, width, height, pts, vol)


@transaction.atomic
def create_collapses_from_cvat(
    xml_path: str | Path,
    *,
    label_name: str = "collapse",
    image_search_mode: str = "auto",
    time_tolerance_seconds: int = 300,
    default_volume: float | None = None,
    remove_reg_suffix: bool = True,
    dry_run: bool = True,
) -> list[int]:
    created_ids: list[int] = []
    for item in _iter_cvat_polygons(xml_path, label_name=label_name):
        name = item.image_name
        if remove_reg_suffix:
            name = name.replace("_REG", "")
        img = _find_image(
            name,
            search_mode=image_search_mode,
            time_tolerance_seconds=time_tolerance_seconds,
        )
        if img is None:
            print(f"[WARN] Image not found for {item.image_name} – skipping.")
            continue

        # Build shapely polygon; fix if self-intersecting with buffer(0)
        try:
            shp = ShpPolygon(item.points)
            if not shp.is_valid:
                shp = shp.buffer(0)
        except TopologicalError:
            print(f"[WARN] Invalid polygon for {item.image_name} – skipping.")
            continue
        if shp.is_empty or shp.area <= 0:
            print(f"[WARN] Empty/zero-area polygon for {item.image_name} – skipping.")
            continue

        # Convert to Django geometry
        wkt_str = shapely_wkt.dumps(shp, rounding_precision=3)
        geos = GEOSGeometry(wkt_str, srid=0)

        if geos.geom_type == "Polygon":
            geos = GEOSMultiPolygon(geos, srid=0)

        # Compute area in pixels
        area_px2 = float(shp.area)

        # Volume: CVAT > default > 0.0
        volume = item.volume if item.volume is not None else default_volume

        collapse = Collapse(
            image=img,
            geom=geos,
            date=img.acquisition_timestamp.date(),
            area=area_px2,
            volume=volume,
        )

        if dry_run:
            print(
                f"[DRY RUN] Would create Collapse for image id={img.id} "
                f"with area={area_px2:.1f} px², volume={volume:.1f} m³. "
                f"To create, set dry_run=False."
            )
            continue

        collapse.save()
        created_ids.append(collapse.id)
        print(f"[OK] Created Collapse id={collapse.id} for image id={img.id}")
    return created_ids

In [60]:
# Create new collapse from cvat data labels
cvat_file = "data/job_23_annotations_2025_11_25_17_21_42_cvat for images 1.1.xml"

created_ids = create_collapses_from_cvat(
    cvat_file,
    label_name="collapse",
    image_search_mode="auto",
    time_tolerance_seconds=300,
    default_volume=0.0,
    remove_reg_suffix=True,
    dry_run=False,
)

[OK] Created Collapse id=1503 for image id=55459


In [None]:
# List of collapse ID to delete
collapse_ids_to_delete = [
    49,
    50,
    56,
    57,
    61,
    62,
    67,
    71,
    77,
    79,
    81,
    83,
    84,
    91,
    93,
    94,
    101,
    104,
    113,
    117,
    118,
    119,
    120,
    121,
    122,
    123,
    125,
    126,
    129,
    130,
    133,
    137,
    139,
    141,
    145,
    147,
    148,
    149,
    150,
    160,
    168,
    169,
    171,
    173,
    174,
    177,
    178,
    182,
    188,
]

In [None]:
engine = create_engine(config.db_url)
df = get_collapses_df(engine)
if df.empty:
    logger.warning("No collapses found in database.")
df

In [None]:
from shapely.ops import unary_union

# Example: merge multiple polygons from your collapse dataframe

collapse_1 = df.iloc[0]
collapse_2 = df.iloc[1]

geom1 = shapely_wkt.loads(collapse_1["geom_wkt"])
geom2 = shapely_wkt.loads(collapse_2["geom_wkt"])

# Merge into single geometry (will be MultiPolygon if not contiguous)
merged_geom = unary_union([geom1, geom2])

# If they ARE contiguous, this will return a Polygon
# If they're NOT contiguous, this will return a MultiPolygon
print(type(merged_geom))  # Polygon or MultiPolygon


In [None]:
image = get_image(image_id=int(collapse_1["image_id"]), config=config)
fig, ax = plt.subplots(figsize=(6, 6))
ax.imshow(image)
# Handle both Polygon and MultiPolygon
if merged_geom.geom_type == "Polygon":
    # Single polygon - plot directly
    xs, ys = merged_geom.exterior.xy
    ax.plot(xs, ys, color="red", linewidth=2)
    ax.fill(xs, ys, facecolor="red", edgecolor="none", alpha=0.3)
elif merged_geom.geom_type == "MultiPolygon":
    # Multiple polygons - iterate and plot each
    for poly in merged_geom.geoms:
        xs, ys = poly.exterior.xy
        ax.plot(xs, ys, color="red", linewidth=2)
        ax.fill(xs, ys, facecolor="red", edgecolor="none", alpha=0.3)
else:
    raise ValueError("Merged geometry is neither Polygon nor MultiPolygon.")
ax.set_axis_off()
ax.set_title("Collapse Geometry", fontsize=10, pad=5)

In [None]:
# Now import Django models
from ppcx_app.models import Collapse
from shapely import wkt as shapely_wkt


In [None]:
# Get the merged geometry
merged_geom = unary_union([geom1, geom2])

# Convert Shapely geometry to Django GEOSGeometry
wkt_str = shapely_wkt.dumps(merged_geom)
django_geom = GEOSGeometry(wkt_str, srid=0)

collapse_obj = Collapse.objects.get(id=collapse_1["id"])
collapse_obj.geom = django_geom
