Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions protos/table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -547,12 +547,16 @@ message ShardManifest {
// A writer must increment this when claiming the shard.
uint64 writer_epoch = 2;

// The most recent WAL entry position (0-based) that has been flushed to a MemTable.
// The most recent WAL entry position that has been flushed to a MemTable.
// During recovery, replay starts from replay_after_wal_entry_position + 1.
// WAL positions are 1-based, so the default value 0 unambiguously means
// "no flush has ever stamped this shard" and recovery replays from 1.
uint64 replay_after_wal_entry_position = 3;

// The most recent WAL entry position (0-based) at the time manifest was updated.
// This is a hint, not authoritative - recovery must list files to find actual state.
// The most recent WAL entry position observed at the time the manifest was
// updated. WAL positions are 1-based; default 0 means no entry has been
// written yet. This is a hint, not authoritative - recovery must list
// files to find actual state.
uint64 wal_entry_position_last_seen = 4;

// Next generation ID to create (incremented after each MemTable flush).
Expand Down
11 changes: 8 additions & 3 deletions rust/lance-index/src/mem_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,15 @@ pub struct ShardManifest {
/// ShardField from the ShardSpec determines how to interpret each value.
pub shard_field_values: HashMap<String, Vec<u8>>,
pub writer_epoch: u64,
/// The most recent WAL entry position (0-based) flushed to a MemTable.
/// Recovery replays from `replay_after_wal_entry_position + 1`.
/// The most recent WAL entry position flushed to a MemTable.
/// Recovery replays from `replay_after_wal_entry_position + 1`. The
/// default value 0 means "no flush has ever stamped this shard" — WAL
/// positions themselves are 1-based, so 0 is never a valid covered
/// position.
pub replay_after_wal_entry_position: u64,
/// The most recent WAL entry position (0-based) when manifest was updated.
/// The most recent WAL entry position observed at manifest write time.
/// Default 0 means "no entry has been written yet"; WAL positions are
/// 1-based.
pub wal_entry_position_last_seen: u64,
pub current_generation: u64,
pub flushed_generations: Vec<FlushedGeneration>,
Expand Down
5 changes: 4 additions & 1 deletion rust/lance/benches/mem_wal_index_micro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,12 @@ async fn measure_flush(
let (epoch, _) = manifest_store.claim_epoch(0).await?;
let flusher = MemTableFlusher::new(store, base_path, uri, shard_id, manifest_store);

// total_batches WAL entries were stamped at positions 1..=total_batches
// by the mark_wal_flushed loop above (1-based positions).
let covered_wal_entry_position = total_batches as u64;
let t = Instant::now();
let _result = flusher
.flush_with_indexes(&memtable, epoch, index_configs)
.flush_with_indexes(&memtable, epoch, index_configs, covered_wal_entry_position)
.await?;
let elapsed = t.elapsed();

Expand Down
49 changes: 36 additions & 13 deletions rust/lance/src/dataset/mem_wal/memtable/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,19 @@ impl MemTableFlusher {
}

/// Flush the MemTable to storage (data files, indexes, bloom filter).
///
/// `covered_wal_entry_position` is stamped into the manifest's
/// `replay_after_wal_entry_position` so post-restart replay skips the
/// WAL entries this generation captures. Pass 0 only for shards that
/// have not yet appended any WAL entry — non-zero positions are
/// 1-based (see `FIRST_WAL_ENTRY_POSITION`).
#[instrument(name = "mt_flush_storage", level = "info", skip_all, fields(shard_id = %self.shard_id, epoch, generation = memtable.generation(), row_count = memtable.row_count()))]
pub async fn flush(&self, memtable: &MemTable, epoch: u64) -> Result<FlushResult> {
pub async fn flush(
&self,
memtable: &MemTable,
epoch: u64,
covered_wal_entry_position: u64,
) -> Result<FlushResult> {
self.manifest_store.check_fenced(epoch).await?;

if memtable.row_count() == 0 {
Expand Down Expand Up @@ -113,9 +124,13 @@ impl MemTableFlusher {
self.write_bloom_filter(&bloom_path, memtable.bloom_filter())
.await?;

let last_wal_entry_position = memtable.last_flushed_wal_entry_position();
let new_manifest = self
.update_manifest(epoch, generation, &gen_folder_name, last_wal_entry_position)
.update_manifest(
epoch,
generation,
&gen_folder_name,
covered_wal_entry_position,
)
.await?;

info!(
Expand All @@ -129,7 +144,7 @@ impl MemTableFlusher {
path: gen_folder_name,
},
rows_flushed,
covered_wal_entry_position: last_wal_entry_position,
covered_wal_entry_position,
})
}

Expand Down Expand Up @@ -184,12 +199,16 @@ impl MemTableFlusher {
}

/// Flush the MemTable to storage with indexes.
///
/// See [`MemTableFlusher::flush`] for `covered_wal_entry_position`
/// semantics.
#[instrument(name = "mt_flush_with_indexes", level = "info", skip_all, fields(shard_id = %self.shard_id, epoch, generation = memtable.generation(), row_count = memtable.row_count(), index_count = index_configs.len()))]
pub async fn flush_with_indexes(
&self,
memtable: &MemTable,
epoch: u64,
index_configs: &[MemIndexConfig],
covered_wal_entry_position: u64,
) -> Result<FlushResult> {
self.manifest_store.check_fenced(epoch).await?;

Expand Down Expand Up @@ -288,9 +307,13 @@ impl MemTableFlusher {
self.write_bloom_filter(&bloom_path, memtable.bloom_filter())
.await?;

let last_wal_entry_position = memtable.last_flushed_wal_entry_position();
let new_manifest = self
.update_manifest(epoch, generation, &gen_folder_name, last_wal_entry_position)
.update_manifest(
epoch,
generation,
&gen_folder_name,
covered_wal_entry_position,
)
.await?;

info!(
Expand All @@ -304,7 +327,7 @@ impl MemTableFlusher {
path: gen_folder_name,
},
rows_flushed: memtable.row_count(),
covered_wal_entry_position: last_wal_entry_position,
covered_wal_entry_position,
})
}

Expand Down Expand Up @@ -883,7 +906,7 @@ mod tests {
assert!(!memtable.all_flushed_to_wal());

let flusher = MemTableFlusher::new(store, base_path, base_uri, shard_id, manifest_store);
let result = flusher.flush(&memtable, epoch).await;
let result = flusher.flush(&memtable, epoch, 0).await;

assert!(result.is_err());
assert!(
Expand Down Expand Up @@ -912,7 +935,7 @@ mod tests {
let memtable = MemTable::new(schema, 1, vec![]).unwrap();

let flusher = MemTableFlusher::new(store, base_path, base_uri, shard_id, manifest_store);
let result = flusher.flush(&memtable, epoch).await;
let result = flusher.flush(&memtable, epoch, 0).await;

assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("empty MemTable"));
Expand Down Expand Up @@ -950,7 +973,7 @@ mod tests {
shard_id,
manifest_store.clone(),
);
let result = flusher.flush(&memtable, epoch).await.unwrap();
let result = flusher.flush(&memtable, epoch, 1).await.unwrap();

assert_eq!(result.generation.generation, 1);
assert_eq!(result.rows_flushed, 10);
Expand Down Expand Up @@ -1011,7 +1034,7 @@ mod tests {
manifest_store.clone(),
);
let result = flusher
.flush_with_indexes(&memtable, epoch, &index_configs)
.flush_with_indexes(&memtable, epoch, &index_configs, 1)
.await
.unwrap();

Expand Down Expand Up @@ -1144,7 +1167,7 @@ mod tests {
manifest_store.clone(),
);
let result = flusher
.flush_with_indexes(&memtable, epoch, &index_configs)
.flush_with_indexes(&memtable, epoch, &index_configs, 1)
.await
.unwrap();

Expand Down Expand Up @@ -1271,7 +1294,7 @@ mod tests {
manifest_store.clone(),
);
let result = flusher
.flush_with_indexes(&memtable, epoch, &index_configs)
.flush_with_indexes(&memtable, epoch, &index_configs, 1)
.await
.unwrap();

Expand Down
17 changes: 12 additions & 5 deletions rust/lance/src/dataset/mem_wal/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl std::fmt::Debug for BatchDurableWatcher {
/// A single WAL entry representing a batch of batches.
#[derive(Debug, Clone)]
pub struct WalEntry {
/// WAL entry position (0-based, sequential).
/// WAL entry position (1-based, sequential — see `FIRST_WAL_ENTRY_POSITION`).
pub position: u64,
/// Writer epoch at the time of write.
pub writer_epoch: u64,
Expand Down Expand Up @@ -664,7 +664,12 @@ impl WalEntryData {
// Generic WAL Appender and Tailer
// ============================================================================

const FIRST_WAL_ENTRY_POSITION: u64 = 0;
/// First valid WAL entry position. Positions are 1-based so that a
/// `ShardManifest::replay_after_wal_entry_position` of 0 unambiguously means
/// "no flush has ever stamped the cursor" — replay then starts at position 1
/// without needing to consult `flushed_generations`, which an external
/// compactor may legitimately drain back to empty.
const FIRST_WAL_ENTRY_POSITION: u64 = 1;
const MAX_APPEND_CREATE_CONFLICTS: usize = 1024;
const APPEND_CONFLICT_REFRESH_INTERVAL: usize = 16;
const MAX_CURSOR_PROBE: u64 = 4096;
Expand Down Expand Up @@ -1389,8 +1394,9 @@ mod tests {
let source = batch_store_source(&batch_store);
let result = buffer.flush(&source, batch_store.len()).await.unwrap();
let entry = result.entry.unwrap();
// First entry from a freshly-discovered position is 0 (atomic-create
// path discovers the tip via list and starts at FIRST_WAL_ENTRY_POSITION).
// First entry from a freshly-discovered position lands at
// FIRST_WAL_ENTRY_POSITION (atomic-create path discovers the tip
// via list).
assert_eq!(entry.position, FIRST_WAL_ENTRY_POSITION);
assert_eq!(entry.writer_epoch, 1);
assert_eq!(entry.num_batches, 2);
Expand Down Expand Up @@ -1629,6 +1635,7 @@ mod tests {
assert!(hint >= 1, "cursor hint never updated, last={hint}");

// next_position must still resolve to one past the last appended entry.
assert_eq!(tailer.next_position().await.unwrap(), 3);
// Three entries from a fresh shard land at 1, 2, 3, so next is 4.
assert_eq!(tailer.next_position().await.unwrap(), 4);
}
}
Loading
Loading