Skip to content

Commit

Permalink
feat(rust): POC metadata reading and writing (#17112)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Jun 24, 2024
1 parent 42ba1b0 commit b60788d
Show file tree
Hide file tree
Showing 12 changed files with 280 additions and 127 deletions.
12 changes: 11 additions & 1 deletion crates/polars-core/src/chunked_array/metadata/collect.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
use super::{Metadata, MetadataCollectable, MetadataEnv};
use super::{Metadata, MetadataEnv};
use crate::chunked_array::{ChunkAgg, ChunkedArray, PolarsDataType, PolarsNumericType};
use crate::series::IsSorted;

pub trait MetadataCollectable<T>: Sized {
fn collect_cheap_metadata(&mut self) {}

#[inline(always)]
fn with_cheap_metadata(mut self) -> Self {
self.collect_cheap_metadata();
self
}
}

impl<T> MetadataCollectable<T> for ChunkedArray<T>
where
T: PolarsDataType,
Expand Down
23 changes: 23 additions & 0 deletions crates/polars-core/src/chunked_array/metadata/guard.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::ops::Deref;
use std::sync::RwLockReadGuard;

use super::Metadata;
use crate::chunked_array::PolarsDataType;

/// A read guard for [`Metadata`]
pub enum MetadataReadGuard<'a, T: PolarsDataType + 'a> {
Unlocked(RwLockReadGuard<'a, Metadata<T>>),
Locked(&'a Metadata<T>),
}

impl<'a, T: PolarsDataType + 'a> Deref for MetadataReadGuard<'a, T> {
type Target = Metadata<T>;

#[inline]
fn deref(&self) -> &Self::Target {
match self {
Self::Unlocked(v) => v.deref(),
Self::Locked(v) => v,
}
}
}
75 changes: 75 additions & 0 deletions crates/polars-core/src/chunked_array/metadata/interior_mutable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};

use super::{Metadata, MetadataTrait};
use crate::chunked_array::PolarsDataType;

// I have attempted multiple times to move this interior mutability to a per metadata field basis.
// While this might allow the use of Atomics instead of RwLocks, it suffers several problems:
//
// 1. The amount of boilerplate explodes. For example, you want read, read_blocking, write,
// write_blocking, get_mut, set for each field.
// 2. It is also very difficult to combine with the dynamic dispatch.
// 3. It is difficult to combine with types that do not allow for atomics (e.g. Box<[u8]>).
// 4. You actually have 2 fields per field: the Option and the Value. You run into critical section
// problems if you try to separate these.

/// An interiorally mutable [`Metadata`]
///
/// This is essentially a more convenient API around `RwLock<Metadata>`. This also allows it to be
/// `Clone`.
pub struct IMMetadata<T: PolarsDataType>(RwLock<Metadata<T>>);

impl<'a, T: PolarsDataType + 'a> IMMetadata<T>
where
Metadata<T>: MetadataTrait + 'a,
{
/// Cast the [`IMMetadata`] to a trait object of [`MetadataTrait`]
pub fn upcast(&'a self) -> &'a RwLock<dyn MetadataTrait + 'a> {
&self.0 as &RwLock<dyn MetadataTrait + 'a>
}
}

impl<T: PolarsDataType> IMMetadata<T> {
pub const fn new(md: Metadata<T>) -> Self {
Self(RwLock::new(md))
}

/// Try to grab a read guard to the [`Metadata`], this fails if this blocks.
pub fn try_read(&self) -> Option<RwLockReadGuard<Metadata<T>>> {
self.0.try_read().ok()
}
/// Block to grab a read guard the [`Metadata`]
pub fn read(&self) -> RwLockReadGuard<Metadata<T>> {
self.0.read().unwrap()
}

/// Try to grab a write guard to the [`Metadata`], this fails if this blocks.
pub fn try_write(&self) -> Option<RwLockWriteGuard<Metadata<T>>> {
self.0.try_write().ok()
}
/// Block to grab a write guard the [`Metadata`]
pub fn write(&self) -> RwLockWriteGuard<Metadata<T>> {
self.0.write().unwrap()
}

/// Take the internal [`Metadata`]
pub fn take(self) -> Metadata<T> {
self.0.into_inner().unwrap()
}
/// Get the mutable to the internal [`Metadata`]
pub fn get_mut(&mut self) -> &mut Metadata<T> {
self.0.get_mut().unwrap()
}
}

impl<T: PolarsDataType> Clone for IMMetadata<T> {
fn clone(&self) -> Self {
Self::new(self.read().clone())
}
}

impl<T: PolarsDataType> Default for IMMetadata<T> {
fn default() -> Self {
Self::new(Default::default())
}
}
36 changes: 36 additions & 0 deletions crates/polars-core/src/chunked_array/metadata/md_trait.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use polars_utils::IdxSize;

use super::{Metadata, MetadataFlags};
use crate::chunked_array::{IntoScalar, PolarsDataType, Scalar};

pub trait MetadataTrait {
fn get_flags(&self) -> MetadataFlags;
fn min_value(&self) -> Option<Scalar>;
fn max_value(&self) -> Option<Scalar>;

/// Number of unique non-null values
fn distinct_count(&self) -> Option<IdxSize>;
}

impl<T: PolarsDataType> MetadataTrait for Metadata<T>
where
T::OwnedPhysical: IntoScalar + Clone,
{
fn get_flags(&self) -> MetadataFlags {
self.get_flags()
}

fn min_value(&self) -> Option<Scalar> {
self.get_min_value()
.map(|v| v.clone().into_scalar(T::get_dtype()).unwrap())
}

fn max_value(&self) -> Option<Scalar> {
self.get_max_value()
.map(|v| v.clone().into_scalar(T::get_dtype()).unwrap())
}

fn distinct_count(&self) -> Option<IdxSize> {
self.get_distinct_count()
}
}
69 changes: 18 additions & 51 deletions crates/polars-core/src/chunked_array/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@ use polars_utils::IdxSize;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

pub use self::collect::MetadataCollectable;
pub use self::env::MetadataEnv;
use super::{IntoScalar, PolarsDataType, Scalar};
pub use self::guard::MetadataReadGuard;
pub use self::interior_mutable::IMMetadata;
pub use self::md_trait::MetadataTrait;
use super::PolarsDataType;
use crate::series::IsSorted;

#[macro_use]
mod env;
mod collect;
mod guard;
mod interior_mutable;
mod md_trait;

macro_rules! mdenv_may_bail {
(get: $field:literal, $value:expr $(=> $default:expr)?) => {{
Expand Down Expand Up @@ -54,15 +61,6 @@ bitflags! {
}
}

pub trait MetadataTrait {
fn get_flags(&self) -> MetadataFlags;
fn min_value(&self) -> Option<Scalar>;
fn max_value(&self) -> Option<Scalar>;

/// Number of unique non-null values
fn distinct_count(&self) -> Option<IdxSize>;
}

pub struct Metadata<T: PolarsDataType> {
flags: MetadataFlags,

Expand All @@ -73,16 +71,6 @@ pub struct Metadata<T: PolarsDataType> {
distinct_count: Option<IdxSize>,
}

pub trait MetadataCollectable<T>: Sized {
fn collect_cheap_metadata(&mut self) {}

#[inline(always)]
fn with_cheap_metadata(mut self) -> Self {
self.collect_cheap_metadata();
self
}
}

bitflags! {
#[derive(Default, Debug, Clone, Copy, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize), serde(transparent))]
Expand All @@ -93,29 +81,6 @@ bitflags! {
}
}

impl<T: PolarsDataType> MetadataTrait for Metadata<T>
where
T::OwnedPhysical: IntoScalar + Clone,
{
fn get_flags(&self) -> MetadataFlags {
self.get_flags()
}

fn min_value(&self) -> Option<Scalar> {
self.get_min_value()
.map(|v| v.clone().into_scalar(T::get_dtype()).unwrap())
}

fn max_value(&self) -> Option<Scalar> {
self.get_max_value()
.map(|v| v.clone().into_scalar(T::get_dtype()).unwrap())
}

fn distinct_count(&self) -> Option<IdxSize> {
self.get_distinct_count()
}
}

impl MetadataFlags {
pub fn set_sorted_flag(&mut self, sorted: IsSorted) {
mdenv_may_bail!(set: "sorted", sorted);
Expand Down Expand Up @@ -430,6 +395,9 @@ impl<T: PolarsDataType> Metadata<T> {
self.flags.set_fast_explode_list(value);
}

pub fn is_sorted_any(&self) -> bool {
self.flags.get_sorted_flag() != IsSorted::Not
}
pub fn is_sorted(&self) -> IsSorted {
self.flags.get_sorted_flag()
}
Expand All @@ -438,6 +406,10 @@ impl<T: PolarsDataType> Metadata<T> {
self.flags.set_sorted_flag(is_sorted)
}

pub fn set_flags(&mut self, flags: MetadataFlags) {
mdenv_may_bail!(set: "flags", flags);
self.flags = flags;
}
pub fn set_min_value(&mut self, min_value: Option<T::OwnedPhysical>) {
mdenv_may_bail!(set: "min_value", min_value);
self.min_value = min_value;
Expand All @@ -451,11 +423,10 @@ impl<T: PolarsDataType> Metadata<T> {
self.distinct_count = distinct_count;
}

pub fn set_flags(&mut self, flags: MetadataFlags) {
mdenv_may_bail!(set: "flags", flags);
self.flags = flags;
pub fn get_flags(&self) -> MetadataFlags {
let flags = self.flags;
mdenv_may_bail!(get: "flags", flags => MetadataFlags::empty())
}

pub fn get_min_value(&self) -> Option<&T::OwnedPhysical> {
let min_value = self.min_value.as_ref();
mdenv_may_bail!(get: "min_value", min_value => None)
Expand All @@ -468,8 +439,4 @@ impl<T: PolarsDataType> Metadata<T> {
let distinct_count = self.distinct_count;
mdenv_may_bail!(get: "distinct_count", distinct_count => None)
}
pub fn get_flags(&self) -> MetadataFlags {
let flags = self.flags;
mdenv_may_bail!(get: "flags", flags => MetadataFlags::empty())
}
}
Loading

0 comments on commit b60788d

Please sign in to comment.