Skip to content
3 changes: 3 additions & 0 deletions protos/index.proto
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ message DiskAnn {

// L parameter
uint32 L = 5;

/// Entry points to the graph
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are the vertex id's?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the IDs in the graph. Basically the "index" in the graph file. Not the "row id" in original dataset

repeated uint64 entries = 6;
}

// One stage in the vector index pipeline.
Expand Down
21 changes: 12 additions & 9 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,11 @@ def create_index(
- **use_opq**: whether to use OPQ (Optimized Product Quantization).
- **max_opq_iterations**: the maximum number of iterations for training OPQ.

For SIMD, the vector dimensions / num_sub_vectors must be a multiple of the stride
depending on the platform (4, 8, 16). An error is raised if this alignment
is not met.
If `index_type` is "DISKANN", then the following parameters are optional:

- **r**: out-degree bound
- **l**: number of levels in the graph.
- **alpha**: distance threadhold for the graph.

Examples
--------
Expand Down Expand Up @@ -402,14 +404,15 @@ def create_index(
]:
raise ValueError(f"Metric {metric} not supported.")
index_type = index_type.upper()
if index_type != "IVF_PQ":
if index_type not in ["IVF_PQ", "DISKANN"]:
raise NotImplementedError(
f"Only IVF_PQ index_type supported. Got {index_type}"
)
if "num_partitions" not in kwargs or "num_sub_vectors" not in kwargs:
raise ValueError(
"num_partitions and num_sub_vectors are required for IVF_PQ"
f"Only IVF_PQ or DiskANN index_types supported. Got {index_type}"
)
if index_type == "IVF_PQ":
if "num_partitions" not in kwargs or "num_sub_vectors" not in kwargs:
raise ValueError(
"num_partitions and num_sub_vectors are required for IVF_PQ"
)

self._ds.create_index(column, index_type, name, metric, kwargs)
return LanceDataset(self.uri)
Expand Down
11 changes: 0 additions & 11 deletions python/python/tests/test_vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import platform
Expand Down
82 changes: 59 additions & 23 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ use arrow::pyarrow::*;
use arrow_array::{Float32Array, RecordBatchReader};
use arrow_data::ArrayData;
use arrow_schema::Schema as ArrowSchema;
use lance::index::vector::ivf::IvfBuildParams;
use lance::index::vector::pq::PQBuildParams;
use pyo3::exceptions::{PyIOError, PyKeyError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{IntoPyDict, PyBool, PyDict, PyInt, PyLong};
use pyo3::types::{IntoPyDict, PyBool, PyDict, PyFloat, PyInt, PyLong};
use pyo3::{pyclass, PyObject, PyResult};
use tokio::runtime::Runtime;

Expand All @@ -32,8 +34,9 @@ use lance::dataset::{
scanner::Scanner as LanceScanner, Dataset as LanceDataset, Version, WriteMode, WriteParams,
};
use lance::index::{
vector::diskann::DiskANNParams,
vector::{MetricType, VectorIndexParams},
IndexType,
DatasetIndexExt, IndexType,
};

const DEFAULT_NPROBS: usize = 1;
Expand Down Expand Up @@ -275,39 +278,72 @@ impl Dataset {
kwargs: Option<&PyDict>,
) -> PyResult<()> {
let idx_type = match index_type.to_uppercase().as_str() {
"IVF_PQ" => IndexType::Vector,
"IVF_PQ" | "DISKANN" => IndexType::Vector,
_ => {
return Err(PyValueError::new_err(format!(
"Index type '{index_type}' is not supported."
)))
}
};

// Only VectorParams are supported.
let mut params = VectorIndexParams::default();
if let Some(kwargs) = kwargs {
if let Some(n) = kwargs.get_item("num_partitions") {
params.num_partitions = PyAny::downcast::<PyInt>(n)?.extract()?
};

if let Some(n) = kwargs.get_item("num_sub_vectors") {
params.num_sub_vectors = PyAny::downcast::<PyInt>(n)?.extract()?
};

if let Some(o) = kwargs.get_item("use_opq") {
params.use_opq = PyAny::downcast::<PyBool>(o)?.extract()?
};
if let Some(o) = kwargs.get_item("max_opq_iterations") {
params.max_opq_iterations = PyAny::downcast::<PyInt>(o)?.extract()?
};
}

params.metric_type = match metric_type {
let m_type = match metric_type {
Some(mt) => MetricType::try_from(mt.to_string().to_lowercase().as_str())
.map_err(|err| PyValueError::new_err(err.to_string()))?,
None => MetricType::L2,
};

// Only VectorParams are supported.
let params = match index_type.to_uppercase().as_str() {
"IVF_PQ" => {
let mut ivf_params = IvfBuildParams::default();
let mut pq_params = PQBuildParams::default();
if let Some(kwargs) = kwargs {
if let Some(n) = kwargs.get_item("num_partitions") {
ivf_params.num_partitions = PyAny::downcast::<PyInt>(n)?.extract()?
};

if let Some(n) = kwargs.get_item("num_bits") {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i forgot do we actually support configuring this now?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We dont support "throw error for unsupported params". This is just do the handing silently. I am fine to remove it if you feel strongly.

pq_params.num_bits = PyAny::downcast::<PyInt>(n)?.extract()?
};

if let Some(n) = kwargs.get_item("num_sub_vectors") {
pq_params.num_sub_vectors = PyAny::downcast::<PyInt>(n)?.extract()?
};

if let Some(o) = kwargs.get_item("use_opq") {
pq_params.use_opq = PyAny::downcast::<PyBool>(o)?.extract()?
};

if let Some(o) = kwargs.get_item("max_opq_iterations") {
pq_params.max_opq_iters = PyAny::downcast::<PyInt>(o)?.extract()?
};
}
VectorIndexParams::with_ivf_pq_params(m_type, ivf_params, pq_params)
}
"DISKANN" => {
let mut params = DiskANNParams::default();
if let Some(kwargs) = kwargs {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what're the default values for these? prolly worth tuning it and set reasonable defaults for like high embedding dimensions and like up to 1M vectors ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default values were from the diskann paper. Will adjust them if necessary when we benchmark diskann

if let Some(n) = kwargs.get_item("r") {
params.r = PyAny::downcast::<PyInt>(n)?.extract()?
};

if let Some(n) = kwargs.get_item("alpha") {
params.alpha = PyAny::downcast::<PyFloat>(n)?.extract()?
};

if let Some(n) = kwargs.get_item("l") {
params.l = PyAny::downcast::<PyInt>(n)?.extract()?
};
}
VectorIndexParams::with_diskann_params(m_type, params)
}
_ => {
return Err(PyValueError::new_err(format!(
"Index type '{index_type}' is not supported."
)))
}
};

self_
.rt
.block_on(async {
Expand Down
13 changes: 8 additions & 5 deletions rust/src/bin/lq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use futures::stream::StreamExt;
use futures::TryStreamExt;

use lance::dataset::Dataset;
use lance::index::vector::{MetricType, VectorIndexParams};
use lance::index::{
vector::{MetricType, VectorIndexParams},
DatasetIndexExt,
};
use lance::{Error, Result};

#[derive(Parser)]
Expand Down Expand Up @@ -69,11 +72,11 @@ enum Commands {

/// Nunber of IVF partitions. Only useful when the index type is 'ivf-pq'.
#[arg(short = 'p', long, default_value_t = 64, value_name = "NUM")]
num_partitions: u32,
num_partitions: usize,

/// Number of sub-vectors in Product Quantizer
#[arg(short = 's', long, default_value_t = 8, value_name = "NUM")]
num_sub_vectors: u32,
num_sub_vectors: usize,

/// Distance metric type. Only support 'l2' and 'cosine'.
#[arg(short = 'm', long, value_name = "DISTANCE")]
Expand Down Expand Up @@ -160,8 +163,8 @@ async fn create_index(
name: &Option<String>,
column: &Option<String>,
index_type: &Option<IndexType>,
num_partitions: &u32,
num_sub_vectors: &u32,
num_partitions: &usize,
num_sub_vectors: &usize,
metric_type: &Option<String>,
use_opq: bool,
) -> Result<()> {
Expand Down
110 changes: 8 additions & 102 deletions rust/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::time::SystemTime;
use arrow_array::{
cast::as_struct_array, RecordBatch, RecordBatchReader, StructArray, UInt64Array,
};
use arrow_schema::{DataType, Schema as ArrowSchema};
use arrow_schema::Schema as ArrowSchema;
use arrow_select::{concat::concat_batches, take::take};
use chrono::prelude::*;
use futures::stream::{self, StreamExt, TryStreamExt};
Expand All @@ -38,9 +38,6 @@ use self::scanner::Scanner;
use crate::arrow::*;
use crate::datatypes::Schema;
use crate::format::{pb, Fragment, Index, Manifest};
use crate::index::vector::ivf::{build_ivf_pq_index, IvfBuildParams};
use crate::index::vector::pq::PQBuildParams;
use crate::index::{vector::VectorIndexParams, IndexParams, IndexType};
use crate::io::{
object_reader::{read_message, read_struct},
read_manifest, read_metadata_offset, write_manifest, FileWriter, ObjectStore,
Expand Down Expand Up @@ -105,7 +102,7 @@ fn manifest_path(base: &Path, version: u64) -> Path {
}

/// Get the latest manifest path
fn latest_manifest_path(base: &Path) -> Path {
pub(crate) fn latest_manifest_path(base: &Path) -> Path {
base.child(LATEST_MANIFEST_NAME)
}

Expand Down Expand Up @@ -357,98 +354,6 @@ impl Dataset {
Ok(counts.iter().sum())
}

/// Create indices on columns.
///
/// Upon finish, a new dataset version is generated.
///
/// Parameters:
///
/// - `columns`: the columns to build the indices on.
/// - `index_type`: specify [`IndexType`].
/// - `name`: optional index name. Must be unique in the dataset.
/// if not provided, it will auto-generate one.
/// - `params`: index parameters.
pub async fn create_index(
&self,
columns: &[&str],
index_type: IndexType,
name: Option<String>,
params: &dyn IndexParams,
) -> Result<Self> {
if columns.len() != 1 {
return Err(Error::Index(
"Only support building index on 1 column at the moment".to_string(),
));
}
let column = columns[0];
let Some(field) = self.schema().field(column) else {
return Err(Error::Index(format!(
"CreateIndex: column '{column}' does not exist"
)));
};

// Load indices from the disk.
let mut indices = self.load_indices().await?;

let index_name = name.unwrap_or(format!("{column}_idx"));
if indices.iter().any(|i| i.name == index_name) {
return Err(Error::Index(format!(
"Index name '{index_name} already exists'"
)));
}

let index_id = Uuid::new_v4();
match index_type {
IndexType::Vector => {
let vec_params = params
.as_any()
.downcast_ref::<VectorIndexParams>()
.ok_or_else(|| {
Error::Index("Vector index type must take a VectorIndexParams".to_string())
})?;

let ivf_params = IvfBuildParams {
num_partitions: vec_params.num_partitions as usize,
metric_type: vec_params.metric_type,
max_iters: vec_params.max_iterations,
};
let pq_params = PQBuildParams {
num_sub_vectors: vec_params.num_sub_vectors as usize,
num_bits: 8,
metric_type: vec_params.metric_type,
max_iters: vec_params.max_iterations,
use_opq: vec_params.use_opq,
max_opq_iters: vec_params.max_opq_iterations,
};
build_ivf_pq_index(
self,
column,
&index_name,
&index_id,
&ivf_params,
&pq_params,
)
.await?
}
}

let latest_manifest = self.latest_manifest().await?;
let mut new_manifest = self.manifest.as_ref().clone();
new_manifest.version = latest_manifest.version + 1;

// Write index metadata down
let new_idx = Index::new(index_id, &index_name, &[field.id], new_manifest.version);
indices.push(new_idx);

write_manifest_file(&self.object_store, &mut new_manifest, Some(indices)).await?;

Ok(Self {
object_store: self.object_store.clone(),
base: self.base.clone(),
manifest: Arc::new(new_manifest),
})
}

pub async fn take(&self, row_indices: &[usize], projection: &Schema) -> Result<RecordBatch> {
let mut sorted_indices: Vec<u32> =
Vec::from_iter(row_indices.iter().map(|indice| *indice as u32));
Expand Down Expand Up @@ -572,7 +477,7 @@ impl Dataset {
latest_manifest_path(&self.base)
}

async fn latest_manifest(&self) -> Result<Manifest> {
pub(crate) async fn latest_manifest(&self) -> Result<Manifest> {
read_manifest(&self.object_store, &self.latest_manifest_path()).await
}

Expand Down Expand Up @@ -660,7 +565,7 @@ impl Dataset {

/// Finish writing the manifest file, and commit the changes by linking the latest manifest file
/// to this version.
async fn write_manifest_file(
pub(crate) async fn write_manifest_file(
object_store: &ObjectStore,
manifest: &mut Manifest,
indices: Option<Vec<Index>>,
Expand Down Expand Up @@ -692,6 +597,9 @@ async fn write_manifest_file_to_path(
#[cfg(test)]
mod tests {
use super::*;
use crate::index::vector::MetricType;
use crate::index::IndexType;
use crate::index::{vector::VectorIndexParams, DatasetIndexExt};
use crate::{datatypes::Schema, utils::testing::generate_random_array};

use crate::dataset::WriteMode::Overwrite;
Expand Down Expand Up @@ -1129,9 +1037,7 @@ mod tests {
let dataset = Dataset::write(&mut reader, test_uri, None).await.unwrap();

// Make sure valid arguments should create index successfully
let mut params = VectorIndexParams::default();
params.num_partitions = 10;
params.num_sub_vectors = 2;
let params = VectorIndexParams::ivf_pq(10, 8, 2, false, MetricType::L2, 50);
let dataset = dataset
.create_index(&["embeddings"], IndexType::Vector, None, &params)
.await
Expand Down
5 changes: 4 additions & 1 deletion rust/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,10 @@ mod test {

use super::*;
use crate::arrow::*;
use crate::index::{vector::VectorIndexParams, IndexType};
use crate::index::{
DatasetIndexExt,
{vector::VectorIndexParams, IndexType},
};
use crate::{arrow::RecordBatchBuffer, dataset::WriteParams};

#[tokio::test]
Expand Down
Loading