<div style="display: flex; align-items: flex-start; gap: 40px;">
  <div style="display: flex; align-items: center; gap: 10px;">
    <img src="https://www.linea.org.br/brand/linea-logo-color.svg" width="100" style="display: block;">
    <img src="https://cdn2.webdamdb.com/1280_c3PXjCZbPM23.png" width="180" style="display: block;">
  </div>
  
  <div style="margin-left: 20px;">
    <h2 style="margin: 0 0 10px 0; padding: 0;">Data Preparation on Object Tables<br> for Training Set Maker pipeline</h2>
    Notebook for LIneA Open OnDemand Platform<br>
    Data Release: <a href="https://dp0-2.lsst.io/">Data Preview 0.2</a> <br>
    Authors: <a href="mailto:luigi.lcsilva@gmail.com">Luigi Lucas de Carvalho Silva</a>, <a href="mailto:julia@linea.org.br">Julia Gschwend</a> <br>
    Last verified to run: 2025-08-16 <br>
    Repository: <a href="https://github.com/linea-it/pz-lsst-inkind">linea-it/pz-lsst-inkind</a> <br>
  </div>
</div>

**tl;dr:**

This notebook prepares the photometric input catalog for the Training Set Maker (TSM) pipeline using LSST Object Catalog data. The process is performed in two steps: 

1- Create skynny tables from the Object catalog: 
- Convert fluxes to magnitudes
- Apply dereddening corrections
- Round numerical values to eliminate excessive precision
- Mask invalid entries
- Apply quality cuts
- Save balanced parquet files with ~N rows (N chosen by user)  

2- Adapt to LSDB standards  
- Convert skinny tables into HATS format as collections including margin cache   


## Imports

In [1]:
################################### GENERAL ##########################################
import os
import gc
import glob
import time
import math
import yaml
import shutil
import logging
import tables_io
import numpy as np
import pandas as pd
import getpass
from datetime import datetime

##################################### DASK ###########################################
from dask import config as dask_config_class
from dask import dataframe as dd
from dask import delayed, compute
from dask.distributed import Client, performance_report, wait
from dask_jobqueue import SLURMCluster

##################################### ASTROPY ###########################################
from astropy.coordinates import SkyCoord
import astropy.units as u
from astropy.io import fits

##################################### DUSTMAPS ###########################################
from dustmaps.sfd import SFDQuery


    /scratch/users/julia/.dustmapsrc

To create a new configuration file in the default location, run the following python code:

    from dustmaps.config import config
    config.reset()

Note that this will delete your configuration! For example, if you have specified a data directory, then dustmaps will forget about its location.
  warn(('Configuration file not found:\n\n'


# Setups

Getting the variables from the .yaml.

In [2]:
with open("config.yaml", "r") as f:
    cfg = yaml.safe_load(f)

globals().update(cfg)

MAG_CONV = np.log(10) * 0.4

if DP_col_value_to_replace is None:
    DP_col_value_to_replace = np.nan
if DP_err_value_to_replace is None:
    DP_err_value_to_replace = np.nan

# Define sufixo baseado nos flags
suffix = ""
if input_col_type == "flux":
    if DP_compute_magnitude:
        suffix = "_mag"
    else:
        suffix = "_flux"

current_date = datetime.now().strftime('%Y-%m-%d_%H-%M')

# Define nome base da pasta de execu√ß√£o
if "input_col_model" in cfg and cfg["input_col_model"]:
    run_folder = f'DP_run_{DP_which_release}_{cfg["input_col_model"]}{suffix}_{current_date}'
else:
    run_folder = f'DP_run_{DP_which_release}{suffix}_{current_date}'

# Caminhos de sa√≠da
os.makedirs(user_base_path, exist_ok=True)

run_path = os.path.join(user_base_path, run_folder)
os.makedirs(run_path, exist_ok=True)

data_dir = os.path.join(run_path, 'data')
os.makedirs(data_dir, exist_ok=True)

logs_dir = os.path.join(run_path, 'logs')
os.makedirs(logs_dir, exist_ok=True)

dask_logs_dir = os.path.join(run_path, 'dask_logs')
os.makedirs(dask_logs_dir, exist_ok=True)

Making a copy of the .yaml in the logs_dir folder.

In [3]:
shutil.copy("config.yaml", os.path.join(logs_dir, "config.yaml"))

'/scratch/users/julia/tsm_lsst_dp0_2_input_data_preparation/DP_run_LSST_DP02_cModel_flux_2025-08-21_12-28/logs/config.yaml'

# Initializing the Cluster

In [4]:
if CLUSTER_extra_dask_configs:
    dask_config_class.set(CLUSTER_dask_config)
else:
    print("Running DASK with the standard memory configuration.")

Running DASK with the standard memory configuration.


In [5]:
current_date = datetime.now().strftime('%Y-%m-%d_%H-%M')

if CLUSTER_save_the_dask_jobs_info:
    CLUSTER_job_extra_directives=[
        '--propagate',
        f'--account={CLUSTER_account}',
        f'--output={dask_logs_dir}/dask_job_%j_{current_date}.out',  
        f'--error={dask_logs_dir}/dask_job_%j_{current_date}.err',
    ]
else:
    CLUSTER_job_extra_directives=[
        '--propagate',
        f'--account={CLUSTER_account}',
        f'--output=/dev/null',  
        f'--error=/dev/null'
    ]

In [6]:
# Configuring the SLURMCluster.
cluster = SLURMCluster(
    interface=CLUSTER_interface,         # Lustre interface
    queue=CLUSTER_queue,                 # Name of the queue
    cores=CLUSTER_cores,                 # Number of logical cores per node
    processes=CLUSTER_processes,         # Number of dask processes per node
    memory=CLUSTER_memory,               # Memory per node
    walltime=CLUSTER_walltime,           # Maximum execution time              
    job_extra_directives=CLUSTER_job_extra_directives,
)

# Scaling the cluster to use X nodes
cluster.scale(jobs=CLUSTER_dask_scale_number)

# Defining the dask client
client = Client(cluster)

# Wait for 90% of the workers to initialize
cluster.wait_for_workers(n_workers=(math.ceil(CLUSTER_dask_scale_number * CLUSTER_processes * 0.9)))
client.run(lambda: gc.collect())

{'tcp://10.148.0.32:43757': 31,
 'tcp://10.148.0.33:44439': 31,
 'tcp://10.148.0.34:37427': 31,
 'tcp://10.148.0.35:36467': 31,
 'tcp://10.148.0.36:32917': 31,
 'tcp://10.148.0.37:32971': 31,
 'tcp://10.148.0.38:33773': 31,
 'tcp://10.148.0.39:42941': 31,
 'tcp://10.148.0.40:43385': 31,
 'tcp://10.148.0.41:46573': 31,
 'tcp://10.148.0.42:43195': 31,
 'tcp://10.148.0.43:37375': 31,
 'tcp://10.148.0.44:34551': 31,
 'tcp://10.148.0.45:44623': 31,
 'tcp://10.148.0.46:39159': 31}

In [7]:
cluster_info = client.cluster
cluster_info

0,1
Dashboard: http://10.148.0.53:8787/status,Workers: 15
Total threads: 750,Total memory: 1.57 TiB

0,1
Comm: tcp://10.148.0.53:45897,Workers: 15
Dashboard: http://10.148.0.53:8787/status,Total threads: 750
Started: Just now,Total memory: 1.57 TiB

0,1
Comm: tcp://10.148.0.42:43195,Total threads: 50
Dashboard: http://10.148.0.42:41019/status,Memory: 107.10 GiB
Nanny: tcp://10.148.0.42:43067,
Local directory: /tmp/dask-scratch-space/worker-6ofzksom,Local directory: /tmp/dask-scratch-space/worker-6ofzksom
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 57.31 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://10.148.0.36:32917,Total threads: 50
Dashboard: http://10.148.0.36:38039/status,Memory: 107.10 GiB
Nanny: tcp://10.148.0.36:41053,
Local directory: /tmp/dask-scratch-space-10117/worker-qwhch89s,Local directory: /tmp/dask-scratch-space-10117/worker-qwhch89s
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 56.88 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://10.148.0.32:43757,Total threads: 50
Dashboard: http://10.148.0.32:34347/status,Memory: 107.10 GiB
Nanny: tcp://10.148.0.32:37997,
Local directory: /tmp/dask-scratch-space-10117/worker-dm9r3w8b,Local directory: /tmp/dask-scratch-space-10117/worker-dm9r3w8b
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 57.31 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://10.148.0.33:44439,Total threads: 50
Dashboard: http://10.148.0.33:33601/status,Memory: 107.10 GiB
Nanny: tcp://10.148.0.33:44387,
Local directory: /tmp/dask-scratch-space-10117/worker-6064ze_y,Local directory: /tmp/dask-scratch-space-10117/worker-6064ze_y
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 56.88 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://10.148.0.39:42941,Total threads: 50
Dashboard: http://10.148.0.39:45495/status,Memory: 107.10 GiB
Nanny: tcp://10.148.0.39:34675,
Local directory: /tmp/dask-scratch-space/worker-zj_jeqh4,Local directory: /tmp/dask-scratch-space/worker-zj_jeqh4
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 56.88 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://10.148.0.40:43385,Total threads: 50
Dashboard: http://10.148.0.40:42313/status,Memory: 107.10 GiB
Nanny: tcp://10.148.0.40:42685,
Local directory: /tmp/dask-scratch-space/worker-t8z40sdi,Local directory: /tmp/dask-scratch-space/worker-t8z40sdi
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 57.31 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://10.148.0.46:39159,Total threads: 50
Dashboard: http://10.148.0.46:36211/status,Memory: 107.10 GiB
Nanny: tcp://10.148.0.46:46101,
Local directory: /tmp/dask-scratch-space-10117/worker-7_k_wgph,Local directory: /tmp/dask-scratch-space-10117/worker-7_k_wgph
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 61.2%,Last seen: Just now
Memory usage: 133.06 MiB,Spilled bytes: 0 B
Read bytes: 3.75 kiB,Write bytes: 4.08 kiB

0,1
Comm: tcp://10.148.0.34:37427,Total threads: 50
Dashboard: http://10.148.0.34:35229/status,Memory: 107.10 GiB
Nanny: tcp://10.148.0.34:35733,
Local directory: /tmp/dask-scratch-space-10117/worker-hvoltst3,Local directory: /tmp/dask-scratch-space-10117/worker-hvoltst3
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 57.31 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://10.148.0.41:46573,Total threads: 50
Dashboard: http://10.148.0.41:45487/status,Memory: 107.10 GiB
Nanny: tcp://10.148.0.41:43515,
Local directory: /tmp/dask-scratch-space/worker-v2y2w1us,Local directory: /tmp/dask-scratch-space/worker-v2y2w1us
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 56.88 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://10.148.0.35:36467,Total threads: 50
Dashboard: http://10.148.0.35:41073/status,Memory: 107.10 GiB
Nanny: tcp://10.148.0.35:44155,
Local directory: /tmp/dask-scratch-space-10117/worker-bx_7xwrx,Local directory: /tmp/dask-scratch-space-10117/worker-bx_7xwrx
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 56.88 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://10.148.0.45:44623,Total threads: 50
Dashboard: http://10.148.0.45:39045/status,Memory: 107.10 GiB
Nanny: tcp://10.148.0.45:34825,
Local directory: /tmp/dask-scratch-space-10117/worker-telyuwzp,Local directory: /tmp/dask-scratch-space-10117/worker-telyuwzp
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 56.88 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://10.148.0.37:32971,Total threads: 50
Dashboard: http://10.148.0.37:42649/status,Memory: 107.10 GiB
Nanny: tcp://10.148.0.37:40139,
Local directory: /tmp/dask-scratch-space/worker-tnljjene,Local directory: /tmp/dask-scratch-space/worker-tnljjene
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 61.2%,Last seen: Just now
Memory usage: 134.88 MiB,Spilled bytes: 0 B
Read bytes: 7.84 kiB,Write bytes: 7.94 kiB

0,1
Comm: tcp://10.148.0.38:33773,Total threads: 50
Dashboard: http://10.148.0.38:36371/status,Memory: 107.10 GiB
Nanny: tcp://10.148.0.38:42687,
Local directory: /tmp/dask-scratch-space/worker-595db34l,Local directory: /tmp/dask-scratch-space/worker-595db34l
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 56.88 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://10.148.0.43:37375,Total threads: 50
Dashboard: http://10.148.0.43:35859/status,Memory: 107.10 GiB
Nanny: tcp://10.148.0.43:41547,
Local directory: /tmp/dask-scratch-space/worker-pv7ceolx,Local directory: /tmp/dask-scratch-space/worker-pv7ceolx
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 60.3%,Last seen: Just now
Memory usage: 134.89 MiB,Spilled bytes: 0 B
Read bytes: 8.11 kiB,Write bytes: 7.40 kiB

0,1
Comm: tcp://10.148.0.44:34551,Total threads: 50
Dashboard: http://10.148.0.44:42931/status,Memory: 107.10 GiB
Nanny: tcp://10.148.0.44:46429,
Local directory: /tmp/dask-scratch-space-10117/worker-psunulnu,Local directory: /tmp/dask-scratch-space-10117/worker-psunulnu
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 56.88 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B


# Processing the Data

In [8]:
input_files = [f for f in glob.glob(os.path.join(input_catalog_folder, input_catalog_pattern))]

In [9]:
report_path = os.path.join(logs_dir, "dask-performance-report.html")

In [10]:
sfd = SFDQuery()

In [11]:
def read_fits_to_df_no_fix(filename, columns=None):
    with fits.open(filename, memmap=True) as hdul:
        data = hdul[1].data
        df = pd.DataFrame(data)
        if input_user_selected_cols is not None:
            df = df[input_user_selected_cols]
    return df

In [12]:
@delayed
def process_file(path):
    log_msgs = [f"üîÑ Starting file: {path}"]

    try:
        if DP_which_release == 'LSST_DP02':
            df = pd.read_parquet(path, columns=input_user_selected_cols)
        elif DP_which_release == 'DES_DR2':
            df = read_fits_to_df_no_fix(path, columns=input_user_selected_cols)

        if DP_is_id_in_the_index:
            df = df.reset_index()

        if DP_filter_by_boolean_column:
            if DP_which_boolean_column not in df.columns:
                raise ValueError(f"Boolean column '{DP_which_boolean_column}' not found in DataFrame.")
            df = df[df[DP_which_boolean_column] == DP_which_value_to_keep]

        if DP_compute_magnitude and input_col_type != "flux":
            raise ValueError("If DP_compute_magnitude=True, input_col_type must be 'flux'.")

        if DP_replace_invalid_values and DP_cross_invalidate:
            if DP_how_to_replace_col_values != "all" or DP_how_to_replace_err_values != "all":
                raise ValueError("If DP_cross_invalidate=True, both DP_how_to_replace_* must be 'all'.")

        if DP_compute_magnitude and DP_col_final_name_pattern == input_col_pattern:
            raise ValueError("Output column pattern must differ from input_col_pattern when computing magnitude.")

        coords = SkyCoord(ra=df[ra_col].values * u.deg, dec=df[dec_col].values * u.deg)
        df["E_BV"] = sfd(coords)

        def get_invalid_masks(values, errors):
            invalid_val = np.zeros(len(values), dtype=bool)
            invalid_err = np.zeros(len(errors), dtype=bool)

            if DP_set_some_limit_as_invalid_for_col:
                invalid_val |= (np.abs(values) >= DP_invalid_limit_value_for_col)
            if DP_set_some_limit_as_invalid_for_err:
                invalid_err |= (np.abs(errors) >= DP_invalid_limit_value_for_err)
            if DP_is_nan_and_inf_invalid_for_col:
                invalid_val |= ~np.isfinite(values)
            if DP_is_nan_and_inf_invalid_for_err:
                invalid_err |= ~np.isfinite(errors)
            if DP_cross_invalidate and DP_how_to_replace_col_values == "all" and DP_how_to_replace_err_values == "all":
                invalid_err |= invalid_val
                invalid_val |= invalid_err

            return invalid_val, invalid_err

        def apply_replacement_locally(arr, mask, replacement_value):
            return np.where(mask, replacement_value, arr)

        flux_cols_to_drop = []

        for band in selected_bands:
            col_in = input_col_pattern.replace("BAND", band)
            err_in = input_err_pattern.replace("BAND", band)

            band_fmt = {
                "lower_case": band.lower(),
                "upper_case": band.upper()
            }.get(DP_pesonalized_which_band_case, band)

            final_col = DP_col_final_name_pattern.replace("BAND", band_fmt)
            final_err_col = DP_err_final_name_pattern.replace("BAND", band_fmt)

            if col_in not in df.columns or err_in not in df.columns:
                raise ValueError(f"Missing column(s) {[col_in, err_in]} in file {path}")

            values = df[col_in]
            errors = df[err_in]

            # CASE 1: compute magnitude and possibly deredden
            if DP_compute_magnitude:
                values = -2.5 * np.log10(values) + MAG_OFFSET
                errors = errors / (df[col_in] * MAG_CONV)
                if DP_compute_dereddening:
                    A_lambda = df["E_BV"] * A_EBV[band]
                    values -= A_lambda

            # CASE 2: deredden directly
            elif DP_compute_dereddening:
                A_lambda = df["E_BV"] * A_EBV[band]
                if input_col_type == "flux":
                    factor = 10 ** (0.4 * A_lambda)
                    values *= factor
                    errors *= factor
                elif input_col_type == "mag":
                    values -= A_lambda
                else:
                    raise ValueError(f"Invalid input_col_type: {input_col_type}")

            if (
                (DP_compute_magnitude or (DP_compute_dereddening and input_col_type == "flux"))
                and not DP_keep_flux_columns_when_computing_mag_or_dered
            ):
                flux_cols_to_drop.extend([col_in, err_in])

            if DP_replace_invalid_values:
                invalid_val, invalid_err = get_invalid_masks(values, errors)

                if DP_how_to_replace_col_values == "all":
                    values = apply_replacement_locally(values, invalid_val, DP_col_value_to_replace)
                elif DP_how_to_replace_col_values == "only_with_invalid_err":
                    values = apply_replacement_locally(values, invalid_err, DP_col_value_to_replace)

                if DP_how_to_replace_err_values == "all":
                    errors = apply_replacement_locally(errors, invalid_err, DP_err_value_to_replace)
                elif DP_how_to_replace_err_values == "only_with_invalid_col":
                    errors = apply_replacement_locally(errors, invalid_val, DP_err_value_to_replace)

            if DP_round_col:
                values = values.round(DP_round_col_decimal_cases)
            if DP_round_err:
                errors = errors.round(DP_round_err_decimal_cases)

            df[final_col] = values
            df[final_err_col] = errors

        df.drop(columns=["E_BV"], inplace=True)

        if not DP_keep_flux_columns_when_computing_mag_or_dered:
            if DP_compute_magnitude or (DP_compute_dereddening and input_col_type == "flux"):
                df.drop(columns=[c for c in flux_cols_to_drop if c in df.columns], inplace=True)

        if DP_filter_by_boolean_column and DP_drop_column_after_filter:
            df.drop(columns=[DP_which_boolean_column], inplace=True)

        if DP_save_the_data:
            base_name = os.path.splitext(os.path.basename(path))[0]
            ext = {"parquet": "parquet", "csv": "csv", "hdf5": "h5"}[save_output_as]
            n_rows = len(df)

            if DP_repartition_files:
                n_parts = max(1, round(n_rows / DP_target_rows_per_part))
                adjusted_rows_per_part = math.ceil(n_rows / n_parts)

                for i in range(n_parts):
                    start = i * adjusted_rows_per_part
                    end = min((i + 1) * adjusted_rows_per_part, n_rows)
                    df_part = df.iloc[start:end]
                    output_path = os.path.join(data_dir, f"{base_name}-part{i}.{ext}")
                    if ext == "parquet":
                        df_part.to_parquet(output_path, index=False)
                    elif ext == "csv":
                        df_part.to_csv(output_path, index=False)
                    elif ext == "hdf5":
                        tables_io.write(df_part, output_path)
                log_msgs.append(f"{base_name} ‚úÖ {n_parts} parts saved (~{adjusted_rows_per_part} rows each)")
                return "\n".join(log_msgs)
            else:
                output_path = os.path.join(data_dir, f"{base_name}.{ext}")
                if ext == "parquet":
                    df.to_parquet(output_path, index=False)
                elif ext == "csv":
                    df.to_csv(output_path, index=False)
                elif ext == "hdf5":
                    tables_io.write(df, output_path)
                log_msgs.append(f"{base_name} ‚úÖ file saved without repartitioning")
                return "\n".join(log_msgs)
        else:
            log_msgs.append(f"{path} ‚úÖ processed but not saved")
            return "\n".join(log_msgs)

    except Exception as e:
        log_msgs.append(f"‚ùå Error processing file {path}: {repr(e)}")
        return "\n".join(log_msgs)

In [13]:
%%time

with performance_report(filename=report_path):
    tasks = [process_file(p) for p in input_files]
    result_logs = compute(*tasks)

# Save logs to file
log_output_path = os.path.join(logs_dir, "execution_logs.txt")
with open(log_output_path, "a") as f:
    for log in result_logs:
        f.write(log + "\n")

summary = "‚úÖ All files processed!"
print(summary)

This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


‚úÖ All files processed!
CPU times: user 1.76 s, sys: 2.59 s, total: 4.35 s
Wall time: 9.34 s


# Validation

## Reading the catalogs

Reading the input catalog.

In [14]:
if DP_do_validation:
    if DP_which_release == 'LSST_DP02':
        ddf_input = dd.read_parquet(input_files, columns=input_user_selected_cols)

        if DP_filter_by_boolean_column:
            if DP_which_boolean_column not in ddf_input.columns:
                raise ValueError(f"Column '{DP_which_boolean_column}' not found in ddf_input.")
            ddf_input = ddf_input[ddf_input[DP_which_boolean_column] == DP_which_value_to_keep]

        if DP_is_id_in_the_index:
            ddf_input = ddf_input.reset_index()

    elif DP_which_release == 'DES_DR2':
        delayed_dfs = [delayed(read_fits_to_df_no_fix)(file, columns=input_user_selected_cols) for file in input_files]
        ddf_input = dd.from_delayed(delayed_dfs)

Reading the output catalog.

In [15]:
if DP_do_validation:
    output_folder = data_dir

    if save_output_as == "parquet":
        output_paths = os.path.join(output_folder, "*.parquet")
        ddf_output = dd.read_parquet(output_paths)

    elif save_output_as == "csv":
        output_paths = os.path.join(output_folder, "*.csv")
        ddf_output = dd.read_csv(output_paths, assume_missing=True)

    elif save_output_as == "hdf5":
        output_files = glob.glob(os.path.join(output_folder, "*.h5"))
        delayed_dfs = [
            delayed(tables_io.read)(file) for file in output_files
        ]
        ddf_output = dd.from_delayed(delayed_dfs)

    else:
        raise ValueError(f"Ouput format not supported for reading: {save_output_as}")

Reading the template catalog.

In [16]:
if DP_do_validation:
    if DP_compare_with_template:
        template_files = glob.glob(os.path.join(template_path, template_pattern))

        template_target_cols = [template_target_col.replace("BAND", band) for band in template_bands_for_comparisson]
        template_columns_to_read = [template_id_col] + template_target_cols

        if DP_template_type == "parquet":
            ddf_template = dd.read_parquet(template_files, columns=template_columns_to_read)

        elif DP_template_type == "fits":
            template_delayed_dfs = [
                delayed(read_fits_to_df_no_fix)(file, columns=template_columns_to_read)
                for file in template_files
            ]
            ddf_template = dd.from_delayed(template_delayed_dfs)

## Getting some basic informations

Printing the columns of the output dataframe and saving.

In [17]:
if DP_do_validation:
    input_selected_columns = list(ddf_input.columns)
    print("üìå Selected columns in ddf_input:")
    print(input_selected_columns)
    
    output_columns = list(ddf_output.columns)
    print("üìå Columns in ddf_output:")
    print(output_columns)
    
    if DP_save_the_data:
        os.makedirs(logs_dir, exist_ok=True)
        columns_path = os.path.join(logs_dir, "input_output_columns.txt")
        with open(columns_path, "w") as f:
            f.write("üìå Selected columns in ddf_input:\n")
            for col in input_selected_columns:
                f.write(col + "\n")
            f.write("\nüìå Columns in ddf_output:\n")
            for col in output_columns:
                f.write(col + "\n")

üìå Selected columns in ddf_input:
['objectId', 'coord_ra', 'coord_dec', 'u_cModelFlux', 'g_cModelFlux', 'r_cModelFlux', 'i_cModelFlux', 'z_cModelFlux', 'y_cModelFlux', 'u_cModelFluxErr', 'g_cModelFluxErr', 'r_cModelFluxErr', 'i_cModelFluxErr', 'z_cModelFluxErr', 'y_cModelFluxErr', 'detect_isPrimary', 'refExtendedness', 'tract', 'patch']
üìå Columns in ddf_output:
[]


In [18]:
ddf_input.head()

column,objectId,coord_ra,coord_dec,u_cModelFlux,g_cModelFlux,r_cModelFlux,i_cModelFlux,z_cModelFlux,y_cModelFlux,u_cModelFluxErr,g_cModelFluxErr,r_cModelFluxErr,i_cModelFluxErr,z_cModelFluxErr,y_cModelFluxErr,detect_isPrimary,refExtendedness,tract,patch
0,1249001228688425206,51.723257,-43.325332,197.654083,239.68619,376.806031,771.889409,1337.511938,1780.087531,42.361105,16.911218,19.237895,35.554235,105.635827,185.297817,True,1.0,2897,42
1,1249001228688425208,51.650768,-43.32586,175.93498,159.580197,177.1925,236.653719,321.286327,423.079444,31.774124,10.976567,13.000149,25.597269,68.734763,125.887242,True,0.0,2897,42
2,1249001228688425217,51.64462,-43.325773,201.113541,234.861942,243.383174,179.508753,116.776572,300.784598,35.132595,12.21872,14.489231,29.320463,76.459099,132.243044,True,1.0,2897,42
3,1249001228688425220,51.739908,-43.325044,63.533804,41.042207,61.670427,184.31976,140.973687,267.533862,31.160796,11.431742,13.12184,24.41328,68.639936,141.905375,True,0.0,2897,42
4,1249001228688425225,51.750784,-43.324635,390.433856,643.844477,1164.632511,1523.420382,1766.516881,1578.183241,53.913831,17.799144,20.633074,39.094253,110.796916,189.869942,True,1.0,2897,42


Validating the sizes of input and output dataframes.

In [19]:
if DP_do_validation:
    input_len = ddf_input.shape[0].compute()
    print(f"üì• Number of rows in input (filtered): {input_len}")

    # === Read output DataFrame ===
    output_len = ddf_output.shape[0].compute()
    print(f"üì§ Number of rows in final output: {output_len}")

    # === Simple validation ===
    if input_len == output_len:
        print("‚úÖ Row count matches!")
    else:
        print("‚ö†Ô∏è Row count mismatch!")
    
    match = input_len == output_len
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    # Validation message
    validation_text = f"""# ==== SIMPLE VALIDATION ====
    Validation timestamp: {timestamp}

    üì• Number of rows in input (filtered): {input_len}
    üì§ Number of rows in final output: {output_len}

    Result: {"‚úÖ Row count matches!" if match else "‚ö†Ô∏è Row count mismatch!"}
    """
    
    if DP_save_the_data:
        validation_path = os.path.join(logs_dir, "validation_info.txt")
        with open(validation_path, "w") as f:
            f.write(validation_text)

        print(f"‚úÖ Validation result saved to: {validation_path}")

üì• Number of rows in input (filtered): 161025835
üì§ Number of rows in final output: 0
‚ö†Ô∏è Row count mismatch!
‚úÖ Validation result saved to: /scratch/users/julia/tsm_lsst_dp0_2_input_data_preparation/DP_run_LSST_DP02_cModel_flux_2025-08-21_12-28/logs/validation_info.txt


Getting some IDs in both dataframes, printing and saving.

In [20]:
if DP_do_validation:
    selected_ids = ddf_input[id_col].head(10).tolist()

    def filter_by_ids(df, col, values):
        return df[df[col].isin(values)]

    filtered_input = ddf_input.map_partitions(filter_by_ids, id_col, selected_ids, meta=ddf_input._meta)
    filtered_output = ddf_output.map_partitions(filter_by_ids, id_col, selected_ids, meta=ddf_output._meta)

    input_sample = filtered_input.compute()
    output_sample = filtered_output.compute()

    print("üì• Subset of ddf_input with selected IDs:")
    display(input_sample)

    print("üì§ Subset of ddf_output with the same IDs:")
    display(output_sample)

    if DP_save_the_data:
        os.makedirs(logs_dir, exist_ok=True)

        input_path = os.path.join(logs_dir, "ddf_input_sample.csv")
        output_path = os.path.join(logs_dir, "ddf_output_sample.csv")

        input_sample.to_csv(input_path, index=False)
        output_sample.to_csv(output_path, index=False)

        print(f"‚úÖ CSV files saved to:\n- {input_path}\n- {output_path}")

KeyError: 'objectId'

Getting some IDs in both dataframes containing invalid values, printing and saving.

In [None]:
if DP_do_validation and DP_replace_invalid_values:
    print("\nüîç Searching for 10 objects with invalid values in the output...")

    # Get the actual output column names (col/err)
    col_pairs = []
    for band in selected_bands:
        if DP_pesonalized_which_band_case == 'lower_case':
            band_formatted = band.lower()
        elif DP_pesonalized_which_band_case == 'upper_case':
            band_formatted = band.upper()
        else:
            band_formatted = band

        col_out = DP_col_final_name_pattern.replace("BAND", band_formatted)
        err_out = DP_err_final_name_pattern.replace("BAND", band_formatted)
        col_pairs.append((col_out, err_out))

    # Identify configured invalid replacement values
    col_invalid_value = DP_col_value_to_replace
    err_invalid_value = DP_err_value_to_replace

    # Function to detect invalid rows in each partition
    def find_invalid_rows(df):
        mask = pd.Series(False, index=df.index)
        for col, err in col_pairs:
            if col in df.columns and err in df.columns:
                if pd.isna(col_invalid_value):
                    mask |= df[col].isna()
                else:
                    mask |= (df[col] == col_invalid_value)

                if pd.isna(err_invalid_value):
                    mask |= df[err].isna()
                else:
                    mask |= (df[err] == err_invalid_value)
        return df[mask].head(10)

    # Apply with Dask and collect results
    invalid_rows_dd = ddf_output.map_partitions(find_invalid_rows, meta=ddf_output._meta)
    invalid_rows = invalid_rows_dd.compute().drop_duplicates(subset=[id_col]).head(10)

    if len(invalid_rows) == 0:
        print("‚úÖ No objects with invalid values found in the output.")
    else:
        print(f"‚ö†Ô∏è Found {len(invalid_rows)} objects with invalid values in the output:")
        display(invalid_rows)

        # Fetch corresponding input objects
        ids_with_invalid = invalid_rows[id_col].tolist()
        filtered_input_invalid = ddf_input.map_partitions(filter_by_ids, id_col, ids_with_invalid, meta=ddf_input._meta)
        input_invalid_sample = filtered_input_invalid.compute()

        print("üì• Matching entries in the input:")
        display(input_invalid_sample)

        if DP_save_the_data:
            os.makedirs(logs_dir, exist_ok=True)

            invalid_input_path = os.path.join(logs_dir, "invalid_input_sample.csv")
            invalid_output_path = os.path.join(logs_dir, "invalid_output_sample.csv")

            input_invalid_sample.to_csv(invalid_input_path, index=False)
            invalid_rows.to_csv(output_path, index=False)

            print(f"‚úÖ Invalid value samples saved to:\n- {invalid_input_path}\n- {invalid_output_path}")

## Comparing with template catalog

In [None]:
if DP_do_validation and DP_compare_with_template:
    if len(selected_bands_for_comparisson) != len(template_bands_for_comparisson):
        raise ValueError("selected_bands_for_comparisson and template_bands_for_comparisson must have the same length.")

    invalid_value = DP_col_value_to_replace if DP_replace_invalid_values else None

    ddf_merged = ddf_output.merge(
        ddf_template,
        left_on=id_col,
        right_on=template_id_col,
        suffixes=("_output", "_template")
    )

    comparison_lines = []
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    comparison_lines.append("# ==== GLOBAL COLUMN COMPARISON ====")
    comparison_lines.append(f"Analysis timestamp: {timestamp}")
    comparison_lines.append(f"Total band pairs compared: {len(selected_bands_for_comparisson)}\n")
    comparison_lines.append("\U0001F50D Comparing columns (using isclose with Dask):\n")

    for band_out, band_template in zip(selected_bands_for_comparisson, template_bands_for_comparisson):
        # Coluna no template
        col_template = template_target_col.replace("BAND", band_template)

        # Coluna no output
        if DP_pesonalized_which_band_case == 'lower_case':
            band_fmt = band_out.lower()
        elif DP_pesonalized_which_band_case == 'upper_case':
            band_fmt = band_out.upper()
        else:
            band_fmt = band_out
        col_out = DP_col_final_name_pattern.replace("BAND", band_fmt)

        if col_out == col_template:
            col_out = f"{col_out}_output"
            col_template = f"{col_template}_template"

        if col_out in ddf_merged.columns and col_template in ddf_merged.columns:
            if invalid_value is not None:
                valid_mask = (
                    (ddf_merged[col_out] != invalid_value) &
                    (ddf_merged[col_template] != invalid_value) &
                    ddf_merged[col_out].map_partitions(np.isfinite) &
                    ddf_merged[col_template].map_partitions(np.isfinite)
                )
            else:
                valid_mask = (
                    ddf_merged[col_out].map_partitions(np.isfinite) &
                    ddf_merged[col_template].map_partitions(np.isfinite)
                )

            ddf_valid = ddf_merged[valid_mask][[id_col, col_out, col_template]]
            total = ddf_valid[id_col].count().compute()

            is_diff = ddf_valid.map_partitions(
                lambda df: pd.Series(
                    ~np.isclose(df[col_out], df[col_template], atol=comparisson_precision, rtol=0),
                    index=df.index
                ),
                meta=pd.Series(dtype=bool)
            )
            diff_count = is_diff.sum().compute()
            percent_diff = (diff_count / total) * 100 if total > 0 else 0

            msg = f"\U0001F4CF Band {band_out.upper()} vs {band_template.upper()}: {total} valid | {diff_count} different ({percent_diff:.5f}%)"
            print(msg)
            comparison_lines.append(msg)

            if diff_count > 0:
                df_sample = ddf_valid.sample(frac=0.01, random_state=42).compute()
                diff_mask = ~np.isclose(df_sample[col_out], df_sample[col_template], atol=comparisson_precision, rtol=0)
                df_diffs = df_sample[diff_mask]

                if not df_diffs.empty:
                    preview_msg = "\n\U0001F50E Sample differences:"
                    print(preview_msg)
                    print(df_diffs.head())

                    comparison_lines.append(preview_msg)
                    comparison_lines.append(df_diffs.head().to_string(index=False))

                    if DP_save_the_data:
                        diff_path = os.path.join(logs_dir, f"diff_sample_{band_out}_vs_{band_template}.csv")
                        df_diffs.to_csv(diff_path, index=False)
        else:
            msg = f"\u26A0\uFE0F Band {band_out.upper()} vs {band_template.upper()}: missing columns -> {col_out} or {col_template}"
            print(msg)
            comparison_lines.append(msg)

    if DP_save_the_data:
        os.makedirs(logs_dir, exist_ok=True)
        comparison_path = os.path.join(logs_dir, "global_comparison.txt")
        with open(comparison_path, "w") as f:
            f.write("\n".join(comparison_lines))

        print(f"\u2705 Global column comparison saved to: {comparison_path}")

# Closing the cluster

In [None]:
cluster.close()
client.close()