Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose index cache size #1587

Merged
merged 11 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion python/python/lance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ def dataset(
Approximately, ``n = Total Rows / number of IVF partitions``.
``pq = number of PQ sub-vectors``.
"""
ds = LanceDataset(uri, version, block_size, commit_lock=commit_lock)
ds = LanceDataset(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the index cache have a TTL (I'm looking at the comment above which, to be fair, looks like it isn't part of this PR)? I don't think it does.

uri,
version,
block_size,
commit_lock=commit_lock,
index_cache_size=index_cache_size,
)
if version is None and asof is not None:
ts_cutoff = sanitize_ts(asof)
ver_cutoff = max(
Expand Down
6 changes: 5 additions & 1 deletion python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,7 @@ def create_index(
ivf_centroids: Optional[Union[np.ndarray, pa.FixedSizeListArray]] = None,
num_sub_vectors: Optional[int] = None,
accelerator: Optional[Union[str, "torch.Device"]] = None,
index_cache_size: Optional[int] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be better to change create_index so that it modifies the dataset in-place instead of returning a new dataset. Though that would be a breaking change so perhaps that ship has sailed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was intending to do that at some point. I had done this on the Rust side earlier since it was a source of bugs: #1118

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That does seem like the better design to me.

**kwargs,
) -> LanceDataset:
"""Create index on column.
Expand Down Expand Up @@ -870,6 +871,8 @@ def create_index(
If set, use an accelerator to speed up the training process.
Accepted accelerator: "cuda" (Nvidia GPU) and "mps" (Apple Silicon GPU).
If not set, use the CPU.
index_cache_size : int, optional
The size of the index cache in number of entries. Default value is 256.
kwargs :
Parameters passed to the index building process.

Expand Down Expand Up @@ -1015,7 +1018,7 @@ def create_index(
kwargs["ivf_centroids"] = ivf_centroids_batch

self._ds.create_index(column, index_type, name, replace, kwargs)
return LanceDataset(self.uri)
return LanceDataset(self.uri, index_cache_size=index_cache_size)

@staticmethod
def _commit(
Expand Down Expand Up @@ -1807,6 +1810,7 @@ def index_stats(self, index_name: str) -> Dict[str, Any]:
index_stats = json.loads(self._ds.index_statistics(index_name))
index_stats["num_indexed_rows"] = self._ds.count_indexed_rows(index_name)
index_stats["num_unindexed_rows"] = self._ds.count_unindexed_rows(index_name)
index_stats["index_cache_entry_count"] = self._ds.index_cache_entry_count()
return index_stats


Expand Down
44 changes: 44 additions & 0 deletions python/python/tests/test_vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ def test_pre_populated_ivf_centroids(dataset, tmp_path: Path):
if platform.system() == "Windows":
expected_filepath = expected_filepath.replace("\\", "/")
expected_statistics = {
"index_cache_entry_count": 1,
"index_type": "IVF",
"uuid": index_uuid,
"uri": expected_filepath,
Expand Down Expand Up @@ -465,3 +466,46 @@ def test_knn_with_deletions(tmp_path):
assert len(results) == 10

assert expected == [r.as_py() for r in results]


def test_index_cache_size(tmp_path):
rng = np.random.default_rng(seed=42)

def query_index(ds, ntimes):
ndim = ds.schema[0].type.list_size
for _ in range(ntimes):
ds.to_table(
nearest={
"column": "vector",
"q": rng.standard_normal(ndim),
},
)

tbl = create_table(nvec=1024, ndim=16)
dataset = lance.write_dataset(tbl, tmp_path / "test")

indexed_dataset = dataset.create_index(
"vector",
index_type="IVF_PQ",
num_partitions=128,
num_sub_vectors=2,
index_cache_size=10,
)

assert (
indexed_dataset.stats.index_stats("vector_idx")["index_cache_entry_count"] == 1
)
query_index(indexed_dataset, 1)
assert (
indexed_dataset.stats.index_stats("vector_idx")["index_cache_entry_count"] == 2
)
query_index(indexed_dataset, 128)
assert (
indexed_dataset.stats.index_stats("vector_idx")["index_cache_entry_count"] == 10
)

indexed_dataset = lance.LanceDataset(indexed_dataset.uri, index_cache_size=5)
query_index(indexed_dataset, 128)
assert (
indexed_dataset.stats.index_stats("vector_idx")["index_cache_entry_count"] == 5
)
4 changes: 4 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,10 @@ impl Dataset {
}
}

fn index_cache_entry_count(&self) -> PyResult<usize> {
Ok(self.ds.index_cache_entry_count())
}

#[staticmethod]
fn commit(
dataset_uri: &str,
Expand Down
5 changes: 5 additions & 0 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,11 @@ impl Dataset {
Version::from(self.manifest.as_ref())
}

/// Get the number of entries currently in the index cache.
pub fn index_cache_entry_count(&self) -> usize {
self.session.index_cache.get_size()
}

/// Get all versions.
pub async fn versions(&self) -> Result<Vec<Version>> {
let mut versions: Vec<Version> = self
Expand Down
3 changes: 3 additions & 0 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2566,6 +2566,7 @@ mod test {
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();

assert_eq!(dataset.index_cache_entry_count(), 0);
dataset
.create_index(
&["vec"],
Expand All @@ -2584,6 +2585,7 @@ mod test {
scan.refine(100);
scan.nprobs(100);

assert_eq!(dataset.index_cache_entry_count(), 0);
let results = scan
.try_into_stream()
.await
Expand All @@ -2592,6 +2594,7 @@ mod test {
.await
.unwrap();

assert_eq!(dataset.index_cache_entry_count(), 5);
assert_eq!(results.len(), 1);
let batch = &results[0];

Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/index/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,5 +246,6 @@ mod tests {
.iter()
.sum::<usize>();
assert_eq!(row_in_index, 2000);
assert_eq!(dataset.index_cache_entry_count(), 6)
}
}
6 changes: 6 additions & 0 deletions rust/lance/src/index/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ impl IndexCache {
self.vector_cache.entry_count() as usize
}

pub(crate) fn get_size(&self) -> usize {
self.scalar_cache.sync();
self.vector_cache.sync();
self.scalar_cache.entry_count() as usize + self.vector_cache.entry_count() as usize
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wjones127 do you feel just summing this is ok?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH, I’m not sure I care about the entry count. As a user, I would much rather set the limit in terms of bytes and get the total bytes consumed in the statistics. So this is fine for now but long term I think we ought to consider switching to evicting based on in-memory size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC we can use weighted_size and with weigher to do that. (I hope moka is better at cache invaliation than naming).
I can give that a quick try.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually let's make that a separate issue to not stall this one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope moka is better at cache invaliation than naming
🤣
Actually let's make that a separate issue to not stall this one.

Sounds good!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #1613

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think summing is fine. We might just merge these into one field someday.

However, I think there is one potential problem. The user might set the index cache size to X and then, if they have both scalar and vector indices, they might see an entry count of 2 * X. Still, the best long term solution is probably to do bytes so let's stick with summing for now.

}

/// Get an Index if present. Otherwise returns [None].
pub(crate) fn get_scalar(&self, key: &str) -> Option<Arc<dyn ScalarIndex>> {
self.scalar_cache.get(key)
Expand Down
2 changes: 2 additions & 0 deletions rust/lance/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,15 @@ mod tests {
MetricType::L2,
));
let idx = Arc::new(PQIndex::new(pq, MetricType::L2));
assert_eq!(session.index_cache.get_size(), 0);
session.index_cache.insert_vector("abc", idx.clone());

let found = session.index_cache.get_vector("abc");
assert!(found.is_some());
assert_eq!(format!("{:?}", found.unwrap()), format!("{:?}", idx));
assert!(session.index_cache.get_vector("abc").is_some());
assert_eq!(session.index_cache.len_vector(), 1);
assert_eq!(session.index_cache.get_size(), 1);

for iter_idx in 0..100 {
let pq_other = Arc::new(ProductQuantizerImpl::<Float32Type>::new(
Expand Down
Loading