diff --git a/rust/lance/src/dataset/mem_wal/write.rs b/rust/lance/src/dataset/mem_wal/write.rs index 85d0c92e78b..e1314d629e8 100644 --- a/rust/lance/src/dataset/mem_wal/write.rs +++ b/rust/lance/src/dataset/mem_wal/write.rs @@ -1339,7 +1339,9 @@ impl ShardWriter { )?; // Background MemTable flush handler — frozen memtable to Lance file. - let memtable_handler = MemTableFlushHandler::new(state.clone(), flusher, epoch, stats); + // It rebuilds the same secondary indexes on each flushed generation. + let memtable_handler = + MemTableFlushHandler::new(state.clone(), flusher, epoch, index_configs.to_vec(), stats); task_executor.add_handler( "memtable_flusher".to_string(), Box::new(memtable_handler), @@ -2135,6 +2137,12 @@ struct MemTableFlushHandler { state: Arc>, flusher: Arc, epoch: u64, + /// Secondary index configs to rebuild on each flushed generation. When + /// non-empty the handler flushes via [`MemTableFlusher::flush_with_indexes`] + /// so queries over flushed generations use index lookups instead of full + /// scans — and so vector search's index-only `fast_search` can see the data + /// at all. + index_configs: Vec, stats: SharedWriteStats, } @@ -2143,12 +2151,14 @@ impl MemTableFlushHandler { state: Arc>, flusher: Arc, epoch: u64, + index_configs: Vec, stats: SharedWriteStats, ) -> Self { Self { state, flusher, epoch, + index_configs, stats, } } @@ -2222,9 +2232,24 @@ impl MemTableFlushHandler { let covered_wal_entry_position = wal_flushed_position .or_else(|| memtable.frozen_at_wal_entry_position()) .unwrap_or(0); - self.flusher - .flush(&memtable, self.epoch, covered_wal_entry_position) + // Rebuild secondary indexes on the flushed generation so later + // queries hit an index instead of scanning. Skip the extra + // dataset open when there are no indexes to build. The indexed + // path's future is boxed to keep this async block's nesting + // under the type-layout recursion limit. + if self.index_configs.is_empty() { + self.flusher + .flush(&memtable, self.epoch, covered_wal_entry_position) + .await + } else { + Box::pin(self.flusher.flush_with_indexes( + &memtable, + self.epoch, + &self.index_configs, + covered_wal_entry_position, + )) .await + } } .await; @@ -2718,6 +2743,101 @@ mod tests { writer.close().await.unwrap(); } + /// End-to-end check that the background flush handler rebuilds secondary + /// indexes on every flushed generation. Before this, the handler flushed + /// via plain `flush`, leaving flushed generations unindexed — point + /// lookups had to full-scan and vector search's index-only `fast_search` + /// couldn't see the data at all. + #[tokio::test] + async fn test_flushed_generation_is_indexed() { + use crate::index::DatasetIndexExt; + + let (store, base_path, base_uri, _temp_dir) = create_local_store().await; + let schema = create_test_schema(); + let shard_id = Uuid::new_v4(); + + let config = ShardWriterConfig { + shard_id, + shard_spec_id: 0, + durable_write: false, + sync_indexed_write: true, + max_wal_buffer_size: 1024 * 1024, + max_wal_flush_interval: None, + max_memtable_size: 64 * 1024 * 1024, + manifest_scan_batch_size: 2, + ..Default::default() + }; + + let index_configs = vec![MemIndexConfig::BTree(BTreeIndexConfig { + name: "id_idx".to_string(), + field_id: 0, + column: "id".to_string(), + })]; + + let writer = ShardWriter::open( + store, + base_path, + base_uri.clone(), + config, + schema.clone(), + index_configs, + ) + .await + .unwrap(); + + writer + .put(vec![create_test_batch(&schema, 0, 10)]) + .await + .unwrap(); + + // Freeze the active memtable and wait until it lands on disk. + writer.force_seal_active().await.unwrap(); + writer.wait_for_flush_drain().await.unwrap(); + + // Resolve the flushed generation recorded in the manifest. + let manifest = writer.manifest().await.unwrap().unwrap(); + assert_eq!( + manifest.flushed_generations.len(), + 1, + "expected exactly one flushed generation" + ); + let gen_uri = format!( + "{}/_mem_wal/{}/{}", + base_uri, shard_id, manifest.flushed_generations[0].path + ); + + // The flushed generation must carry the BTree index built during flush. + let dataset = crate::Dataset::open(&gen_uri).await.unwrap(); + let indices = dataset.load_indices().await.unwrap(); + assert_eq!(indices.len(), 1, "flushed generation should have one index"); + assert_eq!(indices[0].name, "id_idx"); + + // A PK filter over it must resolve through the index, not a full scan. + let mut scan = dataset.scan(); + scan.filter("id = 5").unwrap(); + scan.prefilter(true); + let plan = scan.create_plan().await.unwrap(); + crate::utils::test::assert_plan_node_equals( + plan, + "LanceRead: ...full_filter=id = Int32(5)... + ScalarIndexQuery: query=[id = 5]@id_idx(BTree)", + ) + .await + .unwrap(); + + // And the index returns the correct row. + let batch = dataset + .scan() + .filter("id = 5") + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(batch.num_rows(), 1); + + writer.close().await.unwrap(); + } + /// Test memtable auto-flush triggered by size threshold. #[tokio::test] async fn test_shard_writer_auto_flush_by_size() {