Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 123 additions & 3 deletions rust/lance/src/dataset/mem_wal/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1339,7 +1339,9 @@ impl ShardWriter {
)?;

// Background MemTable flush handler — frozen memtable to Lance file.
let memtable_handler = MemTableFlushHandler::new(state.clone(), flusher, epoch, stats);
// It rebuilds the same secondary indexes on each flushed generation.
let memtable_handler =
MemTableFlushHandler::new(state.clone(), flusher, epoch, index_configs.to_vec(), stats);
task_executor.add_handler(
"memtable_flusher".to_string(),
Box::new(memtable_handler),
Expand Down Expand Up @@ -2135,6 +2137,12 @@ struct MemTableFlushHandler {
state: Arc<RwLock<WriterState>>,
flusher: Arc<MemTableFlusher>,
epoch: u64,
/// Secondary index configs to rebuild on each flushed generation. When
/// non-empty the handler flushes via [`MemTableFlusher::flush_with_indexes`]
/// so queries over flushed generations use index lookups instead of full
/// scans — and so vector search's index-only `fast_search` can see the data
/// at all.
index_configs: Vec<MemIndexConfig>,
stats: SharedWriteStats,
}

Expand All @@ -2143,12 +2151,14 @@ impl MemTableFlushHandler {
state: Arc<RwLock<WriterState>>,
flusher: Arc<MemTableFlusher>,
epoch: u64,
index_configs: Vec<MemIndexConfig>,
stats: SharedWriteStats,
) -> Self {
Self {
state,
flusher,
epoch,
index_configs,
stats,
}
}
Expand Down Expand Up @@ -2222,9 +2232,24 @@ impl MemTableFlushHandler {
let covered_wal_entry_position = wal_flushed_position
.or_else(|| memtable.frozen_at_wal_entry_position())
.unwrap_or(0);
self.flusher
.flush(&memtable, self.epoch, covered_wal_entry_position)
// Rebuild secondary indexes on the flushed generation so later
// queries hit an index instead of scanning. Skip the extra
// dataset open when there are no indexes to build. The indexed
// path's future is boxed to keep this async block's nesting
// under the type-layout recursion limit.
if self.index_configs.is_empty() {
self.flusher
.flush(&memtable, self.epoch, covered_wal_entry_position)
.await
} else {
Box::pin(self.flusher.flush_with_indexes(
&memtable,
self.epoch,
&self.index_configs,
covered_wal_entry_position,
))
.await
}
}
.await;

Expand Down Expand Up @@ -2718,6 +2743,101 @@ mod tests {
writer.close().await.unwrap();
}

/// End-to-end check that the background flush handler rebuilds secondary
/// indexes on every flushed generation. Before this, the handler flushed
/// via plain `flush`, leaving flushed generations unindexed — point
/// lookups had to full-scan and vector search's index-only `fast_search`
/// couldn't see the data at all.
#[tokio::test]
async fn test_flushed_generation_is_indexed() {
use crate::index::DatasetIndexExt;

let (store, base_path, base_uri, _temp_dir) = create_local_store().await;
let schema = create_test_schema();
let shard_id = Uuid::new_v4();

let config = ShardWriterConfig {
shard_id,
shard_spec_id: 0,
durable_write: false,
sync_indexed_write: true,
max_wal_buffer_size: 1024 * 1024,
max_wal_flush_interval: None,
max_memtable_size: 64 * 1024 * 1024,
manifest_scan_batch_size: 2,
..Default::default()
};

let index_configs = vec![MemIndexConfig::BTree(BTreeIndexConfig {
name: "id_idx".to_string(),
field_id: 0,
column: "id".to_string(),
})];

let writer = ShardWriter::open(
store,
base_path,
base_uri.clone(),
config,
schema.clone(),
index_configs,
)
.await
.unwrap();

writer
.put(vec![create_test_batch(&schema, 0, 10)])
.await
.unwrap();

// Freeze the active memtable and wait until it lands on disk.
writer.force_seal_active().await.unwrap();
writer.wait_for_flush_drain().await.unwrap();

// Resolve the flushed generation recorded in the manifest.
let manifest = writer.manifest().await.unwrap().unwrap();
assert_eq!(
manifest.flushed_generations.len(),
1,
"expected exactly one flushed generation"
);
let gen_uri = format!(
"{}/_mem_wal/{}/{}",
base_uri, shard_id, manifest.flushed_generations[0].path
);

// The flushed generation must carry the BTree index built during flush.
let dataset = crate::Dataset::open(&gen_uri).await.unwrap();
let indices = dataset.load_indices().await.unwrap();
assert_eq!(indices.len(), 1, "flushed generation should have one index");
assert_eq!(indices[0].name, "id_idx");

// A PK filter over it must resolve through the index, not a full scan.
let mut scan = dataset.scan();
scan.filter("id = 5").unwrap();
scan.prefilter(true);
let plan = scan.create_plan().await.unwrap();
crate::utils::test::assert_plan_node_equals(
plan,
"LanceRead: ...full_filter=id = Int32(5)...
ScalarIndexQuery: query=[id = 5]@id_idx(BTree)",
)
.await
.unwrap();

// And the index returns the correct row.
let batch = dataset
.scan()
.filter("id = 5")
.unwrap()
.try_into_batch()
.await
.unwrap();
assert_eq!(batch.num_rows(), 1);

writer.close().await.unwrap();
}

/// Test memtable auto-flush triggered by size threshold.
#[tokio::test]
async fn test_shard_writer_auto_flush_by_size() {
Expand Down
Loading