diff --git a/protos/table.proto b/protos/table.proto index af4b30e0421..252e4ab0cfe 100644 --- a/protos/table.proto +++ b/protos/table.proto @@ -547,12 +547,16 @@ message ShardManifest { // A writer must increment this when claiming the shard. uint64 writer_epoch = 2; - // The most recent WAL entry position (0-based) that has been flushed to a MemTable. + // The most recent WAL entry position that has been flushed to a MemTable. // During recovery, replay starts from replay_after_wal_entry_position + 1. + // WAL positions are 1-based, so the default value 0 unambiguously means + // "no flush has ever stamped this shard" and recovery replays from 1. uint64 replay_after_wal_entry_position = 3; - // The most recent WAL entry position (0-based) at the time manifest was updated. - // This is a hint, not authoritative - recovery must list files to find actual state. + // The most recent WAL entry position observed at the time the manifest was + // updated. WAL positions are 1-based; default 0 means no entry has been + // written yet. This is a hint, not authoritative - recovery must list + // files to find actual state. uint64 wal_entry_position_last_seen = 4; // Next generation ID to create (incremented after each MemTable flush). diff --git a/rust/lance-index/src/mem_wal.rs b/rust/lance-index/src/mem_wal.rs index 906ea595534..bd41fde7f44 100644 --- a/rust/lance-index/src/mem_wal.rs +++ b/rust/lance-index/src/mem_wal.rs @@ -159,10 +159,15 @@ pub struct ShardManifest { /// ShardField from the ShardSpec determines how to interpret each value. pub shard_field_values: HashMap>, pub writer_epoch: u64, - /// The most recent WAL entry position (0-based) flushed to a MemTable. - /// Recovery replays from `replay_after_wal_entry_position + 1`. + /// The most recent WAL entry position flushed to a MemTable. + /// Recovery replays from `replay_after_wal_entry_position + 1`. The + /// default value 0 means "no flush has ever stamped this shard" — WAL + /// positions themselves are 1-based, so 0 is never a valid covered + /// position. pub replay_after_wal_entry_position: u64, - /// The most recent WAL entry position (0-based) when manifest was updated. + /// The most recent WAL entry position observed at manifest write time. + /// Default 0 means "no entry has been written yet"; WAL positions are + /// 1-based. pub wal_entry_position_last_seen: u64, pub current_generation: u64, pub flushed_generations: Vec, diff --git a/rust/lance/benches/mem_wal_index_micro.rs b/rust/lance/benches/mem_wal_index_micro.rs index d356f891e1e..74941de2ef8 100644 --- a/rust/lance/benches/mem_wal_index_micro.rs +++ b/rust/lance/benches/mem_wal_index_micro.rs @@ -396,9 +396,12 @@ async fn measure_flush( let (epoch, _) = manifest_store.claim_epoch(0).await?; let flusher = MemTableFlusher::new(store, base_path, uri, shard_id, manifest_store); + // total_batches WAL entries were stamped at positions 1..=total_batches + // by the mark_wal_flushed loop above (1-based positions). + let covered_wal_entry_position = total_batches as u64; let t = Instant::now(); let _result = flusher - .flush_with_indexes(&memtable, epoch, index_configs) + .flush_with_indexes(&memtable, epoch, index_configs, covered_wal_entry_position) .await?; let elapsed = t.elapsed(); diff --git a/rust/lance/src/dataset/mem_wal/memtable/flush.rs b/rust/lance/src/dataset/mem_wal/memtable/flush.rs index 1035219c404..808d08f4286 100644 --- a/rust/lance/src/dataset/mem_wal/memtable/flush.rs +++ b/rust/lance/src/dataset/mem_wal/memtable/flush.rs @@ -79,8 +79,19 @@ impl MemTableFlusher { } /// Flush the MemTable to storage (data files, indexes, bloom filter). + /// + /// `covered_wal_entry_position` is stamped into the manifest's + /// `replay_after_wal_entry_position` so post-restart replay skips the + /// WAL entries this generation captures. Pass 0 only for shards that + /// have not yet appended any WAL entry — non-zero positions are + /// 1-based (see `FIRST_WAL_ENTRY_POSITION`). #[instrument(name = "mt_flush_storage", level = "info", skip_all, fields(shard_id = %self.shard_id, epoch, generation = memtable.generation(), row_count = memtable.row_count()))] - pub async fn flush(&self, memtable: &MemTable, epoch: u64) -> Result { + pub async fn flush( + &self, + memtable: &MemTable, + epoch: u64, + covered_wal_entry_position: u64, + ) -> Result { self.manifest_store.check_fenced(epoch).await?; if memtable.row_count() == 0 { @@ -113,9 +124,13 @@ impl MemTableFlusher { self.write_bloom_filter(&bloom_path, memtable.bloom_filter()) .await?; - let last_wal_entry_position = memtable.last_flushed_wal_entry_position(); let new_manifest = self - .update_manifest(epoch, generation, &gen_folder_name, last_wal_entry_position) + .update_manifest( + epoch, + generation, + &gen_folder_name, + covered_wal_entry_position, + ) .await?; info!( @@ -129,7 +144,7 @@ impl MemTableFlusher { path: gen_folder_name, }, rows_flushed, - covered_wal_entry_position: last_wal_entry_position, + covered_wal_entry_position, }) } @@ -184,12 +199,16 @@ impl MemTableFlusher { } /// Flush the MemTable to storage with indexes. + /// + /// See [`MemTableFlusher::flush`] for `covered_wal_entry_position` + /// semantics. #[instrument(name = "mt_flush_with_indexes", level = "info", skip_all, fields(shard_id = %self.shard_id, epoch, generation = memtable.generation(), row_count = memtable.row_count(), index_count = index_configs.len()))] pub async fn flush_with_indexes( &self, memtable: &MemTable, epoch: u64, index_configs: &[MemIndexConfig], + covered_wal_entry_position: u64, ) -> Result { self.manifest_store.check_fenced(epoch).await?; @@ -288,9 +307,13 @@ impl MemTableFlusher { self.write_bloom_filter(&bloom_path, memtable.bloom_filter()) .await?; - let last_wal_entry_position = memtable.last_flushed_wal_entry_position(); let new_manifest = self - .update_manifest(epoch, generation, &gen_folder_name, last_wal_entry_position) + .update_manifest( + epoch, + generation, + &gen_folder_name, + covered_wal_entry_position, + ) .await?; info!( @@ -304,7 +327,7 @@ impl MemTableFlusher { path: gen_folder_name, }, rows_flushed: memtable.row_count(), - covered_wal_entry_position: last_wal_entry_position, + covered_wal_entry_position, }) } @@ -883,7 +906,7 @@ mod tests { assert!(!memtable.all_flushed_to_wal()); let flusher = MemTableFlusher::new(store, base_path, base_uri, shard_id, manifest_store); - let result = flusher.flush(&memtable, epoch).await; + let result = flusher.flush(&memtable, epoch, 0).await; assert!(result.is_err()); assert!( @@ -912,7 +935,7 @@ mod tests { let memtable = MemTable::new(schema, 1, vec![]).unwrap(); let flusher = MemTableFlusher::new(store, base_path, base_uri, shard_id, manifest_store); - let result = flusher.flush(&memtable, epoch).await; + let result = flusher.flush(&memtable, epoch, 0).await; assert!(result.is_err()); assert!(result.unwrap_err().to_string().contains("empty MemTable")); @@ -950,7 +973,7 @@ mod tests { shard_id, manifest_store.clone(), ); - let result = flusher.flush(&memtable, epoch).await.unwrap(); + let result = flusher.flush(&memtable, epoch, 1).await.unwrap(); assert_eq!(result.generation.generation, 1); assert_eq!(result.rows_flushed, 10); @@ -1011,7 +1034,7 @@ mod tests { manifest_store.clone(), ); let result = flusher - .flush_with_indexes(&memtable, epoch, &index_configs) + .flush_with_indexes(&memtable, epoch, &index_configs, 1) .await .unwrap(); @@ -1144,7 +1167,7 @@ mod tests { manifest_store.clone(), ); let result = flusher - .flush_with_indexes(&memtable, epoch, &index_configs) + .flush_with_indexes(&memtable, epoch, &index_configs, 1) .await .unwrap(); @@ -1271,7 +1294,7 @@ mod tests { manifest_store.clone(), ); let result = flusher - .flush_with_indexes(&memtable, epoch, &index_configs) + .flush_with_indexes(&memtable, epoch, &index_configs, 1) .await .unwrap(); diff --git a/rust/lance/src/dataset/mem_wal/wal.rs b/rust/lance/src/dataset/mem_wal/wal.rs index 7bcd7616643..6232ab0d8ae 100644 --- a/rust/lance/src/dataset/mem_wal/wal.rs +++ b/rust/lance/src/dataset/mem_wal/wal.rs @@ -94,7 +94,7 @@ impl std::fmt::Debug for BatchDurableWatcher { /// A single WAL entry representing a batch of batches. #[derive(Debug, Clone)] pub struct WalEntry { - /// WAL entry position (0-based, sequential). + /// WAL entry position (1-based, sequential — see `FIRST_WAL_ENTRY_POSITION`). pub position: u64, /// Writer epoch at the time of write. pub writer_epoch: u64, @@ -664,7 +664,12 @@ impl WalEntryData { // Generic WAL Appender and Tailer // ============================================================================ -const FIRST_WAL_ENTRY_POSITION: u64 = 0; +/// First valid WAL entry position. Positions are 1-based so that a +/// `ShardManifest::replay_after_wal_entry_position` of 0 unambiguously means +/// "no flush has ever stamped the cursor" — replay then starts at position 1 +/// without needing to consult `flushed_generations`, which an external +/// compactor may legitimately drain back to empty. +const FIRST_WAL_ENTRY_POSITION: u64 = 1; const MAX_APPEND_CREATE_CONFLICTS: usize = 1024; const APPEND_CONFLICT_REFRESH_INTERVAL: usize = 16; const MAX_CURSOR_PROBE: u64 = 4096; @@ -1389,8 +1394,9 @@ mod tests { let source = batch_store_source(&batch_store); let result = buffer.flush(&source, batch_store.len()).await.unwrap(); let entry = result.entry.unwrap(); - // First entry from a freshly-discovered position is 0 (atomic-create - // path discovers the tip via list and starts at FIRST_WAL_ENTRY_POSITION). + // First entry from a freshly-discovered position lands at + // FIRST_WAL_ENTRY_POSITION (atomic-create path discovers the tip + // via list). assert_eq!(entry.position, FIRST_WAL_ENTRY_POSITION); assert_eq!(entry.writer_epoch, 1); assert_eq!(entry.num_batches, 2); @@ -1629,6 +1635,7 @@ mod tests { assert!(hint >= 1, "cursor hint never updated, last={hint}"); // next_position must still resolve to one past the last appended entry. - assert_eq!(tailer.next_position().await.unwrap(), 3); + // Three entries from a fresh shard land at 1, 2, 3, so next is 4. + assert_eq!(tailer.next_position().await.unwrap(), 4); } } diff --git a/rust/lance/src/dataset/mem_wal/write.rs b/rust/lance/src/dataset/mem_wal/write.rs index b57b31c33ed..af573eabd46 100644 --- a/rust/lance/src/dataset/mem_wal/write.rs +++ b/rust/lance/src/dataset/mem_wal/write.rs @@ -726,15 +726,15 @@ async fn replay_memtable_from_wal( manifest: &ShardManifest, memtable: &mut MemTable, ) -> Result { - // Fresh shards (no flushes yet) start replay at position 0; otherwise - // start one past the last covered position. Distinguishing "no flushes" - // from "flushed up to position 0" requires `flushed_generations` since - // `replay_after_wal_entry_position` defaults to 0 in both cases. - let start_position = if manifest.flushed_generations.is_empty() { - 0 - } else { - manifest.replay_after_wal_entry_position.saturating_add(1) - }; + // WAL positions are 1-based (see `FIRST_WAL_ENTRY_POSITION`), so a + // cursor of 0 means "no flush has ever stamped this shard" and replay + // starts at position 1. After flushing position N the cursor holds N + // and replay starts at N+1. The arithmetic collapses to a single + // saturating_add(1) in both cases — we deliberately do not consult + // `flushed_generations` here, since an external compactor may + // legitimately drain that vector back to empty after merging its + // contents into the base table. + let start_position = manifest.replay_after_wal_entry_position.saturating_add(1); // The MemTable is always freshly built before this function runs, so // any existing BatchStore entries can only have come from this replay @@ -887,7 +887,6 @@ impl SharedWriterState { let frozen_size = old_memtable.estimated_size(); state.frozen_memtable_bytes += frozen_size; - state.last_flushed_wal_entry_position = last_wal_entry_position; let flush_watcher = old_memtable .get_memtable_flush_watcher() @@ -1270,9 +1269,21 @@ impl ShardWriter { .seed_next_position(next_wal_position) .await; + // Seed the writer's covered-WAL cursor from the post-replay tip: + // `next_wal_position` is one past the highest WAL entry we just + // replayed into the active memtable, so everything strictly below + // it is durably reflected in this writer's memtable. We can't + // seed from `manifest.wal_entry_position_last_seen` — that field + // is bumped on every successful tailer read by other readers, so + // it may sit above what's actually covered by any flushed + // generation. Subtracting 1 from a fresh shard's `next_wal_position` + // of `FIRST_WAL_ENTRY_POSITION` (= 1) yields 0, which correctly + // means "no entry covered yet." + let initial_covered_wal_entry_position = next_wal_position.saturating_sub(1); + let state = Arc::new(RwLock::new(WriterState { memtable, - last_flushed_wal_entry_position: manifest.wal_entry_position_last_seen, + last_flushed_wal_entry_position: initial_covered_wal_entry_position, frozen_memtable_bytes: 0, frozen_flush_watchers: VecDeque::new(), flush_requested: false, @@ -1957,6 +1968,23 @@ impl MessageHandler for WalFlushHandler { let result = self.do_flush(source, end_batch_position).await; + // Propagate the just-appended WAL entry position back into the + // writer state so a subsequent MemTable freeze can stamp the + // correct `covered_wal_entry_position` into the manifest. Without + // this, `replay_after_wal_entry_position` stays at 0 and replay + // re-reads already-flushed entries after restart. + // + // Always update state before signalling the completion cell so any + // waiter that reads state immediately after the cell fires sees + // the new position. + if let (Ok(flush_result), Some(state_lock)) = (&result, &self.memtable_state) + && let Some(entry) = &flush_result.entry + { + let mut state = state_lock.write().await; + state.last_flushed_wal_entry_position = + state.last_flushed_wal_entry_position.max(entry.position); + } + // Notify completion if requested if let Some(cell) = done { cell.write(result.map_err(|e| e.to_string())); @@ -2109,20 +2137,40 @@ impl MemTableFlushHandler { let memtable_size = memtable.estimated_size(); let flush_result = async { - // Step 1: Wait for WAL flush completion (queued at freeze time). - if let Some(mut completion_reader) = memtable.take_wal_flush_completion() { - match completion_reader.await_value().await { - Some(Ok(_)) => {} - Some(Err(e)) => return Err(Error::io(format!("WAL flush failed: {}", e))), - None => { - return Err(Error::io( - "WAL flush handler exited before reporting completion", - )); + // Step 1: Wait for WAL flush completion (already queued at freeze time). + // The TriggerWalFlush message was sent by freeze_memtable to ensure + // strict ordering of WAL entries. If the freeze didn't trigger a + // flush (no pending WAL range), there's no completion cell and the + // memtable was already WAL-flushed by an earlier put. + let wal_flushed_position = + if let Some(mut completion_reader) = memtable.take_wal_flush_completion() { + match completion_reader.await_value().await { + Some(Ok(flush_result)) => flush_result.entry.map(|e| e.position), + Some(Err(e)) => return Err(Error::io(format!("WAL flush failed: {}", e))), + None => { + return Err(Error::io( + "WAL flush handler exited before reporting completion", + )); + } } - } - } - // Step 2: Flush the memtable to Lance storage. - self.flusher.flush(&memtable, self.epoch).await + } else { + None + }; + + // Step 2: Flush the memtable to Lance storage. The covered WAL + // entry position is either the one we just appended (per-memtable, + // from the completion cell — authoritative even when concurrent + // flushes have raced ahead in `state.last_flushed_wal_entry_position`) + // or, when no flush was triggered at freeze time, the memtable's + // frozen-at marker captured at freeze. Stamping this into the + // manifest is what lets replay-on-reopen skip entries this + // generation covers. + let covered_wal_entry_position = wal_flushed_position + .or_else(|| memtable.frozen_at_wal_entry_position()) + .unwrap_or(0); + self.flusher + .flush(&memtable, self.epoch, covered_wal_entry_position) + .await } .await; @@ -3098,13 +3146,14 @@ mod tests { writer.close().await.unwrap(); - // Read back via WalTailer. + // Read back via WalTailer. WAL positions are 1-based, so two + // entries from a fresh shard land at 1 and 2. let tailer = WalTailer::new(store, base_path, shard_id); - assert_eq!(tailer.first_position().await.unwrap(), 0); - assert_eq!(tailer.next_position().await.unwrap(), 2); + assert_eq!(tailer.first_position().await.unwrap(), 1); + assert_eq!(tailer.next_position().await.unwrap(), 3); - let e0 = tailer.read_entry(0).await.unwrap().unwrap(); - let e1 = tailer.read_entry(1).await.unwrap().unwrap(); + let e0 = tailer.read_entry(1).await.unwrap().unwrap(); + let e1 = tailer.read_entry(2).await.unwrap().unwrap(); assert_eq!(e0.batches.len(), 1); assert_eq!(e0.batches[0].num_rows(), 4); assert_eq!(e1.batches.len(), 1); @@ -3228,10 +3277,11 @@ mod tests { writer.close().await.unwrap(); - // All three puts should be in a single WAL entry at position 0. + // All three puts should be in a single WAL entry at position 1 + // (WAL positions are 1-based). let tailer = WalTailer::new(store, base_path, shard_id); - assert_eq!(tailer.next_position().await.unwrap(), 1); - let entry = tailer.read_entry(0).await.unwrap().unwrap(); + assert_eq!(tailer.next_position().await.unwrap(), 2); + let entry = tailer.read_entry(1).await.unwrap().unwrap(); assert_eq!(entry.batches.len(), 3); for (i, batch) in entry.batches.iter().enumerate() { assert_eq!(batch.num_rows(), 10, "batch {i}"); @@ -3455,6 +3505,108 @@ mod tests { writer.close().await.unwrap(); } + /// Regression for the OSS-WAL compactor-drain bug: after a flush + /// records its generation in the manifest and an external compactor + /// later drains `flushed_generations` back to empty (the legitimate + /// outcome of merging the generation into the base table), reopening + /// the writer must not re-replay the already-flushed WAL entry into + /// the active memtable. + /// + /// Under the pre-fix logic, replay disambiguated "fresh shard" from + /// "flushed-then-compacted" with `flushed_generations.is_empty()`, + /// which collapsed both cases into start-at-0. With 1-based WAL + /// positions and a default cursor of 0 meaning "no flush stamped", + /// the flush-then-drain sequence leaves `replay_after_wal_entry_position` + /// pinned at the flushed position, so replay correctly starts past it. + #[tokio::test] + async fn test_memtable_replay_skips_entries_after_external_compaction() { + use crate::dataset::mem_wal::ShardManifestStore; + + let (store, base_path, base_uri, _temp_dir) = create_local_store().await; + let schema = schema_with_pk(); + let shard_id = Uuid::new_v4(); + + // Writer A: write 5 rows, close (forces a flush of the active + // memtable). The manifest now records a flushed generation and + // pins `replay_after_wal_entry_position` to the covered WAL entry. + { + let writer_a = ShardWriter::open( + store.clone(), + base_path.clone(), + base_uri.clone(), + memtable_config_with_pk(shard_id), + schema.clone(), + vec![], + ) + .await + .unwrap(); + writer_a + .put(vec![create_test_batch(&schema, 0, 5)]) + .await + .unwrap(); + writer_a.close().await.unwrap(); + } + + // Simulate an external compactor merging the flushed generation + // into the base table: drain `flushed_generations` to empty via a + // direct manifest commit. The cursor stays where the flush put it. + let manifest_store = ShardManifestStore::new(store.clone(), &base_path, shard_id, 2); + let pre = manifest_store.read_latest().await.unwrap().unwrap(); + assert!( + !pre.flushed_generations.is_empty(), + "writer A's close() should have stamped a flushed generation" + ); + let cursor_at_flush = pre.replay_after_wal_entry_position; + assert!( + cursor_at_flush >= 1, + "expected cursor to land on a 1-based WAL position after flush, got {cursor_at_flush}" + ); + // Bump the epoch (claim_epoch) so we can commit_update without + // being fenced; this also mirrors how a compactor process would + // hold its own writer claim. + let (compactor_epoch, _) = manifest_store.claim_epoch(pre.shard_spec_id).await.unwrap(); + manifest_store + .commit_update(compactor_epoch, |current| ShardManifest { + version: current.version + 1, + flushed_generations: vec![], + ..current.clone() + }) + .await + .unwrap(); + let post = manifest_store.read_latest().await.unwrap().unwrap(); + assert!( + post.flushed_generations.is_empty(), + "compactor drain should have left flushed_generations empty" + ); + assert_eq!( + post.replay_after_wal_entry_position, cursor_at_flush, + "compactor must not touch the replay cursor" + ); + + // Writer B reopens. Pre-fix: replay saw flushed_generations empty, + // restarted at WAL position 0, and re-inserted writer A's rows. + // Post-fix: replay starts at cursor + 1, finds no entry, and the + // memtable stays empty. + let writer_b = ShardWriter::open( + store, + base_path, + base_uri, + memtable_config_with_pk(shard_id), + schema, + vec![], + ) + .await + .unwrap(); + let stats = writer_b.memtable_stats().await.unwrap(); + assert_eq!( + stats.row_count, 0, + "memtable must not re-replay compacted WAL entries; got {} rows", + stats.row_count + ); + assert_eq!(stats.batch_count, 0); + writer_b.close().await.unwrap(); + } + /// Replay aborts the open with a clear fence error if it encounters a /// WAL entry written with an epoch strictly greater than ours. Simulate /// the race where another writer wrote an entry with a higher epoch @@ -3469,7 +3621,7 @@ mod tests { let schema = schema_with_pk(); let shard_id = Uuid::new_v4(); - // Writer A: write one durable batch (claims epoch 1, writes entry at position 0). + // Writer A: write one durable batch (claims epoch 1, writes entry at position 1). { let writer_a = ShardWriter::open( store.clone(), @@ -3666,8 +3818,8 @@ mod tests { let schema = create_test_schema(); let shard_id = Uuid::new_v4(); - // Writer A claims epoch 1, writes one entry (takes WAL position 0, - // caches its next-position as 1 internally). + // Writer A claims epoch 1, writes one entry (takes WAL position 1, + // caches its next-position as 2 internally). let writer_a = Arc::new( ShardWriter::open( store.clone(),