Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 289 additions & 5 deletions rust/lance/src/dataset/mem_wal/index.rs

Large diffs are not rendered by default.

204 changes: 204 additions & 0 deletions rust/lance/src/dataset/mem_wal/index/pk_key.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! Composite primary-key encoding for MemWAL dedup.
//!
//! A multi-column primary key is reduced to a single order-preserving byte
//! string ([`encode_pk_tuple`]) so the whole tuple is one comparable key:
//! lexicographic byte order equals tuple order, and distinct tuples never
//! collide. Encoded as a `Binary` value, the tuple is indexed directly by a
//! [`super::BTreeMemIndex`] (its byte backend) — both in memory and, after
//! flush, as the on-disk BTree's `Binary` value column — so a probe builds
//! `ScalarValue::Binary(key)` and every layer agrees.
//!
//! Single-column primary keys do **not** use this — they key the typed
//! `BTreeMemIndex` on the column value directly.

use arrow_array::{BinaryArray, RecordBatch};
use datafusion::common::ScalarValue;
use lance_core::{Error, Result};

/// Sign-flip a signed integer to an order-preserving unsigned key (matches the
/// fixed-int BTree backend). Big-endian bytes of the result sort like the value.
#[inline]
fn encode_signed(v: i64) -> u64 {
(v as u64) ^ (1u64 << 63)
}

/// Append an order-preserving encoding of one non-null byte string: each `0x00`
/// is escaped to `0x00 0xFF`, then a `0x00 0x00` terminator is appended. The
/// terminator sorts before any escaped content, so a prefix orders before its
/// extensions and no value can forge a column boundary.
fn encode_bytes(out: &mut Vec<u8>, bytes: &[u8]) {
for &b in bytes {
out.push(b);
if b == 0x00 {
out.push(0xFF);
}
}
out.extend_from_slice(&[0x00, 0x00]);
}

/// Append the order-preserving encoding of a single PK column value. A leading
/// tag (`0x00` null / `0x01` non-null) makes nulls sort first and keeps the
/// per-column encoding self-delimiting (fixed-width for ints, terminated for
/// bytes), so concatenating columns stays injective and order-preserving.
fn encode_value(out: &mut Vec<u8>, value: &ScalarValue) -> Result<()> {
if value.is_null() {
out.push(0x00);
return Ok(());
}
out.push(0x01);
macro_rules! be_signed {
($v:expr) => {
out.extend_from_slice(&encode_signed($v as i64).to_be_bytes())
};
}
match value {
ScalarValue::Int8(Some(v)) => be_signed!(*v),
ScalarValue::Int16(Some(v)) => be_signed!(*v),
ScalarValue::Int32(Some(v)) => be_signed!(*v),
ScalarValue::Int64(Some(v)) => be_signed!(*v),
ScalarValue::Date32(Some(v)) => be_signed!(*v),
ScalarValue::Date64(Some(v)) => be_signed!(*v),
ScalarValue::UInt8(Some(v)) => out.extend_from_slice(&(*v as u64).to_be_bytes()),
ScalarValue::UInt16(Some(v)) => out.extend_from_slice(&(*v as u64).to_be_bytes()),
ScalarValue::UInt32(Some(v)) => out.extend_from_slice(&(*v as u64).to_be_bytes()),
ScalarValue::UInt64(Some(v)) => out.extend_from_slice(&v.to_be_bytes()),
ScalarValue::Boolean(Some(b)) => out.push(*b as u8),
ScalarValue::Utf8(Some(s)) | ScalarValue::LargeUtf8(Some(s)) => {
encode_bytes(out, s.as_bytes())
}
ScalarValue::Binary(Some(b))
| ScalarValue::LargeBinary(Some(b))
| ScalarValue::FixedSizeBinary(_, Some(b)) => encode_bytes(out, b),
other => {
return Err(Error::invalid_input(format!(
"Unsupported primary-key column type for composite key: {other:?}"
)));
}
}
Ok(())
}

/// Encode a PK tuple (values in PK column order) to one order-preserving key.
pub fn encode_pk_tuple(values: &[ScalarValue]) -> Result<Vec<u8>> {
let mut out = Vec::with_capacity(values.len() * 9);
for value in values {
encode_value(&mut out, value)?;
}
Ok(out)
}

/// Encode row `row` of `batch`'s PK columns (at `pk_indices`) to one key.
fn encode_pk_row(batch: &RecordBatch, pk_indices: &[usize], row: usize) -> Result<Vec<u8>> {
let mut out = Vec::with_capacity(pk_indices.len() * 9);
for &col in pk_indices {
let value = ScalarValue::try_from_array(batch.column(col), row)?;
encode_value(&mut out, &value)?;
}
Ok(out)
}

/// Encode every row of `batch`'s PK columns (at `pk_indices`) into a `Binary`
/// column of order-preserving composite keys — the form a [`super::BTreeMemIndex`]
/// indexes directly (its byte backend), so the composite PK reuses the same
/// index as a single-column one.
pub fn encode_pk_batch(batch: &RecordBatch, pk_indices: &[usize]) -> Result<BinaryArray> {
let mut keys: Vec<Vec<u8>> = Vec::with_capacity(batch.num_rows());
for row in 0..batch.num_rows() {
keys.push(encode_pk_row(batch, pk_indices, row)?);
}
Ok(BinaryArray::from_iter_values(keys.iter()))
}

#[cfg(test)]
mod tests {
use super::*;
use arrow_array::{Int32Array, StringArray};
use arrow_schema::{DataType, Field, Schema};
use std::sync::Arc;

fn tuple(a: i32, b: &str) -> Vec<ScalarValue> {
vec![ScalarValue::Int32(Some(a)), ScalarValue::from(b)]
}

#[test]
fn encoding_is_order_preserving_and_injective() {
// Sorting tuples by their encoding must match tuple order, and distinct
// tuples must produce distinct bytes.
let tuples = [
tuple(1, "a"),
tuple(1, "ab"),
tuple(1, "b"),
tuple(2, "a"),
tuple(-1, "z"),
];
let mut encoded: Vec<(Vec<u8>, &Vec<ScalarValue>)> = tuples
.iter()
.map(|t| (encode_pk_tuple(t).unwrap(), t))
.collect();
encoded.sort_by(|x, y| x.0.cmp(&y.0));
let order: Vec<_> = encoded.iter().map(|(_, t)| (*t).clone()).collect();
// -1 < 1 < 2; within id=1, "a" < "ab" < "b".
assert_eq!(
order,
vec![
tuple(-1, "z"),
tuple(1, "a"),
tuple(1, "ab"),
tuple(1, "b"),
tuple(2, "a"),
]
);
// Injective: 5 distinct tuples → 5 distinct keys.
let mut keys: Vec<Vec<u8>> = tuples.iter().map(|t| encode_pk_tuple(t).unwrap()).collect();
keys.sort();
keys.dedup();
assert_eq!(keys.len(), 5);
}

#[test]
fn null_sorts_first_and_is_distinct() {
let null_a = vec![ScalarValue::Int32(None), ScalarValue::from("a")];
let one_a = tuple(1, "a");
assert!(encode_pk_tuple(&null_a).unwrap() < encode_pk_tuple(&one_a).unwrap());
assert_ne!(
encode_pk_tuple(&null_a).unwrap(),
encode_pk_tuple(&one_a).unwrap()
);
}

#[test]
fn prefix_safety_with_embedded_zero() {
// A string containing 0x00 must not collide with or sort incorrectly
// against a shorter one (escaping + terminator).
let with_zero = vec![ScalarValue::Binary(Some(vec![0x00]))];
let empty = vec![ScalarValue::Binary(Some(vec![]))];
assert!(encode_pk_tuple(&empty).unwrap() < encode_pk_tuple(&with_zero).unwrap());
}

#[test]
fn encode_pk_batch_matches_per_tuple_encoding() {
// Each row of the encoded `Binary` column equals `encode_pk_tuple` of
// that row's PK values — so the column a BTreeMemIndex indexes is exactly
// what a probe builds.
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from(vec![2, 1])),
Arc::new(StringArray::from(vec!["a", "b"])),
],
)
.unwrap();
let encoded = encode_pk_batch(&batch, &[0, 1]).unwrap();
assert_eq!(encoded.value(0), encode_pk_tuple(&tuple(2, "a")).unwrap());
assert_eq!(encoded.value(1), encode_pk_tuple(&tuple(1, "b")).unwrap());
// (1,"b") encodes below (2,"a").
assert!(encoded.value(1) < encoded.value(0));
}
}
16 changes: 16 additions & 0 deletions rust/lance/src/dataset/mem_wal/memtable/batch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,22 @@ impl BatchStore {
(0..end).collect()
}

/// The inclusive maximum visible *row* position at `max_visible_batch_position`,
/// or `None` when no rows are visible. The visible batches are the committed
/// prefix `[0, last_visible_idx]`; each batch carries its cumulative
/// `row_offset`, so this is the end of the last visible batch minus one.
/// Used to bound MVCC seeks against the maintained PK-position index.
pub fn max_visible_row(&self, max_visible_batch_position: usize) -> Option<u64> {
let len = self.committed_len.load(Ordering::Acquire);
if len == 0 {
return None;
}
let last_visible_idx = max_visible_batch_position.min(len - 1);
let last = self.get(last_visible_idx)?;
let visible_end = last.row_offset + last.num_rows as u64; // exclusive
visible_end.checked_sub(1)
}

/// Check if a specific batch is visible at a given visibility position.
#[inline]
pub fn is_batch_visible(
Expand Down
148 changes: 148 additions & 0 deletions rust/lance/src/dataset/mem_wal/memtable/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,10 @@ impl MemTableFlusher {
all_indexes.extend(fts_indexes);
}

// Write the standalone primary-key dedup index (sidecar, not a manifest
// index — the block-list opens it directly by path).
self.create_pk_index(&gen_path, memtable.indexes()).await?;

// Write a single manifest that records the fragments, the
// within-generation deletion vector, and all indexes, overwriting the
// data-only v1 manifest created by Dataset::write.
Expand Down Expand Up @@ -543,6 +547,49 @@ impl MemTableFlusher {
Ok(created_indexes)
}

/// Write the standalone primary-key dedup index for this generation.
///
/// Unlike user indexes, this is a **sidecar**: it is not registered in the
/// manifest. The block-list opens it directly by path
/// ([`pk_index_path`]) and probes it with `Equals`. Single-column primary
/// keys index the typed value; composite keys index the order-preserving
/// `Binary` encoded tuple (see [`super::super::index::encode_pk_tuple`]).
/// Row positions line up 1:1 with the forward-written data file, so they are
/// the flushed row ids directly. No-op without a primary-key index.
async fn create_pk_index(
&self,
gen_path: &Path,
mem_indexes: Option<&super::super::index::IndexStore>,
) -> Result<()> {
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use lance_index::scalar::btree::train_btree_index;
use lance_index::scalar::lance_format::LanceIndexStore;

use crate::dataset::mem_wal::util::pk_index_path;

let Some(registry) = mem_indexes else {
return Ok(());
};
let batches = registry.pk_training_batches(8192)?;
if batches.is_empty() {
return Ok(());
}

let schema = batches[0].schema();
let store = LanceIndexStore::new(
self.object_store.clone(),
pk_index_path(gen_path),
Arc::new(LanceCache::no_cache()),
);
let stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
schema,
futures::stream::iter(batches.into_iter().map(Ok)),
));
train_btree_index(stream, &store, 8192, None, None).await?;
Ok(())
}

/// Create FTS (Full-Text Search) indexes from in-memory data (uncommitted).
///
/// Writes the FTS index files and returns index metadata without committing.
Expand Down Expand Up @@ -1227,6 +1274,107 @@ mod tests {
assert_eq!(rows.get(&3), Some(&"c2".to_string()));
}

/// Flushing a memtable with a primary-key index writes a standalone sidecar
/// BTree at `{gen}/_pk_index` that the block-list can reopen by path and
/// probe by value — including for a within-gen-superseded PK (existence,
/// not visibility).
#[tokio::test]
async fn flushed_pk_index_sidecar_is_probeable() {
use lance_core::cache::LanceCache;
use lance_index::metrics::NoOpMetricsCollector;
use lance_index::registry::IndexPluginRegistry;
use lance_index::scalar::lance_format::LanceIndexStore;
use lance_index::scalar::{SargableQuery, SearchResult};

use super::super::super::index::IndexStore;
use crate::dataset::mem_wal::util::pk_index_path;
use datafusion::common::ScalarValue;

let (store, base_path, _base_uri, _temp_dir) = create_local_store().await;
let shard_id = Uuid::new_v4();
let manifest_store = Arc::new(ShardManifestStore::new(
store.clone(),
&base_path,
shard_id,
2,
));
let (epoch, _manifest) = manifest_store.claim_epoch(0).await.unwrap();

// Primary-key index on `id`, no user indexes.
let schema = create_pk_schema();
let mut memtable = MemTable::new(schema.clone(), 1, vec![0]).unwrap();
let mut registry = IndexStore::new();
registry.enable_pk_index(&[("id".to_string(), 0)]);
memtable.set_indexes(registry);

// id=1 updated in-gen (a -> a2); id=2 unique.
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 1])),
Arc::new(StringArray::from(vec!["a", "b", "a2"])),
],
)
.unwrap();
let frag_id = memtable.insert(batch).await.unwrap();
memtable.mark_wal_flushed(&[frag_id], 1, &[0]);

let flusher = MemTableFlusher::new(
store.clone(),
base_path.clone(),
_base_uri.clone(),
shard_id,
manifest_store.clone(),
);
let result = flusher
.flush_with_indexes(&memtable, epoch, &[], 1)
.await
.unwrap();

// Reopen the sidecar directly by path (the block-list's route).
let gen_path = base_path
.clone()
.join("_mem_wal")
.join(shard_id.to_string())
.join(result.generation.path.as_str());
let index_store = Arc::new(LanceIndexStore::new(
store.clone(),
pk_index_path(&gen_path),
Arc::new(LanceCache::no_cache()),
));
let registry = IndexPluginRegistry::with_default_plugins();
let plugin = registry.get_plugin_by_name("BTree").unwrap();
let details =
prost_types::Any::from_msg(&lance_index::pbold::BTreeIndexDetails::default()).unwrap();
let index = plugin
.load_index(index_store, &details, None, &LanceCache::no_cache())
.await
.unwrap();

let contains = |id: i32| {
let index = index.clone();
async move {
let result = index
.search(
&SargableQuery::Equals(ScalarValue::Int32(Some(id))),
&NoOpMetricsCollector,
)
.await
.unwrap();
match result {
SearchResult::Exact(s) | SearchResult::AtMost(s) | SearchResult::AtLeast(s) => {
!s.is_empty()
}
}
}
};
// Both PKs present (id=1 even though its first version was superseded);
// an absent PK is not.
assert!(contains(1).await);
assert!(contains(2).await);
assert!(!contains(99).await);
}

/// Covers `finalize_generation` writing both a deletion vector *and*
/// indexes into the same manifest — the deletion-only and index-only
/// paths are exercised by sibling tests.
Expand Down
Loading
Loading