Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3082,6 +3082,9 @@ def create_scalar_index(
A UUID to use for the segment written by this call.
If not provided, a new UUID will be generated. This parameter is
passed via kwargs internally.
Specifying a UUID is only used by the legacy distributed scalar
build; index types on the segmented-index architecture (such as
vector indexes) generate segment UUIDs internally and reject it.
progress_callback : callable, optional
A callback that receives :class:`lance.progress.IndexProgress` events while
the index is being built.
Expand Down Expand Up @@ -3764,8 +3767,9 @@ def create_index(
``create_index_segment_builder().with_index_type(...).with_segments(...)``
and then committed with ``commit_existing_index_segments(...)``.
index_uuid : str, optional
A UUID to use for the segment written by this call.
If not provided, a new UUID will be generated.
Not accepted for vector indexes: under the segmented-index
architecture Lance generates the segment UUID and returns it in the
resulting index metadata. Passing a value raises an error.
progress_callback : callable, optional
A callback that receives :class:`lance.progress.IndexProgress` events while
the index is being built.
Expand Down
24 changes: 22 additions & 2 deletions python/python/tests/test_indices.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import math
import os
import pathlib
import uuid

import lance
import numpy as np
Expand Down Expand Up @@ -273,15 +274,34 @@ def test_ivf_centroids_multivector_fragment_ids(tmpdir):
metric="cosine",
num_partitions=2,
fragment_ids=fragment_ids,
index_uuid="00000000-0000-4000-8000-000000000001",
ivf_centroids=centroids,
)

assert index.uuid == "00000000-0000-4000-8000-000000000001"
assert uuid.UUID(index.uuid)
assert index.fragment_ids == set(fragment_ids)
assert index.name == "embeddings_idx"


def test_create_index_uncommitted_rejects_user_index_uuid(tmpdir):
ds, dimension = make_multivector_dataset(tmpdir)
centroids = pa.FixedSizeListArray.from_arrays(
pa.array([0.1, 0.2, 0.3, 0.4, 0.8, 0.7, 0.6, 0.5], type=pa.float32()),
dimension,
)
fragment_ids = [fragment.fragment_id for fragment in ds.get_fragments()]

with pytest.raises(ValueError, match="index_uuid is no longer accepted"):
ds.create_index_uncommitted(
"embeddings",
index_type="IVF_HNSW_SQ",
metric="cosine",
num_partitions=2,
fragment_ids=fragment_ids,
index_uuid="00000000-0000-4000-8000-000000000001",
ivf_centroids=centroids,
)


def test_indices_builder_multivector_distributed_dimensions(tmpdir, monkeypatch):
ds, dimension = make_multivector_dataset(tmpdir)
builder = IndicesBuilder(ds, "embeddings")
Expand Down
189 changes: 55 additions & 134 deletions rust/lance/benches/distributed_vector_build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,28 @@
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::fs;
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};

use arrow_array::{ArrayRef, FixedSizeListArray, RecordBatch, RecordBatchIterator};
use arrow_array::{cast::AsArray, types::Float32Type};
use arrow_schema::{DataType, Field, FieldRef, Schema as ArrowSchema};
use criterion::{BatchSize, BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
#[cfg(target_os = "linux")]
use pprof::criterion::{Output, PProfProfiler};
use serde::Serialize;
use uuid::Uuid;

use lance::dataset::{Dataset, WriteMode, WriteParams};
use lance::index::{DatasetIndexExt, vector::VectorIndexParams};
use lance_arrow::FixedSizeListArrayExt;
use lance_index::progress::noop_progress;
use lance_index::vector::kmeans::{KMeansParams, train_kmeans};
use lance_index::{
IndexType,
vector::{ivf::IvfBuildParams, pq::PQBuildParams},
};
use lance_linalg::distance::DistanceType;
use lance_table::format::IndexMetadata;
use lance_testing::datagen::generate_random_array;
use tokio::runtime::Runtime;

Expand All @@ -34,6 +34,7 @@ const NUM_SUB_VECTORS: usize = 16;
const NUM_BITS: usize = 8;
const MAX_ITERS: usize = 20;
const SAMPLE_RATE: usize = 8;
const AUXILIARY_FILE_NAME: &str = "auxiliary.idx";

#[derive(Clone, Copy, Debug)]
struct BenchCase {
Expand All @@ -52,19 +53,19 @@ impl BenchCase {

#[derive(Clone, Debug)]
struct MergeFixture {
index_dir: PathBuf,
partial_aux_bytes: u64,
partial_dir_count: usize,
segments: Vec<IndexMetadata>,
segment_aux_bytes: u64,
segment_count: usize,
}

#[derive(Debug, Serialize)]
struct CaseMetadata {
label: String,
num_shards: usize,
num_partitions: usize,
partial_dir_count: usize,
partial_aux_bytes: u64,
partial_aux_bytes_per_shard: u64,
segment_count: usize,
segment_aux_bytes: u64,
segment_aux_bytes_per_shard: u64,
total_rows: usize,
rows_per_shard: usize,
}
Expand Down Expand Up @@ -124,22 +125,6 @@ fn bench_cases() -> [BenchCase; 6] {
]
}

fn fixture_uuid(bench_case: BenchCase) -> Uuid {
Uuid::from_u128(
0x733a_0000_0000_0000_0000_0000_0000_0000
| ((bench_case.num_shards as u128) << 64)
| bench_case.num_partitions as u128,
)
}

fn working_uuid(bench_case: BenchCase) -> Uuid {
Uuid::from_u128(
0x733b_0000_0000_0000_0000_0000_0000_0000
| ((bench_case.num_shards as u128) << 64)
| bench_case.num_partitions as u128,
)
}

fn create_batches() -> (Arc<ArrowSchema>, Vec<RecordBatch>) {
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"vector",
Expand Down Expand Up @@ -262,101 +247,41 @@ fn contiguous_fragment_groups(dataset: &Dataset, num_shards: usize) -> Vec<Vec<u
.collect()
}

async fn build_partial_fixture(dataset: &mut Dataset, bench_case: BenchCase) -> MergeFixture {
let fixture_uuid = fixture_uuid(bench_case);
let index_dir = dataset_root()
/// Auxiliary-file size of an uncommitted segment on disk.
fn segment_auxiliary_bytes(segment: &IndexMetadata) -> u64 {
let aux_path = dataset_root()
.join("_indices")
.join(fixture_uuid.to_string());

if has_partial_dirs(&index_dir) {
return MergeFixture {
partial_aux_bytes: sum_partial_auxiliary_bytes(&index_dir),
partial_dir_count: count_partial_dirs(&index_dir),
index_dir,
};
}

if index_dir.exists() {
fs::remove_dir_all(&index_dir).unwrap();
}
.join(segment.uuid.to_string())
.join(AUXILIARY_FILE_NAME);
fs::metadata(aux_path).map(|m| m.len()).unwrap_or(0)
}

async fn build_partial_fixture(dataset: &mut Dataset, bench_case: BenchCase) -> MergeFixture {
let fragment_groups = contiguous_fragment_groups(dataset, bench_case.num_shards);
let (ivf_params, pq_params) =
Box::pin(train_shared_ivf_pq(dataset, bench_case.num_partitions)).await;
let params = VectorIndexParams::with_ivf_pq_params(DistanceType::L2, ivf_params, pq_params);

// Build one uncommitted segment per shard. Segment UUIDs are generated by
// Lance; collect the returned metadata to feed the merge.
let mut segments = Vec::with_capacity(fragment_groups.len());
for fragments in fragment_groups {
let mut builder = dataset.create_index_builder(&["vector"], IndexType::Vector, &params);
builder = builder
let mut builder = dataset
.create_index_builder(&["vector"], IndexType::Vector, &params)
.name("distributed_merge_only".to_string())
.fragments(fragments)
.index_uuid(fixture_uuid.to_string());
Box::pin(builder.execute_uncommitted()).await.unwrap();
.fragments(fragments);
let segment = Box::pin(builder.execute_uncommitted()).await.unwrap();
segments.push(segment);
}

MergeFixture {
partial_aux_bytes: sum_partial_auxiliary_bytes(&index_dir),
partial_dir_count: count_partial_dirs(&index_dir),
index_dir,
}
}
let segment_count = segments.len();
let segment_aux_bytes = segments.iter().map(segment_auxiliary_bytes).sum();

fn has_partial_dirs(index_dir: &Path) -> bool {
fs::read_dir(index_dir)
.ok()
.into_iter()
.flatten()
.flatten()
.any(|entry| {
entry.file_type().map(|t| t.is_dir()).unwrap_or(false)
&& entry.file_name().to_string_lossy().starts_with("partial_")
})
}

fn count_partial_dirs(index_dir: &Path) -> usize {
fs::read_dir(index_dir)
.unwrap()
.flatten()
.filter(|entry| {
entry.file_type().map(|t| t.is_dir()).unwrap_or(false)
&& entry.file_name().to_string_lossy().starts_with("partial_")
})
.count()
}

fn sum_partial_auxiliary_bytes(index_dir: &Path) -> u64 {
fs::read_dir(index_dir)
.unwrap()
.flatten()
.filter(|entry| {
entry.file_type().map(|t| t.is_dir()).unwrap_or(false)
&& entry.file_name().to_string_lossy().starts_with("partial_")
})
.map(|entry| entry.path().join("auxiliary.idx"))
.filter_map(|path| fs::metadata(path).ok())
.map(|metadata| metadata.len())
.sum()
}

fn copy_dir_recursive(source: &Path, target: &Path) {
fs::create_dir_all(target).unwrap();
for entry in fs::read_dir(source).unwrap().flatten() {
let source_path = entry.path();
let target_path = target.join(entry.file_name());
let file_type = entry.file_type().unwrap();
if file_type.is_dir() {
copy_dir_recursive(&source_path, &target_path);
} else {
fs::copy(&source_path, &target_path).unwrap();
}
}
}

fn prepare_iteration_target(source: &Path, target: &Path) {
if target.exists() {
fs::remove_dir_all(target).unwrap();
MergeFixture {
segments,
segment_aux_bytes,
segment_count,
}
copy_dir_recursive(source, target);
}

fn write_case_metadata(fixtures: &[(BenchCase, MergeFixture)]) {
Expand All @@ -368,10 +293,9 @@ fn write_case_metadata(fixtures: &[(BenchCase, MergeFixture)]) {
label: bench_case.label(),
num_shards: bench_case.num_shards,
num_partitions: bench_case.num_partitions,
partial_dir_count: fixture.partial_dir_count,
partial_aux_bytes: fixture.partial_aux_bytes,
partial_aux_bytes_per_shard: fixture.partial_aux_bytes
/ fixture.partial_dir_count as u64,
segment_count: fixture.segment_count,
segment_aux_bytes: fixture.segment_aux_bytes,
segment_aux_bytes_per_shard: fixture.segment_aux_bytes / fixture.segment_count as u64,
total_rows: NUM_FRAGMENTS * ROWS_PER_FRAGMENT,
rows_per_shard: (NUM_FRAGMENTS * ROWS_PER_FRAGMENT) / bench_case.num_shards,
})
Expand All @@ -398,33 +322,30 @@ fn bench_distributed_merge_only(c: &mut Criterion) {
group.sample_size(10);

for (bench_case, fixture) in fixtures {
let target_uuid = working_uuid(bench_case);
let target_index_dir_fs = dataset_root()
.join("_indices")
.join(target_uuid.to_string());
let source_index_dir_fs = fixture.index_dir.clone();

group.throughput(Throughput::Bytes(fixture.partial_aux_bytes));
group.throughput(Throughput::Bytes(fixture.segment_aux_bytes));
group.bench_with_input(
BenchmarkId::new("finalize_only", bench_case.label()),
&bench_case,
|b, _| {
let dataset = dataset.clone();
let target_index_dir_fs = target_index_dir_fs.clone();
let source_index_dir_fs = source_index_dir_fs.clone();
b.iter_batched(
|| prepare_iteration_target(&source_index_dir_fs, &target_index_dir_fs),
|_| {
rt.block_on(dataset.merge_index_metadata(
&target_uuid.to_string(),
IndexType::IvfPq,
None,
noop_progress(),
))
.unwrap();
},
BatchSize::PerIteration,
);
let segments = fixture.segments.clone();
b.iter_custom(|iters| {
let mut total = Duration::ZERO;
for _ in 0..iters {
let segments = segments.clone();
let start = Instant::now();
let merged = rt
.block_on(dataset.merge_existing_index_segments(segments))
.unwrap();
total += start.elapsed();

let merged_dir = dataset_root()
.join("_indices")
.join(merged.uuid.to_string());
let _ = fs::remove_dir_all(merged_dir);
}
total
});
},
);
}
Expand Down
3 changes: 1 addition & 2 deletions rust/lance/src/dataset/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,15 +218,14 @@ mod tests {
let target_fragments = fragments.iter().take(2).collect::<Vec<_>>();

let params = VectorIndexParams::ivf_flat(2, MetricType::L2);
let first_segment_uuid = Uuid::new_v4();
let second_segment_uuid = Uuid::new_v4();
let built_index = dataset
.create_index_builder(&["vector"], IndexType::Vector, &params)
.name("vector_idx".to_string())
.index_uuid(first_segment_uuid.to_string())
.execute_uncommitted()
.await
.unwrap();
let first_segment_uuid = built_index.uuid;
let first_segment_dir = dataset.indices_dir().join(first_segment_uuid.to_string());
let second_segment_dir = dataset.indices_dir().join(second_segment_uuid.to_string());
for file_name in ["index.idx", "auxiliary.idx"] {
Expand Down
Loading
Loading