Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make the index cache size (in bytes) available #2381

Merged
merged 3 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ datafusion-execution = "37.1"
datafusion-physical-expr = { version = "37.1", features = [
"regex_expressions",
] }
deepsize = "0.2.0"
either = "1.0"
futures = "0.3"
http = "0.2.9"
Expand Down
7 changes: 7 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
)
from .lance import CompactionMetrics as CompactionMetrics
from .lance import __version__ as __version__
from .lance import _Session as Session
from .optimize import Compaction
from .schema import LanceSchema
from .util import td_to_micros
Expand Down Expand Up @@ -1517,6 +1518,12 @@ def create_index(
self._ds.create_index(column, index_type, name, replace, kwargs)
return LanceDataset(self.uri, index_cache_size=index_cache_size)

def session(self) -> Session:
"""
Return the dataset session, which holds the dataset's state.
"""
return self._ds.session()

@staticmethod
def _commit(
base_uri: Union[str, Path],
Expand Down
3 changes: 3 additions & 0 deletions python/python/lance/lance/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,6 @@ class LanceFileMetadata:
num_global_buffer_bytes: int
global_buffers: List[LanceBufferDescriptor]
columns: List[LanceColumnMetadata]

class _Session:
def size_bytes(self) -> int: ...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarity, should it be cache_size_bytes()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, if we have any other stateful objects in the session, we would want to capture that here as well. In other words, it would make more sense to rename Session to DatasetCache than it would to rename size_bytes to cache_size_bytes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. For context, I'm thinking other methods we might set on the context will be execution args, like:

class Session:
    def set_cpu_pool_size(num_cpus: int): ...
    def set_io_pool_size(num_cpus: int): ...
    def set_memory_limit(memory_limit: int): ...
    def set_spill_location(path: Path): ...

So I'm thinking of the Session object as containing more state than the cache.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I'm thinking of the Session object as containing more state than the cache.

Sorry, my comment was probably confusing. I think this is good, my point should have been "it doesn't make sense to rename Session to DatasetCache and therefore doesn't make sense to rename size_bytes to cache_size_bytes".

For example, maybe DF starts to need some "thread local temp space" or "slab allocator working area" or whatever (I'm probably just making terms up at this point). This would go in the session, would not be trivial in size, and would not be considered a "cache", but we would still want to make sure that size_bytes includes it.

24 changes: 24 additions & 0 deletions python/python/tests/test_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

from pathlib import Path

import lance
import pyarrow as pa


def test_cache_size_bytes(
tmp_path: Path,
):
data = pa.table({"a": range(1000)})
lance.write_dataset(data, tmp_path, max_rows_per_file=250)

ds = lance.dataset(tmp_path)

initial_size = ds.session().size_bytes()

ds.scanner().to_table()

after_scan_size = ds.session().size_bytes()

assert after_scan_size > initial_size
5 changes: 5 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use snafu::{location, Location};

use crate::fragment::{FileFragment, FragmentMetadata};
use crate::schema::LanceSchema;
use crate::session::Session;
use crate::RT;
use crate::{LanceReader, Scanner};

Expand Down Expand Up @@ -972,6 +973,10 @@ impl Dataset {
Ok(self.ds.index_cache_hit_rate())
}

fn session(&self) -> Session {
Session::new(self.ds.session())
}

#[staticmethod]
fn commit(
dataset_uri: &str,
Expand Down
3 changes: 3 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use futures::StreamExt;
use lance_index::DatasetIndexExt;
use pyo3::exceptions::{PyIOError, PyValueError};
use pyo3::prelude::*;
use session::Session;

#[macro_use]
extern crate lazy_static;
Expand All @@ -59,6 +60,7 @@ pub(crate) mod fragment;
pub(crate) mod reader;
pub(crate) mod scanner;
pub(crate) mod schema;
pub(crate) mod session;
pub(crate) mod tracing;
pub(crate) mod updater;
pub(crate) mod utils;
Expand Down Expand Up @@ -127,6 +129,7 @@ fn lance(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<PyCompactionPlan>()?;
m.add_class::<PyRewriteResult>()?;
m.add_class::<PyCompactionMetrics>()?;
m.add_class::<Session>()?;
m.add_class::<TraceGuard>()?;
m.add_class::<schema::LanceSchema>()?;
m.add_wrapped(wrap_pyfunction!(bfloat16_array))?;
Expand Down
31 changes: 31 additions & 0 deletions python/src/session.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::sync::Arc;

use pyo3::{pyclass, pymethods};

use lance::session::Session as LanceSession;

/// The Session holds stateful information for a dataset.
///
/// The session contains caches for opened indices and file metadata.
#[pyclass(name = "_Session", module = "_lib")]
#[derive(Clone)]
pub struct Session {
inner: Arc<LanceSession>,
}

impl Session {
pub fn new(inner: Arc<LanceSession>) -> Self {
Self { inner }
}
}

#[pymethods]
impl Session {
/// Return the current size of the session in bytes
pub fn size_bytes(&self) -> u64 {
self.inner.size_bytes()
}
}
1 change: 1 addition & 0 deletions rust/lance-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ bytes.workspace = true
chrono.workspace = true
datafusion-common = { workspace = true, optional = true }
datafusion-sql = { workspace = true, optional = true }
deepsize.workspace = true
futures.workspace = true
lazy_static.workspace = true
mock_instant.workspace = true
Expand Down
46 changes: 41 additions & 5 deletions rust/lance-core/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use std::any::{Any, TypeId};
use std::sync::Arc;

use deepsize::{Context, DeepSizeOf};
use futures::Future;
use moka::sync::Cache;
use object_store::path::Path;
Expand All @@ -17,12 +18,46 @@ pub const DEFAULT_METADATA_CACHE_SIZE: usize = 128;

type ArcAny = Arc<dyn Any + Send + Sync>;

#[derive(Clone)]
struct SizedRecord {
record: ArcAny,
size_accessor: Arc<dyn Fn(ArcAny) -> usize + Send + Sync>,
}

impl std::fmt::Debug for SizedRecord {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SizedRecord")
.field("record", &self.record)
.finish()
}
}

impl SizedRecord {
fn new<T: DeepSizeOf + Send + Sync + 'static>(record: Arc<T>) -> Self {
let size_accessor =
|record: ArcAny| -> usize { record.downcast_ref::<T>().unwrap().deep_size_of() };
Self {
record,
size_accessor: Arc::new(size_accessor),
}
}
}

/// Cache for various metadata about files.
///
/// The cache is keyed by the file path and the type of metadata.
#[derive(Clone, Debug)]
pub struct FileMetadataCache {
cache: Arc<Cache<(Path, TypeId), ArcAny>>,
cache: Arc<Cache<(Path, TypeId), SizedRecord>>,
}

impl DeepSizeOf for FileMetadataCache {
fn deep_size_of_children(&self, _: &mut Context) -> usize {
self.cache
.iter()
.map(|(_, v)| (v.size_accessor)(v.record))
.sum()
}
}

impl FileMetadataCache {
Expand All @@ -35,19 +70,20 @@ impl FileMetadataCache {
pub fn get<T: Send + Sync + 'static>(&self, path: &Path) -> Option<Arc<T>> {
self.cache
.get(&(path.to_owned(), TypeId::of::<T>()))
.map(|metadata| metadata.clone().downcast::<T>().unwrap())
.map(|metadata| metadata.record.clone().downcast::<T>().unwrap())
}

pub fn insert<T: Send + Sync + 'static>(&self, path: Path, metadata: Arc<T>) {
self.cache.insert((path, TypeId::of::<T>()), metadata);
pub fn insert<T: DeepSizeOf + Send + Sync + 'static>(&self, path: Path, metadata: Arc<T>) {
self.cache
.insert((path, TypeId::of::<T>()), SizedRecord::new(metadata));
}

/// Get an item
///
/// If it exists in the cache return that
///
/// If it doesn't then run `loader` to load the item, insert into cache, and return
pub async fn get_or_insert<T: Send + Sync + 'static, F, Fut>(
pub async fn get_or_insert<T: DeepSizeOf + Send + Sync + 'static, F, Fut>(
&self,
path: &Path,
loader: F,
Expand Down
12 changes: 11 additions & 1 deletion rust/lance-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::sync::Arc;

use arrow_array::ArrayRef;
use arrow_schema::{DataType, Field as ArrowField, TimeUnit};
use deepsize::DeepSizeOf;
use lance_arrow::bfloat16::{
is_bfloat16_field, ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BFLOAT16_EXT_NAME,
};
Expand All @@ -24,7 +25,7 @@ pub use schema::Schema;

/// LogicalType is a string presentation of arrow type.
/// to be serialized into protobuf.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
pub struct LogicalType(String);

impl fmt::Display for LogicalType {
Expand Down Expand Up @@ -324,6 +325,15 @@ pub struct Dictionary {
pub values: Option<ArrayRef>,
}

impl DeepSizeOf for Dictionary {
fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
self.values
.as_ref()
.map(|v| v.get_array_memory_size())
.unwrap_or(0)
}
}

impl PartialEq for Dictionary {
fn eq(&self, other: &Self) -> bool {
match (&self.values, &other.values) {
Expand Down
5 changes: 3 additions & 2 deletions rust/lance-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use arrow_array::{
ArrayRef,
};
use arrow_schema::{DataType, Field as ArrowField};
use deepsize::DeepSizeOf;
use lance_arrow::{bfloat16::ARROW_EXT_NAME_KEY, *};
use snafu::{location, Location};

Expand All @@ -34,7 +35,7 @@ pub struct SchemaCompareOptions {
pub compare_field_ids: bool,
}
/// Encoding enum.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
pub enum Encoding {
/// Plain encoding.
Plain,
Expand All @@ -48,7 +49,7 @@ pub enum Encoding {

/// Lance Schema Field
///
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
pub struct Field {
pub name: String,
pub id: i32,
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-core/src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ use std::{

use arrow_array::RecordBatch;
use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
use deepsize::DeepSizeOf;
use lance_arrow::*;
use snafu::{location, Location};

use super::field::{Field, SchemaCompareOptions};
use crate::{Error, Result};

/// Lance Schema.
#[derive(Default, Debug, Clone)]
#[derive(Default, Debug, Clone, DeepSizeOf)]
pub struct Schema {
/// Top-level fields in the dataset.
pub fields: Vec<Field>,
Expand Down
12 changes: 12 additions & 0 deletions rust/lance-core/src/utils/deletion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{collections::HashSet, ops::Range};

use arrow_array::BooleanArray;
use deepsize::{Context, DeepSizeOf};
use roaring::RoaringBitmap;

/// Threshold for when a DeletionVector::Set should be promoted to a DeletionVector::Bitmap.
Expand All @@ -18,6 +19,17 @@ pub enum DeletionVector {
Bitmap(RoaringBitmap),
}

impl DeepSizeOf for DeletionVector {
fn deep_size_of_children(&self, context: &mut Context) -> usize {
match self {
Self::NoDeletions => 0,
Self::Set(set) => set.deep_size_of_children(context),
// Inexact but probably close enough
Self::Bitmap(bitmap) => bitmap.serialized_size(),
}
}
}

impl DeletionVector {
#[allow(dead_code)] // Used in tests
pub fn len(&self) -> usize {
Expand Down
1 change: 1 addition & 0 deletions rust/lance-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ async-trait.workspace = true
byteorder.workspace = true
bytes.workspace = true
datafusion-common.workspace = true
deepsize.workspace = true
futures.workspace = true
lance-datagen.workspace = true
log.workspace = true
Expand Down
5 changes: 3 additions & 2 deletions rust/lance-file/src/format/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ use std::ops::Range;

use crate::datatypes::{Fields, FieldsWithMeta};
use crate::format::pb;
use deepsize::DeepSizeOf;
use lance_core::datatypes::Schema;
use lance_core::{Error, Result};
use lance_io::traits::ProtoStruct;
use snafu::{location, Location};
/// Data File Metadata
#[derive(Debug, Default, PartialEq)]
#[derive(Debug, Default, DeepSizeOf, PartialEq)]
pub struct Metadata {
/// Offset of each record batch.
pub batch_offsets: Vec<i32>,
Expand Down Expand Up @@ -197,7 +198,7 @@ impl Metadata {
}

/// Metadata about the statistics
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, DeepSizeOf)]
pub struct StatisticsMetadata {
/// Schema of the page-level statistics.
///
Expand Down
5 changes: 3 additions & 2 deletions rust/lance-file/src/page_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use arrow_array::builder::Int64Builder;
use arrow_array::{Array, Int64Array};
use arrow_schema::DataType;
use deepsize::DeepSizeOf;
use lance_io::encodings::plain::PlainDecoder;
use lance_io::encodings::Decoder;
use snafu::{location, Location};
Expand All @@ -13,7 +14,7 @@ use tokio::io::AsyncWriteExt;
use lance_core::{Error, Result};
use lance_io::traits::{Reader, Writer};

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, DeepSizeOf)]
pub struct PageInfo {
pub position: usize,
pub length: usize,
Expand All @@ -27,7 +28,7 @@ impl PageInfo {

/// Page lookup table.
///
#[derive(Debug, Default, Clone, PartialEq)]
#[derive(Debug, Default, Clone, PartialEq, DeepSizeOf)]
pub struct PageTable {
/// map[field-id, map[batch-id, PageInfo]]
pages: BTreeMap<i32, BTreeMap<i32, PageInfo>>,
Expand Down
Loading
Loading