Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make the index cache size (in bytes) available #2381

Merged
merged 3 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ datafusion-execution = "37.1"
datafusion-physical-expr = { version = "37.1", features = [
"regex_expressions",
] }
deepsize = "0.2.0"
either = "1.0"
futures = "0.3"
http = "0.2.9"
Expand Down
6 changes: 6 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1517,6 +1517,12 @@ def create_index(
self._ds.create_index(column, index_type, name, replace, kwargs)
return LanceDataset(self.uri, index_cache_size=index_cache_size)

def cache_size_bytes(self) -> int:
"""
Return the total size of the index + file metadata caches in bytes.
"""
return self._ds.cache_size_bytes()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should have some sort of session handle instead. It might be too early, but would be nice to have something users could pass around and re-use across datasets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a new session object. It can only report the size right now, so you can't use it to pass between datasets, but it will help avoid breaking changes in the future when we do add that capability.

@staticmethod
def _commit(
base_uri: Union[str, Path],
Expand Down
16 changes: 16 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1554,6 +1554,22 @@ def test_metadata(tmp_path: Path):
lance.write_dataset(data, tmp_path)


def test_cache_size_bytes(tmp_path: Path):
"""Test expose physical row ids in the scanner."""
data = pa.table({"a": range(1000)})
lance.write_dataset(data, tmp_path, max_rows_per_file=250)

ds = lance.dataset(tmp_path)

initial_size = ds.cache_size_bytes()

ds.scanner().to_table()

after_scan_size = ds.cache_size_bytes()

assert after_scan_size > initial_size


def test_scan_with_row_ids(tmp_path: Path):
"""Test expose physical row ids in the scanner."""
data = pa.table({"a": range(1000)})
Expand Down
4 changes: 4 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,10 @@ impl Dataset {
Ok(self.ds.index_cache_hit_rate())
}

fn cache_size_bytes(&self) -> u64 {
self.ds.cache_size_bytes()
}

#[staticmethod]
fn commit(
dataset_uri: &str,
Expand Down
1 change: 1 addition & 0 deletions rust/lance-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ bytes.workspace = true
chrono.workspace = true
datafusion-common = { workspace = true, optional = true }
datafusion-sql = { workspace = true, optional = true }
deepsize.workspace = true
futures.workspace = true
lazy_static.workspace = true
mock_instant.workspace = true
Expand Down
46 changes: 41 additions & 5 deletions rust/lance-core/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use std::any::{Any, TypeId};
use std::sync::Arc;

use deepsize::{Context, DeepSizeOf};
use futures::Future;
use moka::sync::Cache;
use object_store::path::Path;
Expand All @@ -17,12 +18,46 @@ pub const DEFAULT_METADATA_CACHE_SIZE: usize = 128;

type ArcAny = Arc<dyn Any + Send + Sync>;

#[derive(Clone)]
struct SizedRecord {
record: ArcAny,
size_accessor: Arc<dyn Fn(ArcAny) -> usize + Send + Sync>,
}

impl std::fmt::Debug for SizedRecord {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SizedRecord")
.field("record", &self.record)
.finish()
}
}

impl SizedRecord {
fn new<T: DeepSizeOf + Send + Sync + 'static>(record: Arc<T>) -> Self {
let size_accessor =
|record: ArcAny| -> usize { record.downcast_ref::<T>().unwrap().deep_size_of() };
Self {
record,
size_accessor: Arc::new(size_accessor),
}
}
}

/// Cache for various metadata about files.
///
/// The cache is keyed by the file path and the type of metadata.
#[derive(Clone, Debug)]
pub struct FileMetadataCache {
cache: Arc<Cache<(Path, TypeId), ArcAny>>,
cache: Arc<Cache<(Path, TypeId), SizedRecord>>,
}

impl DeepSizeOf for FileMetadataCache {
fn deep_size_of_children(&self, _: &mut Context) -> usize {
self.cache
.iter()
.map(|(_, v)| (v.size_accessor)(v.record))
.sum()
}
}

impl FileMetadataCache {
Expand All @@ -35,19 +70,20 @@ impl FileMetadataCache {
pub fn get<T: Send + Sync + 'static>(&self, path: &Path) -> Option<Arc<T>> {
self.cache
.get(&(path.to_owned(), TypeId::of::<T>()))
.map(|metadata| metadata.clone().downcast::<T>().unwrap())
.map(|metadata| metadata.record.clone().downcast::<T>().unwrap())
}

pub fn insert<T: Send + Sync + 'static>(&self, path: Path, metadata: Arc<T>) {
self.cache.insert((path, TypeId::of::<T>()), metadata);
pub fn insert<T: DeepSizeOf + Send + Sync + 'static>(&self, path: Path, metadata: Arc<T>) {
self.cache
.insert((path, TypeId::of::<T>()), SizedRecord::new(metadata));
}

/// Get an item
///
/// If it exists in the cache return that
///
/// If it doesn't then run `loader` to load the item, insert into cache, and return
pub async fn get_or_insert<T: Send + Sync + 'static, F, Fut>(
pub async fn get_or_insert<T: DeepSizeOf + Send + Sync + 'static, F, Fut>(
&self,
path: &Path,
loader: F,
Expand Down
12 changes: 11 additions & 1 deletion rust/lance-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::sync::Arc;

use arrow_array::ArrayRef;
use arrow_schema::{DataType, Field as ArrowField, TimeUnit};
use deepsize::DeepSizeOf;
use lance_arrow::bfloat16::{
is_bfloat16_field, ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BFLOAT16_EXT_NAME,
};
Expand All @@ -24,7 +25,7 @@ pub use schema::Schema;

/// LogicalType is a string presentation of arrow type.
/// to be serialized into protobuf.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
pub struct LogicalType(String);

impl fmt::Display for LogicalType {
Expand Down Expand Up @@ -324,6 +325,15 @@ pub struct Dictionary {
pub values: Option<ArrayRef>,
}

impl DeepSizeOf for Dictionary {
fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
self.values
.as_ref()
.map(|v| v.get_array_memory_size())
.unwrap_or(0)
}
}

impl PartialEq for Dictionary {
fn eq(&self, other: &Self) -> bool {
match (&self.values, &other.values) {
Expand Down
5 changes: 3 additions & 2 deletions rust/lance-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use arrow_array::{
ArrayRef,
};
use arrow_schema::{DataType, Field as ArrowField};
use deepsize::DeepSizeOf;
use lance_arrow::{bfloat16::ARROW_EXT_NAME_KEY, *};
use snafu::{location, Location};

Expand All @@ -34,7 +35,7 @@ pub struct SchemaCompareOptions {
pub compare_field_ids: bool,
}
/// Encoding enum.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
pub enum Encoding {
/// Plain encoding.
Plain,
Expand All @@ -48,7 +49,7 @@ pub enum Encoding {

/// Lance Schema Field
///
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
pub struct Field {
pub name: String,
pub id: i32,
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-core/src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ use std::{

use arrow_array::RecordBatch;
use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
use deepsize::DeepSizeOf;
use lance_arrow::*;
use snafu::{location, Location};

use super::field::{Field, SchemaCompareOptions};
use crate::{Error, Result};

/// Lance Schema.
#[derive(Default, Debug, Clone)]
#[derive(Default, Debug, Clone, DeepSizeOf)]
pub struct Schema {
/// Top-level fields in the dataset.
pub fields: Vec<Field>,
Expand Down
12 changes: 12 additions & 0 deletions rust/lance-core/src/utils/deletion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{collections::HashSet, ops::Range};

use arrow_array::BooleanArray;
use deepsize::{Context, DeepSizeOf};
use roaring::RoaringBitmap;

/// Threshold for when a DeletionVector::Set should be promoted to a DeletionVector::Bitmap.
Expand All @@ -18,6 +19,17 @@ pub enum DeletionVector {
Bitmap(RoaringBitmap),
}

impl DeepSizeOf for DeletionVector {
fn deep_size_of_children(&self, context: &mut Context) -> usize {
match self {
Self::NoDeletions => 0,
Self::Set(set) => set.deep_size_of_children(context),
// Inexact but probably close enough
Self::Bitmap(bitmap) => bitmap.serialized_size(),
}
}
}

impl DeletionVector {
#[allow(dead_code)] // Used in tests
pub fn len(&self) -> usize {
Expand Down
1 change: 1 addition & 0 deletions rust/lance-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ async-trait.workspace = true
byteorder.workspace = true
bytes.workspace = true
datafusion-common.workspace = true
deepsize.workspace = true
futures.workspace = true
lance-datagen.workspace = true
log.workspace = true
Expand Down
5 changes: 3 additions & 2 deletions rust/lance-file/src/format/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ use std::ops::Range;

use crate::datatypes::{Fields, FieldsWithMeta};
use crate::format::pb;
use deepsize::DeepSizeOf;
use lance_core::datatypes::Schema;
use lance_core::{Error, Result};
use lance_io::traits::ProtoStruct;
use snafu::{location, Location};
/// Data File Metadata
#[derive(Debug, Default, PartialEq)]
#[derive(Debug, Default, DeepSizeOf, PartialEq)]
pub struct Metadata {
/// Offset of each record batch.
pub batch_offsets: Vec<i32>,
Expand Down Expand Up @@ -197,7 +198,7 @@ impl Metadata {
}

/// Metadata about the statistics
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, DeepSizeOf)]
pub struct StatisticsMetadata {
/// Schema of the page-level statistics.
///
Expand Down
5 changes: 3 additions & 2 deletions rust/lance-file/src/page_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use arrow_array::builder::Int64Builder;
use arrow_array::{Array, Int64Array};
use arrow_schema::DataType;
use deepsize::DeepSizeOf;
use lance_io::encodings::plain::PlainDecoder;
use lance_io::encodings::Decoder;
use snafu::{location, Location};
Expand All @@ -13,7 +14,7 @@ use tokio::io::AsyncWriteExt;
use lance_core::{Error, Result};
use lance_io::traits::{Reader, Writer};

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, DeepSizeOf)]
pub struct PageInfo {
pub position: usize,
pub length: usize,
Expand All @@ -27,7 +28,7 @@ impl PageInfo {

/// Page lookup table.
///
#[derive(Debug, Default, Clone, PartialEq)]
#[derive(Debug, Default, Clone, PartialEq, DeepSizeOf)]
pub struct PageTable {
/// map[field-id, map[batch-id, PageInfo]]
pages: BTreeMap<i32, BTreeMap<i32, PageInfo>>,
Expand Down
5 changes: 3 additions & 2 deletions rust/lance-file/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema};
use arrow_select::concat::{self, concat_batches};
use arrow_select::filter::filter_record_batch;
use async_recursion::async_recursion;
use deepsize::DeepSizeOf;
use futures::{stream, Future, FutureExt, StreamExt, TryStreamExt};
use lance_arrow::*;
use lance_core::cache::FileMetadataCache;
Expand Down Expand Up @@ -51,7 +52,7 @@ fn compute_row_id(fragment_id: u64, offset: i32) -> u64 {
/// Lance File Reader.
///
/// It reads arrow data from one data file.
#[derive(Clone)]
#[derive(Clone, DeepSizeOf)]
pub struct FileReader {
pub object_reader: Arc<dyn Reader>,
metadata: Arc<Metadata>,
Expand Down Expand Up @@ -229,7 +230,7 @@ impl FileReader {
}

/// Load some metadata about the fragment from the cache, if there is one.
async fn load_from_cache<T: Send + Sync + 'static, F, Fut>(
async fn load_from_cache<T: DeepSizeOf + Send + Sync + 'static, F, Fut>(
cache: Option<&FileMetadataCache>,
path: &Path,
loader: F,
Expand Down
1 change: 1 addition & 0 deletions rust/lance-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ datafusion-expr.workspace = true
datafusion-physical-expr.workspace = true
datafusion-sql.workspace = true
datafusion.workspace = true
deepsize.workspace = true
futures.workspace = true
half.workspace = true
itertools.workspace = true
Expand Down
5 changes: 3 additions & 2 deletions rust/lance-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use std::{any::Any, sync::Arc};

use async_trait::async_trait;
use deepsize::DeepSizeOf;
use lance_core::Result;
use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize};
Expand All @@ -37,7 +38,7 @@ pub mod pb {
/// Generic methods common across all types of secondary indices
///
#[async_trait]
pub trait Index: Send + Sync {
pub trait Index: Send + Sync + DeepSizeOf {
/// Cast to [Any].
fn as_any(&self) -> &dyn Any;

Expand All @@ -58,7 +59,7 @@ pub trait Index: Send + Sync {
}

/// Index Type
#[derive(Debug, PartialEq, Eq, Copy, Hash, Clone)]
#[derive(Debug, PartialEq, Eq, Copy, Hash, Clone, DeepSizeOf)]
pub enum IndexType {
// Preserve 0-100 for simple indices.
Scalar = 0,
Expand Down
5 changes: 3 additions & 2 deletions rust/lance-index/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_common::{scalar::ScalarValue, Column};

use datafusion_expr::Expr;
use deepsize::DeepSizeOf;
use lance_core::Result;

use crate::Index;
Expand Down Expand Up @@ -48,7 +49,7 @@ pub trait IndexReader: Send + Sync {
/// named "files". The index store is responsible for serializing and deserializing
/// these batches into file data (e.g. as .lance files or .parquet files, etc.)
#[async_trait]
pub trait IndexStore: std::fmt::Debug + Send + Sync {
pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
fn as_any(&self) -> &dyn Any;

/// Create a new file and return a writer to store data in the file
Expand Down Expand Up @@ -173,7 +174,7 @@ impl ScalarQuery {

/// A trait for a scalar index, a structure that can determine row ids that satisfy scalar queries
#[async_trait]
pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index {
pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
/// Search the scalar index
///
/// Returns all row ids that satisfy the query, these row ids are not neccesarily ordered
Expand Down
Loading
Loading