From 18074a2aa51dfe800eb0405c27c94259a5d430d3 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Tue, 5 May 2026 14:35:45 -0700 Subject: [PATCH 1/2] feat(fts): implement CacheCodec for posting lists and positions OSS-741. Follows lance#6223. PostingListKey and PositionKey gain a codec mirroring partition_serde.rs (variant tag + JSON header + Arrow IPC streams). SharedPositionStream is backed by bytes::Bytes so the cache read path is end-to-end zero-copy. --- rust/lance-index/src/scalar/inverted.rs | 1 + .../src/scalar/inverted/cache_codec.rs | 665 ++++++++++++++++++ rust/lance-index/src/scalar/inverted/index.rs | 39 +- rust/lance/src/dataset/tests/dataset_index.rs | 240 +++++++ 4 files changed, 934 insertions(+), 11 deletions(-) create mode 100644 rust/lance-index/src/scalar/inverted/cache_codec.rs diff --git a/rust/lance-index/src/scalar/inverted.rs b/rust/lance-index/src/scalar/inverted.rs index 57be7875c66..448861c5c0c 100644 --- a/rust/lance-index/src/scalar/inverted.rs +++ b/rust/lance-index/src/scalar/inverted.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors pub mod builder; +mod cache_codec; mod encoding; mod index; mod iter; diff --git a/rust/lance-index/src/scalar/inverted/cache_codec.rs b/rust/lance-index/src/scalar/inverted/cache_codec.rs new file mode 100644 index 00000000000..fb7512d013c --- /dev/null +++ b/rust/lance-index/src/scalar/inverted/cache_codec.rs @@ -0,0 +1,665 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Cache codec impls for FTS index entries. +//! +//! Serializes [`PostingList`] and [`Positions`] cache values for persistent +//! cache backends. The format is a small variant tag plus a JSON header for +//! scalar metadata, with Arrow-backed payload sections written as zero-copy +//! Arrow IPC streams via [`lance_arrow::ipc`]. The raw byte buffer inside +//! [`SharedPositionStream`] is written via [`write_len_prefixed_bytes`] and +//! read back via [`read_len_prefixed_bytes_at`] -- both zero-copy slices into +//! the input `Bytes` allocation. +//! +//! This is the FTS counterpart of `partition_serde.rs` for vector indices. + +use std::io::Write; +use std::sync::Arc; + +use arrow_array::cast::AsArray; +use arrow_array::types::{Float32Type, UInt32Type, UInt64Type}; +use arrow_array::{ + Array, Float32Array, LargeBinaryArray, ListArray, RecordBatch, UInt32Array, UInt64Array, +}; +use arrow_schema::{DataType, Field, Schema}; +use bytes::Bytes; +use lance_arrow::ipc::{ + read_ipc_stream_single_at, read_len_prefixed_bytes_at, write_ipc_stream, + write_len_prefixed_bytes, +}; +use lance_core::cache::CacheCodecImpl; +use lance_core::{Error, Result}; +use serde::{Deserialize, Serialize}; + +use super::index::{ + CompressedPositionStorage, CompressedPostingList, PlainPostingList, PositionStreamCodec, + Positions, PostingList, PostingTailCodec, SharedPositionStream, +}; + +// --------------------------------------------------------------------------- +// Tags +// --------------------------------------------------------------------------- + +const POSTING_VARIANT_PLAIN: u8 = 0; +const POSTING_VARIANT_COMPRESSED: u8 = 1; + +const POSITIONS_TAG_NONE: u8 = 0; +const POSITIONS_TAG_LEGACY: u8 = 1; +const POSITIONS_TAG_SHARED: u8 = 2; + +const POSTING_TAIL_CODEC_FIXED32: u8 = 0; +const POSTING_TAIL_CODEC_VARINT_DELTA: u8 = 1; + +const POSITION_STREAM_CODEC_VARINT_DOC_DELTA: u8 = 0; +const POSITION_STREAM_CODEC_PACKED_DELTA: u8 = 1; + +// --------------------------------------------------------------------------- +// Codec enum byte mappings +// --------------------------------------------------------------------------- + +fn posting_tail_codec_to_u8(c: PostingTailCodec) -> u8 { + match c { + PostingTailCodec::Fixed32 => POSTING_TAIL_CODEC_FIXED32, + PostingTailCodec::VarintDelta => POSTING_TAIL_CODEC_VARINT_DELTA, + } +} + +fn u8_to_posting_tail_codec(v: u8) -> Result { + match v { + POSTING_TAIL_CODEC_FIXED32 => Ok(PostingTailCodec::Fixed32), + POSTING_TAIL_CODEC_VARINT_DELTA => Ok(PostingTailCodec::VarintDelta), + _ => Err(Error::io(format!("unknown posting tail codec: {v}"))), + } +} + +fn position_stream_codec_to_u8(c: PositionStreamCodec) -> u8 { + match c { + PositionStreamCodec::VarintDocDelta => POSITION_STREAM_CODEC_VARINT_DOC_DELTA, + PositionStreamCodec::PackedDelta => POSITION_STREAM_CODEC_PACKED_DELTA, + } +} + +fn u8_to_position_stream_codec(v: u8) -> Result { + match v { + POSITION_STREAM_CODEC_VARINT_DOC_DELTA => Ok(PositionStreamCodec::VarintDocDelta), + POSITION_STREAM_CODEC_PACKED_DELTA => Ok(PositionStreamCodec::PackedDelta), + _ => Err(Error::io(format!("unknown position stream codec: {v}"))), + } +} + +// --------------------------------------------------------------------------- +// Header / tag I/O helpers (mirrors partition_serde.rs) +// --------------------------------------------------------------------------- + +fn write_json_header(writer: &mut dyn Write, header: &impl Serialize) -> Result<()> { + let bytes = serde_json::to_vec(header)?; + write_len_prefixed_bytes(writer, &bytes)?; + Ok(()) +} + +fn read_json_header(data: &Bytes, offset: &mut usize) -> Result { + let bytes = read_len_prefixed_bytes_at(data, offset).map_err(|e| Error::io(e.to_string()))?; + serde_json::from_slice(&bytes) + .map_err(|e| Error::io(format!("failed to deserialize cache header: {e}"))) +} + +fn write_u8(writer: &mut dyn Write, value: u8) -> Result<()> { + writer + .write_all(&[value]) + .map_err(|e| Error::io(format!("failed to write tag byte: {e}"))) +} + +fn read_u8(data: &Bytes, offset: &mut usize) -> Result { + let bytes = data.as_ref(); + if *offset >= bytes.len() { + return Err(Error::io( + "truncated cache entry: missing tag byte".to_string(), + )); + } + let v = bytes[*offset]; + *offset += 1; + Ok(v) +} + +// --------------------------------------------------------------------------- +// Position storage serde (shared by PostingList variants and Positions) +// --------------------------------------------------------------------------- + +const POSITION_LIST_COLUMN: &str = "position_list"; +const BLOCK_OFFSETS_COLUMN: &str = "block_offsets"; +const ROW_IDS_COLUMN: &str = "row_ids"; +const FREQUENCIES_COLUMN: &str = "frequencies"; +const BLOCKS_COLUMN: &str = "blocks"; + +#[derive(Serialize, Deserialize)] +struct SharedPositionsHeader { + codec: u8, +} + +fn write_position_storage( + writer: &mut dyn Write, + storage: &CompressedPositionStorage, +) -> Result<()> { + match storage { + CompressedPositionStorage::LegacyPerDoc(list) => { + write_u8(writer, POSITIONS_TAG_LEGACY)?; + let schema = Arc::new(Schema::new(vec![Field::new( + POSITION_LIST_COLUMN, + list.data_type().clone(), + list.is_nullable(), + )])); + let batch = RecordBatch::try_new(schema, vec![Arc::new(list.clone())])?; + write_ipc_stream(&batch, writer)?; + } + CompressedPositionStorage::SharedStream(stream) => { + write_u8(writer, POSITIONS_TAG_SHARED)?; + let header = SharedPositionsHeader { + codec: position_stream_codec_to_u8(stream.codec()), + }; + write_json_header(writer, &header)?; + + let offsets = UInt32Array::from(stream.block_offsets().to_vec()); + let schema = Arc::new(Schema::new(vec![Field::new( + BLOCK_OFFSETS_COLUMN, + DataType::UInt32, + false, + )])); + let batch = RecordBatch::try_new(schema, vec![Arc::new(offsets)])?; + write_ipc_stream(&batch, writer)?; + + write_len_prefixed_bytes(writer, stream.bytes())?; + } + } + Ok(()) +} + +fn read_position_storage( + data: &Bytes, + offset: &mut usize, + tag: u8, +) -> Result { + match tag { + POSITIONS_TAG_LEGACY => { + let batch = + read_ipc_stream_single_at(data, offset).map_err(|e| Error::io(e.to_string()))?; + let list = batch + .column(0) + .as_any() + .downcast_ref::() + .ok_or_else(|| Error::io("legacy position column is not a ListArray".to_string()))? + .clone(); + Ok(CompressedPositionStorage::LegacyPerDoc(list)) + } + POSITIONS_TAG_SHARED => { + let header: SharedPositionsHeader = read_json_header(data, offset)?; + let codec = u8_to_position_stream_codec(header.codec)?; + + let batch = + read_ipc_stream_single_at(data, offset).map_err(|e| Error::io(e.to_string()))?; + let block_offsets = batch + .column(0) + .as_primitive::() + .values() + .to_vec(); + + // Zero copy: read_len_prefixed_bytes_at returns a Bytes slice + // backed by the same allocation as `data`, and SharedPositionStream + // now stores its byte buffer as Bytes -- no copy on read. + let bytes = + read_len_prefixed_bytes_at(data, offset).map_err(|e| Error::io(e.to_string()))?; + + Ok(CompressedPositionStorage::SharedStream( + SharedPositionStream::new(codec, block_offsets, bytes), + )) + } + other => Err(Error::io(format!("unknown positions tag: {other}"))), + } +} + +// --------------------------------------------------------------------------- +// PostingList codec +// --------------------------------------------------------------------------- + +#[derive(Serialize, Deserialize)] +struct PlainPostingHeader { + max_score: Option, +} + +#[derive(Serialize, Deserialize)] +struct CompressedPostingHeader { + max_score: f32, + length: u32, + posting_tail_codec: u8, +} + +impl CacheCodecImpl for PostingList { + fn serialize(&self, writer: &mut dyn Write) -> Result<()> { + match self { + Self::Plain(plain) => { + write_u8(writer, POSTING_VARIANT_PLAIN)?; + serialize_plain(writer, plain) + } + Self::Compressed(compressed) => { + write_u8(writer, POSTING_VARIANT_COMPRESSED)?; + serialize_compressed(writer, compressed) + } + } + } + + fn deserialize(data: &Bytes) -> Result { + let mut offset = 0; + let variant = read_u8(data, &mut offset)?; + match variant { + POSTING_VARIANT_PLAIN => Ok(Self::Plain(deserialize_plain(data, &mut offset)?)), + POSTING_VARIANT_COMPRESSED => { + Ok(Self::Compressed(deserialize_compressed(data, &mut offset)?)) + } + other => Err(Error::io(format!("unknown PostingList variant: {other}"))), + } + } +} + +fn serialize_plain(writer: &mut dyn Write, plain: &PlainPostingList) -> Result<()> { + let header = PlainPostingHeader { + max_score: plain.max_score, + }; + write_json_header(writer, &header)?; + + let row_ids = UInt64Array::new(plain.row_ids.clone(), None); + let frequencies = Float32Array::new(plain.frequencies.clone(), None); + let schema = Arc::new(Schema::new(vec![ + Field::new(ROW_IDS_COLUMN, DataType::UInt64, false), + Field::new(FREQUENCIES_COLUMN, DataType::Float32, false), + ])); + let batch = RecordBatch::try_new(schema, vec![Arc::new(row_ids), Arc::new(frequencies)])?; + write_ipc_stream(&batch, writer)?; + + match &plain.positions { + Some(list) => { + // Plain postings can only carry per-doc legacy positions; reuse + // the shared encoder. + write_position_storage( + writer, + &CompressedPositionStorage::LegacyPerDoc(list.clone()), + )?; + } + None => write_u8(writer, POSITIONS_TAG_NONE)?, + } + Ok(()) +} + +fn deserialize_plain(data: &Bytes, offset: &mut usize) -> Result { + let header: PlainPostingHeader = read_json_header(data, offset)?; + + let batch = read_ipc_stream_single_at(data, offset).map_err(|e| Error::io(e.to_string()))?; + let row_ids = batch + .column(0) + .as_primitive::() + .values() + .clone(); + let frequencies = batch + .column(1) + .as_primitive::() + .values() + .clone(); + + let positions_tag = read_u8(data, offset)?; + let positions = match positions_tag { + POSITIONS_TAG_NONE => None, + POSITIONS_TAG_LEGACY => match read_position_storage(data, offset, positions_tag)? { + CompressedPositionStorage::LegacyPerDoc(list) => Some(list), + CompressedPositionStorage::SharedStream(_) => { + unreachable!("shared stream tag was read as legacy variant (this is a bug)") + } + }, + other => { + return Err(Error::io(format!( + "Plain posting list cannot have positions tag {other}" + ))); + } + }; + + Ok(PlainPostingList::new( + row_ids, + frequencies, + header.max_score, + positions, + )) +} + +fn serialize_compressed(writer: &mut dyn Write, posting: &CompressedPostingList) -> Result<()> { + let header = CompressedPostingHeader { + max_score: posting.max_score, + length: posting.length, + posting_tail_codec: posting_tail_codec_to_u8(posting.posting_tail_codec), + }; + write_json_header(writer, &header)?; + + let schema = Arc::new(Schema::new(vec![Field::new( + BLOCKS_COLUMN, + DataType::LargeBinary, + false, + )])); + let batch = RecordBatch::try_new(schema, vec![Arc::new(posting.blocks.clone())])?; + write_ipc_stream(&batch, writer)?; + + match &posting.positions { + Some(storage) => write_position_storage(writer, storage)?, + None => write_u8(writer, POSITIONS_TAG_NONE)?, + } + Ok(()) +} + +fn deserialize_compressed(data: &Bytes, offset: &mut usize) -> Result { + let header: CompressedPostingHeader = read_json_header(data, offset)?; + let posting_tail_codec = u8_to_posting_tail_codec(header.posting_tail_codec)?; + + let batch = read_ipc_stream_single_at(data, offset).map_err(|e| Error::io(e.to_string()))?; + let blocks = batch + .column(0) + .as_any() + .downcast_ref::() + .ok_or_else(|| Error::io("blocks column is not a LargeBinaryArray".to_string()))? + .clone(); + + let positions_tag = read_u8(data, offset)?; + let positions = if positions_tag == POSITIONS_TAG_NONE { + None + } else { + Some(read_position_storage(data, offset, positions_tag)?) + }; + + Ok(CompressedPostingList::new( + blocks, + header.max_score, + header.length, + posting_tail_codec, + positions, + )) +} + +// --------------------------------------------------------------------------- +// Positions codec +// --------------------------------------------------------------------------- + +impl CacheCodecImpl for Positions { + fn serialize(&self, writer: &mut dyn Write) -> Result<()> { + write_position_storage(writer, &self.0) + } + + fn deserialize(data: &Bytes) -> Result { + let mut offset = 0; + let tag = read_u8(data, &mut offset)?; + if tag == POSITIONS_TAG_NONE { + return Err(Error::io( + "Positions cache entry cannot encode the None variant".to_string(), + )); + } + let storage = read_position_storage(data, &mut offset, tag)?; + Ok(Self(storage)) + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use arrow::buffer::ScalarBuffer; + use arrow_array::LargeBinaryArray; + use arrow_array::builder::{Int32Builder, ListBuilder}; + use bytes::Bytes; + use lance_core::cache::CacheCodecImpl; + + use super::super::index::{ + CompressedPositionStorage, CompressedPostingList, PlainPostingList, PositionStreamCodec, + Positions, PostingList, PostingTailCodec, SharedPositionStream, + }; + + fn legacy_positions(rows: &[&[i32]]) -> arrow_array::ListArray { + let mut builder = ListBuilder::new(Int32Builder::new()); + for row in rows { + for v in *row { + builder.values().append_value(*v); + } + builder.append(true); + } + builder.finish() + } + + fn assert_plain_eq(a: &PlainPostingList, b: &PlainPostingList) { + assert_eq!(a.row_ids.as_ref(), b.row_ids.as_ref()); + assert_eq!(a.frequencies.as_ref(), b.frequencies.as_ref()); + assert_eq!(a.max_score, b.max_score); + match (&a.positions, &b.positions) { + (None, None) => {} + (Some(x), Some(y)) => assert_eq!(x, y), + _ => panic!("positions mismatch"), + } + } + + fn assert_position_storage_eq(a: &CompressedPositionStorage, b: &CompressedPositionStorage) { + match (a, b) { + ( + CompressedPositionStorage::LegacyPerDoc(x), + CompressedPositionStorage::LegacyPerDoc(y), + ) => assert_eq!(x, y), + ( + CompressedPositionStorage::SharedStream(x), + CompressedPositionStorage::SharedStream(y), + ) => { + assert_eq!(x.codec(), y.codec()); + assert_eq!(x.block_offsets(), y.block_offsets()); + assert_eq!(x.bytes(), y.bytes()); + } + _ => panic!("position storage variant mismatch"), + } + } + + fn roundtrip_posting_list(entry: &PostingList) -> PostingList { + let mut buf = Vec::new(); + entry.serialize(&mut buf).unwrap(); + PostingList::deserialize(&Bytes::from(buf)).unwrap() + } + + fn roundtrip_positions(entry: &Positions) -> Positions { + let mut buf = Vec::new(); + entry.serialize(&mut buf).unwrap(); + Positions::deserialize(&Bytes::from(buf)).unwrap() + } + + fn assert_slice_points_into_bytes(slice: &[u8], bytes: &Bytes) { + let slice_start = slice.as_ptr() as usize; + let slice_end = slice_start + slice.len(); + let bytes_start = bytes.as_ptr() as usize; + let bytes_end = bytes_start + bytes.len(); + assert!( + slice_start >= bytes_start && slice_end <= bytes_end, + "slice [{slice_start:#x}, {slice_end:#x}) should point into bytes \ + [{bytes_start:#x}, {bytes_end:#x})", + ); + } + + #[test] + fn plain_posting_list_no_positions_roundtrip() { + let plain = PlainPostingList::new( + ScalarBuffer::from(vec![10u64, 20, 30]), + ScalarBuffer::from(vec![0.5f32, 1.0, 1.5]), + Some(2.0), + None, + ); + let entry = PostingList::Plain(plain.clone()); + match roundtrip_posting_list(&entry) { + PostingList::Plain(restored) => assert_plain_eq(&plain, &restored), + PostingList::Compressed(_) => panic!("expected Plain variant"), + } + } + + #[test] + fn plain_posting_list_with_positions_roundtrip() { + let plain = PlainPostingList::new( + ScalarBuffer::from(vec![1u64, 2]), + ScalarBuffer::from(vec![1.0f32, 1.0]), + None, + Some(legacy_positions(&[&[3, 7], &[1, 4, 9]])), + ); + let entry = PostingList::Plain(plain.clone()); + match roundtrip_posting_list(&entry) { + PostingList::Plain(restored) => assert_plain_eq(&plain, &restored), + PostingList::Compressed(_) => panic!("expected Plain variant"), + } + } + + #[test] + fn compressed_posting_list_no_positions_roundtrip() { + // Two synthetic block payloads -- content is opaque to the codec. + let blocks = LargeBinaryArray::from_opt_vec(vec![ + Some(&[1u8, 2, 3, 4, 5][..]), + Some(&[6, 7, 8, 9, 10][..]), + ]); + let posting = + CompressedPostingList::new(blocks, 3.5, 42, PostingTailCodec::VarintDelta, None); + let entry = PostingList::Compressed(posting.clone()); + match roundtrip_posting_list(&entry) { + PostingList::Compressed(restored) => { + assert_eq!(restored.max_score, posting.max_score); + assert_eq!(restored.length, posting.length); + assert_eq!(restored.posting_tail_codec, posting.posting_tail_codec); + assert_eq!(restored.blocks, posting.blocks); + assert!(restored.positions.is_none()); + } + PostingList::Plain(_) => panic!("expected Compressed variant"), + } + } + + #[test] + fn compressed_posting_list_legacy_positions_roundtrip() { + let blocks = LargeBinaryArray::from_opt_vec(vec![Some(&[1u8, 2, 3][..])]); + let posting = CompressedPostingList::new( + blocks, + 1.25, + 5, + PostingTailCodec::Fixed32, + Some(CompressedPositionStorage::LegacyPerDoc(legacy_positions( + &[&[0, 4, 8]], + ))), + ); + let entry = PostingList::Compressed(posting.clone()); + match roundtrip_posting_list(&entry) { + PostingList::Compressed(restored) => { + assert_eq!(restored.posting_tail_codec, posting.posting_tail_codec); + assert_position_storage_eq( + restored.positions.as_ref().unwrap(), + posting.positions.as_ref().unwrap(), + ); + } + PostingList::Plain(_) => panic!("expected Compressed variant"), + } + } + + #[test] + fn compressed_posting_list_shared_stream_roundtrip() { + for codec in [ + PositionStreamCodec::VarintDocDelta, + PositionStreamCodec::PackedDelta, + ] { + let blocks = LargeBinaryArray::from_opt_vec(vec![Some(&[9u8; 16][..])]); + let stream = SharedPositionStream::new( + codec, + vec![0u32, 4, 11], + Bytes::from((0u8..32).collect::>()), + ); + let posting = CompressedPostingList::new( + blocks, + 7.0, + 3, + PostingTailCodec::VarintDelta, + Some(CompressedPositionStorage::SharedStream(stream)), + ); + let entry = PostingList::Compressed(posting.clone()); + match roundtrip_posting_list(&entry) { + PostingList::Compressed(restored) => { + assert_position_storage_eq( + restored.positions.as_ref().unwrap(), + posting.positions.as_ref().unwrap(), + ); + } + PostingList::Plain(_) => panic!("expected Compressed variant"), + } + } + } + + #[test] + fn shared_stream_deserialize_borrows_from_input_bytes() { + let blocks = LargeBinaryArray::from_opt_vec(vec![Some(&[9u8; 16][..])]); + let expected_stream = SharedPositionStream::new( + PositionStreamCodec::PackedDelta, + vec![0u32, 4, 11], + Bytes::from((0u8..32).collect::>()), + ); + let posting = CompressedPostingList::new( + blocks, + 7.0, + 3, + PostingTailCodec::VarintDelta, + Some(CompressedPositionStorage::SharedStream( + expected_stream.clone(), + )), + ); + let mut buf = Vec::new(); + PostingList::Compressed(posting) + .serialize(&mut buf) + .unwrap(); + let serialized = Bytes::from(buf); + + let restored = PostingList::deserialize(&serialized).unwrap(); + let PostingList::Compressed(restored) = restored else { + panic!("expected Compressed variant"); + }; + let Some(CompressedPositionStorage::SharedStream(stream)) = restored.positions else { + panic!("expected shared-stream positions"); + }; + + assert_eq!(stream.codec(), expected_stream.codec()); + assert_eq!(stream.block_offsets(), expected_stream.block_offsets()); + assert_eq!(stream.bytes(), expected_stream.bytes()); + assert_slice_points_into_bytes(stream.bytes(), &serialized); + } + + #[test] + fn positions_legacy_roundtrip() { + let positions = Positions(CompressedPositionStorage::LegacyPerDoc(legacy_positions( + &[&[1, 2, 3], &[], &[10]], + ))); + let restored = roundtrip_positions(&positions); + assert_position_storage_eq(&positions.0, &restored.0); + } + + #[test] + fn positions_shared_stream_roundtrip() { + let stream = SharedPositionStream::new( + PositionStreamCodec::PackedDelta, + vec![0u32, 8], + Bytes::from(vec![0xAAu8; 24]), + ); + let positions = Positions(CompressedPositionStorage::SharedStream(stream)); + let restored = roundtrip_positions(&positions); + assert_position_storage_eq(&positions.0, &restored.0); + } + + #[test] + fn truncated_data_errors() { + let plain = PlainPostingList::new( + ScalarBuffer::from(vec![1u64]), + ScalarBuffer::from(vec![1.0f32]), + None, + None, + ); + let entry = PostingList::Plain(plain); + let mut buf = Vec::new(); + entry.serialize(&mut buf).unwrap(); + buf.truncate(buf.len() / 2); + assert!(PostingList::deserialize(&Bytes::from(buf)).is_err()); + } +} diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index cfec52b0692..424887da28e 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -39,7 +39,7 @@ use fst::{Automaton, IntoStreamer, Streamer}; use futures::{FutureExt, Stream, StreamExt, TryStreamExt, stream}; use itertools::Itertools; use lance_arrow::{RecordBatchExt, iter_str_array}; -use lance_core::cache::{CacheKey, LanceCache, WeakLanceCache}; +use lance_core::cache::{CacheCodec, CacheKey, LanceCache, WeakLanceCache}; use lance_core::error::{DataFusionResult, LanceOptionExt}; use lance_core::utils::mask::{RowAddrMask, RowAddrTreeMap}; use lance_core::utils::tokio::{get_num_compute_intensive_cpus, spawn_cpu}; @@ -1927,10 +1927,12 @@ impl PostingListReader { Error::Schema { .. } => Error::invalid_input("position is not found but required for phrase queries, try recreating the index with position".to_owned()), e => e, })?; - let bytes = batch[COMPRESSED_POSITION_COL] - .as_binary::() - .value(0) - .to_vec(); + let bytes = bytes::Bytes::from( + batch[COMPRESSED_POSITION_COL] + .as_binary::() + .value(0) + .to_vec(), + ); let block_offsets = batch[POSITION_BLOCK_OFFSET_COL] .as_list::() .value(0) @@ -1985,7 +1987,7 @@ impl PostingListReader { /// New type just to allow Positions implement DeepSizeOf so it can be put /// in the cache. #[derive(Clone)] -pub struct Positions(CompressedPositionStorage); +pub struct Positions(pub(super) CompressedPositionStorage); impl DeepSizeOf for Positions { fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize { @@ -2014,6 +2016,10 @@ impl CacheKey for PostingListKey { fn type_name() -> &'static str { "PostingList" } + + fn codec() -> Option { + Some(CacheCodec::from_impl::()) + } } #[derive(Debug, Clone)] @@ -2031,6 +2037,10 @@ impl CacheKey for PositionKey { fn type_name() -> &'static str { "Position" } + + fn codec() -> Option { + Some(CacheCodec::from_impl::()) + } } #[derive(Debug, Clone, PartialEq)] @@ -2052,11 +2062,14 @@ impl DeepSizeOf for CompressedPositionStorage { pub struct SharedPositionStream { codec: PositionStreamCodec, block_offsets: Vec, - bytes: Vec, + // Stored as `Bytes` so that the cache deserialization path can hand + // ownership of an IPC-decoded slice in without copying. Cloning the + // stream is then an `Arc` bump rather than an O(N) buffer copy. + bytes: bytes::Bytes, } impl SharedPositionStream { - pub fn new(codec: PositionStreamCodec, block_offsets: Vec, bytes: Vec) -> Self { + pub fn new(codec: PositionStreamCodec, block_offsets: Vec, bytes: bytes::Bytes) -> Self { Self { codec, block_offsets, @@ -2096,7 +2109,7 @@ impl SharedPositionStream { } pub fn size(&self) -> usize { - self.block_offsets.capacity() * std::mem::size_of::() + self.bytes.capacity() + self.block_offsets.capacity() * std::mem::size_of::() + self.bytes.len() } } @@ -2443,7 +2456,7 @@ impl CompressedPostingList { .as_binary::() .clone(); let positions = if let Some(col) = batch.column_by_name(COMPRESSED_POSITION_COL) { - let bytes = col.as_binary::().value(0).to_vec(); + let bytes = bytes::Bytes::from(col.as_binary::().value(0).to_vec()); let block_offsets = batch[POSITION_BLOCK_OFFSET_COL] .as_list::() .value(0) @@ -2602,7 +2615,11 @@ impl EncodedPositionBlocks { } fn into_stream(self) -> SharedPositionStream { - SharedPositionStream::new(PositionStreamCodec::PackedDelta, self.offsets, self.bytes) + SharedPositionStream::new( + PositionStreamCodec::PackedDelta, + self.offsets, + bytes::Bytes::from(self.bytes), + ) } } diff --git a/rust/lance/src/dataset/tests/dataset_index.rs b/rust/lance/src/dataset/tests/dataset_index.rs index 180b63bfa84..f1e6b033c83 100644 --- a/rust/lance/src/dataset/tests/dataset_index.rs +++ b/rust/lance/src/dataset/tests/dataset_index.rs @@ -1954,6 +1954,246 @@ async fn test_prewarm_index_with_position_validation() { ); } +/// Cache backend that exercises the serialization codec on every insert and +/// returns deserialized entries on every get. Items without a codec fall +/// through to an in-memory passthrough so that non-FTS cache traffic still +/// works during the test. +/// +/// Mirrors the helper in `rust/lance/src/index/vector/ivf/v2.rs` tests; if a +/// third user appears, lift this into a shared test utility. +mod fts_serializing_backend { + use std::collections::HashMap; + use std::pin::Pin; + + use futures::Future; + use lance_core::Result; + use lance_core::cache::{ + CacheBackend, CacheCodec, CacheEntry, InternalCacheKey, MokaCacheBackend, + }; + + type SerializedEntry = (bytes::Bytes, CacheCodec, usize); + + #[derive(Debug)] + pub struct SerializingBackend { + serialized: tokio::sync::Mutex>, + passthrough: MokaCacheBackend, + } + + impl SerializingBackend { + pub fn new() -> Self { + Self { + serialized: tokio::sync::Mutex::new(HashMap::new()), + passthrough: MokaCacheBackend::with_capacity(256 * 1024 * 1024), + } + } + + pub async fn serialized_entry_count(&self) -> usize { + self.serialized.lock().await.len() + } + } + + #[async_trait::async_trait] + impl CacheBackend for SerializingBackend { + async fn get( + &self, + key: &InternalCacheKey, + codec: Option, + ) -> Option { + let guard = self.serialized.lock().await; + if let Some((bytes, stored_codec, _)) = guard.get(key) { + return Some( + stored_codec + .deserialize(&bytes.clone()) + .expect("deserialization should succeed"), + ); + } + drop(guard); + self.passthrough.get(key, codec).await + } + + async fn insert( + &self, + key: &InternalCacheKey, + entry: CacheEntry, + size_bytes: usize, + codec: Option, + ) { + if let Some(codec) = codec { + let mut bytes = Vec::new(); + codec + .serialize(&entry, &mut bytes) + .expect("serialization should succeed"); + self.serialized + .lock() + .await + .insert(key.clone(), (bytes::Bytes::from(bytes), codec, size_bytes)); + } else { + self.passthrough.insert(key, entry, size_bytes, None).await; + } + } + + async fn get_or_insert<'a>( + &self, + key: &InternalCacheKey, + loader: Pin> + Send + 'a>>, + codec: Option, + ) -> Result<(CacheEntry, bool)> { + if let Some(entry) = self.get(key, codec).await { + return Ok((entry, true)); + } + let (entry, size) = loader.await?; + self.insert(key, entry.clone(), size, codec).await; + Ok((entry, false)) + } + + async fn invalidate_prefix(&self, prefix: &str) { + self.serialized + .lock() + .await + .retain(|k, _| !k.starts_with(prefix)); + self.passthrough.invalidate_prefix(prefix).await; + } + + async fn clear(&self) { + self.serialized.lock().await.clear(); + self.passthrough.clear().await; + } + + async fn num_entries(&self) -> usize { + self.serialized.lock().await.len() + self.passthrough.num_entries().await + } + + async fn size_bytes(&self) -> usize { + let serialized: usize = self + .serialized + .lock() + .await + .values() + .map(|(_, _, s)| *s) + .sum(); + serialized + self.passthrough.size_bytes().await + } + } +} + +/// Validates the OSS-741 contract: after FTS prewarm through a serializing +/// cache backend, FTS queries serve results without any further IO. The +/// serializing backend forces every cache hit through the new +/// `CacheCodec` impls, so this also smoke-tests the round-trip path under +/// realistic data shapes (compressed posting blocks + shared position +/// stream when positions are enabled). +#[tokio::test] +async fn test_fts_prewarm_with_serializing_backend_serves_query_with_no_io() { + use lance_io::assert_io_eq; + + use fts_serializing_backend::SerializingBackend; + + let tmpdir = TempStrDir::default(); + let uri = tmpdir.to_owned(); + drop(tmpdir); + + let doc_col: Arc = Arc::new(GenericStringArray::::from(vec![ + "lance search engine", + "lance search with tail", + "phrase query example", + "search query terms", + ])); + let ids = UInt64Array::from_iter_values(0..doc_col.len() as u64); + let batch = RecordBatch::try_new( + arrow_schema::Schema::new(vec![ + arrow_schema::Field::new("doc", doc_col.data_type().to_owned(), true), + arrow_schema::Field::new("id", DataType::UInt64, false), + ]) + .into(), + vec![Arc::new(doc_col) as ArrayRef, Arc::new(ids) as ArrayRef], + ) + .unwrap(); + let schema = batch.schema(); + let batches = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema); + let mut dataset = Dataset::write(batches, &uri, None).await.unwrap(); + dataset + .create_index( + &["doc"], + IndexType::Inverted, + Some("fts_idx".to_owned()), + &InvertedIndexParams::default().with_position(true), + true, + ) + .await + .unwrap(); + + // Re-open the dataset on a session whose cache backend serializes every + // entry through its codec. Set a generous capacity so nothing is evicted + // before we query. + let backend = Arc::new(SerializingBackend::new()); + let session = Arc::new(Session::with_index_cache_backend( + backend.clone(), + 128 * 1024 * 1024, + Arc::new(lance_io::object_store::ObjectStoreRegistry::default()), + )); + let dataset = DatasetBuilder::from_uri(&uri) + .with_session(session) + .load() + .await + .unwrap(); + + // Reset IO counters to isolate prewarm + query traffic from open/load. + dataset.object_store.as_ref().io_stats_incremental(); + + dataset + .prewarm_index_with_options( + "fts_idx", + &PrewarmOptions::Fts(FtsPrewarmOptions::new().with_position(true)), + ) + .await + .unwrap(); + + // The FTS codec must have been exercised. Posting lists and positions + // enter the serialized store; non-FTS entries (e.g. the unsized + // `ScalarIndexCacheKey` for the index itself) legitimately fall through + // to the in-memory passthrough — those cannot have a codec by design. + let serialized_after_prewarm = backend.serialized_entry_count().await; + assert!( + serialized_after_prewarm > 0, + "prewarm should have routed FTS entries (PostingList / Positions) through CacheCodec, \ + but the serializing store was empty" + ); + + // After prewarm, a phrase query (which exercises both posting lists and + // positions, deserializing them from bytes via the codec) must not hit + // disk. + dataset.object_store.as_ref().io_stats_incremental(); + + // Project `_rowid` so the scan does not need to read a data column from + // the dataset's parquet/lance files; the index path alone determines + // whether the FTS cache is doing its job. + let result = dataset + .scan() + .project(&[ROW_ID]) + .unwrap() + .full_text_search(FullTextSearchQuery::new_query( + PhraseQuery::new("lance search".to_owned()).into(), + )) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!( + result.num_rows(), + 2, + "phrase query should still return correct results after deserialization" + ); + + let stats = dataset.object_store.as_ref().io_stats_incremental(); + assert_io_eq!( + stats, + read_iops, + 0, + "FTS query should not perform IO after prewarm; the serializing cache \ + backend must serve every posting list and positions entry from memory" + ); +} + #[tokio::test] async fn test_fts_phrase_query_with_removed_stop_words() { let tmpdir = TempStrDir::default(); From 8cfd2b0fe4e9eb3a492100e91a445fa43dff0fe1 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Wed, 6 May 2026 11:16:15 -0700 Subject: [PATCH 2/2] fix(fts): use as_primitive_opt for cache codec IPC reads The deserialization paths for PlainPostingList and the shared-position storage read primitive columns out of an Arrow IPC stream that was decoded from cache bytes. The bytes can come from a persistent cache backend across versions or from a corrupted entry, so the IPC schema is part of the (untrusted) input rather than something we have already validated. The previous code used as_primitive::(), which panics on type mismatch. Switch to as_primitive_opt + ok_or_else(Error::io) so a malformed cache entry surfaces as an Error::io the caller can log and fall back from instead of aborting the process. This matches the pattern already used in this file for ListArray and LargeBinaryArray columns and aligns with the rust/AGENTS.md guidance to prefer _opt variants when the data type has not been pre-verified. Co-Authored-By: Claude Opus 4.7 (1M context) --- rust/lance-index/src/scalar/inverted/cache_codec.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/rust/lance-index/src/scalar/inverted/cache_codec.rs b/rust/lance-index/src/scalar/inverted/cache_codec.rs index fb7512d013c..080ecbb3d61 100644 --- a/rust/lance-index/src/scalar/inverted/cache_codec.rs +++ b/rust/lance-index/src/scalar/inverted/cache_codec.rs @@ -198,7 +198,8 @@ fn read_position_storage( read_ipc_stream_single_at(data, offset).map_err(|e| Error::io(e.to_string()))?; let block_offsets = batch .column(0) - .as_primitive::() + .as_primitive_opt::() + .ok_or_else(|| Error::io("block_offsets column is not UInt32".to_string()))? .values() .to_vec(); @@ -294,12 +295,14 @@ fn deserialize_plain(data: &Bytes, offset: &mut usize) -> Result() + .as_primitive_opt::() + .ok_or_else(|| Error::io("row_ids column is not UInt64".to_string()))? .values() .clone(); let frequencies = batch .column(1) - .as_primitive::() + .as_primitive_opt::() + .ok_or_else(|| Error::io("frequencies column is not Float32".to_string()))? .values() .clone();