Skip to content

Commit

Permalink
feat: index v3 starting point (#2380) (#2378) (#2377) (#2376)
Browse files Browse the repository at this point in the history
This is a commit of a stack of PR
* closes #2380
* closes #2378
* closes #2377
* closes #2376
  • Loading branch information
chebbyChefNEQ committed May 23, 2024
1 parent 83ecc01 commit 3dd63b8
Show file tree
Hide file tree
Showing 13 changed files with 195 additions and 13 deletions.
1 change: 1 addition & 0 deletions rust/lance-index/src/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod residual;
pub mod sq;
pub mod transform;
pub mod utils;
pub mod v3;

// TODO: Make these crate private once the migration from lance to lance-index is done.
pub const PQ_CODE_COLUMN: &str = "__pq_code";
Expand Down
6 changes: 1 addition & 5 deletions rust/lance-index/src/vector/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,10 @@ use lance_core::Result;

pub mod builder;
pub mod memory;
pub mod storage;

/// Vector storage to back a graph.
pub use storage::VectorStore;

use crate::vector::DIST_COL;

use self::storage::DistCalculator;
use crate::vector::v3::storage::DistCalculator;

pub(crate) const NEIGHBORS_COL: &str = "__neighbors";

Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/vector/graph/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

//! In-memory graph representations.

use super::storage::{DistCalculator, VectorStore};
use crate::vector::v3::storage::{DistCalculator, VectorStore};
use arrow::array::AsArray;
use arrow_array::types::Float32Type;
use arrow_array::ArrayRef;
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-index/src/vector/hnsw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use serde::{Deserialize, Serialize};

use self::builder::HnswBuildParams;

use super::graph::{OrderedFloat, OrderedNode, VectorStore};
use super::graph::{OrderedFloat, OrderedNode};
use super::v3::storage::VectorStore;

const HNSW_TYPE: &str = "HNSW";
const VECTOR_ID_COL: &str = "__vector_id";
Expand Down
4 changes: 2 additions & 2 deletions rust/lance-index/src/vector/hnsw/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ use super::super::graph::beam_search;
use super::{select_neighbors_heuristic, HnswMetadata, HNSW_TYPE, VECTOR_ID_COL, VECTOR_ID_FIELD};
use crate::scalar::IndexWriter;
use crate::vector::graph::builder::GraphBuilderNode;
use crate::vector::graph::storage::DistCalculator;
use crate::vector::graph::{greedy_search, storage::VectorStore};
use crate::vector::graph::greedy_search;
use crate::vector::graph::{
Graph, OrderedFloat, OrderedNode, VisitedGenerator, DISTS_FIELD, NEIGHBORS_COL, NEIGHBORS_FIELD,
};
use crate::vector::v3::storage::{DistCalculator, VectorStore};
use crate::vector::DIST_COL;
use crate::{IndexMetadata, INDEX_METADATA_SCHEMA_KEY};

Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/vector/pq/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ use super::{distance::build_distance_table_l2, num_centroids, ProductQuantizerIm
use crate::{
pb,
vector::{
graph::storage::{DistCalculator, VectorStore},
ivf::storage::IvfData,
pq::transform::PQTransformer,
quantizer::{QuantizerMetadata, QuantizerStorage},
transform::Transformer,
v3::storage::{DistCalculator, VectorStore},
PQ_CODE_COLUMN,
},
IndexMetadata, INDEX_METADATA_SCHEMA_KEY,
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/vector/quantizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use super::pq::storage::PQ_METADTA_KEY;
use super::pq::ProductQuantizer;
use super::sq::storage::SQ_METADATA_KEY;
use super::{
graph::VectorStore,
ivf::storage::IvfData,
pq::{
storage::{ProductQuantizationMetadata, ProductQuantizationStorage},
Expand All @@ -30,6 +29,7 @@ use super::{
storage::{ScalarQuantizationMetadata, ScalarQuantizationStorage},
ScalarQuantizer,
},
v3::storage::VectorStore,
};
use super::{PQ_CODE_COLUMN, SQ_CODE_COLUMN};

Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/vector/sq/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use snafu::{location, Location};

use crate::{
vector::{
graph::{storage::DistCalculator, VectorStore},
quantizer::{QuantizerMetadata, QuantizerStorage},
v3::storage::{DistCalculator, VectorStore},
SQ_CODE_COLUMN,
},
IndexMetadata, INDEX_METADATA_SCHEMA_KEY,
Expand Down
6 changes: 6 additions & 0 deletions rust/lance-index/src/vector/v3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

pub mod shuffler;
pub mod storage;
pub mod subindex;
35 changes: 35 additions & 0 deletions rust/lance-index/src/vector/v3/shuffler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! Shuffler is a component that takes a stream of record batches and shuffles them into
//! the corresponding IVF partitions.

use lance_core::Result;
use lance_io::stream::RecordBatchStream;

#[async_trait::async_trait]
/// A reader that can read the shuffled partitions.
pub trait IvfShuffleReader {
/// Read a partition by partition_id
/// will return Ok(None) if partition_size is 0
/// check reader.partiton_size(partition_id) before calling this function
async fn read_partition(
&self,
partition_id: usize,
) -> Result<Option<Box<dyn RecordBatchStream + Unpin + 'static>>>;

/// Get the size of the partition by partition_id
fn partiton_size(&self, partition_id: usize) -> Result<usize>;
}

#[async_trait::async_trait]
/// A shuffler that can shuffle the incoming stream of record batches into IVF partitions.
/// Returns a IvfShuffleReader that can be used to read the shuffled partitions.
pub trait IvfShuffler {
/// Shuffle the incoming stream of record batches into IVF partitions.
/// Returns a IvfShuffleReader that can be used to read the shuffled partitions.
async fn shuffle(
mut self,
data: Box<dyn RecordBatchStream + Unpin + 'static>,
) -> Result<Box<dyn IvfShuffleReader>>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::any::Any;

use arrow_array::ArrayRef;
use lance_linalg::distance::MetricType;
use num_traits::Num;

/// WARNING: Internal API, API stability is not guaranteed
pub trait DistCalculator {
Expand Down Expand Up @@ -49,4 +50,8 @@ pub trait VectorStore: Send + Sync {
fn dist_calculator_from_id(&self, id: u32) -> Self::DistanceCalculator<'_>;

fn distance_between(&self, a: u32, b: u32) -> f32;

fn dist_calculator_from_native<T: Num>(&self, _query: &[T]) -> Self::DistanceCalculator<'_> {
todo!("Implement this")
}
}
137 changes: 137 additions & 0 deletions rust/lance-index/src/vector/v3/subindex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::sync::Arc;

use arrow_array::{Float32Array, RecordBatch, StructArray, UInt64Array};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use itertools::Itertools;
use lance_core::{Result, ROW_ID_FIELD};
use num_traits::Num;

use crate::vector::v3::storage::DistCalculator;
use crate::vector::{
graph::{OrderedFloat, OrderedNode},
DIST_COL,
};

use super::storage::VectorStore;

/// A prefilter that can be used to skip vectors during search
///
/// Note: there is a `struct PreFilter` in `lance`. However we can't depend on `lance` in `lance-index`
/// because it would create a circular dependency.
///
/// By defining a trait here, we can implement the trait for `lance::PreFilter`
/// and not have the circular dependency
pub trait PreFilter {
fn no_prefilter() -> Arc<NoPreFilter> {
Arc::new(NoPreFilter {})
}

fn should_drop(&self, id: u64) -> bool;
}

/// A prefilter that does not skip any vectors
pub struct NoPreFilter {}

impl PreFilter for NoPreFilter {
fn should_drop(&self, _id: u64) -> bool {
false
}
}

/// A sub index for IVF index
pub trait IvfSubIndex: Send + Sync + Sized {
type QueryParams: Default;

fn name(&self) -> &str;

/// Search the sub index for nearest neighbors.
/// # Arguments:
/// * `query` - The query vector
/// * `k` - The number of nearest neighbors to return
/// * `params` - The query parameters
/// * `prefilter` - The prefilter object indicating which vectors to skip
fn search<T: Num>(
&self,
query: &[T],
k: usize,
params: Self::QueryParams,
storage: &impl VectorStore,
prefilter: Arc<impl PreFilter>,
) -> Result<RecordBatch>;

/// Load the sub index from a struct array with a single row
fn load(data: StructArray) -> Result<Self>;

/// Given a vector storage, containing all the data for the IVF partition, build the sub index.
fn index_vectors(&self, storage: &impl VectorStore) -> Result<()>;

/// Encode the sub index into a struct array
fn to_array(&self) -> Result<StructArray>;
}

/// A Flat index is any index that stores no metadata, and
/// during query, it simply scans over the storage and returns the top k results
pub struct FlatIndex {}

lazy_static::lazy_static! {
static ref ANN_SEARCH_SCHEMA: SchemaRef = Arc::new(Schema::new(vec![
Field::new(DIST_COL, DataType::Float32, true),
ROW_ID_FIELD.clone(),
]));
}

impl IvfSubIndex for FlatIndex {
type QueryParams = ();

fn name(&self) -> &str {
"FLAT"
}

fn search<T: Num>(
&self,
query: &[T],
k: usize,
_params: Self::QueryParams,
storage: &impl VectorStore,
prefilter: Arc<impl PreFilter>,
) -> Result<RecordBatch> {
let dist_calc = storage.dist_calculator_from_native(query);
let (row_ids, dists): (Vec<u64>, Vec<f32>) = (0..storage.len())
.filter(|&id| !prefilter.should_drop(storage.row_ids()[id]))
.map(|id| OrderedNode {
id: id as u32,
dist: OrderedFloat(dist_calc.distance(id as u32)),
})
.sorted_unstable()
.take(k)
.map(
|OrderedNode {
id,
dist: OrderedFloat(dist),
}| (storage.row_ids()[id as usize], dist),
)
.unzip();

let (row_ids, dists) = (UInt64Array::from(row_ids), Float32Array::from(dists));

Ok(RecordBatch::try_new(
ANN_SEARCH_SCHEMA.clone(),
vec![Arc::new(dists), Arc::new(row_ids)],
)?)
}

fn load(_: StructArray) -> Result<Self> {
Ok(Self {})
}

fn index_vectors(&self, _: &impl VectorStore) -> Result<()> {
Ok(())
}

fn to_array(&self) -> Result<StructArray> {
Ok(StructArray::from(vec![]))
}
}
3 changes: 2 additions & 1 deletion rust/lance/src/index/vector/hnsw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ use lance_file::reader::FileReader;
use lance_index::vector::{hnsw::HNSW, quantizer::Quantizer};
use lance_index::{
vector::{
graph::{VectorStore, NEIGHBORS_FIELD},
graph::NEIGHBORS_FIELD,
hnsw::{HnswMetadata, VECTOR_ID_FIELD},
ivf::storage::IVF_PARTITION_KEY,
quantizer::{IvfQuantizationStorage, Quantization},
v3::storage::VectorStore,
Query,
},
Index, IndexType,
Expand Down

0 comments on commit 3dd63b8

Please sign in to comment.