<img align='left' src = '../../images/linea.png' width=150 style='padding: 20px'> 

# LSDB Tutorial - TopN-Max and TailN-Min queries

How to perform global topN max and tailN min queries. <br>

Contact: Luigi Silva ([luigi.silva@linea.org.br](mailto:luigi.silva@linea.org.br)) <br>
Last check: Dec. 01, 2025

## Introduction

### Context

In the standard [LSDB](https://docs.lsdb.io/en/latest/) access model (e.g., through [data.lsdb.io](data.lsdb.io)), each [HATS](https://hats.readthedocs.io/en/latest/) pixel corresponds to a standalone parquet file, and LSDB relies on PyArrow for reading these files. PyArrow currently requires downloading and parsing the entire parquet file before applying column or row-level filters. As a consequence, even when the user requests only a subset of columns or a simple row filter, the full parquet payload is transferred to the client and processed locally. In some cases, due to internal parquet metadata handling, the same file may even be read multiple times.

When accessing a catalog through an LSDB-server instance (for example, via the ```:43210``` endpoint), a different model is used. The LSDB-server acts as a proxy layer over the underlying parquet files. In this configuration:

* Column projection (```columns=[...]```) and

* Row filtering via PyArrow-compatible predicates (```filters=[(...)]```)

are executed on the server side, before any data is transmitted to the client.

The server loads the parquet file locally, applies the filters and column selection using PyArrow, and only the resulting reduced subset is serialized and sent over HTTP to the client. This significantly reduces both network transfer volume and client-side memory usage, especially for large pixels or queries that discard most of the rows.

The LSDB-server approach is still experimental, as it requires ensuring that different archive providers support the same HTTP semantics and PyArrow filter behaviors. However, for workloads involving selective access to a few columns or strongly filtering predicates, server-side processing offers substantial performance improvements compared to the raw-file mode of data.lsdb.io.

### Main parameters in the codes below

The codes perform global Top-N (maximum values) or Tail-N (minimum values) searches based on a selected ranking column. The parameters below define how many partitions are inspected and how strict the server-side filtering is.

**K**  
Number of HATS partitions to inspect, after sorting all partitions by the relevant statistic (`max_value` for Top-N, `min_value` for Tail-N).  
If **K is too small**, the algorithm may fail to assemble the requested N results.  
If **K is too large**, too many partitions will be loaded, increasing memory usage and slowing down the run.

**topN_max_count / tailN_min_count**  
Number of rows expected in the final global Top-N or Tail-N result.  
The code validates that the final output contains exactly N objects.

**ranking_column**  
Column used to extract global maxima or minima. Must be included in the projected columns.

**ranking_threshold / ranking_threshold_min**  
Server-side row filter.  
Top-N keeps `ranking_column >= threshold`  
Tail-N keeps `ranking_column <= threshold_min`  
If **too restrictive**, required candidates may be discarded.  
If **too permissive**, the filtered partitions may become very large, increasing the risk of excessive memory usage or local compute overload.

**General guidance**  
Choose **K** reasonably close to **N**, but not equal to N.  
Select a **threshold** that is loose enough to retain the needed candidates, but tight enough to keep data volumes manageable.  
Both parameters affect memory footprint and correctness, and the final validation steps will fail if the selection is incomplete or unbounded.

## Imports and Cluster Setup

In [None]:
import time
import lsdb
import pandas as pd
from dask.distributed import Client, LocalCluster

# -----------------------------------------------------------
# Start local Dask cluster
# -----------------------------------------------------------
t0 = time.perf_counter()
print(">> Starting local Dask cluster...")
cluster = LocalCluster(
    n_workers=3,
    threads_per_worker=1,
    memory_limit="4GB",
)
client = Client(cluster)
print(">> Cluster started.")
print("   Dashboard:", client.dashboard_link)
t1 = time.perf_counter()
print(f"   [Time: {t1 - t0:.2f} s]")
print("-----------------------------------------------------------\n")

## TopN-Max query

In [None]:
# GLOBAL TIMER
t_global_start = time.perf_counter()

# -----------------------------------------------------------
# 0) Configurable parameters
# -----------------------------------------------------------
t0 = time.perf_counter()

# Number of HATS partitions to inspect when searching for the
# global highest values of the ranking column
K = 90

# Number of highest-valued rows to return (global top-N maximum search)
topN_max_count = 100

# Column whose global maximum values will be extracted
ranking_column = "parallax"

# Server-side cutoff; rows below this value are discarded before transfer
ranking_threshold = 155

# Columns to request from LSDB (server-side projection)
columns_to_select = ["ra", "dec", "phot_g_mean_mag", "parallax"]

print(f">> K = {K}")
print(f">> topN_max_count = {topN_max_count}")
print(f">> ranking_column = '{ranking_column}'")
print(f">> ranking_threshold = {ranking_threshold}")
print(f">> columns_to_select = {columns_to_select}")
t1 = time.perf_counter()
print(f"   [Time: {t1 - t0:.2f} s]")
print("-----------------------------------------------------------\n")


# Ensure ranking_column is included
if ranking_column not in columns_to_select:
    print(f">> Adding ranking_column '{ranking_column}' to columns_to_select.")
    columns_to_select.append(ranking_column)

# Fail fast: K cannot exceed topN_max_count in a global-top-N maximum search
if K > topN_max_count:
    raise ValueError(
        f"\nERROR: Invalid parameter choice: K ({K}) > topN_max_count ({topN_max_count}).\n"
        f"K defines how many highest-max-value partitions will be inspected, and cannot\n"
        f"be larger than the number of global top-N results requested.\n"
        f"Please reduce K or increase topN_max_count."
    )

# -----------------------------------------------------------
# 1) Open catalog with server-side projection
# -----------------------------------------------------------
t0 = time.perf_counter()
print(">> Opening Gaia DR3 catalog (server-side projection enabled)...")
cat = lsdb.open_catalog(
    "http://epyc.astro.washington.edu:43210/hats/gaia_dr3",
    columns=columns_to_select,
)
print(">> Catalog opened.")
t1 = time.perf_counter()
print(f"   [Time: {t1 - t0:.2f} s]")
print("-----------------------------------------------------------\n")


# -----------------------------------------------------------
# 2) Per-pixel statistics (metadata only)
# -----------------------------------------------------------
t0 = time.perf_counter()
print(">> Reading per-pixel statistics...")
per_pixel = cat.per_pixel_statistics(
    include_columns=[ranking_column],
    include_stats=["max_value"],
)
print(">> Per-pixel statistics loaded.")
t1 = time.perf_counter()
print(f"   [Time: {t1 - t0:.2f} s]")
print("-----------------------------------------------------------\n")

# Sort partitions by descending max(ranking_column)
per_pixel_sorted = per_pixel.sort_values(f"{ranking_column}: max_value", ascending=False)

# Select top-K partitions that contain the highest maximum values of the column
topK_pixels = per_pixel_sorted.head(K)
pixel_list = topK_pixels.index.tolist()

print(">> Top K pixels (first 10):")
print(topK_pixels.head(10))
print("-----------------------------------------------------------\n")


# -----------------------------------------------------------
# 3) Server-side row filtering (first stage of global max search)
# -----------------------------------------------------------
t0 = time.perf_counter()
print(f">> Re-opening catalog with server-side filter ({ranking_column} >= {ranking_threshold})...")
cat_filtered = lsdb.open_catalog(
    "http://epyc.astro.washington.edu:43210/hats/gaia_dr3",
    columns=columns_to_select,
    filters=[(ranking_column, ">=", float(ranking_threshold))],
)
print(">> Filtered catalog ready.")
t1 = time.perf_counter()
print(f"   [Time: {t1 - t0:.2f} s]")
print("-----------------------------------------------------------\n")


# -----------------------------------------------------------
# 4) Restrict to the selected K partitions (second stage of max-value pruning)
# -----------------------------------------------------------
t0 = time.perf_counter()
print(f">> Applying pixel_search to the selected {K} partitions...")
cat_filtered_limited = cat_filtered.pixel_search(pixels=pixel_list)
print(">> Pixel subset applied.")
print("   Pixels:", pixel_list[:10], "...")
t1 = time.perf_counter()
print(f"   [Time: {t1 - t0:.2f} s]")
print("-----------------------------------------------------------\n")


# -----------------------------------------------------------
# 5) Materialize candidate rows (after both filters)
# -----------------------------------------------------------
t0 = time.perf_counter()
print(">> Computing candidate dataset...")
df_candidates = cat_filtered_limited.compute()
print(">> Compute complete.")
print("   Rows returned:", len(df_candidates))
t1 = time.perf_counter()

elapsed = t1 - t0
minutes = int(elapsed // 60)
seconds = elapsed % 60

print(f"   [Time: {elapsed:.2f} s | {minutes} min {seconds:.2f} s]")
print("-----------------------------------------------------------\n")

# -----------------------------------------------------------
# 6) Compute the global Top-N maximum values
# -----------------------------------------------------------
t0 = time.perf_counter()
print(f">> Selecting global top-{topN_max_count} highest values of '{ranking_column}'...")
topN_global_max = df_candidates.nlargest(topN_max_count, ranking_column)
print(topN_global_max.head(10))

pN_min = topN_global_max[ranking_column].min()
print(f"\n>> Minimum value within global top-{topN_max_count} = {pN_min}")
t1 = time.perf_counter()
print(f"   [Time: {t1 - t0:.2f} s]")
print("-----------------------------------------------------------\n")


# -----------------------------------------------------------
# 7) Validation checks (completeness of the top-N maxima)
# -----------------------------------------------------------
t0 = time.perf_counter()
print(">> Validating top-N maximum extraction...")

# Check final size
if len(topN_global_max) != topN_max_count:
    raise ValueError(
        f"\nERROR: Expected {topN_max_count} objects but obtained {len(topN_global_max)}. "
        f"Threshold or K may be too restrictive."
    )
else:
    print(f"   ✔ Size check passed ({topN_max_count} objects).")

# Ensure no omitted partition could surpass the smallest selected maximum
if K < len(per_pixel_sorted):
    next_partition_max = per_pixel_sorted[f"{ranking_column}: max_value"].iloc[K]
else:
    next_partition_max = -float("inf")

print(f"   max_value of pixel K+1 = {next_partition_max}")

if pN_min < next_partition_max:
    raise ValueError(
        f"\nERROR: Potential missing objects: min(top-{topN_max_count}) = {pN_min} < "
        f"max_value of pixel K+1 = {next_partition_max}. Increase K or adjust threshold."
    )
else:
    print("   ✔ Pixel-range validation passed.")
t1 = time.perf_counter()
print(f"   [Time: {t1 - t0:.2f} s]")
print("-----------------------------------------------------------")


# -----------------------------------------------------------
# GLOBAL EXECUTION TIME
# -----------------------------------------------------------
t_global_end = time.perf_counter()
elapsed_global = t_global_end - t_global_start
minutes_g = int(elapsed_global // 60)
seconds_g = elapsed_global % 60

print(f"\n>> TOTAL EXECUTION TIME: {elapsed_global:.2f} s | {minutes_g} min {seconds_g:.2f} s")

## TailN-Min query

In [None]:
# GLOBAL TIMER
t_global_start = time.perf_counter()

# -----------------------------------------------------------
# 0) Configurable parameters
# -----------------------------------------------------------
t0 = time.perf_counter()

# Number of HATS partitions to inspect when searching for the
# global lowest values of the ranking column
K = 90

# Number of lowest-valued rows to return (global tail-N minimum search)
tailN_min_count = 100

# Column whose global minimum values will be extracted
ranking_column = "parallax"

# Server-side cutoff; rows ABOVE this value are discarded before transfer
# (i.e., we keep ranking_column <= ranking_threshold_min)
ranking_threshold_min = -45

# Columns to request from LSDB (server-side projection)
columns_to_select = ["ra", "dec", "phot_g_mean_mag", "parallax"]

print(f">> K = {K}")
print(f">> tailN_min_count = {tailN_min_count}")
print(f">> ranking_column = '{ranking_column}'")
print(f">> ranking_threshold_min = {ranking_threshold_min}")
print(f">> columns_to_select = {columns_to_select}")
t1 = time.perf_counter()
print(f"   [Time: {t1 - t0:.2f} s]")
print("-----------------------------------------------------------\n")

# Ensure ranking_column is included
if ranking_column not in columns_to_select:
    print(f">> Adding ranking_column '{ranking_column}' to columns_to_select.")
    columns_to_select.append(ranking_column)

# Fail fast: K cannot exceed tailN_min_count in a global tail-N minimum search
if K > tailN_min_count:
    raise ValueError(
        f"\nERROR: Invalid parameter choice: K ({K}) > tailN_min_count ({tailN_min_count}).\n"
        f"K defines how many lowest-min-value partitions will be inspected, and cannot\n"
        f"be larger than the number of global tail-N results requested.\n"
        f"Please reduce K or increase tailN_min_count."
    )

# -----------------------------------------------------------
# 1) Open catalog with server-side projection
# -----------------------------------------------------------
t0 = time.perf_counter()
print(">> Opening Gaia DR3 catalog (server-side projection enabled)...")
cat = lsdb.open_catalog(
    "http://epyc.astro.washington.edu:43210/hats/gaia_dr3",
    columns=columns_to_select,
)
print(">> Catalog opened.")
t1 = time.perf_counter()
print(f"   [Time: {t1 - t0:.2f} s]")
print("-----------------------------------------------------------\n")


# -----------------------------------------------------------
# 2) Per-pixel statistics (metadata only)
# -----------------------------------------------------------
t0 = time.perf_counter()
print(">> Reading per-pixel statistics...")
per_pixel = cat.per_pixel_statistics(
    include_columns=[ranking_column],
    include_stats=["min_value"],
)
print(">> Per-pixel statistics loaded.")
t1 = time.perf_counter()
print(f"   [Time: {t1 - t0:.2f} s]")
print("-----------------------------------------------------------\n")

# Sort partitions by ascending min(ranking_column)
per_pixel_sorted = per_pixel.sort_values(f"{ranking_column}: min_value", ascending=True)

# Select top-K partitions that contain the lowest minimum values of the column
topK_pixels = per_pixel_sorted.head(K)
pixel_list = topK_pixels.index.tolist()

print(">> Top K pixels (first 10):")
print(topK_pixels.head(10))
print("-----------------------------------------------------------\n")


# -----------------------------------------------------------
# 3) Server-side row filtering (first stage of global min search)
# -----------------------------------------------------------
t0 = time.perf_counter()
print(f">> Re-opening catalog with server-side filter ({ranking_column} <= {ranking_threshold_min})...")
cat_filtered = lsdb.open_catalog(
    "http://epyc.astro.washington.edu:43210/hats/gaia_dr3",
    columns=columns_to_select,
    filters=[(ranking_column, "<=", float(ranking_threshold_min))],
)
print(">> Filtered catalog ready.")
t1 = time.perf_counter()
print(f"   [Time: {t1 - t0:.2f} s]")
print("-----------------------------------------------------------\n")


# -----------------------------------------------------------
# 4) Restrict to the selected K partitions (second stage of min-value pruning)
# -----------------------------------------------------------
t0 = time.perf_counter()
print(f">> Applying pixel_search to the selected {K} partitions...")
cat_filtered_limited = cat_filtered.pixel_search(pixels=pixel_list)
print(">> Pixel subset applied.")
print("   Pixels:", pixel_list[:10], "...")
t1 = time.perf_counter()
print(f"   [Time: {t1 - t0:.2f} s]")
print("-----------------------------------------------------------\n")


# -----------------------------------------------------------
# 5) Materialize candidate rows (after both filters)
# -----------------------------------------------------------
t0 = time.perf_counter()
print(">> Computing candidate dataset...")
df_candidates = cat_filtered_limited.compute()
print(">> Compute complete.")
print("   Rows returned:", len(df_candidates))
t1 = time.perf_counter()

elapsed = t1 - t0
minutes = int(elapsed // 60)
seconds = elapsed % 60

print(f"   [Time: {elapsed:.2f} s | {minutes} min {seconds:.2f} s]")
print("-----------------------------------------------------------\n")


# -----------------------------------------------------------
# 6) Compute the global Tail-N minimum values
# -----------------------------------------------------------
t0 = time.perf_counter()
print(f">> Selecting global tail-{tailN_min_count} lowest values of '{ranking_column}'...")
tailN_global_min = df_candidates.nsmallest(tailN_min_count, ranking_column)
print(tailN_global_min.head(10))

pN_max_in_tail = tailN_global_min[ranking_column].max()
print(f"\n>> Maximum value within global tail-{tailN_min_count} = {pN_max_in_tail}")
t1 = time.perf_counter()
print(f"   [Time: {t1 - t0:.2f} s]")
print("-----------------------------------------------------------\n")


# -----------------------------------------------------------
# 7) Validation checks (completeness of the tail-N minima)
# -----------------------------------------------------------
t0 = time.perf_counter()
print(">> Validating tail-N minimum extraction...")

# Validation 1: Ensure we returned the requested count
if len(tailN_global_min) != tailN_min_count:
    raise ValueError(
        f"\nERROR: Expected {tailN_min_count} objects but obtained {len(tailN_global_min)}. "
        f"Threshold or K may be too restrictive."
    )
else:
    print(f"   ✔ Size check passed ({tailN_min_count} objects).")

# Validation 2: ensure no better (smaller) values exist in omitted partitions
if K < len(per_pixel_sorted):
    next_partition_min = per_pixel_sorted[f"{ranking_column}: min_value"].iloc[K]
else:
    next_partition_min = float("inf")

print(f"   min_value of pixel K+1 = {next_partition_min}")

if pN_max_in_tail > next_partition_min:
    raise ValueError(
        f"\nERROR: Potential missing objects: max(tail-{tailN_min_count}) = {pN_max_in_tail} > "
        f"min_value of pixel K+1 = {next_partition_min}. Increase K or adjust threshold."
    )
else:
    print("   ✔ Pixel-range validation passed.")
t1 = time.perf_counter()
print(f"   [Time: {t1 - t0:.2f} s]")
print("-----------------------------------------------------------")


# -----------------------------------------------------------
# GLOBAL EXECUTION TIME
# -----------------------------------------------------------
t_global_end = time.perf_counter()

elapsed_global = t_global_end - t_global_start
minutes_g = int(elapsed_global // 60)
seconds_g = elapsed_global % 60

print(f"\n>> TOTAL EXECUTION TIME: {elapsed_global:.2f} s | {minutes_g} min {seconds_g:.2f} s")