## Incremental imports

```bash
__ dia_object_collection/
|__ dia_object_lc/
   |__ dataset/
       |__ Norder=1/
       |   |__ Dir=0/
       |       |__ Npix=0/
       |           |__ 2025-09-26.parquet
       |           |__ 2025-09-27.parquet
       |           |__ past.parquet (weekly_aggregated)
       |__ Norder=2/
       |   |__ Dir=0/
       |       |__ Npix=0/
       |           |__ 2025-09-26.parquet
       |           |__ 2025-09-27.parquet
       |           |__ past.parquet (weekly_aggregated)
       |__ .../
       |__ _common_metadata (constant)
   |__ partition_info.csv (constant)
   |__ hats.properties (daily_updated)
```

#### Daily

- Keep the existing partitioning schema.

- Add a new parquet file to each pixel.

  - Account for the object rows of latest validity start only.
  
  - Find out where each new object lies.

  - Generate the margin caches for sources and forced sources.
  
  - Nest the sources and forced sources in their respective objects.

  - Copy the new daily parquet files into their respective pixel directories.

- Delete the outdated _metadata and skymaps, if they exist.

- Update hats.properties (at least `n_rows`).


#### [TBD] Every so often (e.g. weekly):

- Aggregate the daily parquets of each pixel into the `past.parquet` file.

  - Merge duplicate object rows.

  - Keep the latest values for each conflicting object columns.

  - Merge the light curve nested columns.

- Repartition each pixel according to a pre-defined threshold argument.

- Regenerate _metadata and skymap files.

- Regenerate collection with margin cache and index catalog (from scratch).


Similar to DB transactions, these updates are disruptive actions. Users should be OK with it.

In [None]:
# This notebook requires calling `Catalog.join_nested` with "how='left'"
# %pip install git+https://github.com/astronomy-commons/lsdb.git@sandro/join-nested-left

In [None]:
import lsdb
import tempfile

from dask.distributed import Client
from hats_import.catalog.arguments import ImportArguments
from hats_import.catalog.resume_plan import ResumePlan
from incremental.pipeline import map_pixels, binning, split_pixels, reduce_pixels, finalize
from pathlib import Path

In [None]:
# Path to the PPDB data
PPDB_DIR = Path("/sdf/scratch/rubin/ppdb/data/lsstcam")

# Path to a pre-existing catalog
dia_object_lc_path = Path("incremental/dia_object_lc")

In [None]:
# Initialize a Dask Client
tmp_dir = tempfile.TemporaryDirectory()
client = Client(n_workers=16, threads_per_worker=1, local_directory=tmp_dir.name)
tmp_dir

Load existing nested catalog:

In [None]:
# The catalog has leaf HEALPix directories
dia_object_lc = lsdb.open_catalog(dia_object_lc_path)
existing_pixels = set(dia_object_lc.get_healpix_pixels())
existing_pixels

In [None]:
# We will use the highest order of the existing 
# nested catalog to map the new data
mapping_order = int(max(existing_pixels).order)
print(f"mapping_order = {mapping_order}")

Get the new PPDB file increments for objects, sources and forced sources:

In [None]:
def get_paths(dataset_type, start=10, end=20):
    """Return the latest parquet file for a given dataset type."""
    dataset_name = "".join(word.capitalize() for word in dataset_type.split("_"))
    files = sorted(PPDB_DIR.rglob(f"{dataset_name}.parquet"))
    return files[start:end]

new_object_files = get_paths("dia_object")
new_source_files = get_paths("dia_source")
new_forced_source_files = get_paths("dia_forced_source")

When importing new data, we want to keep the existing partitioning structure intact. We’ll only add new pixels if the alerts include data that lies outside the current pixel coverage. This is a change from how the `hats-import` map-reduce pipeline currently works.

In [None]:
def import_dataset(dataset_name, catalog_type, new_input_files):
    args = ImportArguments(
        output_path=tmp_dir.name,
        output_artifact_name=dataset_name,
        input_file_list=new_input_files,
        file_reader="parquet",
        ra_column="ra",
        dec_column="dec",
        catalog_type=catalog_type,
        highest_healpix_order=mapping_order,
        simple_progress_bar=True,
        resume=False,
    )
    resume_plan = ResumePlan(import_args=args)
    histogram, total_rows, pickled_reader_file = map_pixels(args, resume_plan, client)
    alignment_file = binning(args, resume_plan, histogram, existing_pixels, total_rows)
    split_pixels(args, resume_plan, alignment_file, pickled_reader_file, client)
    reduce_pixels(args, resume_plan, client)
    finalize(args, resume_plan, histogram, total_rows)

In [None]:
import_dataset("dia_object", "object", new_object_files)
import_dataset("dia_source", "source", new_source_files)
import_dataset("dia_forced_source", "source", new_forced_source_files)

### Post-processing

Same post-processing steps that were applied to the existing data.

In [None]:
import astropy.units as u
import hats
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

from dask.distributed import as_completed
from datetime import datetime, timezone
from hats.catalog import PartitionInfo
from hats.io import paths
from hats.io.parquet_metadata import write_parquet_metadata
from tqdm import tqdm

In [None]:
def select_by_latest_validity(table):
    """Select rows with the latest validityStart for each object."""
    return table.sort_values("validityStart").drop_duplicates(
        "diaObjectId", keep="last"
    )


def append_mag_and_magerr(table, flux_cols):
    """Calculate magnitudes and their errors for flux columns."""
    mag_cols = {}

    for flux_col in flux_cols:
        flux_col_err = f"{flux_col}Err"
        mag_col = flux_col.replace("Flux", "Mag")
        mag_col_err = f"{mag_col}Err"

        flux = table[flux_col]
        mag = u.nJy.to(u.ABmag, flux)
        mag_cols[mag_col] = mag

        flux_err = table[flux_col_err]
        upper_mag = u.nJy.to(u.ABmag, flux + flux_err)
        lower_mag = u.nJy.to(u.ABmag, flux - flux_err)
        magErr = -(upper_mag - lower_mag) / 2
        mag_cols[mag_col_err] = magErr

    mag_table = pd.DataFrame(
        mag_cols, dtype=pd.ArrowDtype(pa.float32()), index=table.index
    )
    return pd.concat([table, mag_table], axis=1)


def cast_columns_float32(table):
    """Cast non-(positional/time) columns to single-precision"""
    position_time_cols = [
        "ra",
        "dec",
        "raErr",
        "decErr",
        "x",
        "y",
        "xErr",
        "yErr",
        "midpointMjdTai",
        "radecMjdTai",
    ]
    columns_to_cast = [
        field
        for (field, type) in table.dtypes.items()
        if field not in position_time_cols and type == pd.ArrowDtype(pa.float64())
    ]
    dtype_map = {col: pd.ArrowDtype(pa.float32()) for col in columns_to_cast}
    return table.astype(dtype_map)


def postprocess_catalog(catalog_name, flux_col_prefixes):
    catalog_dir = f"{tmp_dir.name}/{catalog_name}"
    catalog = hats.read_hats(catalog_dir)
    futures = []
    for target_pixel in catalog.get_healpix_pixels():
        futures.append(
            client.submit(
                process_partition,
                catalog_dir=catalog_dir,
                target_pixel=target_pixel,
                flux_col_prefixes=flux_col_prefixes,
            )
        )
    for future in tqdm(as_completed(futures), desc=catalog_name, total=len(futures)):
        if future.status == "error":
            raise future.exception()
    rewrite_catalog_metadata(catalog)


def process_partition(catalog_dir, target_pixel, flux_col_prefixes):
    """Apply post-processing steps to each individual partition"""
    file_path = hats.io.pixel_catalog_file(catalog_dir, target_pixel)
    table = pd.read_parquet(file_path, dtype_backend="pyarrow")
    if "validityStart" in table.columns:
        table = select_by_latest_validity(table)
    if len(flux_col_prefixes) > 0:
        table = append_mag_and_magerr(table, flux_col_prefixes)
    table = cast_columns_float32(table)
    final_table = pa.Table.from_pandas(
        table, preserve_index=False
    ).replace_schema_metadata()
    pq.write_table(final_table, file_path.path)


def rewrite_catalog_metadata(catalog):
    """Update catalog metadata after processing the leaf parquet files"""
    destination_path = f"{tmp_dir.name}/{catalog.catalog_name}"
    parquet_rows = write_parquet_metadata(destination_path)
    # Read partition info from _metadata and write to partition_info.csv
    partition_info = PartitionInfo.read_from_dir(destination_path)
    partition_info_file = paths.get_partition_info_pointer(destination_path)
    partition_info.write_to_file(partition_info_file)
    now = datetime.now(tz=timezone.utc)
    catalog.catalog_info.copy_and_update(
        total_rows=parquet_rows, hats_creation_date=now.strftime("%Y-%m-%dT%H:%M%Z")
    ).to_properties_file(destination_path)

In [None]:
flux_col_prefixes = [f"{band}_scienceFluxMean" for band in list("ugrizy")]
postprocess_catalog("dia_object", flux_col_prefixes=flux_col_prefixes)
postprocess_catalog("dia_source", flux_col_prefixes=["scienceFlux"])
postprocess_catalog("dia_forced_source", flux_col_prefixes=["scienceFlux"])

### Create nested increment

Nest sources in objects and sort them by MJD.

In [None]:
from hats_import import pipeline_with_client
from hats_import.margin_cache.margin_cache_arguments import MarginCacheArguments

margin_radius_arcsec = 5

def sort_nested_sources(df, source_cols):
    mjd_col = "midpointMjdTai"
    for source_col in source_cols:
        flat_sources = df[source_col].nest.to_flat()
        df = df.drop(columns=[source_col])
        df = df.add_nested(
            flat_sources.sort_values([flat_sources.index.name, mjd_col]), source_col
        )
    return df

In [None]:
args = MarginCacheArguments(
    input_catalog_path=f"{tmp_dir.name}/dia_source",
    output_path=tmp_dir.name,
    margin_threshold=margin_radius_arcsec,
    output_artifact_name=f"dia_source_{margin_radius_arcsec}arcs",
    simple_progress_bar=True,
    resume=False,
)
pipeline_with_client(args, client)

In [None]:
args = MarginCacheArguments(
    input_catalog_path=f"{tmp_dir.name}/dia_forced_source",
    output_path=tmp_dir.name,
    margin_threshold=margin_radius_arcsec,
    output_artifact_name=f"dia_forced_source_{margin_radius_arcsec}arcs",
    simple_progress_bar=True,
    resume=False,
)
pipeline_with_client(args, client)

In [None]:
dia_object = lsdb.read_hats(f"{tmp_dir.name}/dia_object")

dia_source = lsdb.read_hats(
    f"{tmp_dir.name}/dia_source",
    margin_cache=f"{tmp_dir.name}/dia_source_{margin_radius_arcsec}arcs",
)

dia_forced_source = lsdb.read_hats(
    f"{tmp_dir.name}/dia_forced_source",
    margin_cache=f"{tmp_dir.name}/dia_forced_source_{margin_radius_arcsec}arcs",
)

In [None]:
dia_object_nested = (
    dia_object.join_nested(
        dia_source,
        left_on="diaObjectId",
        right_on="diaObjectId",
        nested_column_name="diaSource",
        how="left"
    )
    .join_nested(
        dia_forced_source,
        left_on="diaObjectId",
        right_on="diaObjectId",
        nested_column_name="diaForcedSource",
        how="left"
    )
    .map_partitions(
        lambda x: sort_nested_sources(x, source_cols=["diaSource", "diaForcedSource"])
    )
)
dia_object_nested

### Update existing catalog

Write the partitions with the new data to the existing catalog.

In [None]:
import dask
import numpy as np


def write_partitions(catalog, output_catalog_dir, **kwargs):
    """Saves catalog partitions as parquet to disk"""
    results, pixels = [], []
    partitions = catalog._ddf.to_delayed()

    # The new parquet files will be named after the current date.
    npix_suffix = f"/{datetime.now().strftime("%Y-%m-%d")}.parquet"

    for pixel, partition_index in catalog._ddf_pixel_map.items():
        results.append(
            perform_write(
                partitions[partition_index],
                pixel,
                output_catalog_dir,
                npix_suffix,
                **kwargs,
            )
        )
        pixels.append(pixel)

    counts = dask.compute(*results)
    non_empty_indices = np.nonzero(counts)
    non_empty_pixels = np.array(pixels)[non_empty_indices]
    non_empty_counts = np.array(counts)[non_empty_indices]

    # Check that the catalog is not empty
    if len(non_empty_pixels) == 0:
        raise RuntimeError("The output catalog is empty")
    return list(non_empty_pixels), list(non_empty_counts)


@dask.delayed
def perform_write(df, hp_pixel, output_catalog_dir, npix_suffix, **kwargs):
    if len(df) == 0:
        return 0    
    # The parquet leaf files live in a pixel directory. Create it if it does not exist.
    pixel_dir = hats.io.pixel_directory(output_catalog_dir, hp_pixel.order, hp_pixel.pixel) / f"Npix={hp_pixel.pixel}"
    hats.io.file_io.make_directory(pixel_dir, exist_ok=True)
    # Write file to destination.
    pixel_path = paths.pixel_catalog_file(output_catalog_dir, hp_pixel, npix_suffix=npix_suffix)
    df.to_parquet(pixel_path.path, filesystem=pixel_path.fs, **kwargs)
    return len(df)

In [None]:
pixels, counts = write_partitions(dia_object_nested, dia_object_lc_path)
pixels, counts

And update the catalog properties and metadata:

In [None]:
# Delete point maps and sky maps
paths.get_skymap_file_pointer(dia_object_lc_path).unlink(missing_ok=True)
paths.get_point_map_file_pointer(dia_object_lc_path).unlink(missing_ok=True)
for order in dia_object_lc.hc_structure.catalog_info.skymap_alt_orders:
    paths.get_skymap_file_pointer(dia_object_lc_path, order=order).unlink(missing_ok=True)

In [None]:
# Delete _metadata and _data_thumbnail.parquet
paths.get_parquet_metadata_pointer(dia_object_lc_path).unlink(missing_ok=True)
paths.get_data_thumbnail_pointer(dia_object_lc_path).unlink(missing_ok=True)

In [None]:
# Update partition_info.csv
partition_info = PartitionInfo.from_healpix(pixels)
partition_info_file = paths.get_partition_info_pointer(dia_object_lc_path)
partition_info.write_to_file(partition_info_file)

In [None]:
# Update hats.properties and properties
from lsdb.catalog.dataset.dataset import Dataset

old_properties = dia_object_lc.hc_structure.catalog_info

new_props = dict(
    Dataset.new_provenance_properties(dia_object_lc_path),
    total_rows=old_properties.total_rows + int(np.sum(counts)),
    hats_order=partition_info.get_highest_order(),
    hats_max_rows=None,
    skymap_order=None,
    skymap_alt_orders=None,
    moc_sky_fraction=f"{partition_info.calculate_fractional_coverage():0.5f}",
    # There is an issue with setting the default columns
    default_columns=old_properties.default_columns,
    npix_suffix="/",
)
dia_object_lc.hc_structure.catalog_info.copy_and_update(**new_props).to_properties_file(dia_object_lc_path)

In [None]:
client.close()