From 6a74fce37e3f5f9a941e48c8e76a3a2e9b8eff56 Mon Sep 17 00:00:00 2001 From: Justin Miller Date: Tue, 21 Apr 2026 09:59:07 +0800 Subject: [PATCH] perf(index): fast-path total rows and cache index_statistics output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `Dataset::index_statistics` called `count_rows(None)` on every invocation purely to compute `num_unindexed_rows = total - indexed`. On large or legacy datasets `count_rows` fans out per-fragment and can open fragment data files, making it the dominant cost. In addition, repeat calls at the same manifest version recomputed the full JSON from scratch even though the answer is deterministic. P1: introduce `manifest_total_rows` which sums `Fragment::num_rows()` from the manifest. If every fragment has `physical_rows`, the total is resolved in memory. If any is missing, fall through to the existing `count_rows(None)` path — no correctness regression for legacy datasets. P2: wrap `index_statistics` with a `DSIndexCache` get/insert keyed on `(manifest_version, index_name)`. The cache already scopes entries by dataset URI; manifest version is monotonic and bumps on every operation that can change the result (append, delete, compact, index create/optimize/drop), so invalidation is automatic — no write path ever needs to touch the cache. Both changes are additive. Failed calls don't populate the cache. No public API or manifest format change. Benchmarks (10M rows, 10K fragments, local SSD, Apple Silicon): count_rows_baseline 1.82 ms legacy_cold 5.33 ms (pre-P1/P2) cold 3.56 ms (P1 only; -33% / -1.77 ms) cached 21.7 µs (P1 + P2; ~245x vs legacy_cold) P1 savings scale linearly with fragment count (~200 ns/fragment in-memory; orders of magnitude more on legacy formats or slow object stores where `count_rows` reads data files). P2 collapses repeat calls to a moka lookup plus a string clone. Tests added: - test_index_statistics_row_counts_match_count_rows - test_manifest_total_rows_missing_metadata_returns_none - test_index_statistics_cache_hit_avoids_io - test_index_statistics_cache_invalidates_on_manifest_bump - test_index_statistics_cache_distinguishes_index_names Benchmark added: - benches/index_stats.rs (legacy vs cold vs cached, parity check) --- rust/lance/Cargo.toml | 4 + rust/lance/benches/index_stats.rs | 224 ++++++++++++++ rust/lance/src/index.rs | 390 ++++++++++++++++++++++++- rust/lance/src/session/index_caches.rs | 26 ++ 4 files changed, 633 insertions(+), 11 deletions(-) create mode 100644 rust/lance/benches/index_stats.rs diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index afb7886b5cf..fa3e60fd158 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -200,5 +200,9 @@ harness = false name = "mem_wal_read" harness = false +[[bench]] +name = "index_stats" +harness = false + [lints] workspace = true diff --git a/rust/lance/benches/index_stats.rs b/rust/lance/benches/index_stats.rs new file mode 100644 index 00000000000..7008ccf57e7 --- /dev/null +++ b/rust/lance/benches/index_stats.rs @@ -0,0 +1,224 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Benchmark for `Dataset::index_statistics`, comparing the current +//! implementation against the pre-P1/P2 baseline. +//! +//! Two optimizations are measured: +//! +//! * **P1** — derive the total row count from the manifest instead of calling +//! `Dataset::count_rows(None)`, which fans out per-fragment and can read +//! fragment data files on datasets written by older Lance versions. +//! * **P2** — cache `index_statistics` JSON keyed by manifest version in the +//! dataset's `DSIndexCache`, so repeat calls at the same version are served +//! from memory. +//! +//! The benchmark runs four functions on the same fixture in the same process: +//! +//! * `index_stats/legacy_cold` — pre-P1/P2 behavior. Always calls +//! `count_rows(None)`, never hits the cache. Exercised via the +//! `#[doc(hidden)]` `bench_legacy_index_statistics` entry point. +//! * `index_stats/cold` — current behavior with a cold dataset cache +//! (session reopened every iteration). Isolates the P1 win against +//! `legacy_cold`. +//! * `index_stats/cached` — current behavior after the first call. Isolates +//! the P2 win against `cold`. +//! * `index_stats/count_rows_baseline` — wall time of the `count_rows(None)` +//! fan-out alone, for context on what P1 removes from the hot path. + +use std::sync::{Arc, OnceLock}; + +use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator}; +use arrow_schema::{DataType, Field, Schema}; +use criterion::{Criterion, criterion_group, criterion_main}; +use lance::Dataset; +use lance::dataset::WriteParams; +use lance::dataset::builder::DatasetBuilder; +use lance::index::DatasetIndexExt; +use lance::index::bench_legacy_index_statistics; +use lance_core::utils::tempfile::TempStrDir; +use lance_index::IndexType; +use lance_index::scalar::ScalarIndexParams; +#[cfg(target_os = "linux")] +use pprof::criterion::{Output, PProfProfiler}; + +/// Fixture shape. 10 M rows split into 10 000 fragments of 1 000 rows each — +/// large enough that `count_rows`'s per-fragment fan-out dominates, at a +/// fragment count closer to what's seen in production on tables that haven't +/// been compacted recently. +const NUM_FRAGMENTS: usize = 10_000; +const ROWS_PER_FRAGMENT: usize = 1_000; +const TOTAL_ROWS: usize = NUM_FRAGMENTS * ROWS_PER_FRAGMENT; +const INDEX_NAME: &str = "id_idx"; + +struct Fixture { + // Kept alive for the lifetime of the benchmark so the on-disk data stays valid. + _tempdir: TempStrDir, + uri: String, +} + +async fn build_fixture() -> Fixture { + let tempdir = TempStrDir::default(); + let uri = tempdir.as_str().to_string(); + + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + // One `Dataset::write` call produces multiple fragments when the input + // exceeds `max_rows_per_file` — vastly cheaper than committing per fragment. + // Feed the data in batches of `ROWS_PER_FRAGMENT` so memory stays bounded. + let batches: Vec = (0..NUM_FRAGMENTS) + .map(|f| { + let start = (f * ROWS_PER_FRAGMENT) as i32; + let values: Vec = (start..start + ROWS_PER_FRAGMENT as i32).collect(); + RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(values))]).unwrap() + }) + .collect(); + let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); + let params = WriteParams { + max_rows_per_file: ROWS_PER_FRAGMENT, + ..Default::default() + }; + let mut dataset = Dataset::write(reader, &uri, Some(params)).await.unwrap(); + assert_eq!(dataset.fragments().len(), NUM_FRAGMENTS); + assert_eq!(dataset.count_rows(None).await.unwrap(), TOTAL_ROWS); + + dataset + .create_index( + &["id"], + IndexType::BTree, + Some(INDEX_NAME.into()), + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + Fixture { + _tempdir: tempdir, + uri, + } +} + +/// Reopen the dataset so we get a fresh session (and therefore a fresh +/// `DSIndexCache`). This lets each iteration measure the cold path. +async fn open_cold(uri: &str) -> Dataset { + DatasetBuilder::from_uri(uri).load().await.unwrap() +} + +/// Process-wide runtime + fixture. Built lazily on first access so we pay the +/// 10 M-row dataset write once, not four times. +struct BenchEnv { + rt: tokio::runtime::Runtime, + fixture: Fixture, +} + +fn env() -> &'static BenchEnv { + static ENV: OnceLock = OnceLock::new(); + ENV.get_or_init(|| { + let rt = tokio::runtime::Runtime::new().unwrap(); + let fixture = rt.block_on(build_fixture()); + // Parity check against the fresh fixture: legacy and current paths must + // agree on every counter in the JSON payload before measurement begins. + rt.block_on(async { + let ds = open_cold(&fixture.uri).await; + let legacy: serde_json::Value = serde_json::from_str( + &bench_legacy_index_statistics(&ds, INDEX_NAME).await.unwrap(), + ) + .unwrap(); + let current: serde_json::Value = + serde_json::from_str(&ds.index_statistics(INDEX_NAME).await.unwrap()).unwrap(); + for key in [ + "num_indexed_rows", + "num_unindexed_rows", + "num_indexed_fragments", + "num_unindexed_fragments", + "num_indices", + ] { + assert_eq!( + legacy[key], current[key], + "legacy and current paths disagree on {key}", + ); + } + }); + BenchEnv { rt, fixture } + }) +} + +fn bench_count_rows(c: &mut Criterion) { + let env = env(); + let dataset = env.rt.block_on(open_cold(&env.fixture.uri)); + + c.bench_function("index_stats/count_rows_baseline", |b| { + b.iter(|| { + env.rt.block_on(async { + let _ = dataset.count_rows(None).await.unwrap(); + }) + }); + }); +} + +fn bench_legacy_cold(c: &mut Criterion) { + let env = env(); + + // Pre-P1/P2 behavior: no cache, `count_rows(None)` every call. + c.bench_function("index_stats/legacy_cold", |b| { + b.iter(|| { + env.rt.block_on(async { + let ds = open_cold(&env.fixture.uri).await; + let _ = bench_legacy_index_statistics(&ds, INDEX_NAME) + .await + .unwrap(); + }) + }); + }); +} + +fn bench_cold(c: &mut Criterion) { + let env = env(); + + // Current behavior, cold cache. Difference vs `legacy_cold` is the P1 win. + c.bench_function("index_stats/cold", |b| { + b.iter(|| { + env.rt.block_on(async { + let ds = open_cold(&env.fixture.uri).await; + let _ = ds.index_statistics(INDEX_NAME).await.unwrap(); + }) + }); + }); +} + +fn bench_cached(c: &mut Criterion) { + let env = env(); + let dataset = env.rt.block_on(open_cold(&env.fixture.uri)); + + // Prime the cache. + env.rt.block_on(async { + let _ = dataset.index_statistics(INDEX_NAME).await.unwrap(); + }); + + // Current behavior, warm cache. Difference vs `cold` is the P2 win. + c.bench_function("index_stats/cached", |b| { + b.iter(|| { + env.rt.block_on(async { + let _ = dataset.index_statistics(INDEX_NAME).await.unwrap(); + }) + }); + }); +} + +#[cfg(target_os = "linux")] +criterion_group!( + name = benches; + config = Criterion::default().significance_level(0.1).sample_size(30) + .with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = bench_count_rows, bench_legacy_cold, bench_cold, bench_cached +); + +#[cfg(not(target_os = "linux"))] +criterion_group!( + name = benches; + config = Criterion::default().significance_level(0.1).sample_size(30); + targets = bench_count_rows, bench_legacy_cold, bench_cold, bench_cached +); + +criterion_main!(benches); diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 40788265172..64aab64d3e4 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -87,7 +87,7 @@ use crate::index::mem_wal::open_mem_wal_index; pub use crate::index::prefilter::{FilterLoader, PreFilter}; use crate::index::scalar::{IndexDetails, fetch_index_details, load_training_data}; pub use crate::index::vector::{LogicalIvfView, LogicalVectorIndex}; -use crate::session::index_caches::{FragReuseIndexKey, IndexMetadataKey}; +use crate::session::index_caches::{FragReuseIndexKey, IndexMetadataKey, IndexStatisticsKey}; use crate::{Error, Result, dataset::Dataset}; pub use create::CreateIndexBuilder; pub use lance_index::IndexDescription; @@ -1131,17 +1131,32 @@ impl DatasetIndexExt for Dataset { return Err(Error::index_not_found(format!("name={}", index_name))); } - if index_name == FRAG_REUSE_INDEX_NAME { - return index_statistics_frag_reuse(self).boxed().await; + // The result is a pure function of (dataset URI, manifest version, index name): + // the cache is already prefixed by dataset URI, and any mutation that can change + // the answer (append, delete, compaction, index create/optimize/drop) bumps + // `manifest.version`, so we get automatic invalidation. + let cache_key = IndexStatisticsKey { + version: self.manifest.version, + index_name, + }; + if let Some(cached) = self.index_cache.get_with_key(&cache_key).await { + return Ok(cached.as_ref().clone()); } - if index_name == MEM_WAL_INDEX_NAME { - return index_statistics_mem_wal(self).boxed().await; - } + let stats = if index_name == FRAG_REUSE_INDEX_NAME { + index_statistics_frag_reuse(self).boxed().await? + } else if index_name == MEM_WAL_INDEX_NAME { + index_statistics_mem_wal(self).boxed().await? + } else { + index_statistics_scalar(self, index_name, metadatas) + .boxed() + .await? + }; - index_statistics_scalar(self, index_name, metadatas) - .boxed() - .await + self.index_cache + .insert_with_key(&cache_key, Arc::new(stats.clone())) + .await; + Ok(stats) } async fn read_index_partition( @@ -1183,6 +1198,82 @@ fn sum_indexed_rows_per_delta(indexed_fragments_per_delta: &[Vec]) -> Ok(rows_per_delta) } +/// Sum row counts across every fragment using only the manifest. +/// +/// Returns `Some(total)` when every fragment has `physical_rows` and any deletion +/// count resolved in-memory. Returns `None` if any fragment is missing this +/// metadata — callers should fall back to [`Dataset::count_rows`], which can +/// read fragment data files to recover the count for legacy datasets written +/// before these fields were always populated. +fn manifest_total_rows(ds: &Dataset) -> Option { + let mut total = 0usize; + for frag in ds.fragments().iter() { + total = total.checked_add(frag.num_rows()?)?; + } + Some(total) +} + +/// Benchmark-only entry point that reproduces the pre-P1/P2 behavior of +/// [`DatasetIndexExt::index_statistics`]: skips the `DSIndexCache` lookup and +/// computes the total row count via [`Dataset::count_rows`] rather than the +/// in-memory manifest sum. Exists solely so `benches/index_stats.rs` can +/// measure the wall-time delta against the current implementation within a +/// single process. Not part of the stable API. +#[doc(hidden)] +pub async fn bench_legacy_index_statistics(ds: &Dataset, index_name: &str) -> Result { + let metadatas = ds.load_indices_by_name(index_name).await?; + if metadatas.is_empty() { + return Err(Error::index_not_found(format!("name={}", index_name))); + } + + if index_name == FRAG_REUSE_INDEX_NAME { + return index_statistics_frag_reuse(ds).boxed().await; + } + if index_name == MEM_WAL_INDEX_NAME { + return index_statistics_mem_wal(ds).boxed().await; + } + + let field_id = metadatas[0].fields[0]; + let field_path = ds.schema().field_path(field_id)?; + + let (indices_stats, index_uri, num_indices, updated_at) = + collect_regular_indices_statistics(ds, metadatas, &field_path).await?; + + let index_type_hint = indices_stats + .first() + .and_then(|stats| stats.get("index_type")) + .and_then(|v| v.as_str()); + let index_type = legacy_type_name(&index_uri, index_type_hint); + + let indexed_fragments_per_delta = ds.indexed_fragments(index_name).await?; + let num_indexed_rows_per_delta = sum_indexed_rows_per_delta(&indexed_fragments_per_delta)?; + let num_indexed_fragments = unique_indexed_fragment_count(&indexed_fragments_per_delta) + .ok_or_else(|| Error::internal("overlap in indexed fragments".to_string()))?; + let num_unindexed_fragments = ds.fragments().len() - num_indexed_fragments; + let num_indexed_rows: usize = num_indexed_rows_per_delta.iter().sum(); + drop(indexed_fragments_per_delta); + + // Pre-P1: unconditional count_rows fan-out across every fragment. + let total_rows = ds.count_rows(None).await?; + let num_unindexed_rows = total_rows - num_indexed_rows; + + let stats = json!({ + "index_type": index_type, + "name": index_name, + "num_indices": num_indices, + "num_segments": num_indices, + "indices": indices_stats.clone(), + "segments": indices_stats, + "num_indexed_fragments": num_indexed_fragments, + "num_indexed_rows": num_indexed_rows, + "num_unindexed_fragments": num_unindexed_fragments, + "num_unindexed_rows": num_unindexed_rows, + "num_indexed_rows_per_delta": num_indexed_rows_per_delta, + "updated_at_timestamp_ms": updated_at, + }); + serialize_index_statistics(&stats) +} + fn unique_indexed_fragment_count(indexed_fragments_per_delta: &[Vec]) -> Option { let mut fragment_ids = HashSet::new(); for frags in indexed_fragments_per_delta { @@ -1358,8 +1449,15 @@ async fn gather_fragment_statistics( let num_indexed_rows: usize = num_indexed_rows_per_delta.iter().sum(); drop(indexed_fragments_per_delta); - let total_rows = ds.count_rows(None).await?; - let num_unindexed_rows = total_rows - num_indexed_rows; + // Prefer the in-memory sum of per-fragment row counts. `count_rows(None)` + // falls back to opening the first data file of each fragment when the + // manifest lacks `physical_rows` / `writer_version`, which dominates + // `index_stats` latency on tables with many fragments or slow storage. + let total_rows = match manifest_total_rows(ds) { + Some(n) => n, + None => ds.count_rows(None).await?, + }; + let num_unindexed_rows = total_rows.saturating_sub(num_indexed_rows); Ok(Some(( num_indexed_rows_per_delta, @@ -2939,6 +3037,276 @@ mod tests { assert_eq!(stats["num_indexed_rows"], 512); } + /// P1: `gather_fragment_statistics` should derive the total row count from + /// the manifest when every fragment carries `physical_rows`, matching the + /// answer that `Dataset::count_rows(None)` would produce. + #[tokio::test] + async fn test_index_statistics_row_counts_match_count_rows() { + let test_dir = TempStrDir::default(); + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + // Five separate write batches so the dataset has five fragments — the + // case where `count_rows` would fan out per fragment. + let rows_per_frag: i32 = 100; + let num_frags = 5; + let batches: Vec = (0..num_frags) + .map(|f| { + let start = f * rows_per_frag; + let values: Vec = (start..start + rows_per_frag).collect(); + RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(values))]) + .unwrap() + }) + .collect(); + + let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); + let mut dataset = Dataset::write( + reader, + test_dir.as_str(), + Some(WriteParams { + max_rows_per_file: rows_per_frag as usize, + ..Default::default() + }), + ) + .await + .unwrap(); + assert_eq!(dataset.fragments().len(), num_frags as usize); + + dataset + .create_index( + &["id"], + IndexType::BTree, + Some("id_idx".into()), + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + // Straddle fragment boundaries with the delete to exercise the + // deletion-count path in `Fragment::num_rows()`. + dataset + .delete("id < 25 OR (id >= 150 AND id < 175)") + .await + .unwrap(); + + // Append a new (unindexed) fragment. + let extra: Vec = (500..600).collect(); + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(extra))]).unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); + dataset.append(reader, None).await.unwrap(); + + // Ground truth via count_rows — must equal indexed + unindexed from stats. + let total_from_count_rows = dataset.count_rows(None).await.unwrap(); + let manifest_total = super::manifest_total_rows(&dataset).unwrap(); + assert_eq!(manifest_total, total_from_count_rows); + + let stats: serde_json::Value = + serde_json::from_str(&dataset.index_statistics("id_idx").await.unwrap()).unwrap(); + let indexed = stats["num_indexed_rows"].as_u64().unwrap() as usize; + let unindexed = stats["num_unindexed_rows"].as_u64().unwrap() as usize; + assert_eq!(indexed + unindexed, total_from_count_rows); + // 500 rows initially, 50 deleted (all from indexed fragments), 100 appended. + assert_eq!(indexed, 450); + assert_eq!(unindexed, 100); + } + + /// P1: `manifest_total_rows` returns `None` when any fragment lacks + /// `physical_rows`, so the caller falls back to `count_rows(None)`. + #[tokio::test] + async fn test_manifest_total_rows_missing_metadata_returns_none() { + use lance_table::format::Fragment as FragmentFmt; + + let test_dir = TempStrDir::default(); + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from((0..10).collect::>()))], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); + let mut dataset = Dataset::write(reader, test_dir.as_str(), None) + .await + .unwrap(); + + // Sanity: happy path returns a value. + assert!(super::manifest_total_rows(&dataset).is_some()); + + // Corrupt one fragment's metadata in-memory so the helper must bail. + let mut manifest = (*dataset.manifest).clone(); + let mut fragments: Vec = manifest.fragments.as_ref().clone(); + fragments[0].physical_rows = None; + manifest.fragments = Arc::new(fragments); + dataset.manifest = Arc::new(manifest); + + assert!(super::manifest_total_rows(&dataset).is_none()); + } + + /// P2: repeat `index_statistics` calls at the same manifest version are + /// served from the `DSIndexCache` without reopening the index file. + #[tokio::test] + async fn test_index_statistics_cache_hit_avoids_io() { + let test_dir = TempStrDir::default(); + let schema = Arc::new(Schema::new(vec![Field::new( + "status", + DataType::Int32, + false, + )])); + let values: Vec = (0..10_000).collect(); + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(values))]).unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write(reader, test_dir.as_str(), None) + .await + .unwrap(); + + // BTree lacks `load_statistics` today, so the uncached path opens the + // index file — exactly the cost we want the cache to skip. + dataset + .create_index( + &["status"], + IndexType::BTree, + Some("status_idx".into()), + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + let io_tracker = dataset.object_store().io_tracker().clone(); + + // Prime the cache. + let first = dataset.index_statistics("status_idx").await.unwrap(); + + // Reset I/O stats and call again — the second call must be served + // entirely from the session index cache. + io_tracker.incremental_stats(); + let second = dataset.index_statistics("status_idx").await.unwrap(); + let stats = io_tracker.incremental_stats(); + + assert_eq!(first, second, "cache must return identical JSON"); + assert_io_eq!( + stats, + read_iops, + 0, + "cached index_statistics should perform no reads; got {} ops", + stats.read_iops + ); + assert_io_eq!( + stats, + read_bytes, + 0, + "cached index_statistics should read zero bytes; got {} bytes", + stats.read_bytes + ); + } + + /// P2: mutations (append, delete, optimize) bump the manifest version and + /// therefore implicitly invalidate the cache — stale numbers must never + /// leak through. + #[tokio::test] + async fn test_index_statistics_cache_invalidates_on_manifest_bump() { + let test_dir = TempStrDir::default(); + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from((0..200).collect::>()))], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write(reader, test_dir.as_str(), None) + .await + .unwrap(); + dataset + .create_index( + &["id"], + IndexType::BTree, + Some("id_idx".into()), + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + let before: serde_json::Value = + serde_json::from_str(&dataset.index_statistics("id_idx").await.unwrap()).unwrap(); + assert_eq!(before["num_indexed_rows"], 200); + assert_eq!(before["num_unindexed_rows"], 0); + + // Append bumps manifest.version — the next call must see 50 unindexed rows. + let extra = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from((200..250).collect::>()))], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(extra)], schema.clone()); + dataset.append(reader, None).await.unwrap(); + + let after_append: serde_json::Value = + serde_json::from_str(&dataset.index_statistics("id_idx").await.unwrap()).unwrap(); + assert_eq!(after_append["num_indexed_rows"], 200); + assert_eq!(after_append["num_unindexed_rows"], 50); + + // `optimize_indices(append)` rolls the new fragment into a delta; the + // old manifest version's entry must not shadow the new counts. + dataset + .optimize_indices(&OptimizeOptions::append()) + .await + .unwrap(); + let after_optimize: serde_json::Value = + serde_json::from_str(&dataset.index_statistics("id_idx").await.unwrap()).unwrap(); + assert_eq!(after_optimize["num_indexed_rows"], 250); + assert_eq!(after_optimize["num_unindexed_rows"], 0); + } + + /// P2: two indices on the same dataset at the same version must get + /// independent cache entries — the name-based suffix in the key prevents + /// collisions. + #[tokio::test] + async fn test_index_statistics_cache_distinguishes_index_names() { + let test_dir = TempStrDir::default(); + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + let a_values: Vec = (0..100).collect(); + let b_values: Vec = (1000..1100).collect(); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(a_values)), + Arc::new(Int32Array::from(b_values)), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write(reader, test_dir.as_str(), None) + .await + .unwrap(); + for col in ["a", "b"] { + dataset + .create_index( + &[col], + IndexType::BTree, + Some(format!("{col}_idx")), + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + } + + let a1 = dataset.index_statistics("a_idx").await.unwrap(); + let b1 = dataset.index_statistics("b_idx").await.unwrap(); + assert_ne!(a1, b1, "different indices must yield different stats"); + + // Cached reads preserve the per-name distinction. + let a2 = dataset.index_statistics("a_idx").await.unwrap(); + let b2 = dataset.index_statistics("b_idx").await.unwrap(); + assert_eq!(a1, a2); + assert_eq!(b1, b2); + } + #[tokio::test] async fn test_optimize_delta_indices() { let dimensions = 16; diff --git a/rust/lance/src/session/index_caches.rs b/rust/lance/src/session/index_caches.rs index 3ae777880aa..694c4af81b2 100644 --- a/rust/lance/src/session/index_caches.rs +++ b/rust/lance/src/session/index_caches.rs @@ -145,3 +145,29 @@ impl CacheKey for ScalarIndexDetailsKey<'_> { "ScalarIndexDetails" } } + +/// Cache key for the serialized `index_statistics` JSON for a single named index +/// at a specific manifest version. +/// +/// `index_statistics` is a pure function of `(dataset URI, manifest version, index name)`: +/// the `DSIndexCache` already prefixes entries by dataset URI, and any mutation that can +/// change the answer (append, delete, compaction, index create/optimize/drop) bumps the +/// manifest version. Keying on the version therefore gives automatic invalidation — we +/// never need to touch this cache from a write path. +#[derive(Debug)] +pub struct IndexStatisticsKey<'a> { + pub version: u64, + pub index_name: &'a str, +} + +impl CacheKey for IndexStatisticsKey<'_> { + type ValueType = String; + + fn key(&self) -> Cow<'_, str> { + Cow::Owned(format!("stats/{}/{}", self.version, self.index_name)) + } + + fn type_name() -> &'static str { + "IndexStatisticsJson" + } +}