Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: define Flat index as a scan over VectorStorage #2380

Merged
merged 7 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/lance-index/src/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod residual;
pub mod sq;
pub mod transform;
pub mod utils;
pub mod v3;

// TODO: Make these crate private once the migration from lance to lance-index is done.
pub const PQ_CODE_COLUMN: &str = "__pq_code";
Expand Down
6 changes: 1 addition & 5 deletions rust/lance-index/src/vector/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,10 @@ use lance_core::Result;

pub mod builder;
pub mod memory;
pub mod storage;

/// Vector storage to back a graph.
pub use storage::VectorStore;

use crate::vector::DIST_COL;

use self::storage::DistCalculator;
use crate::vector::v3::storage::DistCalculator;

pub(crate) const NEIGHBORS_COL: &str = "__neighbors";

Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/vector/graph/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

//! In-memory graph representations.

use super::storage::{DistCalculator, VectorStore};
use crate::vector::v3::storage::{DistCalculator, VectorStore};
use arrow::array::AsArray;
use arrow_array::types::Float32Type;
use arrow_array::ArrayRef;
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-index/src/vector/hnsw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use serde::{Deserialize, Serialize};

use self::builder::HnswBuildParams;

use super::graph::{OrderedFloat, OrderedNode, VectorStore};
use super::graph::{OrderedFloat, OrderedNode};
use super::v3::storage::VectorStore;

const HNSW_TYPE: &str = "HNSW";
const VECTOR_ID_COL: &str = "__vector_id";
Expand Down
4 changes: 2 additions & 2 deletions rust/lance-index/src/vector/hnsw/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ use super::super::graph::beam_search;
use super::{select_neighbors_heuristic, HnswMetadata, HNSW_TYPE, VECTOR_ID_COL, VECTOR_ID_FIELD};
use crate::scalar::IndexWriter;
use crate::vector::graph::builder::GraphBuilderNode;
use crate::vector::graph::storage::DistCalculator;
use crate::vector::graph::{greedy_search, storage::VectorStore};
use crate::vector::graph::greedy_search;
use crate::vector::graph::{
Graph, OrderedFloat, OrderedNode, VisitedGenerator, DISTS_FIELD, NEIGHBORS_COL, NEIGHBORS_FIELD,
};
use crate::vector::v3::storage::{DistCalculator, VectorStore};
use crate::vector::DIST_COL;
use crate::{IndexMetadata, INDEX_METADATA_SCHEMA_KEY};

Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/vector/pq/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ use super::{distance::build_distance_table_l2, num_centroids, ProductQuantizerIm
use crate::{
pb,
vector::{
graph::storage::{DistCalculator, VectorStore},
ivf::storage::IvfData,
pq::transform::PQTransformer,
quantizer::{QuantizerMetadata, QuantizerStorage},
transform::Transformer,
v3::storage::{DistCalculator, VectorStore},
PQ_CODE_COLUMN,
},
IndexMetadata, INDEX_METADATA_SCHEMA_KEY,
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/vector/quantizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use super::pq::storage::PQ_METADTA_KEY;
use super::pq::ProductQuantizer;
use super::sq::storage::SQ_METADATA_KEY;
use super::{
graph::VectorStore,
ivf::storage::IvfData,
pq::{
storage::{ProductQuantizationMetadata, ProductQuantizationStorage},
Expand All @@ -30,6 +29,7 @@ use super::{
storage::{ScalarQuantizationMetadata, ScalarQuantizationStorage},
ScalarQuantizer,
},
v3::storage::VectorStore,
};
use super::{PQ_CODE_COLUMN, SQ_CODE_COLUMN};

Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/vector/sq/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use snafu::{location, Location};

use crate::{
vector::{
graph::{storage::DistCalculator, VectorStore},
quantizer::{QuantizerMetadata, QuantizerStorage},
v3::storage::{DistCalculator, VectorStore},
SQ_CODE_COLUMN,
},
IndexMetadata, INDEX_METADATA_SCHEMA_KEY,
Expand Down
6 changes: 6 additions & 0 deletions rust/lance-index/src/vector/v3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

pub mod shuffler;
pub mod storage;
pub mod subindex;
35 changes: 35 additions & 0 deletions rust/lance-index/src/vector/v3/shuffler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! Shuffler is a component that takes a stream of record batches and shuffles them into
//! the corresponding IVF partitions.

use lance_core::Result;
use lance_io::stream::RecordBatchStream;

#[async_trait::async_trait]
/// A reader that can read the shuffled partitions.
pub trait IvfShuffleReader {
/// Read a partition by partition_id
/// will return Ok(None) if partition_size is 0
/// check reader.partiton_size(partition_id) before calling this function
async fn read_partition(
&self,
partition_id: usize,
) -> Result<Option<Box<dyn RecordBatchStream + Unpin + 'static>>>;

/// Get the size of the partition by partition_id
fn partiton_size(&self, partition_id: usize) -> Result<usize>;
}

#[async_trait::async_trait]
/// A shuffler that can shuffle the incoming stream of record batches into IVF partitions.
/// Returns a IvfShuffleReader that can be used to read the shuffled partitions.
pub trait IvfShuffler {
/// Shuffle the incoming stream of record batches into IVF partitions.
/// Returns a IvfShuffleReader that can be used to read the shuffled partitions.
async fn shuffle(
mut self,
data: Box<dyn RecordBatchStream + Unpin + 'static>,
) -> Result<Box<dyn IvfShuffleReader>>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::any::Any;

use arrow_array::ArrayRef;
use lance_linalg::distance::MetricType;
use num_traits::Num;

/// WARNING: Internal API, API stability is not guaranteed
pub trait DistCalculator {
Expand Down Expand Up @@ -49,4 +50,8 @@ pub trait VectorStore: Send + Sync {
fn dist_calculator_from_id(&self, id: u32) -> Self::DistanceCalculator<'_>;

fn distance_between(&self, a: u32, b: u32) -> f32;

fn dist_calculator_from_native<T: Num>(&self, _query: &[T]) -> Self::DistanceCalculator<'_> {
todo!("Implement this")
}
}
137 changes: 137 additions & 0 deletions rust/lance-index/src/vector/v3/subindex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::sync::Arc;

use arrow_array::{Float32Array, RecordBatch, StructArray, UInt64Array};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use itertools::Itertools;
use lance_core::{Result, ROW_ID_FIELD};
use num_traits::Num;

use crate::vector::v3::storage::DistCalculator;
use crate::vector::{
graph::{OrderedFloat, OrderedNode},
DIST_COL,
};

use super::storage::VectorStore;

/// A prefilter that can be used to skip vectors during search
///
/// Note: there is a `struct PreFilter` in `lance`. However we can't depend on `lance` in `lance-index`
/// because it would create a circular dependency.
///
/// By defining a trait here, we can implement the trait for `lance::PreFilter`
/// and not have the circular dependency
pub trait PreFilter {
fn no_prefilter() -> Arc<NoPreFilter> {
Arc::new(NoPreFilter {})
}

fn should_drop(&self, id: u64) -> bool;
}

/// A prefilter that does not skip any vectors
pub struct NoPreFilter {}

impl PreFilter for NoPreFilter {
fn should_drop(&self, _id: u64) -> bool {
false
}
}

/// A sub index for IVF index
pub trait IvfSubIndex: Send + Sync + Sized {
type QueryParams: Default;

fn name(&self) -> &str;

/// Search the sub index for nearest neighbors.
/// # Arguments:
/// * `query` - The query vector
/// * `k` - The number of nearest neighbors to return
/// * `params` - The query parameters
/// * `prefilter` - The prefilter object indicating which vectors to skip
fn search<T: Num>(
&self,
query: &[T],
k: usize,
params: Self::QueryParams,
storage: &impl VectorStore,
prefilter: Arc<impl PreFilter>,
) -> Result<RecordBatch>;

/// Load the sub index from a struct array with a single row
fn load(data: StructArray) -> Result<Self>;

/// Given a vector storage, containing all the data for the IVF partition, build the sub index.
fn index_vectors(&self, storage: &impl VectorStore) -> Result<()>;

/// Encode the sub index into a struct array
fn to_array(&self) -> Result<StructArray>;
}

/// A Flat index is any index that stores no metadata, and
/// during query, it simply scans over the storage and returns the top k results
pub struct FlatIndex {}

lazy_static::lazy_static! {
static ref ANN_SEARCH_SCHEMA: SchemaRef = Arc::new(Schema::new(vec![
Field::new(DIST_COL, DataType::Float32, true),
ROW_ID_FIELD.clone(),
]));
}

impl IvfSubIndex for FlatIndex {
type QueryParams = ();

fn name(&self) -> &str {
"FLAT"
}

fn search<T: Num>(
&self,
query: &[T],
k: usize,
_params: Self::QueryParams,
storage: &impl VectorStore,
prefilter: Arc<impl PreFilter>,
) -> Result<RecordBatch> {
let dist_calc = storage.dist_calculator_from_native(query);
let (row_ids, dists): (Vec<u64>, Vec<f32>) = (0..storage.len())
chebbyChefNEQ marked this conversation as resolved.
Show resolved Hide resolved
.filter(|&id| !prefilter.should_drop(storage.row_ids()[id]))
.map(|id| OrderedNode {
id: id as u32,
dist: OrderedFloat(dist_calc.distance(id as u32)),
})
.sorted_unstable()
.take(k)
.map(
|OrderedNode {
id,
dist: OrderedFloat(dist),
}| (storage.row_ids()[id as usize], dist),
)
.unzip();

let (row_ids, dists) = (UInt64Array::from(row_ids), Float32Array::from(dists));

Ok(RecordBatch::try_new(
ANN_SEARCH_SCHEMA.clone(),
vec![Arc::new(dists), Arc::new(row_ids)],
)?)
}

fn load(_: StructArray) -> Result<Self> {
Ok(Self {})
}

fn index_vectors(&self, _: &impl VectorStore) -> Result<()> {
Ok(())
}

fn to_array(&self) -> Result<StructArray> {
Ok(StructArray::from(vec![]))
}
}
3 changes: 2 additions & 1 deletion rust/lance/src/index/vector/hnsw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ use lance_file::reader::FileReader;
use lance_index::vector::{hnsw::HNSW, quantizer::Quantizer};
use lance_index::{
vector::{
graph::{VectorStore, NEIGHBORS_FIELD},
graph::NEIGHBORS_FIELD,
hnsw::{HnswMetadata, VECTOR_ID_FIELD},
ivf::storage::IVF_PARTITION_KEY,
quantizer::{IvfQuantizationStorage, Quantization},
v3::storage::VectorStore,
Query,
},
Index, IndexType,
Expand Down
Loading