Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions rust/lance-index/src/scalar/inverted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod scorer;
pub mod tokenizer;
mod wand;

use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use arrow_schema::{DataType, Field};
Expand All @@ -24,6 +25,89 @@ pub use lance_tokenizer::Language;
pub use scorer::MemBM25Scorer;
pub use tokenizer::*;

use crate::scalar::inverted::query::{FtsSearchParams, Tokens};

/// Collect the unique terms needed to build a shared BM25 scorer.
///
/// The scorer only needs corpus-level document frequencies, so we keep a
/// deduplicated term list here instead of constructing a full `Tokens`
/// object with positions. When fuzziness is enabled, each segment may
/// contribute additional terms (via `expand_fuzzy_tokens`); the union of
/// those terms is what the global scorer must cover.
fn scorer_terms(
indices: &[Arc<InvertedIndex>],
query_tokens: &Tokens,
params: &FtsSearchParams,
) -> Result<Vec<String>> {
let mut terms = Vec::new();
let mut seen = HashSet::new();

if !matches!(params.fuzziness, Some(n) if n != 0) {
for token in query_tokens {
if seen.insert(token.to_string()) {
terms.push(token.to_string());
}
}
return Ok(terms);
}

for index in indices {
let expanded = index.expand_fuzzy_tokens(query_tokens, params)?;
for idx in 0..expanded.len() {
let token = expanded.get_token(idx);
if seen.insert(token.to_string()) {
terms.push(token.to_string());
}
}
}
Ok(terms)
}

/// Build a shared [`MemBM25Scorer`] across a set of FTS index segments.
///
/// Aggregates each segment's `(total_tokens, num_docs, per_term_doc_freq)`
/// statistics — obtained via [`InvertedIndex::bm25_stats_for_terms`] — into a
/// single corpus-wide scorer, so that BM25 IDF scoring uses *global*
/// statistics rather than per-segment statistics. Computes the union of
/// fuzzy-expanded terms when `params.fuzziness` is set.
///
/// Public as the canonical producer paired with the `with_base_scorer`
/// consumer on FTS exec types: callers holding `Arc<InvertedIndex>` segment
/// handles locally can construct an injectable scorer without reimplementing
/// per-segment stat aggregation, term deduplication, and fuzzy-expansion
/// union. Keeps a single source of truth for BM25 IDF arithmetic across
/// segments.
pub fn build_global_bm25_scorer(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this method need to be made public? Is it to supply a MemBM25Scorer to some of the exec nodes?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is mostly for api completeness since it is paired with with_base_scorer. Keeps a single source of truth for BM25 IDF arithmetic across segments. I could move it back if you prefer

indices: &[Arc<InvertedIndex>],
query_tokens: &Tokens,
params: &FtsSearchParams,
) -> Result<MemBM25Scorer> {
let terms = scorer_terms(indices, query_tokens, params)?;
let first_index = indices.first().ok_or_else(|| {
lance_core::Error::invalid_input("FTS index requires at least one segment")
})?;
let (mut total_tokens, mut num_docs, first_token_docs) =
first_index.bm25_stats_for_terms(&terms);
let mut token_docs = HashMap::with_capacity(terms.len());
for (term, count) in terms.iter().cloned().zip(first_token_docs.into_iter()) {
token_docs.insert(term, count);
}

for index in indices.iter().skip(1) {
let (segment_total_tokens, segment_num_docs, segment_token_docs) =
index.bm25_stats_for_terms(&terms);
total_tokens += segment_total_tokens;
num_docs += segment_num_docs;
for (term, count) in terms.iter().zip(segment_token_docs.into_iter()) {
*token_docs
.get_mut(term)
.expect("global scorer terms should already be initialized") += count;
}
}

Ok(MemBM25Scorer::new(total_tokens, num_docs, token_docs))
}
Comment thread
vivek-bharathan marked this conversation as resolved.

use lance_core::Error;

use crate::pbold;
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/scalar/inverted/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde::ser::SerializeMap;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FtsSearchParams {
pub limit: Option<usize>,
pub wand_factor: f32,
Expand Down
112 changes: 40 additions & 72 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ use crate::io::exec::{
};
use crate::io::exec::{AddRowOffsetExec, LanceFilterExec, LanceScanConfig, get_physical_optimizer};
use crate::{Error, Result};
use crate::{datatypes::Schema, io::exec::fts::BooleanQueryExec};
use crate::{
datatypes::Schema,
io::exec::fts::{BoolSlot, BooleanQueryExec, build_boolean_query_children},
};

pub use lance_datafusion::exec::{ExecutionStatsCallback, ExecutionSummaryCounts};
#[cfg(feature = "substrait")]
Expand Down Expand Up @@ -3240,83 +3243,48 @@ impl Scanner {
// so that we won't miss possible matches
let unlimited_params = params.clone().with_limit(None);

// For should queries, union the results of each subquery
let mut should = Vec::with_capacity(query.should.len());
for subquery in &query.should {
let plan = Box::pin(self.plan_fts(
subquery,
&unlimited_params,
filter_plan,
prefilter_source,
))
.await?;
should.push(plan);
should.push(
Box::pin(self.plan_fts(
subquery,
&unlimited_params,
filter_plan,
prefilter_source,
))
.await?,
);
}
let should = if should.is_empty() {
Arc::new(EmptyExec::new(FTS_SCHEMA.clone()))
} else if should.len() == 1 {
should.pop().unwrap()
} else {
let unioned = UnionExec::try_new(should)?;
Arc::new(RepartitionExec::try_new(
unioned,
Partitioning::RoundRobinBatch(1),
)?)
};

// For must queries, inner join the results of each subquery on row_id
let mut must = None;
for query in &query.must {
let plan = Box::pin(self.plan_fts(
query,
&unlimited_params,
filter_plan,
prefilter_source,
))
.await?;
if let Some(joined_plan) = must {
must = Some(Arc::new(HashJoinExec::try_new(
joined_plan,
plan,
vec![(
Arc::new(Column::new_with_schema(ROW_ID, &FTS_SCHEMA)?),
Arc::new(Column::new_with_schema(ROW_ID, &FTS_SCHEMA)?),
)],
None,
&datafusion_expr::JoinType::Inner,
None,
datafusion_physical_plan::joins::PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
false,
)?) as _);
} else {
must = Some(plan);
}
let mut must = Vec::with_capacity(query.must.len());
for subquery in &query.must {
must.push(
Box::pin(self.plan_fts(
subquery,
&unlimited_params,
filter_plan,
prefilter_source,
))
.await?,
);
}

// For must_not queries, union the results of each subquery
let mut must_not = Vec::with_capacity(query.must_not.len());
for query in &query.must_not {
let plan = Box::pin(self.plan_fts(
query,
&unlimited_params,
filter_plan,
prefilter_source,
))
.await?;
must_not.push(plan);
for subquery in &query.must_not {
must_not.push(
Box::pin(self.plan_fts(
subquery,
&unlimited_params,
filter_plan,
prefilter_source,
))
.await?,
);
}
let must_not = if must_not.is_empty() {
Arc::new(EmptyExec::new(FTS_SCHEMA.clone()))
} else if must_not.len() == 1 {
must_not.pop().unwrap()
} else {
let unioned = UnionExec::try_new(must_not)?;
Arc::new(RepartitionExec::try_new(
unioned,
Partitioning::RoundRobinBatch(1),
)?)
};

let should = build_boolean_query_children(BoolSlot::Should, should)?
.expect("Should slot always returns Some");
let must = build_boolean_query_children(BoolSlot::Must, must)?;
let must_not = build_boolean_query_children(BoolSlot::MustNot, must_not)?
.expect("MustNot slot always returns Some");

if query.should.is_empty() && must.is_none() {
return Err(Error::invalid_input(
Expand Down
2 changes: 2 additions & 0 deletions rust/lance/src/index/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

pub(crate) mod inverted;

pub use inverted::{load_segment_details, load_segments};

use std::sync::{Arc, LazyLock};

use crate::index::DatasetIndexExt;
Expand Down
23 changes: 16 additions & 7 deletions rust/lance/src/index/scalar/inverted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,14 @@ pub(crate) async fn build_segment(
Ok(built_segment)
}

/// Load all committed inverted-index segments that belong to the same named index.
pub(crate) async fn load_segments(
dataset: &Dataset,
column: &str,
) -> Result<Option<Vec<IndexMetadata>>> {
/// Load all committed inverted-index segments that belong to the same named
/// FTS index on `column`.
///
/// Returns `Ok(None)` if no FTS index exists on the column. When an index
/// exists, the returned vector contains every committed segment's
/// [`IndexMetadata`] (UUID, fragment coverage, index details). All segments
/// must share the same indexed fields; mismatched fields return an error.
pub async fn load_segments(dataset: &Dataset, column: &str) -> Result<Option<Vec<IndexMetadata>>> {
let Some(index_meta) = dataset
.load_scalar_index(
lance_index::IndexCriteria::default()
Expand Down Expand Up @@ -152,8 +155,14 @@ pub(crate) async fn load_segments(
Ok(Some(indices))
}

/// Load and validate the shared inverted-index details across committed segments.
pub(crate) async fn load_segment_details(
/// Load and validate the shared [`InvertedIndexDetails`] across committed
/// segments returned by [`load_segments`].
///
/// All segments are required to agree on their decoded `InvertedIndexDetails`
/// payload (analyzer, tokenizer, position settings, etc.); inconsistent
/// segments return an error. Returns the canonical details that may be used
/// when constructing a tokenizer or running a query against the index.
pub async fn load_segment_details(
dataset: &Dataset,
column: &str,
segments: &[IndexMetadata],
Expand Down
Loading
Loading