In [35]:
# import modules

import pandas as pd
import dask.dataframe as dd
import json
import glob
import os
import datetime

In [15]:
parquet_file = r"K:\CentraalDatamanagement\PDC\01_WIP\01_Algemeen\X_000002_DatakwaliteitBaseline\DAMO_H_parquet\DAMO_W.ws_Plattenormprofiel_H.parquet"
date_col = 'GDB_FROM_DATE'
id_col = 'OBJECTID'

In [3]:
df = dd.read_parquet(parquet_file)

In [37]:

import dask.dataframe as dd
import pandas as pd
import numpy as np
from typing import List, Optional, Sequence, Union

def last_attribute_change_dates(
    path: Union[str, Sequence[str]],
    object_id_col: str = "OBJECTID",
    from_col: str = "GDB_FROM_DATE",
    to_col: Optional[str] = "GDB_TO_DATE",
    attributes: Optional[List[str]] = None,   # expliciet opgeven welke attributen; anders auto
    exclude: Optional[List[str]] = None,       # kolommen om te negeren
    treat_empty_as_nan: bool = True,           # '' behandelen als NaN bij vergelijking
    npartitions: Optional[int] = None,         # optioneel herpartitioneren
    engine: str = "pyarrow",
):
    """
    Retourneert een Dask DataFrame met:
      - 1 rij per OBJECTID
      - Voor elke attribuutkolom: de laatste datum (GDB_FROM_DATE) waarop dit attribuut veranderde.
    """

    # 1) Lees alleen de metadata om kolomnamen te bepalen
    ddf_meta = dd.read_parquet(path, engine=engine)

    # 2) Bepaal attribuutkolommen
    drop = {object_id_col, from_col}
    if to_col:
        drop.add(to_col)
    if exclude:
        drop |= set(exclude)
    # verwijder gebruikelijke geometriekolommen indien aanwezig
    for gcol in ("geometry", "wkb_geometry", "geom", "GEOMETRY"):
        if gcol in ddf_meta.columns:
            drop.add(gcol)

    if attributes is None:
        attr_cols = [c for c in ddf_meta.columns if c not in drop]
    else:
        missing = set(attributes) - set(ddf_meta.columns)
        if missing:
            raise ValueError(f"Attribuutkolommen niet gevonden: {missing}")
        attr_cols = list(attributes)

    # 3) Lees alleen benodigde kolommen
    cols_needed = [object_id_col, from_col] + attr_cols
    ddf = dd.read_parquet(path, engine=engine, columns=cols_needed)

    # 4) Zorg dat from_col datetime is
    ddf[from_col] = dd.to_datetime(ddf[from_col], errors="coerce")

    if npartitions:
        ddf = ddf.repartition(npartitions=npartitions)

    # 5) Per OBJECTID-groep de laatste wijzigingsdatums bepalen
    def _per_object_last_change(pdf: pd.DataFrame,
                                from_col: str,
                                attr_cols: List[str],
                                treat_empty_as_nan: bool):
        # sorteer op tijd (stabiel)
        g = pdf.sort_values(from_col, kind="mergesort").reset_index(drop=True)
        out = {}

        # Voor elke attribuutkolom bepaal wanneer de waarde wijzigt t.o.v. vorige versie
        for col in attr_cols:
            s = g[col]

            # optioneel lege strings als NaN behandelen (handig voor string-attributen)
            if treat_empty_as_nan and s.dtype == "object":
                s = s.replace("", np.nan)

            prev = s.shift(1)
            # "zelfde waarde" = gelijk of beide NaN
            same = (s == prev) | (s.isna() & prev.isna())
            changed = ~same

            if len(changed) > 0:
                # De allereerste rij geldt als "initiële wijziging"
                changed.iloc[0] = True

            if changed.any():
                last_date = g.loc[changed, from_col].max()
            else:
                # Geen rijen? (zou niet voorkomen) -> NaT
                last_date = pd.NaT

            out[col] = last_date

        # 1 rij terug met alleen attribuutkolommen (OBJECTID komt straks uit de groupby-index)
        return pd.DataFrame([out])

    # 6) Meta voor Dask plannen: één lege DataFrame met attribuutkolommen (datetime64[ns])
    meta = pd.DataFrame({col: pd.Series(dtype="datetime64[ns]") for col in attr_cols})

    # 7) Apply per OBJECTID; Dask shufflet groepen zodat elke groep compleet is
    res = ddf.groupby(object_id_col).apply(
        _per_object_last_change,
        from_col=from_col,
        attr_cols=attr_cols,
        treat_empty_as_nan=treat_empty_as_nan,
        meta=meta,
        # split_out=...,  # optioneel (Dask >= 2023) om de output over meer partitions te spreiden
    )

    # groupby-index (OBJECTID) terug naar kolom
    res = res.reset_index()

    # => res: kolommen: OBJECTID + alle attribuutkolommen (als datums)
    return res


In [38]:

ddf_out = last_attribute_change_dates(
    parquet_file,
    object_id_col="OBJECTID",
    from_col="GDB_FROM_DATE",
    to_col="GDB_TO_DATE",
    # attributes=["NAAM", "TYPE", "STATUS"],  # optioneel: selectie; anders automatisch
    exclude=["GLOBALID", "CREATED_USER", "CREATED_DATE", "LAST_EDITED_USER", "LAST_EDITED_DATE"],  # optioneel
    npartitions=None,      # optioneel tunen
    treat_empty_as_nan=True
)


In [42]:
ddf_out.to_parquet(r"K:\CentraalDatamanagement\PDC\01_WIP\01_Algemeen\X_000002_DatakwaliteitBaseline\DAMO_H_parquet", engine="pyarrow", write_index=False)

KeyboardInterrupt: 