diff --git a/Cargo.lock b/Cargo.lock index 22f3fa45b38..977fafa9bc4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4876,9 +4876,13 @@ dependencies = [ "base64 0.22.1", "bytes", "chrono", + "datafusion-common", + "datafusion-physical-plan", + "env_logger", "futures", "hmac", "lance", + "lance-arrow", "lance-core", "lance-index", "lance-io", @@ -4891,6 +4895,7 @@ dependencies = [ "rand 0.9.4", "reqwest 0.12.28", "ring", + "roaring", "rstest", "rustls-pki-types", "serde", @@ -4901,6 +4906,7 @@ dependencies = [ "tower", "tower-http 0.5.2", "url", + "uuid", "wiremock", ] diff --git a/java/lance-jni/Cargo.lock b/java/lance-jni/Cargo.lock index fd11b54f6ee..3c6cd359abe 100644 --- a/java/lance-jni/Cargo.lock +++ b/java/lance-jni/Cargo.lock @@ -4056,8 +4056,11 @@ dependencies = [ "async-trait", "axum", "bytes", + "datafusion-common", + "datafusion-physical-plan", "futures", "lance", + "lance-arrow", "lance-core", "lance-index", "lance-io", @@ -4068,12 +4071,14 @@ dependencies = [ "object_store", "rand 0.9.4", "reqwest 0.12.28", + "roaring", "serde", "serde_json", "tokio", "tower", "tower-http 0.5.2", "url", + "uuid", ] [[package]] diff --git a/rust/lance-namespace-impls/BENCHMARK.md b/rust/lance-namespace-impls/BENCHMARK.md new file mode 100644 index 00000000000..791d1fde85f --- /dev/null +++ b/rust/lance-namespace-impls/BENCHMARK.md @@ -0,0 +1,113 @@ +# Copy-on-Write Directory Manifest Benchmark Results + +## Test Environment + +- **Instance:** c7i.12xlarge / c7i.48xlarge EC2 (us-east-1) +- **Storage:** S3 Standard, S3 Express One Zone (same AZ) +- **Concurrency:** Multi-process (each worker is a separate OS process) +- **Operations per level:** 100-200 + +## Changes Benchmarked + +1. **v1 → v2:** CommitBuilder replaced with direct `Manifest::new_from_previous` + `write_manifest_file` +2. **v2 → v3:** Removed redundant dataset reload before each write (use cached dataset, detect conflicts at commit time) +3. **Index vs no-index:** With/without BTree + Bitmap + FTS index building during CoW rewrite + +--- + +## Write Operation Comparison (v3, S3 Express, c=1) + +| Scale | Index | create-ns | declare-table | create-table | +|---|---|---:|---:|---:| +| 1K | idx | 174ms (5.7/s) | 193ms (5.1/s) | 385ms (2.6/s) | +| 1K | noidx | 146ms (6.8/s) | 159ms (6.2/s) | 352ms (2.8/s) | +| 100K | idx | 257ms (3.9/s) | 291ms (3.4/s) | 512ms (1.9/s) | +| 100K | noidx | 158ms (6.3/s) | 196ms (5.0/s) | 417ms (2.4/s) | + +### Latency Breakdown (S3 Express, c=1, no-index, 1K entries) + +``` +create-namespace: 146ms (manifest CoW rewrite only) +declare-table: 159ms (manifest CoW rewrite + .lance-reserved PUT) +create-table: 352ms (manifest CoW rewrite + .lance-reserved PUT + table data write) + +.lance-reserved overhead: ~13ms +table data write overhead: ~193ms +``` + +--- + +## Write Throughput Improvement (v2 → v3, reload fix) + +### S3 Express + +| Scale | Index | Operation | v2 p50 (tput) | v3 p50 (tput) | p50 delta | tput delta | +|---|---|---|---:|---:|---:|---:| +| 1K | idx | create-ns | 273ms (3.7/s) | 164ms (6.0/s) | **-40%** | **+62%** | +| 1K | idx | create-table | 485ms (2.1/s) | 345ms (2.9/s) | **-29%** | **+38%** | +| 100K | idx | create-ns | 280ms (3.5/s) | 265ms (3.8/s) | -5% | +9% | +| 100K | idx | create-table | 588ms (1.7/s) | 460ms (2.2/s) | **-22%** | **+29%** | + +### S3 Standard + +| Scale | Index | Operation | v2 p50 (tput) | v3 p50 (tput) | p50 delta | tput delta | +|---|---|---|---:|---:|---:|---:| +| 1K | idx | create-ns | 410ms (2.4/s) | 327ms (2.9/s) | **-20%** | **+21%** | +| 1K | idx | create-table | 663ms (1.5/s) | 584ms (1.6/s) | -12% | +7% | +| 100K | idx | create-ns | 693ms (1.4/s) | 692ms (1.4/s) | 0% | 0% | +| 100K | idx | create-table | 981ms (1.0/s) | 958ms (1.0/s) | -2% | 0% | + +--- + +## Scale Benchmark (v2, 100K–1M entries, c7i.48xlarge) + +### CoW Rewrite + Index Build Time (initial seed) + +| Scale | S3 Standard | S3 Express | +|---:|---:|---:| +| 100K | 0.7s | 0.4s | +| 500K | 1.6s | 1.1s | +| 1M | 3.2s | 2.0s | + +### Write Throughput at Scale (c=1, with index) + +| Scale | S3 create-ns p50 (tput) | S3X create-ns p50 (tput) | +|---:|---:|---:| +| 100K | 693ms (1.4/s) | 280ms (3.5/s) | +| 300K | 985ms (1.0/s) | 582ms (1.7/s) | +| 500K | 1559ms (0.6/s) | 933ms (1.1/s) | +| 700K | 2174ms (0.5/s) | 1304ms (0.8/s) | +| 1M | 2873ms (0.3/s) | 1791ms (0.6/s) | + +### Indexed Point Lookup — describe-table (warm, c=1) + +| Scale | S3 p50 | S3X p50 | +|---:|---:|---:| +| 100K | 45ms | 9ms | +| 500K | 47ms | 9ms | +| 1M | 54ms | 9ms | + +**Flat from 100K to 1M** — BTree index makes point lookups O(log n). + +### Bitmap Scan — list-namespaces (warm, c=1) + +| Scale | S3 p50 | S3X p50 | +|---:|---:|---:| +| 100K | 51ms | 14ms | +| 500K | 67ms | 34ms | +| 1M | 101ms | 65ms | + +Linear with result count. Still under 100ms on S3X at 1M. + +--- + +## Summary + +| Metric | S3 Standard | S3 Express | +|---|---:|---:| +| Pure manifest write (1K, no-idx, c=1) | 224ms (4.3/s) | **146ms (6.8/s)** | +| Declare table (1K, no-idx, c=1) | 251ms (3.8/s) | **159ms (6.2/s)** | +| Declare table (1K, idx, c=1) | 355ms (2.7/s) | **193ms (5.1/s)** | +| Indexed point lookup (1M, warm) | 54ms | **9ms** | +| List namespaces (1M, warm) | 101ms | **65ms** | +| CoW full rewrite + 3 indices (1M) | 3.2s | **2.0s** | diff --git a/rust/lance-namespace-impls/Cargo.toml b/rust/lance-namespace-impls/Cargo.toml index 963edf5e8ca..296b75cc4ea 100644 --- a/rust/lance-namespace-impls/Cargo.toml +++ b/rust/lance-namespace-impls/Cargo.toml @@ -42,6 +42,7 @@ reqwest = { version = "0.12", optional = true, default-features = false, feature # Directory implementation dependencies (always enabled) url = { workspace = true } lance = { workspace = true } +lance-arrow = { workspace = true } lance-index = { workspace = true } lance-linalg = { workspace = true } lance-io = { workspace = true } @@ -50,6 +51,8 @@ object_store = { workspace = true } arrow = { workspace = true } arrow-ipc = { workspace = true } arrow-schema = { workspace = true } +datafusion-common = { workspace = true } +datafusion-physical-plan = { workspace = true } # REST adapter implementation dependencies (optional, enabled by "rest-adapter" feature) axum = { workspace = true, optional = true } @@ -65,6 +68,8 @@ serde_json = { workspace = true } futures.workspace = true log.workspace = true rand.workspace = true +roaring.workspace = true +uuid.workspace = true # Shared credential vending dependencies sha2 = { version = "0.10", optional = true } @@ -91,6 +96,8 @@ arrow = { workspace = true } arrow-ipc = { workspace = true } rstest.workspace = true lance-table.workspace = true +serde = { workspace = true } +env_logger = "0.11" [lints] workspace = true diff --git a/rust/lance-namespace-impls/examples/manifest_bench.rs b/rust/lance-namespace-impls/examples/manifest_bench.rs new file mode 100644 index 00000000000..8df657dfcd8 --- /dev/null +++ b/rust/lance-namespace-impls/examples/manifest_bench.rs @@ -0,0 +1,753 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Multi-process manifest benchmark with S3 support. +//! +//! Modes: +//! seed — populate a manifest with N entries via namespace API +//! seed-large — write a __manifest Lance table directly with N rows +//! run — benchmark read/write operations with multi-process concurrency +//! worker — (internal) single-process worker spawned by `run` +//! +//! Examples: +//! # Seed 1000 entries via namespace API +//! manifest_bench seed --root s3://bucket/bench/test1 --count 1000 +//! +//! # Seed 500K rows directly into __manifest table +//! manifest_bench seed-large --root s3://bucket/bench/scale \ +//! --count 500000 --inline-optimization true +//! +//! # Run scale benchmark at 500K initial entries +//! manifest_bench run --root s3://bucket/bench/scale \ +//! --concurrency 1,10,100 --operations 200 + +use std::collections::HashMap; +use std::io::{BufRead, BufReader}; +use std::process::{Command, Stdio}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use arrow::array::{RecordBatch, RecordBatchIterator, StringArray}; +use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; +use bytes::Bytes; +use lance::dataset::{InsertBuilder, WriteMode, WriteParams}; +use lance_arrow::json::JsonArray; +use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION; +use lance_namespace::LanceNamespace; +use lance_namespace::models::{ + CreateNamespaceRequest, CreateTableRequest, DeclareTableRequest, DescribeTableRequest, + ListNamespacesRequest, ListTablesRequest, +}; +use lance_namespace_impls::DirectoryNamespaceBuilder; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Clone)] +struct LatencyRecord { + operation: String, + latency_ms: f64, + error: bool, +} + +#[derive(Serialize)] +struct BenchResult { + variant: String, + operation: String, + concurrency: usize, + initial_entries: usize, + total_operations: usize, + total_duration_ms: f64, + throughput_ops_per_sec: f64, + avg_latency_ms: f64, + p50_latency_ms: f64, + p90_latency_ms: f64, + p99_latency_ms: f64, + min_latency_ms: f64, + max_latency_ms: f64, + errors: usize, +} + +fn percentile(sorted: &[f64], p: f64) -> f64 { + if sorted.is_empty() { + return 0.0; + } + let idx = ((sorted.len() as f64 - 1.0) * p).round() as usize; + sorted[idx.min(sorted.len() - 1)] +} + +fn compute_result( + variant: &str, + operation: &str, + concurrency: usize, + initial_entries: usize, + wall_duration: Duration, + mut latencies: Vec, + errors: usize, +) -> BenchResult { + latencies.sort_by(|a, b| a.partial_cmp(b).unwrap()); + let total = latencies.len(); + let total_ms = wall_duration.as_secs_f64() * 1000.0; + let throughput = if total_ms > 0.0 { + total as f64 / (total_ms / 1000.0) + } else { + 0.0 + }; + BenchResult { + variant: variant.to_string(), + operation: operation.to_string(), + concurrency, + initial_entries, + total_operations: total, + total_duration_ms: total_ms, + throughput_ops_per_sec: throughput, + avg_latency_ms: if total > 0 { + latencies.iter().sum::() / total as f64 + } else { + 0.0 + }, + p50_latency_ms: percentile(&latencies, 0.50), + p90_latency_ms: percentile(&latencies, 0.90), + p99_latency_ms: percentile(&latencies, 0.99), + min_latency_ms: latencies.first().copied().unwrap_or(0.0), + max_latency_ms: latencies.last().copied().unwrap_or(0.0), + errors, + } +} + +fn create_test_ipc_data() -> Vec { + use arrow::array::Int32Array; + use arrow::ipc::writer::StreamWriter; + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ], + ) + .unwrap(); + let mut buffer = Vec::new(); + { + let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + } + buffer +} + +fn manifest_schema() -> Arc { + Arc::new(ArrowSchema::new(vec![ + Field::new("object_id", DataType::Utf8, false).with_metadata( + [( + LANCE_UNENFORCED_PRIMARY_KEY_POSITION.to_string(), + "0".to_string(), + )] + .into_iter() + .collect(), + ), + Field::new("object_type", DataType::Utf8, false), + Field::new("location", DataType::Utf8, true), + lance_arrow::json::json_field("metadata", true), + ])) +} + +async fn build_namespace( + root: &str, + inline_optimization: bool, + storage_options: &HashMap, +) -> Box { + let mut properties = HashMap::new(); + properties.insert("root".to_string(), root.to_string()); + properties.insert("dir_listing_enabled".to_string(), "false".to_string()); + properties.insert( + "inline_optimization_enabled".to_string(), + inline_optimization.to_string(), + ); + for (k, v) in storage_options { + properties.insert(format!("storage.{}", k), v.clone()); + } + let builder = DirectoryNamespaceBuilder::from_properties(properties, None) + .expect("Failed to create namespace builder from properties"); + Box::new(builder.build().await.expect("Failed to build namespace")) +} + +// ──────────────────── seed mode ──────────────────── + +async fn seed( + root: &str, + count: usize, + inline_optimization: bool, + storage_options: &HashMap, +) { + eprintln!("Seeding {} entries at {}", count, root); + let ns = build_namespace(root, inline_optimization, storage_options).await; + let ipc_data = Bytes::from(create_test_ipc_data()); + + let ns_count = count / 3; + let table_count = count - ns_count; + + for i in 0..ns_count { + let mut req = CreateNamespaceRequest::new(); + req.id = Some(vec![format!("ns_{}", i)]); + if let Err(e) = ns.create_namespace(req).await { + eprintln!("seed ns_{}: {}", i, e); + } + if (i + 1) % 100 == 0 { + eprintln!(" seeded {}/{} namespaces", i + 1, ns_count); + } + } + for i in 0..table_count { + let mut req = CreateTableRequest::new(); + req.id = Some(vec![format!("table_{}", i)]); + if let Err(e) = ns.create_table(req, ipc_data.clone()).await { + eprintln!("seed table_{}: {}", i, e); + } + if (i + 1) % 100 == 0 { + eprintln!(" seeded {}/{} tables", i + 1, table_count); + } + } + eprintln!( + "Seed complete: {} namespaces, {} tables", + ns_count, table_count + ); +} + +// ──────────────────── seed-large mode ──────────────────── +// Writes a __manifest Lance table directly with N rows, bypassing the namespace API. + +const SEED_LARGE_BATCH_SIZE: usize = 10_000; + +fn generate_manifest_batch( + schema: &Arc, + start_idx: usize, + batch_size: usize, + total_count: usize, +) -> RecordBatch { + let ns_count = total_count / 3; + let actual_size = batch_size.min(total_count - start_idx); + + let mut object_ids = Vec::with_capacity(actual_size); + let mut object_types = Vec::with_capacity(actual_size); + let mut locations: Vec> = Vec::with_capacity(actual_size); + let mut metadatas: Vec> = Vec::with_capacity(actual_size); + + for i in start_idx..start_idx + actual_size { + if i < ns_count { + object_ids.push(format!("ns_{}", i)); + object_types.push("namespace".to_string()); + locations.push(None); + metadatas.push(None); + } else { + let table_idx = i - ns_count; + object_ids.push(format!("table_{}", table_idx)); + object_types.push("table".to_string()); + locations.push(Some(format!("table_{}", table_idx))); + metadatas.push(Some(r#"{"bench":"true"}"#)); + } + } + + let metadata_array = Arc::new( + JsonArray::try_from_iter(metadatas.into_iter()) + .expect("Failed to encode metadata as JSON") + .into_inner(), + ); + + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(object_ids)), + Arc::new(StringArray::from(object_types)), + Arc::new(StringArray::from(locations)), + metadata_array, + ], + ) + .expect("Failed to create manifest batch") +} + +async fn seed_large( + root: &str, + count: usize, + inline_optimization: bool, + storage_options: &HashMap, +) { + let manifest_uri = format!("{}/{}", root, "__manifest"); + eprintln!( + "Seed-large: writing {} rows directly to {}", + count, manifest_uri + ); + + let schema = manifest_schema(); + + // Generate batches + let mut batches = Vec::new(); + let mut offset = 0; + while offset < count { + let batch_size = SEED_LARGE_BATCH_SIZE.min(count - offset); + batches.push(generate_manifest_batch(&schema, offset, batch_size, count)); + offset += batch_size; + } + eprintln!(" generated {} batches", batches.len()); + + let mut write_params = WriteParams { + mode: WriteMode::Create, + ..WriteParams::default() + }; + if !storage_options.is_empty() { + let accessor = Arc::new( + lance_io::object_store::StorageOptionsAccessor::with_static_options( + storage_options.clone(), + ), + ); + write_params.store_params = Some(lance_io::object_store::ObjectStoreParams { + storage_options_accessor: Some(accessor), + ..Default::default() + }); + } + + let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); + InsertBuilder::new(manifest_uri.as_str()) + .with_params(&write_params) + .execute_stream(reader) + .await + .expect("Failed to write manifest dataset"); + + eprintln!(" wrote Lance dataset"); + + // Now open via namespace API to trigger the first CoW rewrite with indices + if inline_optimization { + eprintln!(" triggering initial CoW rewrite to build indices..."); + let start = Instant::now(); + let ns = build_namespace(root, true, storage_options).await; + let mut req = CreateNamespaceRequest::new(); + req.id = Some(vec!["__seed_trigger__".to_string()]); + ns.create_namespace(req) + .await + .expect("Failed to trigger CoW rewrite"); + eprintln!( + " CoW rewrite with index build took {:.1}s", + start.elapsed().as_secs_f64() + ); + } + + let ns_count = count / 3; + let table_count = count - ns_count; + eprintln!( + "Seed-large complete: {} total rows ({} namespaces, {} tables)", + count, ns_count, table_count + ); +} + +// ──────────────────── worker mode ──────────────────── + +async fn worker( + root: &str, + operation: &str, + operations: usize, + warmup: usize, + worker_id: usize, + table_count: usize, + inline_optimization: bool, + storage_options: &HashMap, +) { + let ns = build_namespace(root, inline_optimization, storage_options).await; + let ipc_data = Bytes::from(create_test_ipc_data()); + + // Warmup (only for warm-read operations) + if operation.starts_with("warm-read") { + for _ in 0..warmup { + let _ = + run_operation(ns.as_ref(), operation, worker_id, 0, table_count, &ipc_data).await; + } + } + + for i in 0..operations { + let start = Instant::now(); + let err = run_operation(ns.as_ref(), operation, worker_id, i, table_count, &ipc_data) + .await + .is_err(); + let latency_ms = start.elapsed().as_secs_f64() * 1000.0; + let record = LatencyRecord { + operation: operation.to_string(), + latency_ms, + error: err, + }; + println!("{}", serde_json::to_string(&record).unwrap()); + } +} + +async fn run_operation( + ns: &dyn LanceNamespace, + operation: &str, + worker_id: usize, + op_idx: usize, + table_count: usize, + ipc_data: &Bytes, +) -> Result<(), Box> { + match operation { + "cold-read-list-namespaces" | "warm-read-list-namespaces" => { + let mut req = ListNamespacesRequest::new(); + req.id = Some(vec![]); + ns.list_namespaces(req).await?; + } + "cold-read-list-tables" | "warm-read-list-tables" => { + let mut req = ListTablesRequest::new(); + req.id = Some(vec![]); + ns.list_tables(req).await?; + } + "cold-read-describe-table" | "warm-read-describe-table" => { + let table_idx = (worker_id * 1000 + op_idx) % table_count.max(1); + let req = DescribeTableRequest { + id: Some(vec![format!("table_{}", table_idx)]), + ..Default::default() + }; + ns.describe_table(req).await?; + } + "write-create-namespace" => { + let mut req = CreateNamespaceRequest::new(); + req.id = Some(vec![format!("bench_w{}_{}", worker_id, op_idx)]); + ns.create_namespace(req).await?; + } + "write-create-table" => { + let mut req = CreateTableRequest::new(); + req.id = Some(vec![format!("bench_t{}_{}", worker_id, op_idx)]); + ns.create_table(req, ipc_data.clone()).await?; + } + "write-declare-table" => { + let req = DeclareTableRequest { + id: Some(vec![format!("bench_d{}_{}", worker_id, op_idx)]), + ..Default::default() + }; + ns.declare_table(req).await?; + } + _ => { + return Err(format!("unknown operation: {}", operation).into()); + } + } + Ok(()) +} + +// ──────────────────── cold-read worker ──────────────────── +// For cold reads, each operation opens a FRESH namespace to avoid caching. + +async fn cold_read_worker( + root: &str, + operation: &str, + operations: usize, + worker_id: usize, + table_count: usize, + inline_optimization: bool, + storage_options: &HashMap, +) { + let ipc_data = Bytes::from(create_test_ipc_data()); + + for i in 0..operations { + // Fresh namespace for each operation — simulates cold start + let start = Instant::now(); + let ns = build_namespace(root, inline_optimization, storage_options).await; + let err = run_operation(ns.as_ref(), operation, worker_id, i, table_count, &ipc_data) + .await + .is_err(); + let latency_ms = start.elapsed().as_secs_f64() * 1000.0; + let record = LatencyRecord { + operation: operation.to_string(), + latency_ms, + error: err, + }; + println!("{}", serde_json::to_string(&record).unwrap()); + } +} + +// ──────────────────── run mode (coordinator) ──────────────────── + +fn run_workers( + self_exe: &str, + root: &str, + operation: &str, + concurrency: usize, + operations: usize, + warmup: usize, + table_count: usize, + initial_entries: usize, + inline_optimization: bool, + variant: &str, + storage_options: &HashMap, +) -> BenchResult { + let ops_per_worker = operations / concurrency.max(1); + if ops_per_worker == 0 { + return compute_result( + variant, + operation, + concurrency, + initial_entries, + Duration::ZERO, + vec![], + 0, + ); + } + + let wall_start = Instant::now(); + + let children: Vec<_> = (0..concurrency) + .map(|worker_id| { + let mut cmd = Command::new(self_exe); + cmd.arg("worker") + .arg("--root") + .arg(root) + .arg("--operation") + .arg(operation) + .arg("--operations") + .arg(ops_per_worker.to_string()) + .arg("--warmup") + .arg(warmup.to_string()) + .arg("--worker-id") + .arg(worker_id.to_string()) + .arg("--table-count") + .arg(table_count.to_string()) + .arg("--inline-optimization") + .arg(inline_optimization.to_string()); + for (k, v) in storage_options { + cmd.arg("--storage-option").arg(format!("{}={}", k, v)); + } + cmd.stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn() + .expect("Failed to spawn worker") + }) + .collect(); + + let mut all_latencies = Vec::new(); + let mut total_errors = 0; + + for mut child in children { + let stdout = child.stdout.take().unwrap(); + let reader = BufReader::new(stdout); + for line in reader.lines() { + let line = line.expect("failed to read worker output"); + if let Ok(record) = serde_json::from_str::(&line) { + if record.error { + total_errors += 1; + } else { + all_latencies.push(record.latency_ms); + } + } + } + let status = child.wait().expect("failed to wait for worker"); + if !status.success() { + eprintln!("Worker exited with status: {}", status); + } + } + + let wall_duration = wall_start.elapsed(); + compute_result( + variant, + operation, + concurrency, + initial_entries, + wall_duration, + all_latencies, + total_errors, + ) +} + +fn parse_concurrency_list(s: &str) -> Vec { + s.split(',') + .filter_map(|v| v.trim().parse::().ok()) + .filter(|v| *v > 0) + .collect() +} + +#[tokio::main] +async fn main() { + env_logger::init(); + + let args: Vec = std::env::args().collect(); + if args.len() < 2 { + eprintln!("Usage: manifest_bench [options]"); + std::process::exit(1); + } + + let mode = args[1].as_str(); + let mut root = String::new(); + let mut operation = String::new(); + let mut operations: usize = 100; + let mut warmup: usize = 10; + let mut concurrency_list = vec![1, 2, 5, 10, 20, 50, 100]; + let mut count: usize = 1000; + let mut worker_id: usize = 0; + let mut table_count: usize = 667; // default for 1000 seed: 1000 - 1000/3 + let mut initial_entries: usize = 0; + let mut inline_optimization = true; + let mut variant = String::new(); + let mut storage_options: HashMap = HashMap::new(); + + let mut i = 2; + while i < args.len() { + match args[i].as_str() { + "--root" => { + root = args[i + 1].clone(); + i += 2; + } + "--operation" => { + operation = args[i + 1].clone(); + i += 2; + } + "--operations" => { + operations = args[i + 1].parse().unwrap(); + i += 2; + } + "--warmup" => { + warmup = args[i + 1].parse().unwrap(); + i += 2; + } + "--concurrency" => { + concurrency_list = parse_concurrency_list(&args[i + 1]); + i += 2; + } + "--count" => { + count = args[i + 1].parse().unwrap(); + i += 2; + } + "--worker-id" => { + worker_id = args[i + 1].parse().unwrap(); + i += 2; + } + "--table-count" => { + table_count = args[i + 1].parse().unwrap(); + i += 2; + } + "--initial-entries" => { + initial_entries = args[i + 1].parse().unwrap(); + i += 2; + } + "--inline-optimization" => { + inline_optimization = args[i + 1].parse().unwrap(); + i += 2; + } + "--variant" => { + variant = args[i + 1].clone(); + i += 2; + } + "--storage-option" => { + let kv = &args[i + 1]; + if let Some((k, v)) = kv.split_once('=') { + storage_options.insert(k.to_string(), v.to_string()); + } + i += 2; + } + _ => { + eprintln!("Unknown argument: {}", args[i]); + std::process::exit(1); + } + } + } + + if variant.is_empty() { + variant = if inline_optimization { + "default".to_string() + } else { + "no_inline_opt".to_string() + }; + } + + match mode { + "seed" => { + seed(&root, count, inline_optimization, &storage_options).await; + } + "seed-large" => { + seed_large(&root, count, inline_optimization, &storage_options).await; + } + "worker" => { + if operation.starts_with("cold-read") { + cold_read_worker( + &root, + &operation, + operations, + worker_id, + table_count, + inline_optimization, + &storage_options, + ) + .await; + } else { + worker( + &root, + &operation, + operations, + warmup, + worker_id, + table_count, + inline_optimization, + &storage_options, + ) + .await; + } + } + "run" => { + let self_exe = std::env::current_exe() + .expect("failed to get self exe path") + .to_string_lossy() + .to_string(); + + let operations_list = [ + "cold-read-list-namespaces", + "cold-read-list-tables", + "cold-read-describe-table", + "warm-read-list-namespaces", + "warm-read-list-tables", + "warm-read-describe-table", + "write-create-namespace", + "write-declare-table", + "write-create-table", + ]; + + // If --operation is set, only run that one + let ops: Vec<&str> = if operation.is_empty() { + operations_list.to_vec() + } else { + vec![operation.as_str()] + }; + + eprintln!("=== Manifest Benchmark ==="); + eprintln!("variant: {}", variant); + eprintln!("root: {}", root); + eprintln!("inline_optimization: {}", inline_optimization); + eprintln!("initial_entries: {}", initial_entries); + eprintln!("concurrency: {:?}", concurrency_list); + eprintln!("operations per level: {}", operations); + eprintln!("warmup: {}", warmup); + eprintln!("table_count: {}", table_count); + + for op in &ops { + for &concurrency in &concurrency_list { + let actual_ops = (operations / concurrency) * concurrency; + eprintln!(" {} concurrency={} ops={}", op, concurrency, actual_ops); + let result = run_workers( + &self_exe, + &root, + op, + concurrency, + actual_ops, + warmup, + table_count, + initial_entries, + inline_optimization, + &variant, + &storage_options, + ); + println!("{}", serde_json::to_string(&result).unwrap()); + } + } + eprintln!("=== Benchmark complete ==="); + } + _ => { + eprintln!( + "Unknown mode: {}. Use seed, seed-large, run, or worker.", + mode + ); + std::process::exit(1); + } + } +} diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 425993e3956..09145b6a4b6 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -307,11 +307,10 @@ impl DirectoryNamespaceBuilder { self } - /// Enable or disable inline optimization of the __manifest table. + /// Deprecated compatibility option for inline optimization of the __manifest table. /// - /// When enabled (default), performs compaction and indexing on the __manifest table - /// after every write operation to maintain optimal performance. - /// When disabled, manual optimization must be performed separately. + /// Copy-on-write manifest rewrites always replace data files and maintain + /// indexes inline. This setting is retained for callers that still pass it. pub fn inline_optimization_enabled(mut self, enabled: bool) -> Self { self.inline_optimization_enabled = enabled; self @@ -349,7 +348,7 @@ impl DirectoryNamespaceBuilder { /// - `root`: The root directory path (required) /// - `manifest_enabled`: Enable manifest-based table tracking (optional, default: true) /// - `dir_listing_enabled`: Enable directory listing for table discovery (optional, default: true) - /// - `inline_optimization_enabled`: Enable inline optimization of __manifest table (optional, default: true) + /// - `inline_optimization_enabled`: Deprecated compatibility option; ignored by copy-on-write manifest rewrites /// - `storage.*`: Storage options (optional, prefix will be stripped) /// /// Credential vendor properties (prefixed with `credential_vendor.`, prefix is stripped): @@ -447,7 +446,7 @@ impl DirectoryNamespaceBuilder { .and_then(|v| v.parse::().ok()) .unwrap_or(true); - // Extract inline_optimization_enabled (default: true) + // Extract deprecated inline_optimization_enabled (default: true) let inline_optimization_enabled = properties .get("inline_optimization_enabled") .and_then(|v| v.parse::().ok()) @@ -2935,15 +2934,12 @@ impl LanceNamespace for DirectoryNamespace { .to_string(); if let Err(e) = manifest_ns - .insert_into_manifest_with_metadata( - vec![manifest::ManifestEntry { - object_id, - object_type: manifest::ObjectType::TableVersion, - location: None, - metadata: Some(metadata_json), - }], - None, - ) + .insert_into_manifest_with_metadata(vec![manifest::ManifestEntry { + object_id, + object_type: manifest::ObjectType::TableVersion, + location: None, + metadata: Some(metadata_json), + }]) .await { log::warn!( @@ -3039,38 +3035,27 @@ impl LanceNamespace for DirectoryNamespace { ranges, }]; - let mut total_deleted_count = 0i64; - if self.table_version_storage_enabled && let Some(ref manifest_ns) = self.manifest_ns { - // Phase 1 (atomic commit point): Delete version records from __manifest - // for ALL tables in a single atomic operation. This is the authoritative + // Phase 1 (atomic commit point): Delete version records from __manifest. + // This is the authoritative // source of truth — once __manifest entries are removed, the versions - // are logically deleted across all tables atomically. - - // Collect all (table_id_str, ranges) for batch deletion - let mut all_object_ids: Vec = Vec::new(); - for te in &table_entries { - let table_id_str = manifest::ManifestNamespace::str_object_id( - &te.table_id.clone().unwrap_or_default(), - ); - for (start, end) in &te.ranges { - for version in *start..=*end { - let object_id = manifest::ManifestNamespace::build_version_object_id( - &table_id_str, - version, - ); - all_object_ids.push(object_id); - } - } - } - - if !all_object_ids.is_empty() { - total_deleted_count = manifest_ns - .batch_delete_table_versions_by_object_ids(&all_object_ids) - .await?; - } + // are logically deleted. + let table_ranges = table_entries + .iter() + .map(|te| { + ( + manifest::ManifestNamespace::str_object_id( + &te.table_id.clone().unwrap_or_default(), + ), + te.ranges.clone(), + ) + }) + .collect::>(); + let total_deleted_count = manifest_ns + .batch_delete_table_versions_by_ranges(&table_ranges) + .await?; // Phase 2: Delete physical manifest files (best-effort). // Even if some file deletions fail, the versions are already removed from @@ -3087,7 +3072,7 @@ impl LanceNamespace for DirectoryNamespace { } // Fallback when table_version_storage is not enabled: delete physical files directly (no __manifest) - total_deleted_count = self + let total_deleted_count = self .delete_physical_version_files(&table_entries, false) .await?; diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index 0e22f1e8b69..47b6b128863 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -6,27 +6,40 @@ //! This module provides a namespace implementation that uses a manifest table //! to track tables and nested namespaces. -use arrow::array::builder::{ListBuilder, StringBuilder}; -use arrow::array::{Array, RecordBatch, RecordBatchIterator, StringArray}; -use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; +use arrow::array::{ + Array, LargeBinaryArray, LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, + UInt64Array, +}; +use arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaRef}; use arrow_ipc::reader::StreamReader; use async_trait::async_trait; use bytes::Bytes; -use futures::{FutureExt, TryStreamExt, stream::StreamExt}; -use lance::dataset::optimize::{CompactionOptions, compact_files}; +use datafusion_common::DataFusionError; +use datafusion_physical_plan::{ + SendableRecordBatchStream, + stream::RecordBatchStreamAdapter as DatafusionRecordBatchStreamAdapter, +}; +use futures::{ + FutureExt, TryStreamExt, + stream::{self, StreamExt}, +}; +use lance::dataset::index::LanceIndexStoreExt; +use lance::dataset::transaction::Operation; use lance::dataset::{ - DeleteBuilder, MergeInsertBuilder, ReadParams, WhenMatched, WhenNotMatched, WriteMode, - WriteParams, builder::DatasetBuilder, + InsertBuilder, ManifestWriteConfig, ReadParams, WhenMatched, WriteMode, WriteParams, + builder::DatasetBuilder, write_manifest_file, }; -use lance::index::DatasetIndexExt; use lance::session::Session; use lance::{Dataset, dataset::scanner::Scanner}; -use lance_core::Error as LanceError; +use lance_arrow::json::{JsonArray, decode_json, json_field}; use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION; +use lance_core::{Error as LanceError, ROW_ID}; use lance_core::{Error, Result}; -use lance_index::IndexType; -use lance_index::optimize::OptimizeOptions; -use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams}; +use lance_index::progress::noop_progress; +use lance_index::registry::IndexPluginRegistry; +use lance_index::scalar::lance_format::LanceIndexStore; +use lance_index::scalar::registry::VALUE_COLUMN_NAME; +use lance_index::scalar::{BuiltinIndexType, CreatedIndex, InvertedIndexParams, ScalarIndexParams}; use lance_io::object_store::{ObjectStore, ObjectStoreParams}; use lance_namespace::LanceNamespace; use lance_namespace::error::NamespaceError; @@ -41,15 +54,19 @@ use lance_namespace::models::{ TableVersion, }; use lance_namespace::schema::arrow_schema_to_json; +use lance_table::format::{IndexMetadata, Manifest}; +use lance_table::io::commit::CommitError; use object_store::{Error as ObjectStoreError, path::Path}; +use roaring::RoaringBitmap; use std::io::Cursor; use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap, HashSet}, hash::{DefaultHasher, Hash, Hasher}, ops::{Deref, DerefMut}, - sync::Arc, + sync::{Arc, Mutex as StdMutex, MutexGuard as StdMutexGuard}, }; use tokio::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use uuid::Uuid; const MANIFEST_TABLE_NAME: &str = "__manifest"; const DELIMITER: &str = "$"; @@ -62,11 +79,12 @@ pub(crate) const DECLARED_FILTER_CONCURRENCY: usize = 16; const OBJECT_ID_INDEX_NAME: &str = "object_id_btree"; /// Bitmap index on the object_type column for filtering by type const OBJECT_TYPE_INDEX_NAME: &str = "object_type_bitmap"; -/// LabelList index on the base_objects column for view dependencies -const BASE_OBJECTS_INDEX_NAME: &str = "base_objects_label_list"; -/// Inline maintenance on the manifest table is expensive relative to a single-row mutation. -/// Wait until enough fragments accumulate before compacting files or merging indices. -const MANIFEST_INLINE_OPTIMIZATION_FRAGMENT_THRESHOLD: usize = 8; +/// JSON FTS index on the metadata column for metadata text search +const METADATA_INDEX_NAME: &str = "metadata_fts"; +// Each retry reloads and rewrites the full manifest. Match the regular Lance +// commit retry budget so multi-process namespace writes can make progress. +const DEFAULT_MANIFEST_REWRITE_COMMIT_RETRIES: u32 = 20; +const MANIFEST_INDEX_BATCH_SIZE: usize = 8192; /// Object types that can be stored in the manifest #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -77,7 +95,7 @@ pub enum ObjectType { } impl ObjectType { - pub fn as_str(&self) -> &str { + pub fn as_str(&self) -> &'static str { match self { Self::Namespace => "namespace", Self::Table => "table", @@ -160,6 +178,433 @@ pub struct ManifestEntry { pub metadata: Option, } +struct CopyOnWriteMutation { + result: T, + has_changes: bool, +} + +impl CopyOnWriteMutation { + fn updated(result: T) -> Self { + Self { + result, + has_changes: true, + } + } + + fn unchanged(result: T) -> Self { + Self { + result, + has_changes: false, + } + } +} + +struct ManifestIndexBuildInput { + index_name: &'static str, + column_name: &'static str, + params: ScalarIndexParams, + field: Field, + stream: SendableRecordBatchStream, +} + +struct ManifestTrainedIndex { + index_name: &'static str, + column_name: &'static str, + uuid: Uuid, + created_index: CreatedIndex, +} + +struct ManifestRowValue { + object_id: String, + object_type: ObjectType, + location: Option, + metadata: Option, +} + +struct ManifestOutputRow<'a> { + object_id: &'a str, + object_type: ObjectType, + location: Option<&'a str>, + metadata: Option<&'a str>, +} + +#[derive(Default)] +struct ManifestIndexAccumulator { + object_ids: BTreeMap, u64>, + object_types: BTreeMap<&'static str, RoaringBitmap>, + metadata_values: Vec>, + metadata_row_ids: Vec, + row_count: u64, +} + +impl ManifestIndexAccumulator { + fn next_row_id(&self) -> Result { + if self.row_count > u64::from(u32::MAX) { + return Err(NamespaceError::Internal { + message: format!( + "Manifest rewrite exceeded maximum single-fragment row count: {}", + self.row_count + ), + } + .into()); + } + Ok(self.row_count) + } + + fn push(&mut self, row: &ManifestOutputRow<'_>) -> Result { + let row_id = self.next_row_id()?; + if self + .object_ids + .insert(Arc::::from(row.object_id), row_id) + .is_some() + { + return Err(NamespaceError::Internal { + message: format!("Manifest contains duplicate object_id '{}'", row.object_id), + } + .into()); + } + self.object_types + .entry(row.object_type.as_str()) + .or_default() + .insert(row_id as u32); + self.metadata_values + .push(row.metadata.map(ToString::to_string)); + self.metadata_row_ids.push(row_id); + self.row_count += 1; + Ok(row_id) + } +} + +struct ManifestBatchBuilder { + object_ids: Vec, + object_types: Vec<&'static str>, + locations: Vec>, + metadatas: Vec>, + row_ids: Vec, +} + +impl ManifestBatchBuilder { + fn new() -> Self { + Self { + object_ids: Vec::new(), + object_types: Vec::new(), + locations: Vec::new(), + metadatas: Vec::new(), + row_ids: Vec::new(), + } + } + + fn is_empty(&self) -> bool { + self.object_ids.is_empty() + } + + fn append( + &mut self, + index_data: &mut ManifestIndexAccumulator, + row: ManifestOutputRow<'_>, + ) -> Result<()> { + let row_id = index_data.push(&row)?; + self.object_ids.push(row.object_id.to_string()); + self.object_types.push(row.object_type.as_str()); + self.locations.push(row.location.map(ToString::to_string)); + self.metadatas.push(row.metadata.map(ToString::to_string)); + self.row_ids.push(row_id); + Ok(()) + } + + fn finish(self) -> Result { + let metadata_array = Arc::new( + JsonArray::try_from_iter(self.metadatas.iter().map(|v| v.as_deref())) + .map_err(|e| { + lance_core::Error::from(NamespaceError::Internal { + message: format!("Failed to encode manifest metadata as JSON: {}", e), + }) + })? + .into_inner(), + ); + RecordBatch::try_new( + ManifestNamespace::manifest_schema(), + vec![ + Arc::new(StringArray::from(self.object_ids)), + Arc::new(StringArray::from(self.object_types)), + Arc::new(StringArray::from(self.locations)), + metadata_array, + ], + ) + .map_err(|e| { + lance_core::Error::from(NamespaceError::Internal { + message: format!("Failed to create manifest snapshot batch: {:?}", e), + }) + }) + } +} + +trait ManifestStreamMutation: Send { + type Output: Clone + Send + 'static; + + fn process_existing_row( + &mut self, + row: ManifestRowValue, + output: &mut ManifestBatchBuilder, + index_data: &mut ManifestIndexAccumulator, + ) -> Result<()>; + + fn append_rows( + &mut self, + output: &mut ManifestBatchBuilder, + index_data: &mut ManifestIndexAccumulator, + ) -> Result<()>; + + fn finish(&self) -> CopyOnWriteMutation; +} + +struct ManifestRewriteShared { + mutation: M, + index_data: Option, + result: Option>, + error: Option, +} + +impl ManifestRewriteShared { + fn new(mutation: M) -> Self { + Self { + mutation, + index_data: Some(ManifestIndexAccumulator::default()), + result: None, + error: None, + } + } +} + +struct UpsertManifestMutation { + entries: Vec, + entry_positions: HashMap, + matched: Vec, + when_matched: WhenMatched, +} + +impl UpsertManifestMutation { + fn new(entries: Vec, when_matched: WhenMatched) -> Self { + let entry_positions = entries + .iter() + .enumerate() + .map(|(index, entry)| (entry.object_id.clone(), index)) + .collect(); + let matched = vec![false; entries.len()]; + Self { + entries, + entry_positions, + matched, + when_matched, + } + } + + fn entry_row(&self, index: usize) -> ManifestOutputRow<'_> { + let entry = &self.entries[index]; + ManifestOutputRow { + object_id: &entry.object_id, + object_type: entry.object_type, + location: entry.location.as_deref(), + metadata: entry.metadata.as_deref(), + } + } +} + +impl ManifestStreamMutation for UpsertManifestMutation { + type Output = (); + + fn process_existing_row( + &mut self, + row: ManifestRowValue, + output: &mut ManifestBatchBuilder, + index_data: &mut ManifestIndexAccumulator, + ) -> Result<()> { + if let Some(index) = self.entry_positions.get(&row.object_id).copied() { + match self.when_matched { + WhenMatched::Fail => { + return Err(NamespaceError::ConcurrentModification { + message: format!( + "Object '{}' was concurrently created by another operation", + row.object_id + ), + } + .into()); + } + WhenMatched::UpdateAll => { + self.matched[index] = true; + output.append(index_data, self.entry_row(index))?; + return Ok(()); + } + _ => { + return Err(NamespaceError::Internal { + message: format!( + "Unsupported manifest rewrite matched action: {:?}", + self.when_matched + ), + } + .into()); + } + } + } + + output.append( + index_data, + ManifestOutputRow { + object_id: &row.object_id, + object_type: row.object_type, + location: row.location.as_deref(), + metadata: row.metadata.as_deref(), + }, + ) + } + + fn append_rows( + &mut self, + output: &mut ManifestBatchBuilder, + index_data: &mut ManifestIndexAccumulator, + ) -> Result<()> { + for index in 0..self.entries.len() { + if !self.matched[index] { + output.append(index_data, self.entry_row(index))?; + } + } + Ok(()) + } + + fn finish(&self) -> CopyOnWriteMutation { + CopyOnWriteMutation::updated(()) + } +} + +struct DeleteObjectMutation { + object_id: String, + deleted: bool, +} + +impl ManifestStreamMutation for DeleteObjectMutation { + type Output = (); + + fn process_existing_row( + &mut self, + row: ManifestRowValue, + output: &mut ManifestBatchBuilder, + index_data: &mut ManifestIndexAccumulator, + ) -> Result<()> { + if row.object_id == self.object_id { + self.deleted = true; + return Ok(()); + } + + output.append( + index_data, + ManifestOutputRow { + object_id: &row.object_id, + object_type: row.object_type, + location: row.location.as_deref(), + metadata: row.metadata.as_deref(), + }, + ) + } + + fn append_rows( + &mut self, + _output: &mut ManifestBatchBuilder, + _index_data: &mut ManifestIndexAccumulator, + ) -> Result<()> { + Ok(()) + } + + fn finish(&self) -> CopyOnWriteMutation { + if self.deleted { + CopyOnWriteMutation::updated(()) + } else { + CopyOnWriteMutation::unchanged(()) + } + } +} + +enum DeleteTableVersionsTarget { + ObjectIds(HashSet), + Ranges(Vec), +} + +#[derive(Clone)] +struct DeleteTableVersionRangeTarget { + object_id_prefix: String, + ranges: Vec<(i64, i64)>, +} + +impl DeleteTableVersionRangeTarget { + fn matches(&self, object_id: &str) -> bool { + let Some(version) = object_id + .strip_prefix(&self.object_id_prefix) + .and_then(|suffix| suffix.parse::().ok()) + else { + return false; + }; + + self.ranges + .iter() + .any(|(start, end)| *start <= version && version <= *end) + } +} + +impl DeleteTableVersionsTarget { + fn matches(&self, object_id: &str) -> bool { + match self { + Self::ObjectIds(object_ids) => object_ids.contains(object_id), + Self::Ranges(targets) => targets.iter().any(|target| target.matches(object_id)), + } + } +} + +struct DeleteTableVersionsMutation { + target: DeleteTableVersionsTarget, + deleted_count: i64, +} + +impl ManifestStreamMutation for DeleteTableVersionsMutation { + type Output = i64; + + fn process_existing_row( + &mut self, + row: ManifestRowValue, + output: &mut ManifestBatchBuilder, + index_data: &mut ManifestIndexAccumulator, + ) -> Result<()> { + if row.object_type == ObjectType::TableVersion && self.target.matches(&row.object_id) { + self.deleted_count += 1; + return Ok(()); + } + + output.append( + index_data, + ManifestOutputRow { + object_id: &row.object_id, + object_type: row.object_type, + location: row.location.as_deref(), + metadata: row.metadata.as_deref(), + }, + ) + } + + fn append_rows( + &mut self, + _output: &mut ManifestBatchBuilder, + _index_data: &mut ManifestIndexAccumulator, + ) -> Result<()> { + Ok(()) + } + + fn finish(&self) -> CopyOnWriteMutation { + if self.deleted_count > 0 { + CopyOnWriteMutation::updated(self.deleted_count) + } else { + CopyOnWriteMutation::unchanged(0) + } + } +} + /// Information about a namespace stored in the manifest #[derive(Debug, Clone)] pub struct NamespaceInfo { @@ -190,6 +635,25 @@ impl DatasetConsistencyWrapper { }) } + /// Get the cached dataset without reloading. + /// Use when the caller knows the dataset is already at an acceptable version + /// (e.g., under a mutation lock where we just committed or are about to detect + /// conflicts at commit time). + pub async fn get_cached(&self) -> DatasetReadGuard<'_> { + DatasetReadGuard { + guard: self.0.read().await, + } + } + + /// Reload the dataset and return a reference. + /// Use on retry paths where we know the version is stale. + pub async fn get_refreshed(&self) -> Result> { + self.reload().await?; + Ok(DatasetReadGuard { + guard: self.0.read().await, + }) + } + /// Get a mutable reference to the dataset. /// Always reloads to ensure strong consistency. pub async fn get_mut(&self) -> Result> { @@ -306,8 +770,9 @@ pub struct ManifestNamespace { /// If true, root namespace tables use {table_name}.lance naming /// If false, they use namespace-prefixed names dir_listing_enabled: bool, - /// Whether to perform inline optimization (compaction and indexing) on the __manifest table - /// after every write. Defaults to true. + /// When true, copy-on-write manifest rewrites build replacement indices + /// (BTree, Bitmap, FTS) inline during the overwrite. When false, only the + /// data file is rewritten and indices are expected to be created offline. inline_optimization_enabled: bool, /// Number of retries for commit operations on the manifest table. /// If None, defaults to [`lance_table::io::commit::CommitConfig`] default (20). @@ -341,11 +806,13 @@ impl std::fmt::Debug for ManifestNamespace { /// - Other errors: IO error with the operation description fn convert_lance_commit_error(e: &LanceError, operation: &str, object_id: Option<&str>) -> Error { match e { - // CommitConflict: version collision retries exhausted -> Throttling (safe to retry) - LanceError::CommitConflict { .. } => NamespaceError::Throttling { - message: format!("Too many concurrent writes, please retry later: {:?}", e), + // Safe-to-retry commit conflicts exhausted retries -> Throttling. + LanceError::CommitConflict { .. } | LanceError::RetryableCommitConflict { .. } => { + NamespaceError::Throttling { + message: format!("Too many concurrent writes, please retry later: {:?}", e), + } + .into() } - .into(), // TooMuchWriteContention: RetryableCommitConflict (semantic conflict) retries exhausted -> ConcurrentModification // IncompatibleTransaction: incompatible concurrent change -> ConcurrentModification LanceError::TooMuchWriteContention { .. } | LanceError::IncompatibleTransaction { .. } => { @@ -493,6 +960,31 @@ impl ManifestNamespace { ) } + fn build_version_object_id_prefix(table_object_id: &str) -> String { + format!("{}{}", table_object_id, DELIMITER) + } + + fn normalize_table_version_ranges(ranges: &[(i64, i64)]) -> Vec<(i64, i64)> { + let mut ranges = ranges + .iter() + .copied() + .filter(|(start, end)| start <= end) + .collect::>(); + ranges.sort_unstable(); + + let mut merged: Vec<(i64, i64)> = Vec::with_capacity(ranges.len()); + for (start, end) in ranges { + if let Some((_, last_end)) = merged.last_mut() + && start <= last_end.saturating_add(1) + { + *last_end = (*last_end).max(end); + continue; + } + merged.push((start, end)); + } + merged + } + /// Parse a version number from the version suffix of a table version object_id. /// /// The object_id is formatted as `{table_id}${zero_padded_version}`. @@ -556,231 +1048,791 @@ impl ManifestNamespace { Ok(full_url.to_string()) } - /// Perform inline optimization on the __manifest table. - /// - /// This method: - /// 1. Creates three indexes on the manifest table: - /// - BTREE index on object_id for fast lookups - /// - Bitmap index on object_type for filtering by type - /// - LabelList index on base_objects for view dependencies - /// 2. Runs file compaction to merge small files - /// 3. Optimizes existing indices - /// - /// This is called automatically after writes when inline_optimization_enabled is true. - async fn run_inline_optimization(&self) -> Result<()> { - if !self.inline_optimization_enabled { - return Ok(()); - } - - // Get a mutable reference to the dataset to perform optimization - let mut dataset_guard = self.manifest_dataset.get_mut().await?; - let dataset: &mut Dataset = &mut dataset_guard; + /// Get the manifest schema + fn manifest_schema() -> Arc { + Arc::new(ArrowSchema::new(vec![ + // Set unenforced primary key on object_id for bloom filter conflict detection + Field::new("object_id", DataType::Utf8, false).with_metadata( + [( + LANCE_UNENFORCED_PRIMARY_KEY_POSITION.to_string(), + "0".to_string(), + )] + .into_iter() + .collect(), + ), + Field::new("object_type", DataType::Utf8, false), + Field::new("location", DataType::Utf8, true), + json_field("metadata", true), + ])) + } - // Step 1: Create indexes if they don't already exist - let indices = dataset.load_indices().await?; + /// Get a scanner for the manifest dataset + async fn manifest_scanner(&self) -> Result { + let dataset_guard = self.manifest_dataset.get().await?; + Ok(dataset_guard.scan()) + } - // Check which indexes already exist - let has_object_id_index = indices.iter().any(|idx| idx.name == OBJECT_ID_INDEX_NAME); - let has_object_type_index = indices.iter().any(|idx| idx.name == OBJECT_TYPE_INDEX_NAME); - let has_base_objects_index = indices - .iter() - .any(|idx| idx.name == BASE_OBJECTS_INDEX_NAME); + /// Helper to execute a scanner and collect results into a Vec + async fn execute_scanner(scanner: Scanner) -> Result> { + let mut stream = scanner.try_into_stream().await.map_err(|e| { + lance_core::Error::from(NamespaceError::Internal { + message: format!("Failed to create stream: {:?}", e), + }) + })?; - // Create BTREE index on object_id - if !has_object_id_index { - log::debug!( - "Creating BTREE index '{}' on object_id for __manifest table", - OBJECT_ID_INDEX_NAME - ); - let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree); - if let Err(e) = dataset - .create_index( - &["object_id"], - IndexType::BTree, - Some(OBJECT_ID_INDEX_NAME.to_string()), - ¶ms, - true, - ) - .await - { - log::warn!( - "Failed to create BTREE index on object_id for __manifest table: {:?}. Query performance may be impacted.", - e - ); - } else { - log::info!( - "Created BTREE index '{}' on object_id for __manifest table", - OBJECT_ID_INDEX_NAME - ); - } + let mut batches = Vec::new(); + while let Some(batch) = stream.next().await { + batches.push(batch.map_err(|e| { + lance_core::Error::from(NamespaceError::Internal { + message: format!("Failed to read batch: {:?}", e), + }) + })?); } - // Create Bitmap index on object_type - if !has_object_type_index { - log::debug!( - "Creating Bitmap index '{}' on object_type for __manifest table", - OBJECT_TYPE_INDEX_NAME - ); - let params = ScalarIndexParams::default(); - if let Err(e) = dataset - .create_index( - &["object_type"], - IndexType::Bitmap, - Some(OBJECT_TYPE_INDEX_NAME.to_string()), - ¶ms, - true, - ) - .await - { - log::warn!( - "Failed to create Bitmap index on object_type for __manifest table: {:?}. Query performance may be impacted.", - e - ); - } else { - log::info!( - "Created Bitmap index '{}' on object_type for __manifest table", - OBJECT_TYPE_INDEX_NAME - ); - } - } + Ok(batches) + } - // Create LabelList index on base_objects - if !has_base_objects_index { - log::debug!( - "Creating LabelList index '{}' on base_objects for __manifest table", - BASE_OBJECTS_INDEX_NAME - ); - let params = ScalarIndexParams::default(); - if let Err(e) = dataset - .create_index( - &["base_objects"], - IndexType::LabelList, - Some(BASE_OBJECTS_INDEX_NAME.to_string()), - ¶ms, - true, - ) - .await - { - log::warn!( - "Failed to create LabelList index on base_objects for __manifest table: {:?}. Query performance may be impacted.", - e - ); - } else { - log::info!( - "Created LabelList index '{}' on base_objects for __manifest table", - BASE_OBJECTS_INDEX_NAME - ); + /// Helper to get a string column from a record batch + fn get_string_column<'a>(batch: &'a RecordBatch, column_name: &str) -> Result<&'a StringArray> { + let column = batch.column_by_name(column_name).ok_or_else(|| { + lance_core::Error::from(NamespaceError::Internal { + message: format!("Column '{}' not found", column_name), + }) + })?; + column + .as_any() + .downcast_ref::() + .ok_or_else(|| { + lance_core::Error::from(NamespaceError::Internal { + message: format!("Column '{}' is not a string array", column_name), + }) + }) + } + + fn required_string_value<'a>( + array: &'a StringArray, + row: usize, + column_name: &str, + ) -> Result<&'a str> { + if array.is_null(row) { + return Err(NamespaceError::Internal { + message: format!("Manifest column '{}' has null at row {}", column_name, row), } + .into()); } + Ok(array.value(row)) + } - let should_compact_and_optimize = - dataset.count_fragments() >= MANIFEST_INLINE_OPTIMIZATION_FRAGMENT_THRESHOLD; + fn optional_string_value(array: &StringArray, row: usize) -> Option { + (!array.is_null(row)).then(|| array.value(row).to_string()) + } - if !should_compact_and_optimize { - return Ok(()); + fn metadata_column_values( + batch: &RecordBatch, + column_name: &str, + ) -> Result>> { + let column = batch.column_by_name(column_name).ok_or_else(|| { + lance_core::Error::from(NamespaceError::Internal { + message: format!("Column '{}' not found", column_name), + }) + })?; + + if let Some(array) = column.as_any().downcast_ref::() { + return Ok((0..array.len()) + .map(|row| Self::optional_string_value(array, row)) + .collect()); } - // Step 2: Run file compaction - log::debug!("Running file compaction on __manifest table"); - match compact_files(dataset, CompactionOptions::default(), None).await { - Ok(compaction_metrics) => { - if compaction_metrics.fragments_removed > 0 { - log::info!( - "Compacted __manifest table: removed {} fragments, added {} fragments", - compaction_metrics.fragments_removed, - compaction_metrics.fragments_added - ); - } - } - Err(e) => { - log::warn!( - "Failed to compact files for __manifest table: {:?}. Continuing with optimization.", - e - ); - } + if let Some(array) = column.as_any().downcast_ref::() { + return Ok((0..array.len()) + .map(|row| (!array.is_null(row)).then(|| array.value(row).to_string())) + .collect()); } - // Step 3: Optimize indices - log::debug!("Optimizing indices on __manifest table"); - match dataset.optimize_indices(&OptimizeOptions::default()).await { - Ok(_) => { - log::info!("Successfully optimized indices on __manifest table"); - } - Err(e) => { - log::warn!( - "Failed to optimize indices on __manifest table: {:?}. Continuing anyway.", - e - ); - } + if let Some(array) = column.as_any().downcast_ref::() { + return Ok((0..array.len()) + .map(|row| (!array.is_null(row)).then(|| decode_json(array.value(row)))) + .collect()); } - Ok(()) + Err(NamespaceError::Internal { + message: format!( + "Column '{}' is not a supported metadata array: {:?}", + column_name, + column.data_type() + ), + } + .into()) } - /// Get the manifest schema - fn manifest_schema() -> Arc { + fn projected_schema(dataset: &Dataset) -> Result { + let projected_columns = ["object_id", "object_type", "location", "metadata"]; + let lance_schema = dataset.schema(); + let fields: Vec<_> = projected_columns + .iter() + .filter_map(|name| { + let f = lance_schema.field(name)?; + Some(Field::new(*name, f.data_type(), f.nullable)) + }) + .collect(); + Ok(Arc::new(ArrowSchema::new(fields))) + } + + async fn manifest_projected_stream(dataset: &Dataset) -> Result { + // Use the dataset's own schema so that old datasets with Utf8 metadata + // (instead of JSON/LargeBinary) stream correctly. The downstream + // rewrite path (metadata_column_values) handles both column types. + let schema = Self::projected_schema(dataset)?; + let mut scanner = dataset.scan(); + scanner + .project(&["object_id", "object_type", "location", "metadata"]) + .map_err(|e| { + lance_core::Error::from(NamespaceError::Internal { + message: format!("Failed to project manifest columns: {:?}", e), + }) + })?; + let stream = scanner.try_into_stream().await.map_err(|e| { + lance_core::Error::from(NamespaceError::Internal { + message: format!("Failed to create manifest stream: {:?}", e), + }) + })?; + let stream = stream.map_err(|err| DataFusionError::External(Box::new(err))); + Ok(Box::pin(DatafusionRecordBatchStreamAdapter::new( + schema, + stream.fuse(), + ))) + } + + fn manifest_rewrite_commit_retries(&self) -> u32 { + self.commit_retries + .unwrap_or(DEFAULT_MANIFEST_REWRITE_COMMIT_RETRIES) + } + + fn value_row_id_schema(value_field: Field) -> SchemaRef { Arc::new(ArrowSchema::new(vec![ - // Set unenforced primary key on object_id for bloom filter conflict detection - Field::new("object_id", DataType::Utf8, false).with_metadata( - [( - LANCE_UNENFORCED_PRIMARY_KEY_POSITION.to_string(), - "0".to_string(), - )] - .into_iter() - .collect(), - ), - Field::new("object_type", DataType::Utf8, false), - Field::new("location", DataType::Utf8, true), - Field::new("metadata", DataType::Utf8, true), - Field::new( - "base_objects", - DataType::List(Arc::new(Field::new("object_id", DataType::Utf8, true))), - true, - ), + value_field, + Field::new(ROW_ID, DataType::UInt64, false), ])) } - /// Get a scanner for the manifest dataset - async fn manifest_scanner(&self) -> Result { - let dataset_guard = self.manifest_dataset.get().await?; - Ok(dataset_guard.scan()) + fn metadata_index_schema() -> SchemaRef { + Self::value_row_id_schema(json_field(VALUE_COLUMN_NAME, true)) } - /// Helper to execute a scanner and collect results into a Vec - async fn execute_scanner(scanner: Scanner) -> Result> { - let mut stream = scanner.try_into_stream().await.map_err(|e| { + fn metadata_index_stream( + metadata_values: Vec>, + metadata_row_ids: Vec, + ) -> SendableRecordBatchStream { + let schema = Self::metadata_index_schema(); + let stream_schema = schema.clone(); + let stream = stream::unfold( + ( + metadata_values.into_iter().zip(metadata_row_ids).peekable(), + false, + schema, + ), + |state| async move { + let (mut iter, emitted, schema) = state; + let mut values = Vec::with_capacity(MANIFEST_INDEX_BATCH_SIZE); + let mut row_ids = Vec::with_capacity(MANIFEST_INDEX_BATCH_SIZE); + for _ in 0..MANIFEST_INDEX_BATCH_SIZE { + let Some((value, row_id)) = iter.next() else { + break; + }; + values.push(value); + row_ids.push(row_id); + } + if values.is_empty() { + if emitted { + None + } else { + let batch = Self::json_row_id_batch(schema.clone(), values, row_ids) + .map_err(|err| DataFusionError::External(Box::new(err))); + Some((batch, (iter, true, schema))) + } + } else { + let batch = Self::json_row_id_batch(schema.clone(), values, row_ids) + .map_err(|err| DataFusionError::External(Box::new(err))); + Some((batch, (iter, true, schema))) + } + }, + ); + Box::pin(DatafusionRecordBatchStreamAdapter::new( + stream_schema, + stream.fuse(), + )) + } + + fn metadata_index_params() -> ScalarIndexParams { + ScalarIndexParams::for_builtin(BuiltinIndexType::Inverted) + .with_params(&InvertedIndexParams::default().lance_tokenizer("json".to_string())) + } + + fn string_row_id_batch( + schema: SchemaRef, + values: Vec, + row_ids: Vec, + ) -> Result { + RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(values)), + Arc::new(UInt64Array::from(row_ids)), + ], + ) + .map_err(Into::into) + } + + fn json_row_id_batch( + schema: SchemaRef, + values: Vec>, + row_ids: Vec, + ) -> Result { + let json_array = Arc::new( + JsonArray::try_from_iter(values.iter().map(|v| v.as_deref())) + .map_err(|e| { + lance_core::Error::from(NamespaceError::Internal { + message: format!("Failed to encode metadata index JSON: {}", e), + }) + })? + .into_inner(), + ); + RecordBatch::try_new( + schema, + vec![json_array, Arc::new(UInt64Array::from(row_ids))], + ) + .map_err(Into::into) + } + + fn object_id_index_stream(object_ids: BTreeMap, u64>) -> SendableRecordBatchStream { + let schema = + Self::value_row_id_schema(Field::new(VALUE_COLUMN_NAME, DataType::Utf8, false)); + let stream_schema = schema.clone(); + let stream = stream::unfold( + (object_ids.into_iter(), false, schema), + |state| async move { + let (mut iter, emitted, schema) = state; + let mut values = Vec::with_capacity(MANIFEST_INDEX_BATCH_SIZE); + let mut row_ids = Vec::with_capacity(MANIFEST_INDEX_BATCH_SIZE); + for _ in 0..MANIFEST_INDEX_BATCH_SIZE { + let Some((value, row_id)) = iter.next() else { + break; + }; + values.push(value.to_string()); + row_ids.push(row_id); + } + if values.is_empty() { + if emitted { + None + } else { + let batch = Self::string_row_id_batch(schema.clone(), values, row_ids) + .map_err(|err| DataFusionError::External(Box::new(err))); + Some((batch, (iter, true, schema))) + } + } else { + let batch = Self::string_row_id_batch(schema.clone(), values, row_ids) + .map_err(|err| DataFusionError::External(Box::new(err))); + Some((batch, (iter, true, schema))) + } + }, + ); + Box::pin(DatafusionRecordBatchStreamAdapter::new( + stream_schema, + stream.fuse(), + )) + } + + fn object_type_index_stream( + object_types: BTreeMap<&'static str, RoaringBitmap>, + ) -> SendableRecordBatchStream { + let schema = + Self::value_row_id_schema(Field::new(VALUE_COLUMN_NAME, DataType::Utf8, false)); + let stream_schema = schema.clone(); + let entries = object_types + .into_iter() + .map(|(value, bitmap)| { + ( + value, + Box::new(bitmap.into_iter()) as Box + Send>, + ) + }) + .collect::>() + .into_iter(); + let stream = stream::unfold( + (entries, None, false, schema), + |(mut entries, mut current, emitted, schema)| async move { + let mut values = Vec::with_capacity(MANIFEST_INDEX_BATCH_SIZE); + let mut row_ids = Vec::with_capacity(MANIFEST_INDEX_BATCH_SIZE); + while values.len() < MANIFEST_INDEX_BATCH_SIZE { + if current.is_none() { + current = entries.next(); + } + let Some((value, iter)) = current.as_mut() else { + break; + }; + if let Some(row_id) = iter.next() { + values.push((*value).to_string()); + row_ids.push(u64::from(row_id)); + } else { + current = None; + } + } + + if values.is_empty() { + if emitted { + None + } else { + let batch = Self::string_row_id_batch(schema.clone(), values, row_ids) + .map_err(|err| DataFusionError::External(Box::new(err))); + Some((batch, (entries, current, true, schema))) + } + } else { + let batch = Self::string_row_id_batch(schema.clone(), values, row_ids) + .map_err(|err| DataFusionError::External(Box::new(err))); + Some((batch, (entries, current, true, schema))) + } + }, + ); + Box::pin(DatafusionRecordBatchStreamAdapter::new( + stream_schema, + stream.fuse(), + )) + } + + async fn train_manifest_index( + dataset: &Dataset, + input: ManifestIndexBuildInput, + index_uuid: Uuid, + ) -> Result { + let index_store = LanceIndexStore::from_dataset_for_new(dataset, &index_uuid.to_string())?; + let registry = IndexPluginRegistry::with_default_plugins(); + let plugin = registry.get_plugin_by_name(&input.params.index_type)?; + let training_request = plugin + .new_training_request(input.params.params.as_deref().unwrap_or("{}"), &input.field)?; + let created_index = plugin + .train_index( + input.stream, + &index_store, + training_request, + None, + noop_progress(), + ) + .await?; + + Ok(ManifestTrainedIndex { + index_name: input.index_name, + column_name: input.column_name, + uuid: index_uuid, + created_index, + }) + } + + fn manifest_index_metadata( + lance_schema: &lance_core::datatypes::Schema, + fragment_bitmap: &RoaringBitmap, + dataset_version: u64, + trained_index: ManifestTrainedIndex, + ) -> Result { + Ok(IndexMetadata { + uuid: trained_index.uuid, + fields: vec![lance_schema.field_id(trained_index.column_name)?], + name: trained_index.index_name.to_string(), + dataset_version, + fragment_bitmap: Some(fragment_bitmap.clone()), + index_details: Some(Arc::new(trained_index.created_index.index_details)), + index_version: trained_index.created_index.index_version as i32, + created_at: None, + base_id: None, + files: trained_index.created_index.files, + }) + } + + async fn build_manifest_index( + dataset: &Dataset, + lance_schema: &lance_core::datatypes::Schema, + input: ManifestIndexBuildInput, + fragment_bitmap: &RoaringBitmap, + dataset_version: u64, + index_uuid: Uuid, + ) -> Result { + let trained_index = Self::train_manifest_index(dataset, input, index_uuid).await?; + Self::manifest_index_metadata( + lance_schema, + fragment_bitmap, + dataset_version, + trained_index, + ) + } + + async fn build_manifest_indices( + dataset: &Dataset, + manifest: &Manifest, + index_data: ManifestIndexAccumulator, + index_uuids: &mut Vec, + ) -> Result> { + let num_fragments = manifest.fragments.len(); + let fragment_bitmap = RoaringBitmap::from_iter(0..num_fragments as u32); + let schema = &manifest.schema; + let ManifestIndexAccumulator { + object_ids, + object_types, + metadata_values, + metadata_row_ids, + .. + } = index_data; + let object_id_uuid = Uuid::new_v4(); + let object_type_uuid = Uuid::new_v4(); + let metadata_uuid = Uuid::new_v4(); + index_uuids.extend([object_id_uuid, object_type_uuid, metadata_uuid]); + + let dataset_version = manifest.version; + let object_id_index_fut = Self::build_manifest_index( + dataset, + schema, + ManifestIndexBuildInput { + index_name: OBJECT_ID_INDEX_NAME, + column_name: "object_id", + params: ScalarIndexParams::for_builtin(BuiltinIndexType::BTree), + field: Field::new(VALUE_COLUMN_NAME, DataType::Utf8, false), + stream: Self::object_id_index_stream(object_ids), + }, + &fragment_bitmap, + dataset_version, + object_id_uuid, + ); + let object_type_index_fut = Self::build_manifest_index( + dataset, + schema, + ManifestIndexBuildInput { + index_name: OBJECT_TYPE_INDEX_NAME, + column_name: "object_type", + params: ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap), + field: Field::new(VALUE_COLUMN_NAME, DataType::Utf8, false), + stream: Self::object_type_index_stream(object_types), + }, + &fragment_bitmap, + dataset_version, + object_type_uuid, + ); + let metadata_index_fut = Self::build_manifest_index( + dataset, + schema, + ManifestIndexBuildInput { + index_name: METADATA_INDEX_NAME, + column_name: "metadata", + params: Self::metadata_index_params(), + field: json_field(VALUE_COLUMN_NAME, true), + stream: Self::metadata_index_stream(metadata_values, metadata_row_ids), + }, + &fragment_bitmap, + dataset_version, + metadata_uuid, + ); + + let (object_id_index, object_type_index, metadata_index) = futures::join!( + object_id_index_fut, + object_type_index_fut, + metadata_index_fut + ); + + Ok(vec![object_id_index?, object_type_index?, metadata_index?]) + } + + fn lock_manifest_rewrite_shared( + shared: &Arc>>, + ) -> Result>> { + shared.lock().map_err(|_| { lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to create stream: {:?}", e), + message: "Manifest rewrite state mutex was poisoned".to_string(), + }) + }) + } + + fn set_manifest_rewrite_error( + shared: &Arc>>, + err: LanceError, + ) { + match shared.lock() { + Ok(mut guard) => { + guard.error = Some(err); + } + Err(poisoned) => { + let mut guard = poisoned.into_inner(); + guard.error = Some(err); + } + } + } + + fn take_manifest_rewrite_error( + shared: &Arc>>, + ) -> Result> { + let mut guard = Self::lock_manifest_rewrite_shared(shared)?; + Ok(guard.error.take()) + } + + fn process_manifest_rewrite_batch( + batch: RecordBatch, + shared: &Arc>>, + ) -> Result> { + let object_ids = Self::get_string_column(&batch, "object_id")?; + let object_types = Self::get_string_column(&batch, "object_type")?; + let locations = Self::get_string_column(&batch, "location")?; + let metadatas = Self::metadata_column_values(&batch, "metadata")?; + let mut output = ManifestBatchBuilder::new(); + let mut guard = Self::lock_manifest_rewrite_shared(shared)?; + let mut index_data = guard.index_data.take().ok_or_else(|| { + lance_core::Error::from(NamespaceError::Internal { + message: "Manifest rewrite index state is unavailable".to_string(), }) })?; + for (row, metadata) in metadatas.into_iter().enumerate().take(batch.num_rows()) { + let row_value = ManifestRowValue { + object_id: Self::required_string_value(object_ids, row, "object_id")?.to_string(), + object_type: ObjectType::parse(Self::required_string_value( + object_types, + row, + "object_type", + )?)?, + location: Self::optional_string_value(locations, row), + metadata, + }; + guard + .mutation + .process_existing_row(row_value, &mut output, &mut index_data)?; + } + guard.index_data = Some(index_data); + if output.is_empty() { + return Ok(None); + } + Ok(Some(output.finish()?)) + } - let mut batches = Vec::new(); - while let Some(batch) = stream.next().await { - batches.push(batch.map_err(|e| { - lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to read batch: {:?}", e), - }) - })?); + fn finish_manifest_rewrite_stream( + shared: &Arc>>, + ) -> Result> { + let mut output = ManifestBatchBuilder::new(); + let mut guard = Self::lock_manifest_rewrite_shared(shared)?; + let mut index_data = guard.index_data.take().ok_or_else(|| { + lance_core::Error::from(NamespaceError::Internal { + message: "Manifest rewrite index state is unavailable".to_string(), + }) + })?; + guard.mutation.append_rows(&mut output, &mut index_data)?; + let result = guard.mutation.finish(); + let force_empty_batch = index_data.row_count == 0; + guard.result = Some(result); + guard.index_data = Some(index_data); + if output.is_empty() && !force_empty_batch { + Ok(None) + } else { + Ok(Some(output.finish()?)) } + } - Ok(batches) + fn manifest_rewrite_output_stream( + source: SendableRecordBatchStream, + shared: Arc>>, + ) -> SendableRecordBatchStream { + enum Phase { + Source, + Finish, + Done, + } + + let schema = Self::manifest_schema(); + let stream = stream::unfold( + (source, shared, Phase::Source), + |(mut source, shared, mut phase)| async move { + loop { + match phase { + Phase::Source => match source.next().await { + Some(Ok(batch)) => { + match Self::process_manifest_rewrite_batch(batch, &shared) { + Ok(Some(batch)) => { + return Some((Ok(batch), (source, shared, phase))); + } + Ok(None) => continue, + Err(err) => { + let message = err.to_string(); + Self::set_manifest_rewrite_error(&shared, err); + return Some(( + Err(DataFusionError::External(Box::new( + std::io::Error::other(message), + ))), + (source, shared, Phase::Done), + )); + } + } + } + Some(Err(err)) => { + return Some((Err(err), (source, shared, Phase::Done))); + } + None => phase = Phase::Finish, + }, + Phase::Finish => { + phase = Phase::Done; + match Self::finish_manifest_rewrite_stream(&shared) { + Ok(Some(batch)) => { + return Some((Ok(batch), (source, shared, phase))); + } + Ok(None) => continue, + Err(err) => { + let message = err.to_string(); + Self::set_manifest_rewrite_error(&shared, err); + return Some(( + Err(DataFusionError::External(Box::new( + std::io::Error::other(message), + ))), + (source, shared, Phase::Done), + )); + } + } + } + Phase::Done => return None, + } + } + }, + ); + Box::pin(DatafusionRecordBatchStreamAdapter::new( + schema, + stream.fuse(), + )) } - /// Helper to get a string column from a record batch - fn get_string_column<'a>(batch: &'a RecordBatch, column_name: &str) -> Result<&'a StringArray> { - let column = batch.column_by_name(column_name).ok_or_else(|| { + fn take_manifest_rewrite_result( + shared: &Arc>>, + ) -> Result<(CopyOnWriteMutation, ManifestIndexAccumulator)> { + let mut guard = Self::lock_manifest_rewrite_shared(shared)?; + let result = guard.result.take().ok_or_else(|| { lance_core::Error::from(NamespaceError::Internal { - message: format!("Column '{}' not found", column_name), + message: "Manifest rewrite stream did not finish".to_string(), }) })?; - column - .as_any() - .downcast_ref::() - .ok_or_else(|| { - lance_core::Error::from(NamespaceError::Internal { - message: format!("Column '{}' is not a string array", column_name), - }) + let index_data = guard.index_data.take().ok_or_else(|| { + lance_core::Error::from(NamespaceError::Internal { + message: "Manifest rewrite index state is unavailable".to_string(), }) + })?; + Ok((result, index_data)) + } + + async fn rewrite_manifest( + &self, + operation: &str, + mut make_mutation: F, + ) -> Result + where + M: ManifestStreamMutation + 'static, + F: FnMut() -> M, + { + let _mutation_guard = self.manifest_mutation_lock.lock().await; + let max_retries = self.manifest_rewrite_commit_retries(); + let mut retries = 0; + + let build_indices = self.inline_optimization_enabled; + + loop { + // First attempt uses cached dataset (no I/O). Retries reload to pick up + // the version that won the conflict. + let dataset_guard = if retries == 0 { + self.manifest_dataset.get_cached().await + } else { + self.manifest_dataset.get_refreshed().await? + }; + let dataset = Arc::new(dataset_guard.clone()); + drop(dataset_guard); + + let source = Self::manifest_projected_stream(&dataset).await?; + let shared = Arc::new(StdMutex::new(ManifestRewriteShared::new(make_mutation()))); + let output_stream = Self::manifest_rewrite_output_stream(source, shared.clone()); + let write_params = WriteParams { + mode: WriteMode::Overwrite, + session: self.session.clone(), + max_rows_per_file: u32::MAX as usize, + skip_auto_cleanup: true, + ..WriteParams::default() + }; + + let transaction = match InsertBuilder::new(dataset.clone()) + .with_params(&write_params) + .execute_uncommitted_stream(output_stream) + .await + { + Ok(transaction) => transaction, + Err(err) => { + if let Some(stream_err) = Self::take_manifest_rewrite_error(&shared)? { + return Err(stream_err); + } + return Err(convert_lance_commit_error(&err, operation, None)); + } + }; + + let (mutation, index_data) = Self::take_manifest_rewrite_result(&shared)?; + if !mutation.has_changes { + return Ok(mutation.result); + } + + // Extract fragments and schema from the overwrite transaction + let Operation::Overwrite { + fragments, schema, .. + } = transaction.operation + else { + return Err(NamespaceError::Internal { + message: "Manifest rewrite transaction is not an overwrite".to_string(), + } + .into()); + }; + let mut manifest = + Manifest::new_from_previous(dataset.manifest(), schema, Arc::new(fragments)); + + let indices = if build_indices { + let mut index_uuids = vec![]; + Some( + Self::build_manifest_indices(&dataset, &manifest, index_data, &mut index_uuids) + .await?, + ) + } else { + None + }; + + let object_store = dataset.object_store(None).await?; + let base_path = dataset.branch_location().path; + let naming_scheme = dataset.manifest_location().naming_scheme; + let config = ManifestWriteConfig::default(); + + let result = write_manifest_file( + &object_store, + dataset.commit_handler(), + &base_path, + &mut manifest, + indices, + &config, + naming_scheme, + None, + ) + .await; + + match result { + Ok(_) => { + // Promote cache to the committed version. checkout_version reads + // the manifest we just wrote, which is typically served from the + // object-store HTTP cache and avoids a full reload cycle. + if let Ok(new_dataset) = dataset.checkout_version(manifest.version).await { + self.manifest_dataset.set_latest(new_dataset).await; + } + return Ok(mutation.result); + } + Err(CommitError::CommitConflict) if retries < max_retries => { + retries += 1; + tokio::time::sleep(std::time::Duration::from_millis(10 * u64::from(retries))) + .await; + } + Err(err) => { + let lance_err: LanceError = err.into(); + return Err(convert_lance_commit_error(&lance_err, operation, None)); + } + } + } } /// Check if the manifest contains an object with the given ID @@ -855,10 +1907,9 @@ impl ManifestNamespace { let object_id_array = Self::get_string_column(&batch, "object_id")?; let location_array = Self::get_string_column(&batch, "location")?; - let metadata_array = Self::get_string_column(&batch, "metadata")?; + let metadata_values = Self::metadata_column_values(&batch, "metadata")?; let location = location_array.value(0).to_string(); - let metadata = if !metadata_array.is_null(0) { - let metadata_str = metadata_array.value(0); + let metadata = if let Some(metadata_str) = &metadata_values[0] { match serde_json::from_str::>(metadata_str) { Ok(map) => Some(map), Err(e) => { @@ -984,203 +2035,55 @@ impl ManifestNamespace { object_type: ObjectType, location: Option, ) -> Result<()> { - self.insert_into_manifest_with_metadata( - vec![ManifestEntry { - object_id, - object_type, - location, - metadata: None, - }], - None, - ) + self.insert_into_manifest_with_metadata(vec![ManifestEntry { + object_id, + object_type, + location, + metadata: None, + }]) .await } - /// Insert one or more entries into the manifest table with metadata and base_objects. + /// Insert one or more entries into the manifest table with metadata. /// /// This is the unified entry point for both single and batch inserts. - /// Uses a single MergeInsert operation to insert all entries at once. /// If any entry already exists (matching object_id), the entire batch fails. pub async fn insert_into_manifest_with_metadata( &self, entries: Vec, - base_objects: Option>, ) -> Result<()> { - self.merge_into_manifest_with_metadata(entries, base_objects, WhenMatched::Fail) + self.merge_into_manifest_with_metadata(entries, WhenMatched::Fail) .await } - async fn upsert_into_manifest_with_metadata( - &self, - entries: Vec, - base_objects: Option>, - ) -> Result<()> { - self.merge_into_manifest_with_metadata(entries, base_objects, WhenMatched::UpdateAll) + async fn upsert_into_manifest_with_metadata(&self, entries: Vec) -> Result<()> { + self.merge_into_manifest_with_metadata(entries, WhenMatched::UpdateAll) .await } async fn merge_into_manifest_with_metadata( &self, entries: Vec, - base_objects: Option>, when_matched: WhenMatched, ) -> Result<()> { if entries.is_empty() { return Ok(()); } - let schema = Self::manifest_schema(); - - let mut object_ids = Vec::with_capacity(entries.len()); - let mut object_types = Vec::with_capacity(entries.len()); - let mut locations: Vec> = Vec::with_capacity(entries.len()); - let mut metadatas: Vec> = Vec::with_capacity(entries.len()); - - let string_builder = StringBuilder::new(); - let mut list_builder = ListBuilder::new(string_builder).with_field(Arc::new(Field::new( - "object_id", - DataType::Utf8, - true, - ))); - - for (i, entry) in entries.iter().enumerate() { - object_ids.push(entry.object_id.as_str()); - object_types.push(entry.object_type.as_str()); - locations.push(entry.location.clone()); - metadatas.push(entry.metadata.clone()); - - // Only the first entry gets the base_objects (for single-entry inserts - // with base_objects like view creation); batch entries use null. - if i == 0 { - match &base_objects { - Some(objects) => { - for obj in objects { - list_builder.values().append_value(obj); - } - list_builder.append(true); - } - None => { - list_builder.append_null(); - } - } - } else { - list_builder.append_null(); - } - } - - let base_objects_array = list_builder.finish(); - - let location_array: Arc = Arc::new(StringArray::from( - locations.iter().map(|l| l.as_deref()).collect::>(), - )); - - let metadata_array: Arc = Arc::new(StringArray::from( - metadatas.iter().map(|m| m.as_deref()).collect::>(), - )); - - let batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(StringArray::from(object_ids)), - Arc::new(StringArray::from(object_types.to_vec())), - location_array, - metadata_array, - Arc::new(base_objects_array), - ], - ) - .map_err(|e| { - lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to create manifest entries: {:?}", e), - }) - })?; - - let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); - - // Use MergeInsert so callers can choose fail-on-existing inserts or metadata upserts. - let _mutation_guard = self.manifest_mutation_lock.lock().await; - let dataset_guard = self.manifest_dataset.get().await?; - let dataset_arc = Arc::new(dataset_guard.clone()); - drop(dataset_guard); // Drop read guard before merge insert - - let mut merge_builder = - MergeInsertBuilder::try_new(dataset_arc, vec!["object_id".to_string()]).map_err( - |e| { - lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to create merge builder: {:?}", e), - }) - }, - )?; - merge_builder.when_matched(when_matched); - merge_builder.when_not_matched(WhenNotMatched::InsertAll); - // Use conflict_retries to handle cross-process races on manifest mutations. - merge_builder.conflict_retries(5); - // TODO: after BTREE index creation on object_id, has_scalar_index=true causes - // MergeInsert to use V1 path which lacks bloom filters for conflict detection. This - // results in (Some, None) filter mismatch when rebasing against V2 operations. - // Setting use_index=false ensures all operations consistently use V2 path. - merge_builder.use_index(false); - if let Some(retries) = self.commit_retries { - merge_builder.commit_retries(retries); - } - - let (new_dataset_arc, _merge_stats) = merge_builder - .try_build() - .map_err(|e| { - lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to build merge: {:?}", e), - }) - })? - .execute_reader(Box::new(reader)) - .await - .map_err(|e| { - convert_lance_commit_error(&e, "Failed to execute merge insert into manifest", None) - })?; - - let new_dataset = Arc::try_unwrap(new_dataset_arc).unwrap_or_else(|arc| (*arc).clone()); - self.manifest_dataset.set_latest(new_dataset).await; - - // Run inline optimization after write - if let Err(e) = self.run_inline_optimization().await { - log::warn!( - "Unexpected failure when running inline optimization: {:?}", - e - ); - } - - Ok(()) + self.rewrite_manifest("Failed to overwrite manifest", || { + UpsertManifestMutation::new(entries.clone(), when_matched.clone()) + }) + .await } /// Delete an entry from the manifest table pub async fn delete_from_manifest(&self, object_id: &str) -> Result<()> { - let predicate = format!("object_id = '{}'", object_id); - - // Get dataset and use DeleteBuilder with configured retries - let _mutation_guard = self.manifest_mutation_lock.lock().await; - let dataset_guard = self.manifest_dataset.get().await?; - let dataset = Arc::new(dataset_guard.clone()); - drop(dataset_guard); // Drop read guard before delete - - let new_dataset = DeleteBuilder::new(dataset, &predicate) - .execute() - .await - .map_err(|e| convert_lance_commit_error(&e, "Failed to delete", None))?; - - // Update the wrapper with the new dataset - self.manifest_dataset - .set_latest( - Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()), - ) - .await; - - // Run inline optimization after delete - if let Err(e) = self.run_inline_optimization().await { - log::warn!( - "Unexpected failure when running inline optimization: {:?}", - e - ); - } - - Ok(()) + let object_id = object_id.to_string(); + self.rewrite_manifest("Failed to delete from manifest", || DeleteObjectMutation { + object_id: object_id.clone(), + deleted: false, + }) + .await } /// Query the manifest for all versions of a table, sorted by version. @@ -1223,13 +2126,12 @@ impl ManifestNamespace { continue; } let object_id_array = Self::get_string_column(&batch, "object_id")?; - let metadata_array = Self::get_string_column(&batch, "metadata")?; - for i in 0..batch.num_rows() { + let metadata_values = Self::metadata_column_values(&batch, "metadata")?; + for (i, metadata) in metadata_values.iter().enumerate().take(batch.num_rows()) { let oid = object_id_array.value(i); // Parse version from object_id if let Some(version) = Self::parse_version_from_object_id(oid) { - let metadata_str = metadata_array.value(i).to_string(); - versions.push((version, metadata_str)); + versions.push((version, metadata.clone().unwrap_or_default())); } } } @@ -1289,103 +2191,79 @@ impl ManifestNamespace { if batch.num_rows() == 0 { continue; } - let metadata_array = Self::get_string_column(&batch, "metadata")?; - return Ok(Some(metadata_array.value(0).to_string())); + let metadata_values = Self::metadata_column_values(&batch, "metadata")?; + return Ok(metadata_values[0].clone()); } Ok(None) } + async fn delete_table_version_rows_by_object_ids(&self, object_ids: &[String]) -> Result { + let object_ids = object_ids.iter().cloned().collect::>(); + self.rewrite_manifest("Failed to delete table versions from manifest", || { + DeleteTableVersionsMutation { + target: DeleteTableVersionsTarget::ObjectIds(object_ids.clone()), + deleted_count: 0, + } + }) + .await + } + /// Delete table version entries from the manifest for a given table and version ranges. /// /// Each range is (start_version, end_version) inclusive. Deletes all matching /// `object_type = 'table_version'` entries whose object_id matches /// `{object_id}${zero_padded_version}`. /// - /// Builds a single filter expression covering all version ranges and executes - /// one bulk delete operation instead of deleting versions one at a time. + /// Applies the ranges while streaming the manifest rewrite, without expanding + /// sparse ranges into every possible version object id. pub async fn delete_table_versions( &self, object_id: &str, ranges: &[(i64, i64)], ) -> Result { - if ranges.is_empty() { - return Ok(0); - } - - // Collect all object_ids to delete (both new zero-padded and legacy formats) - let mut object_id_conditions: Vec = Vec::new(); - for (start, end) in ranges { - for version in *start..=*end { - let oid = Self::build_version_object_id(object_id, version); - let escaped = oid.replace('\'', "''"); - object_id_conditions.push(format!("'{}'", escaped)); - } - } - - if object_id_conditions.is_empty() { - return Ok(0); - } - - // First, count how many entries exist so we can report the deleted count - let in_list = object_id_conditions.join(", "); - let filter = format!( - "object_type = 'table_version' AND object_id IN ({})", - in_list - ); - - let mut scanner = self.manifest_scanner().await?; - scanner.filter(&filter).map_err(|e| { - lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to filter: {:?}", e), - }) - })?; - scanner.project(&["object_id", "location"]).map_err(|e| { - lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to project: {:?}", e), - }) - })?; - let batches = Self::execute_scanner(scanner).await?; - let deleted_count: i64 = batches.iter().map(|b| b.num_rows() as i64).sum(); - - if deleted_count == 0 { - return Ok(0); - } - - // Execute a single bulk delete with the combined filter - let _mutation_guard = self.manifest_mutation_lock.lock().await; - let dataset_guard = self.manifest_dataset.get().await?; - let dataset = Arc::new(dataset_guard.clone()); - drop(dataset_guard); - - let new_dataset = DeleteBuilder::new(dataset, &filter) - .execute() + self.batch_delete_table_versions_by_ranges(&[(object_id.to_string(), ranges.to_vec())]) .await - .map_err(|e| { - convert_lance_commit_error(&e, "Failed to batch delete table versions", None) - })?; - - self.manifest_dataset - .set_latest( - Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()), - ) - .await; + } - if let Err(e) = self.run_inline_optimization().await { - log::warn!( - "Unexpected failure when running inline optimization: {:?}", - e - ); + /// Atomically delete table version entries from the manifest for multiple + /// tables and version ranges. + pub async fn batch_delete_table_versions_by_ranges( + &self, + table_ranges: &[(String, Vec<(i64, i64)>)], + ) -> Result { + let targets = table_ranges + .iter() + .filter_map(|(object_id, ranges)| { + let ranges = Self::normalize_table_version_ranges(ranges); + if ranges.is_empty() { + None + } else { + Some(DeleteTableVersionRangeTarget { + object_id_prefix: Self::build_version_object_id_prefix(object_id), + ranges, + }) + } + }) + .collect::>(); + if targets.is_empty() { + return Ok(0); } - Ok(deleted_count) + self.rewrite_manifest("Failed to delete table versions from manifest", || { + DeleteTableVersionsMutation { + target: DeleteTableVersionsTarget::Ranges(targets.clone()), + deleted_count: 0, + } + }) + .await } /// Atomically delete table version entries from the manifest by their object_ids. /// /// This method supports multi-table transactional deletion: all specified /// object_ids (which may span multiple tables) are deleted in a single atomic - /// `DeleteBuilder` operation. Either all entries are removed or none are. + /// copy-on-write manifest rewrite. Either all entries are removed or none are. /// /// Object IDs are formatted as `{table_id}${version}`. pub async fn batch_delete_table_versions_by_object_ids( @@ -1396,70 +2274,8 @@ impl ManifestNamespace { return Ok(0); } - let in_list: String = object_ids - .iter() - .map(|oid| { - let escaped = oid.replace('\'', "''"); - format!("'{}'", escaped) - }) - .collect::>() - .join(", "); - - let filter = format!( - "object_type = 'table_version' AND object_id IN ({})", - in_list - ); - - // Count how many entries exist so we can report the deleted count - let mut scanner = self.manifest_scanner().await?; - scanner.filter(&filter).map_err(|e| { - lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to filter: {:?}", e), - }) - })?; - scanner.project(&["object_id", "location"]).map_err(|e| { - lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to project: {:?}", e), - }) - })?; - let batches = Self::execute_scanner(scanner).await?; - let deleted_count: i64 = batches.iter().map(|b| b.num_rows() as i64).sum(); - - if deleted_count == 0 { - return Ok(0); - } - - // Execute a single atomic bulk delete covering all tables - let _mutation_guard = self.manifest_mutation_lock.lock().await; - let dataset_guard = self.manifest_dataset.get().await?; - let dataset = Arc::new(dataset_guard.clone()); - drop(dataset_guard); - - let new_dataset = DeleteBuilder::new(dataset, &filter) - .execute() + self.delete_table_version_rows_by_object_ids(object_ids) .await - .map_err(|e| { - convert_lance_commit_error( - &e, - "Failed to batch delete table versions across multiple tables", - None, - ) - })?; - - self.manifest_dataset - .set_latest( - Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()), - ) - .await; - - if let Err(e) = self.run_inline_optimization().await { - log::warn!( - "Unexpected failure when running inline optimization: {:?}", - e - ); - } - - Ok(deleted_count) } /// Set a property flag in the __manifest table's metadata key-value map. @@ -1652,11 +2468,10 @@ impl ManifestNamespace { } let object_id_array = Self::get_string_column(&batch, "object_id")?; - let metadata_array = Self::get_string_column(&batch, "metadata")?; + let metadata_values = Self::metadata_column_values(&batch, "metadata")?; let object_id_str = object_id_array.value(0); - let metadata = if !metadata_array.is_null(0) { - let metadata_str = metadata_array.value(0); + let metadata = if let Some(metadata_str) = &metadata_values[0] { match serde_json::from_str::>(metadata_str) { Ok(map) => Some(map), Err(e) => { @@ -2293,15 +3108,12 @@ impl LanceNamespace for ManifestNamespace { if overwriting_existing_table { let metadata = Self::serialize_metadata(request.properties.as_ref(), "table", &object_id)?; - self.upsert_into_manifest_with_metadata( - vec![ManifestEntry { - object_id, - object_type: ObjectType::Table, - location: Some(dir_name), - metadata, - }], - None, - ) + self.upsert_into_manifest_with_metadata(vec![ManifestEntry { + object_id, + object_type: ObjectType::Table, + location: Some(dir_name), + metadata, + }]) .await?; Ok(CreateTableResponse { @@ -2324,15 +3136,12 @@ impl LanceNamespace for ManifestNamespace { let metadata = Self::serialize_metadata(request.properties.as_ref(), "table", &object_id)?; // Register in manifest (store dir_name, not full URI) - self.insert_into_manifest_with_metadata( - vec![ManifestEntry { - object_id, - object_type: ObjectType::Table, - location: Some(dir_name.clone()), - metadata, - }], - None, - ) + self.insert_into_manifest_with_metadata(vec![ManifestEntry { + object_id, + object_type: ObjectType::Table, + location: Some(dir_name.clone()), + metadata, + }]) .await?; Ok(CreateTableResponse { @@ -2531,15 +3340,12 @@ impl LanceNamespace for ManifestNamespace { let metadata = Self::serialize_metadata(request.properties.as_ref(), "namespace", &object_id)?; - self.insert_into_manifest_with_metadata( - vec![ManifestEntry { - object_id, - object_type: ObjectType::Namespace, - location: None, - metadata, - }], - None, - ) + self.insert_into_manifest_with_metadata(vec![ManifestEntry { + object_id, + object_type: ObjectType::Namespace, + location: None, + metadata, + }]) .await?; Ok(CreateNamespaceResponse { @@ -2711,15 +3517,12 @@ impl LanceNamespace for ManifestNamespace { let metadata = Self::serialize_metadata(request.properties.as_ref(), "table", &object_id)?; // Add entry to manifest marking this as a declared table (store dir_name, not full path) - self.insert_into_manifest_with_metadata( - vec![ManifestEntry { - object_id, - object_type: ObjectType::Table, - location: Some(dir_name), - metadata, - }], - None, - ) + self.insert_into_manifest_with_metadata(vec![ManifestEntry { + object_id, + object_type: ObjectType::Table, + location: Some(dir_name), + metadata, + }]) .await?; log::info!( @@ -2856,93 +3659,439 @@ impl LanceNamespace for ManifestNamespace { } }; - Ok(DeregisterTableResponse { - id: request.id.clone(), - location: Some(table_uri), - ..Default::default() - }) + Ok(DeregisterTableResponse { + id: request.id.clone(), + location: Some(table_uri), + ..Default::default() + }) + } +} + +#[cfg(test)] +mod tests { + use super::{ + MANIFEST_TABLE_NAME, METADATA_INDEX_NAME, OBJECT_ID_INDEX_NAME, OBJECT_TYPE_INDEX_NAME, + ObjectType, convert_lance_commit_error, + }; + use crate::{DirectoryNamespaceBuilder, ManifestNamespace}; + use arrow::datatypes::DataType; + use bytes::Bytes; + use lance::dataset::builder::DatasetBuilder; + use lance::index::DatasetIndexExt; + use lance_core::utils::tempfile::TempStdDir; + use lance_namespace::LanceNamespace; + use lance_namespace::models::{ + CreateNamespaceRequest, CreateTableRequest, DescribeTableRequest, DropTableRequest, + ListNamespacesRequest, ListTablesRequest, TableExistsRequest, + }; + use rstest::rstest; + use std::collections::HashMap; + + fn create_test_ipc_data() -> Vec { + use arrow::array::{Int32Array, StringArray}; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::ipc::writer::StreamWriter; + use arrow::record_batch::RecordBatch; + use std::sync::Arc; + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ], + ) + .unwrap(); + + let mut buffer = Vec::new(); + { + let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + } + buffer + } + + fn namespace_request(name: &str) -> CreateNamespaceRequest { + let mut request = CreateNamespaceRequest::new(); + request.id = Some(vec![name.to_string()]); + request + } + + async fn load_manifest_dataset(root: &str) -> lance::Dataset { + DatasetBuilder::from_uri(format!("{}/{}", root, MANIFEST_TABLE_NAME)) + .load() + .await + .unwrap() + } + + async fn manifest_object_ids(root: &str) -> Vec { + let dataset = load_manifest_dataset(root).await; + let mut scanner = dataset.scan(); + scanner.project(&["object_id"]).unwrap(); + let batches = ManifestNamespace::execute_scanner(scanner).await.unwrap(); + let mut ids = Vec::new(); + for batch in batches { + let object_ids = ManifestNamespace::get_string_column(&batch, "object_id").unwrap(); + for row in 0..batch.num_rows() { + ids.push(object_ids.value(row).to_string()); + } + } + ids + } + + #[test] + fn test_retryable_commit_conflict_maps_to_throttling() { + let conflict = lance_core::Error::retryable_commit_conflict_source( + 1, + Box::new(std::io::Error::other("conflict")), + ); + let converted = convert_lance_commit_error(&conflict, "test operation", None); + + let lance_core::Error::Namespace { source, .. } = converted else { + panic!("expected namespace error"); + }; + let namespace_error = source.downcast_ref::(); + assert!(matches!( + namespace_error, + Some(lance_namespace::NamespaceError::Throttling { .. }) + )); + } + + #[rstest] + #[case::with_optimization(true)] + #[case::without_optimization(false)] + #[tokio::test] + async fn test_manifest_namespace_basic_create_and_list(#[case] inline_optimization: bool) { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + + // Create a DirectoryNamespace with manifest enabled (default) + let dir_namespace = DirectoryNamespaceBuilder::new(temp_path) + .inline_optimization_enabled(inline_optimization) + .build() + .await + .unwrap(); + + // Verify we can list tables (should be empty) + let mut request = ListTablesRequest::new(); + request.id = Some(vec![]); + let response = dir_namespace.list_tables(request).await.unwrap(); + assert_eq!(response.tables.len(), 0); + + // Create a test table + let buffer = create_test_ipc_data(); + let mut create_request = CreateTableRequest::new(); + create_request.id = Some(vec!["test_table".to_string()]); + + let _response = dir_namespace + .create_table(create_request, Bytes::from(buffer)) + .await + .unwrap(); + + // List tables again - should see our new table + let mut request = ListTablesRequest::new(); + request.id = Some(vec![]); + let response = dir_namespace.list_tables(request).await.unwrap(); + assert_eq!(response.tables.len(), 1); + assert_eq!(response.tables[0], "test_table"); + } + + #[tokio::test] + async fn test_manifest_rewrite_from_properties() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + + let mut properties = HashMap::new(); + properties.insert("root".to_string(), temp_path.to_string()); + properties.insert( + "inline_optimization_enabled".to_string(), + "true".to_string(), + ); + + let namespace = DirectoryNamespaceBuilder::from_properties(properties, None) + .unwrap() + .build() + .await + .unwrap(); + + namespace + .create_namespace(namespace_request("workspace")) + .await + .unwrap(); + + let mut request = ListNamespacesRequest::new(); + request.id = Some(vec![]); + let response = namespace.list_namespaces(request).await.unwrap(); + assert_eq!(response.namespaces, vec!["workspace".to_string()]); + + let dataset = load_manifest_dataset(temp_path).await; + let metadata_field = dataset.schema().field("metadata").unwrap(); + assert_eq!(metadata_field.logical_type.to_string(), "json"); + assert!( + dataset.schema().field("base_objects").is_none(), + "manifest rewrites should drop the old base_objects column" + ); + + let indices = dataset.load_indices().await.unwrap(); + assert!( + indices + .iter() + .any(|index| index.name == OBJECT_ID_INDEX_NAME), + "manifest rewrites should create the object_id btree index synchronously" + ); + assert!( + indices + .iter() + .any(|index| index.name == OBJECT_TYPE_INDEX_NAME), + "manifest rewrites should create the object_type bitmap index synchronously" + ); + assert!( + indices + .iter() + .any(|index| index.name == METADATA_INDEX_NAME), + "manifest rewrites should create the metadata JSON FTS index synchronously" + ); + assert!( + indices.iter().all(|index| index.fields.len() == 1), + "manifest rewrite indexes should be single-column indexes" + ); + } + + #[tokio::test] + async fn test_manifest_rewrite_overwrites_compact_ordered_snapshot() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + + let namespace = DirectoryNamespaceBuilder::new(temp_path) + .inline_optimization_enabled(true) + .build() + .await + .unwrap(); + + for name in ["zeta", "alpha", "middle"] { + namespace + .create_namespace(namespace_request(name)) + .await + .unwrap(); + } + + let dataset = load_manifest_dataset(temp_path).await; + assert_eq!( + dataset.manifest().fragments.len(), + 1, + "copy-on-write manifest should keep the current snapshot compact" + ); + + let object_ids = manifest_object_ids(temp_path).await; + assert_eq!( + object_ids, + vec![ + "zeta".to_string(), + "alpha".to_string(), + "middle".to_string() + ] + ); + + let indices = dataset.load_indices().await.unwrap(); + assert!( + indices + .iter() + .any(|index| index.name == OBJECT_ID_INDEX_NAME), + "manifest rewrites should maintain the object_id btree index" + ); + assert!( + indices + .iter() + .any(|index| index.name == OBJECT_TYPE_INDEX_NAME), + "manifest rewrites should maintain the object_type bitmap index" + ); + assert!( + indices + .iter() + .any(|index| index.name == METADATA_INDEX_NAME), + "manifest rewrites should maintain the metadata JSON FTS index" + ); + } + + #[tokio::test] + async fn test_manifest_rewrite_table_lifecycle() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + + let namespace = DirectoryNamespaceBuilder::new(temp_path) + .build() + .await + .unwrap(); + + let mut create_request = CreateTableRequest::new(); + create_request.id = Some(vec!["test_table".to_string()]); + namespace + .create_table(create_request, Bytes::from(create_test_ipc_data())) + .await + .unwrap(); + + let mut list_request = ListTablesRequest::new(); + list_request.id = Some(vec![]); + let response = namespace.list_tables(list_request).await.unwrap(); + assert_eq!(response.tables, vec!["test_table".to_string()]); + + let mut drop_request = DropTableRequest::new(); + drop_request.id = Some(vec!["test_table".to_string()]); + namespace.drop_table(drop_request).await.unwrap(); + + let mut list_request = ListTablesRequest::new(); + list_request.id = Some(vec![]); + let response = namespace.list_tables(list_request).await.unwrap(); + assert!(response.tables.is_empty()); + + let object_ids = manifest_object_ids(temp_path).await; + assert!(object_ids.is_empty()); + } + + #[tokio::test] + async fn test_manifest_rewrite_table_version_query_preserves_null_metadata_row() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + + let namespace = DirectoryNamespaceBuilder::new(temp_path) + .build() + .await + .unwrap(); + let table_object_id = "table"; + let manifest_ns = namespace.manifest_ns.as_ref().unwrap(); + manifest_ns + .insert_into_manifest_with_metadata(vec![super::ManifestEntry { + object_id: ManifestNamespace::build_version_object_id(table_object_id, 1), + object_type: super::ObjectType::TableVersion, + location: None, + metadata: None, + }]) + .await + .unwrap(); + + let versions = manifest_ns + .query_table_versions(table_object_id, false, None) + .await + .unwrap(); + assert_eq!(versions, vec![(1, String::new())]); } -} -#[cfg(test)] -mod tests { - use crate::{DirectoryNamespaceBuilder, ManifestNamespace}; - use bytes::Bytes; - use lance_core::utils::tempfile::TempStdDir; - use lance_namespace::LanceNamespace; - use lance_namespace::models::{ - CreateNamespaceRequest, CreateTableRequest, DescribeTableRequest, DropTableRequest, - ListTablesRequest, TableExistsRequest, - }; - use rstest::rstest; + #[tokio::test] + async fn test_manifest_delete_table_versions_streams_large_ranges() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); - fn create_test_ipc_data() -> Vec { - use arrow::array::{Int32Array, StringArray}; - use arrow::datatypes::{DataType, Field, Schema}; - use arrow::ipc::writer::StreamWriter; - use arrow::record_batch::RecordBatch; - use std::sync::Arc; + let namespace = DirectoryNamespaceBuilder::new(temp_path) + .build() + .await + .unwrap(); + let manifest_ns = namespace.manifest_ns.as_ref().unwrap(); + manifest_ns + .insert_into_manifest_with_metadata(vec![ + super::ManifestEntry { + object_id: ManifestNamespace::build_version_object_id("table", 1), + object_type: super::ObjectType::TableVersion, + location: None, + metadata: Some("{}".to_string()), + }, + super::ManifestEntry { + object_id: ManifestNamespace::build_version_object_id("table", 10), + object_type: super::ObjectType::TableVersion, + location: None, + metadata: Some("{}".to_string()), + }, + super::ManifestEntry { + object_id: ManifestNamespace::build_version_object_id("other", 2), + object_type: super::ObjectType::TableVersion, + location: None, + metadata: Some("{}".to_string()), + }, + ]) + .await + .unwrap(); - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("name", DataType::Utf8, false), - ])); + let deleted = manifest_ns + .delete_table_versions("table", &[(1, 1_000_000_000)]) + .await + .unwrap(); + assert_eq!(deleted, 2); - let batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(Int32Array::from(vec![1, 2, 3])), - Arc::new(StringArray::from(vec!["a", "b", "c"])), - ], - ) - .unwrap(); + let object_ids = manifest_object_ids(temp_path).await; + assert_eq!( + object_ids, + vec![ManifestNamespace::build_version_object_id("other", 2)] + ); - let mut buffer = Vec::new(); - { - let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap(); - writer.write(&batch).unwrap(); - writer.finish().unwrap(); - } - buffer + let deleted = manifest_ns + .batch_delete_table_versions_by_ranges(&[("other".to_string(), vec![(2, 2)])]) + .await + .unwrap(); + assert_eq!(deleted, 1); + assert!(manifest_object_ids(temp_path).await.is_empty()); } - #[rstest] - #[case::with_optimization(true)] - #[case::without_optimization(false)] #[tokio::test] - async fn test_manifest_namespace_basic_create_and_list(#[case] inline_optimization: bool) { + async fn test_manifest_rewrite_concurrent_compatible_creates_succeed() { let temp_dir = TempStdDir::default(); let temp_path = temp_dir.to_str().unwrap(); - // Create a DirectoryNamespace with manifest enabled (default) - let dir_namespace = DirectoryNamespaceBuilder::new(temp_path) - .inline_optimization_enabled(inline_optimization) + let first = DirectoryNamespaceBuilder::new(temp_path) + .build() + .await + .unwrap(); + let second = DirectoryNamespaceBuilder::new(temp_path) .build() .await .unwrap(); - // Verify we can list tables (should be empty) - let mut request = ListTablesRequest::new(); + let (alpha, beta) = tokio::join!( + first.create_namespace(namespace_request("alpha")), + second.create_namespace(namespace_request("beta")) + ); + + alpha.unwrap(); + beta.unwrap(); + + let mut request = ListNamespacesRequest::new(); request.id = Some(vec![]); - let response = dir_namespace.list_tables(request).await.unwrap(); - assert_eq!(response.tables.len(), 0); + let mut response = first.list_namespaces(request).await.unwrap(); + response.namespaces.sort(); + assert_eq!( + response.namespaces, + vec!["alpha".to_string(), "beta".to_string()] + ); + } - // Create a test table - let buffer = create_test_ipc_data(); - let mut create_request = CreateTableRequest::new(); - create_request.id = Some(vec!["test_table".to_string()]); + #[tokio::test] + async fn test_manifest_rewrite_concurrent_duplicate_create_fails() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); - let _response = dir_namespace - .create_table(create_request, Bytes::from(buffer)) + let first = DirectoryNamespaceBuilder::new(temp_path) + .build() + .await + .unwrap(); + let second = DirectoryNamespaceBuilder::new(temp_path) + .build() .await .unwrap(); - // List tables again - should see our new table - let mut request = ListTablesRequest::new(); - request.id = Some(vec![]); - let response = dir_namespace.list_tables(request).await.unwrap(); - assert_eq!(response.tables.len(), 1); - assert_eq!(response.tables[0], "test_table"); + let (first_result, second_result) = tokio::join!( + first.create_namespace(namespace_request("duplicate")), + second.create_namespace(namespace_request("duplicate")) + ); + + let results = [first_result, second_result]; + let ok_count = results.iter().filter(|result| result.is_ok()).count(); + let err_count = results.iter().filter(|result| result.is_err()).count(); + assert_eq!(ok_count, 1); + assert_eq!(err_count, 1); } #[rstest] @@ -4069,4 +5218,366 @@ mod tests { assert_eq!(n, names(&["c", "d"])); assert_eq!(next, Some("d".to_string())); } + + /// Seed a __manifest dataset using the legacy schema (Utf8 metadata + + /// base_objects List column) so that backward-compat tests start from a + /// real pre-migration state. + async fn seed_old_schema_manifest( + root: &str, + rows: &[(&str, &str, Option<&str>, Option<&str>)], + ) { + use arrow::array::builder::{ListBuilder, StringBuilder}; + use arrow::array::{RecordBatchIterator, StringArray}; + use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + use lance::dataset::WriteParams; + use std::sync::Arc; + + let old_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("object_id", DataType::Utf8, false), + Field::new("object_type", DataType::Utf8, false), + Field::new("location", DataType::Utf8, true), + Field::new("metadata", DataType::Utf8, true), + Field::new( + "base_objects", + DataType::List(Arc::new(Field::new("object_id", DataType::Utf8, true))), + true, + ), + ])); + + let mut object_ids = Vec::new(); + let mut object_types = Vec::new(); + let mut locations: Vec> = Vec::new(); + let mut metadatas: Vec> = Vec::new(); + let mut base_objects_builder = ListBuilder::new(StringBuilder::new()) + .with_field(Field::new("object_id", DataType::Utf8, true)); + + for &(oid, otype, loc, meta) in rows { + object_ids.push(oid.to_string()); + object_types.push(otype.to_string()); + locations.push(loc.map(ToString::to_string)); + metadatas.push(meta.map(ToString::to_string)); + base_objects_builder.append(true); + } + + let batch = arrow::array::RecordBatch::try_new( + old_schema.clone(), + vec![ + Arc::new(StringArray::from(object_ids)), + Arc::new(StringArray::from(object_types)), + Arc::new(StringArray::from(locations)), + Arc::new(StringArray::from(metadatas)), + Arc::new(base_objects_builder.finish()), + ], + ) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], old_schema); + let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME); + let write_params = WriteParams::default(); + lance::Dataset::write(Box::new(reader), &manifest_path, Some(write_params)) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_backward_compat_read_old_schema_manifest() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + + let meta_json = r#"{"key":"value"}"#; + seed_old_schema_manifest( + temp_path, + &[ + ("ns1", ObjectType::Namespace.as_str(), None, None), + ( + "my_table", + ObjectType::Table.as_str(), + Some("some/location"), + Some(meta_json), + ), + ], + ) + .await; + + // Verify old dataset has Utf8 metadata and base_objects columns + let old_dataset = load_manifest_dataset(temp_path).await; + assert!(old_dataset.schema().field("base_objects").is_some()); + assert_eq!( + old_dataset.schema().field("metadata").unwrap().data_type(), + DataType::Utf8, + ); + + // Open via the namespace builder — read path should work. + // Disable dir listing so list_tables goes through the manifest path. + let namespace = DirectoryNamespaceBuilder::new(temp_path) + .dir_listing_enabled(false) + .build() + .await + .unwrap(); + + let mut list_ns_req = ListNamespacesRequest::new(); + list_ns_req.id = Some(vec![]); + let ns_response = namespace.list_namespaces(list_ns_req).await.unwrap(); + assert_eq!(ns_response.namespaces, vec!["ns1".to_string()]); + + let mut list_tables_req = ListTablesRequest::new(); + list_tables_req.id = Some(vec![]); + let table_response = namespace.list_tables(list_tables_req).await.unwrap(); + assert_eq!(table_response.tables, vec!["my_table".to_string()]); + + let describe_req = DescribeTableRequest { + id: Some(vec!["my_table".to_string()]), + ..Default::default() + }; + let describe_resp = namespace.describe_table(describe_req).await.unwrap(); + let properties = describe_resp.properties.unwrap(); + assert_eq!(properties.get("key").unwrap(), "value"); + } + + #[tokio::test] + async fn test_backward_compat_write_migrates_old_schema() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + + seed_old_schema_manifest( + temp_path, + &[("existing_ns", ObjectType::Namespace.as_str(), None, None)], + ) + .await; + + // Open namespace and perform a write (which triggers copy-on-write rewrite) + let namespace = DirectoryNamespaceBuilder::new(temp_path) + .build() + .await + .unwrap(); + + namespace + .create_namespace(namespace_request("new_ns")) + .await + .unwrap(); + + // After the write, the manifest should have been rewritten with new schema + let dataset = load_manifest_dataset(temp_path).await; + assert!( + dataset.schema().field("base_objects").is_none(), + "copy-on-write rewrite should drop the old base_objects column" + ); + let metadata_field = dataset.schema().field("metadata").unwrap(); + assert_eq!( + metadata_field.logical_type.to_string(), + "json", + "copy-on-write rewrite should migrate metadata to JSON" + ); + + // Verify both old and new rows are present + let mut list_ns_req = ListNamespacesRequest::new(); + list_ns_req.id = Some(vec![]); + let response = namespace.list_namespaces(list_ns_req).await.unwrap(); + assert!(response.namespaces.contains(&"existing_ns".to_string())); + assert!(response.namespaces.contains(&"new_ns".to_string())); + + // Verify indices are built + let indices = dataset.load_indices().await.unwrap(); + assert!(indices.iter().any(|i| i.name == OBJECT_ID_INDEX_NAME)); + assert!(indices.iter().any(|i| i.name == OBJECT_TYPE_INDEX_NAME)); + assert!(indices.iter().any(|i| i.name == METADATA_INDEX_NAME)); + } + + #[tokio::test] + async fn test_manifest_indices_are_complete_and_versioned() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + + let namespace = DirectoryNamespaceBuilder::new(temp_path) + .inline_optimization_enabled(true) + .build() + .await + .unwrap(); + + // Create enough objects to exercise all index types + namespace + .create_namespace(namespace_request("ns1")) + .await + .unwrap(); + namespace + .create_namespace(namespace_request("ns2")) + .await + .unwrap(); + let buffer = create_test_ipc_data(); + let mut props1 = HashMap::new(); + props1.insert("env".to_string(), "prod".to_string()); + props1.insert("owner".to_string(), "alice".to_string()); + let mut req = CreateTableRequest::new(); + req.id = Some(vec!["tbl1".to_string()]); + req.properties = Some(props1); + namespace + .create_table(req, Bytes::from(buffer.clone())) + .await + .unwrap(); + let mut props2 = HashMap::new(); + props2.insert("env".to_string(), "staging".to_string()); + props2.insert("owner".to_string(), "bob".to_string()); + let mut req = CreateTableRequest::new(); + req.id = Some(vec!["tbl2".to_string()]); + req.properties = Some(props2); + namespace + .create_table(req, Bytes::from(buffer)) + .await + .unwrap(); + + let dataset = load_manifest_dataset(temp_path).await; + let indices = dataset.load_indices().await.unwrap(); + + // All 3 indices must exist + assert_eq!( + indices.len(), + 3, + "Expected exactly 3 indices, got: {:?}", + indices.iter().map(|i| &i.name).collect::>() + ); + + let btree = indices + .iter() + .find(|i| i.name == OBJECT_ID_INDEX_NAME) + .expect("object_id btree index missing"); + let bitmap = indices + .iter() + .find(|i| i.name == OBJECT_TYPE_INDEX_NAME) + .expect("object_type bitmap index missing"); + let fts = indices + .iter() + .find(|i| i.name == METADATA_INDEX_NAME) + .expect("metadata FTS index missing"); + + // Each index covers exactly one column + assert_eq!(btree.fields.len(), 1); + assert_eq!(bitmap.fields.len(), 1); + assert_eq!(fts.fields.len(), 1); + + // All indices are stamped with the current dataset version + let version = dataset.manifest().version; + assert_eq!( + btree.dataset_version, version, + "btree index dataset_version mismatch" + ); + assert_eq!( + bitmap.dataset_version, version, + "bitmap index dataset_version mismatch" + ); + assert_eq!( + fts.dataset_version, version, + "fts index dataset_version mismatch" + ); + + // All indices must have fragment bitmaps covering the single manifest fragment + for idx in [btree, bitmap, fts] { + let bitmap = idx + .fragment_bitmap + .as_ref() + .unwrap_or_else(|| panic!("{} should have a fragment bitmap", idx.name)); + assert_eq!( + bitmap.len(), + dataset.manifest().fragments.len() as u64, + "{} fragment bitmap length mismatch", + idx.name + ); + } + + // All indices reference distinct UUIDs + let mut uuids: Vec<_> = indices.iter().map(|i| i.uuid).collect(); + uuids.sort(); + uuids.dedup(); + assert_eq!(uuids.len(), 3, "index UUIDs should be unique"); + } + + #[tokio::test] + async fn test_manifest_reads_use_indexed_scans() { + use lance_index::scalar::FullTextSearchQuery; + + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + + let namespace = DirectoryNamespaceBuilder::new(temp_path) + .inline_optimization_enabled(true) + .build() + .await + .unwrap(); + + // Populate manifest with a namespace and tables with metadata + namespace + .create_namespace(namespace_request("ns")) + .await + .unwrap(); + let buffer = create_test_ipc_data(); + let mut props = HashMap::new(); + props.insert( + "description".to_string(), + "important production data".to_string(), + ); + let mut req = CreateTableRequest::new(); + req.id = Some(vec!["my_table".to_string()]); + req.properties = Some(props); + namespace + .create_table(req, Bytes::from(buffer)) + .await + .unwrap(); + + let dataset = load_manifest_dataset(temp_path).await; + + // 1. object_id equality filter should use BTree (ScalarIndexQuery) + { + let mut scanner = dataset.scan(); + scanner.filter("object_id = 'ns'").unwrap(); + scanner.prefilter(true); + let plan = scanner.explain_plan(true).await.unwrap(); + assert!( + plan.contains("ScalarIndexQuery"), + "object_id filter should use ScalarIndexQuery (btree), plan:\n{}", + plan + ); + } + + // 2. object_type equality filter should use Bitmap (ScalarIndexQuery) + { + let mut scanner = dataset.scan(); + scanner.filter("object_type = 'table'").unwrap(); + scanner.prefilter(true); + let plan = scanner.explain_plan(true).await.unwrap(); + assert!( + plan.contains("ScalarIndexQuery"), + "object_type filter should use ScalarIndexQuery (bitmap), plan:\n{}", + plan + ); + } + + // 3. Combined object_id + object_type filter should use ScalarIndexQuery + { + let mut scanner = dataset.scan(); + scanner + .filter("object_id = 'my_table' AND object_type = 'table'") + .unwrap(); + scanner.prefilter(true); + let plan = scanner.explain_plan(true).await.unwrap(); + assert!( + plan.contains("ScalarIndexQuery"), + "combined filter should use ScalarIndexQuery, plan:\n{}", + plan + ); + } + + // 4. FTS query on metadata should use the inverted index (MatchQuery) + { + let mut scanner = dataset.scan(); + let query = FullTextSearchQuery::new("production".to_owned()) + .with_column("metadata".to_string()) + .unwrap(); + scanner.full_text_search(query).unwrap(); + let plan = scanner.explain_plan(true).await.unwrap(); + assert!( + plan.contains("MatchQuery"), + "metadata FTS query should use MatchQuery (inverted index), plan:\n{}", + plan + ); + } + } } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 8abb5975fdd..93c366e4a4c 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -965,6 +965,10 @@ impl Dataset { &self.manifest_location } + pub fn commit_handler(&self) -> &dyn CommitHandler { + self.commit_handler.as_ref() + } + /// Create a [`delta::DatasetDeltaBuilder`] to explore changes between dataset versions. /// /// # Example @@ -1344,6 +1348,7 @@ impl Dataset { commit_config, self.manifest_location.naming_scheme, None, + None, ) .await?; @@ -3307,7 +3312,7 @@ impl DatasetTakeRows for Dataset { } #[derive(Debug)] -pub(crate) struct ManifestWriteConfig { +pub struct ManifestWriteConfig { auto_set_feature_flags: bool, // default true timestamp: Option, // default None use_stable_row_ids: bool, // default false @@ -3337,7 +3342,7 @@ impl ManifestWriteConfig { /// Commit a manifest file and create a copy at the latest manifest path. #[allow(clippy::too_many_arguments)] -pub(crate) async fn write_manifest_file( +pub async fn write_manifest_file( object_store: &ObjectStore, commit_handler: &dyn CommitHandler, base_path: &Path, diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 790ce9a04f5..957c160b1e5 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -1104,6 +1104,7 @@ async fn reserve_fragment_ids( &Default::default(), dataset.manifest_location.naming_scheme, None, + None, ) .await?; diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index efab12d249c..39a3d1b4e2f 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -8,7 +8,7 @@ use lance_core::utils::mask::RowAddrTreeMap; use lance_file::version::LanceFileVersion; use lance_io::object_store::{ObjectStore, ObjectStoreParams}; use lance_table::{ - format::{DataStorageFormat, is_detached_version}, + format::{DataStorageFormat, IndexMetadata, is_detached_version}, io::commit::{CommitConfig, CommitHandler, ManifestNamingScheme}, }; @@ -47,6 +47,7 @@ pub struct CommitBuilder<'a> { commit_config: CommitConfig, affected_rows: Option, transaction_properties: Option>>, + replacement_indices: Option>, } impl<'a> CommitBuilder<'a> { @@ -64,6 +65,7 @@ impl<'a> CommitBuilder<'a> { commit_config: Default::default(), affected_rows: None, transaction_properties: None, + replacement_indices: None, } } @@ -169,6 +171,16 @@ impl<'a> CommitBuilder<'a> { self } + /// Replace the dataset's index metadata as part of a strict overwrite commit. + /// + /// This is intended for callers that have already rewritten all data and + /// rebuilt replacement index files. The commit path stamps the indices with + /// the committed dataset version. + pub fn with_replacement_indices(mut self, replacement_indices: Vec) -> Self { + self.replacement_indices = Some(replacement_indices); + self + } + /// provide Configuration key-value pairs associated with this transaction. /// This is used to store metadata about the transaction, such as commit messages, engine information, etc. /// this properties map will be persisted as a part of the transaction object @@ -267,6 +279,29 @@ impl<'a> CommitBuilder<'a> { validate_operation(None, &transaction.operation)?; } + if self.replacement_indices.is_some() { + if self.detached { + return Err(Error::invalid_input( + "replacement indices are not supported for detached commits", + )); + } + if dest.dataset().is_none() { + return Err(Error::invalid_input( + "replacement indices require an existing dataset", + )); + } + if !matches!(transaction.operation, Operation::Overwrite { .. }) { + return Err(Error::invalid_input( + "replacement indices are only supported for overwrite commits", + )); + } + if self.commit_config.num_retries != 0 { + return Err(Error::invalid_input( + "replacement indices require a strict overwrite commit with max_retries=0", + )); + } + } + let (metadata_cache, index_cache) = match &dest { WriteDestination::Dataset(ds) => (ds.metadata_cache.clone(), ds.index_cache.clone()), WriteDestination::Uri(uri) => ( @@ -337,6 +372,7 @@ impl<'a> CommitBuilder<'a> { &self.commit_config, manifest_naming_scheme, self.affected_rows.as_ref(), + self.replacement_indices.clone(), ) .await? } diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 700da33a034..53b3304883c 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -919,6 +919,7 @@ pub(crate) async fn commit_transaction( commit_config: &CommitConfig, manifest_naming_scheme: ManifestNamingScheme, affected_rows: Option<&RowAddrTreeMap>, + replacement_indices: Option>, ) -> Result<(Manifest, ManifestLocation)> { // Note: object_store has been configured with WriteParams, but dataset.object_store.as_ref() // has not necessarily. So for anything involving writing, use `object_store`. @@ -931,6 +932,11 @@ pub(crate) async fn commit_transaction( // Strict overwrites are not subject to any sort of automatic conflict resolution. let strict_overwrite = matches!(transaction.operation, Operation::Overwrite { .. }) && commit_config.num_retries == 0; + if replacement_indices.is_some() && !strict_overwrite { + return Err(Error::invalid_input( + "replacement indices require a strict overwrite transaction", + )); + } let mut dataset = if dataset.manifest.version != read_version && (read_version != 0 || strict_overwrite) { // If the dataset version is not the same as the read version, we need to @@ -1017,6 +1023,13 @@ pub(crate) async fn commit_transaction( manifest.version = target_version; + if let Some(replacement_indices) = replacement_indices.as_ref() { + indices = replacement_indices.clone(); + for index in &mut indices { + index.dataset_version = target_version; + } + } + let previous_writer_version = &dataset.manifest.writer_version; // The versions of Lance prior to when we started writing the writer version // sometimes wrote incorrect `Fragment.physical_rows` values, so we should