From 5cdeea256665efb6305fd6c8b5023b59baba8f1b Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 1 Apr 2024 13:17:06 +0200 Subject: [PATCH] perf: Add non-order preserving variable row-encoding (#15414) --- .../src/chunked_array/logical/struct_/mod.rs | 5 +- .../ops/sort/arg_sort_multiple.rs | 51 ++++++++-- .../src/frame/group_by/into_groups.rs | 1 - crates/polars-core/src/frame/group_by/mod.rs | 6 +- .../executors/sinks/group_by/generic/eval.rs | 4 +- .../src/executors/sinks/sort/sink_multiple.rs | 15 ++- crates/polars-row/src/decode.rs | 6 +- crates/polars-row/src/encode.rs | 92 ++++++++++--------- crates/polars-row/src/fixed.rs | 12 +-- crates/polars-row/src/lib.rs | 2 +- crates/polars-row/src/row.rs | 24 ++++- crates/polars-row/src/variable.rs | 77 +++++++++++++--- 12 files changed, 203 insertions(+), 92 deletions(-) diff --git a/crates/polars-core/src/chunked_array/logical/struct_/mod.rs b/crates/polars-core/src/chunked_array/logical/struct_/mod.rs index 9457bfe00bd1..b5651cd2a405 100644 --- a/crates/polars-core/src/chunked_array/logical/struct_/mod.rs +++ b/crates/polars-core/src/chunked_array/logical/struct_/mod.rs @@ -9,10 +9,10 @@ use arrow::legacy::trusted_len::TrustedLenPush; use arrow::offset::OffsetsBuffer; use smartstring::alias::String as SmartString; -use self::sort::arg_sort_multiple::_get_rows_encoded_ca; use super::*; use crate::chunked_array::iterator::StructIter; use crate::datatypes::*; +use crate::prelude::sort::arg_sort_multiple::_get_rows_encoded_ca_unordered; use crate::utils::index_to_chunked_index; /// This is logical type [`StructChunked`] that @@ -415,8 +415,7 @@ impl StructChunked { } pub fn rows_encode(&self) -> PolarsResult { - let descending = vec![false; self.fields.len()]; - _get_rows_encoded_ca(self.name(), &self.fields, &descending, false) + _get_rows_encoded_ca_unordered(self.name(), &self.fields) } pub fn iter(&self) -> StructIter { diff --git a/crates/polars-core/src/chunked_array/ops/sort/arg_sort_multiple.rs b/crates/polars-core/src/chunked_array/ops/sort/arg_sort_multiple.rs index 4deffaab0165..35e2d57decf3 100644 --- a/crates/polars-core/src/chunked_array/ops/sort/arg_sort_multiple.rs +++ b/crates/polars-core/src/chunked_array/ops/sort/arg_sort_multiple.rs @@ -1,5 +1,5 @@ use compare_inner::NullOrderCmp; -use polars_row::{convert_columns, RowsEncoded, SortField}; +use polars_row::{convert_columns, EncodingField, RowsEncoded}; use polars_utils::iter::EnumerateIdxTrait; use super::*; @@ -87,18 +87,19 @@ pub fn _get_rows_encoded_compat_array(by: &Series) -> PolarsResult { Ok(out) } -pub(crate) fn encode_rows_vertical_par_default(by: &[Series]) -> PolarsResult { +pub(crate) fn encode_rows_vertical_par_unordered( + by: &[Series], +) -> PolarsResult { let n_threads = POOL.current_num_threads(); let len = by[0].len(); let splits = _split_offsets(len, n_threads); - let descending = vec![false; by.len()]; let chunks = splits.into_par_iter().map(|(offset, len)| { let sliced = by .iter() .map(|s| s.slice(offset as i64, len)) .collect::>(); - let rows = _get_rows_encoded(&sliced, &descending, false)?; + let rows = _get_rows_encoded_unordered(&sliced)?; Ok(rows.into_array()) }); let chunks = POOL.install(|| chunks.collect::>>()); @@ -106,12 +107,35 @@ pub(crate) fn encode_rows_vertical_par_default(by: &[Series]) -> PolarsResult PolarsResult { - let descending = vec![false; by.len()]; - let rows = _get_rows_encoded(by, &descending, false)?; +pub(crate) fn encode_rows_unordered(by: &[Series]) -> PolarsResult { + let rows = _get_rows_encoded_unordered(by)?; Ok(BinaryOffsetChunked::with_chunk("", rows.into_array())) } +pub fn _get_rows_encoded_unordered(by: &[Series]) -> PolarsResult { + let mut cols = Vec::with_capacity(by.len()); + let mut fields = Vec::with_capacity(by.len()); + for by in by { + let arr = _get_rows_encoded_compat_array(by)?; + let field = EncodingField::new_unsorted(); + match arr.data_type() { + // Flatten the struct fields. + ArrowDataType::Struct(_) => { + let arr = arr.as_any().downcast_ref::().unwrap(); + for arr in arr.values() { + cols.push(arr.clone() as ArrayRef); + fields.push(field) + } + }, + _ => { + cols.push(arr); + fields.push(field) + }, + } + } + Ok(convert_columns(&cols, &fields)) +} + pub fn _get_rows_encoded( by: &[Series], descending: &[bool], @@ -123,9 +147,10 @@ pub fn _get_rows_encoded( for (by, descending) in by.iter().zip(descending) { let arr = _get_rows_encoded_compat_array(by)?; - let sort_field = SortField { + let sort_field = EncodingField { descending: *descending, nulls_last, + no_order: false, }; match arr.data_type() { // Flatten the struct fields. @@ -133,7 +158,7 @@ pub fn _get_rows_encoded( let arr = arr.as_any().downcast_ref::().unwrap(); for arr in arr.values() { cols.push(arr.clone() as ArrayRef); - fields.push(sort_field.clone()) + fields.push(sort_field) } }, _ => { @@ -155,6 +180,14 @@ pub fn _get_rows_encoded_ca( .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array())) } +pub fn _get_rows_encoded_ca_unordered( + name: &str, + by: &[Series], +) -> PolarsResult { + _get_rows_encoded_unordered(by) + .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array())) +} + pub(crate) fn argsort_multiple_row_fmt( by: &[Series], mut descending: Vec, diff --git a/crates/polars-core/src/frame/group_by/into_groups.rs b/crates/polars-core/src/frame/group_by/into_groups.rs index fe2fd5a493e5..f12779c52819 100644 --- a/crates/polars-core/src/frame/group_by/into_groups.rs +++ b/crates/polars-core/src/frame/group_by/into_groups.rs @@ -299,7 +299,6 @@ impl IntoGroupsProxy for BinaryChunked { }) .collect::>() }); - let byte_hashes = byte_hashes.iter().collect::>(); group_by_threaded_slice(byte_hashes, n_partitions, sorted) } else { let byte_hashes = fill_bytes_hashes(self, null_h, hb.clone()); diff --git a/crates/polars-core/src/frame/group_by/mod.rs b/crates/polars-core/src/frame/group_by/mod.rs index bc7e90406cf7..014ae2c8c28d 100644 --- a/crates/polars-core/src/frame/group_by/mod.rs +++ b/crates/polars-core/src/frame/group_by/mod.rs @@ -23,7 +23,7 @@ pub use into_groups::*; pub use proxy::*; use crate::prelude::sort::arg_sort_multiple::{ - encode_rows_default, encode_rows_vertical_par_default, + encode_rows_unordered, encode_rows_vertical_par_unordered, }; impl DataFrame { @@ -84,9 +84,9 @@ impl DataFrame { }) } else { let rows = if multithreaded { - encode_rows_vertical_par_default(&by) + encode_rows_vertical_par_unordered(&by) } else { - encode_rows_default(&by) + encode_rows_unordered(&by) }? .into_series(); rows.group_tuples(multithreaded, sorted) diff --git a/crates/polars-pipe/src/executors/sinks/group_by/generic/eval.rs b/crates/polars-pipe/src/executors/sinks/group_by/generic/eval.rs index 3fa0b384dd0c..c2b4262143da 100644 --- a/crates/polars-pipe/src/executors/sinks/group_by/generic/eval.rs +++ b/crates/polars-pipe/src/executors/sinks/group_by/generic/eval.rs @@ -1,7 +1,7 @@ use std::cell::UnsafeCell; use polars_core::export::ahash::RandomState; -use polars_row::{RowsEncoded, SortField}; +use polars_row::{EncodingField, RowsEncoded}; use super::*; use crate::executors::sinks::group_by::utils::prepare_key; @@ -18,7 +18,7 @@ pub(super) struct Eval { aggregation_series: UnsafeCell>, keys_columns: UnsafeCell>, hashes: Vec, - key_fields: Vec, + key_fields: Vec, // amortizes the encoding buffers rows_encoded: RowsEncoded, } diff --git a/crates/polars-pipe/src/executors/sinks/sort/sink_multiple.rs b/crates/polars-pipe/src/executors/sinks/sort/sink_multiple.rs index 6fdede156fcf..d31d6e77e3a8 100644 --- a/crates/polars-pipe/src/executors/sinks/sort/sink_multiple.rs +++ b/crates/polars-pipe/src/executors/sinks/sort/sink_multiple.rs @@ -7,7 +7,7 @@ use polars_core::prelude::*; use polars_core::series::IsSorted; use polars_plan::prelude::*; use polars_row::decode::decode_rows_from_binary; -use polars_row::SortField; +use polars_row::EncodingField; use super::*; use crate::operators::{ @@ -15,15 +15,12 @@ use crate::operators::{ }; const POLARS_SORT_COLUMN: &str = "__POLARS_SORT_COLUMN"; -fn get_sort_fields(sort_idx: &[usize], sort_args: &SortArguments) -> Vec { +fn get_sort_fields(sort_idx: &[usize], sort_args: &SortArguments) -> Vec { let mut descending = sort_args.descending.clone(); _broadcast_descending(sort_idx.len(), &mut descending); descending .into_iter() - .map(|descending| SortField { - descending, - nulls_last: sort_args.nulls_last, - }) + .map(|descending| EncodingField::new_sorted(descending, sort_args.nulls_last)) .collect() } @@ -61,7 +58,7 @@ fn finalize_dataframe( can_decode: bool, sort_dtypes: Option<&[ArrowDataType]>, rows: &mut Vec<&'static [u8]>, - sort_fields: &[SortField], + sort_fields: &[EncodingField], schema: &Schema, ) { unsafe { @@ -126,7 +123,7 @@ pub struct SortSinkMultiple { sort_sink: Box, sort_args: SortArguments, // Needed for encoding - sort_fields: Arc<[SortField]>, + sort_fields: Arc<[EncodingField]>, sort_dtypes: Option>, // amortize allocs sort_column: Vec, @@ -320,7 +317,7 @@ struct DropEncoded { can_decode: bool, sort_dtypes: Option>, rows: Vec<&'static [u8]>, - sort_fields: Arc<[SortField]>, + sort_fields: Arc<[EncodingField]>, output_schema: SchemaRef, } diff --git a/crates/polars-row/src/decode.rs b/crates/polars-row/src/decode.rs index 246ac976fc10..180cf2ad00e8 100644 --- a/crates/polars-row/src/decode.rs +++ b/crates/polars-row/src/decode.rs @@ -10,7 +10,7 @@ use crate::variable::{decode_binary, decode_binview}; /// encodings. pub unsafe fn decode_rows_from_binary<'a>( arr: &'a BinaryArray, - fields: &[SortField], + fields: &[EncodingField], data_types: &[ArrowDataType], rows: &mut Vec<&'a [u8]>, ) -> Vec { @@ -27,7 +27,7 @@ pub unsafe fn decode_rows_from_binary<'a>( pub unsafe fn decode_rows( // the rows will be updated while the data is decoded rows: &mut [&[u8]], - fields: &[SortField], + fields: &[EncodingField], data_types: &[ArrowDataType], ) -> Vec { assert_eq!(fields.len(), data_types.len()); @@ -38,7 +38,7 @@ pub unsafe fn decode_rows( .collect() } -unsafe fn decode(rows: &mut [&[u8]], field: &SortField, data_type: &ArrowDataType) -> ArrayRef { +unsafe fn decode(rows: &mut [&[u8]], field: &EncodingField, data_type: &ArrowDataType) -> ArrayRef { match data_type { ArrowDataType::Null => NullArray::new(ArrowDataType::Null, rows.len()).to_boxed(), ArrowDataType::Boolean => decode_bool(rows, field).to_boxed(), diff --git a/crates/polars-row/src/encode.rs b/crates/polars-row/src/encode.rs index bd94f33a203c..0cd660b47b9e 100644 --- a/crates/polars-row/src/encode.rs +++ b/crates/polars-row/src/encode.rs @@ -10,10 +10,10 @@ use polars_utils::slice::GetSaferUnchecked; use polars_utils::vec::PushUnchecked; use crate::fixed::FixedLengthEncoding; -use crate::row::{RowsEncoded, SortField}; +use crate::row::{EncodingField, RowsEncoded}; use crate::{with_match_arrow_primitive_type, ArrayRef}; -pub fn convert_columns(columns: &[ArrayRef], fields: &[SortField]) -> RowsEncoded { +pub fn convert_columns(columns: &[ArrayRef], fields: &[EncodingField]) -> RowsEncoded { let mut rows = RowsEncoded::new(vec![], vec![]); convert_columns_amortized(columns, fields, &mut rows); rows @@ -28,7 +28,7 @@ pub fn convert_columns_no_order(columns: &[ArrayRef]) -> RowsEncoded { pub fn convert_columns_amortized_no_order(columns: &[ArrayRef], rows: &mut RowsEncoded) { convert_columns_amortized( columns, - std::iter::repeat(&SortField::default()).take(columns.len()), + std::iter::repeat(&EncodingField::default()).take(columns.len()), rows, ); } @@ -41,7 +41,7 @@ enum Encoder { enc: Vec, rows: Option, original: LargeListArray, - field: SortField, + field: EncodingField, }, Leaf(ArrayRef), } @@ -112,7 +112,7 @@ impl Encoder { } } -fn get_encoders(arr: &dyn Array, encoders: &mut Vec, field: &SortField) -> usize { +fn get_encoders(arr: &dyn Array, encoders: &mut Vec, field: &EncodingField) -> usize { let mut added = 0; match arr.data_type() { ArrowDataType::Struct(_) => { @@ -134,7 +134,7 @@ fn get_encoders(arr: &dyn Array, encoders: &mut Vec, field: &SortField) enc: inner, original: arr.clone(), rows: None, - field: field.clone(), + field: *field, }); added += 1; }, @@ -146,7 +146,7 @@ fn get_encoders(arr: &dyn Array, encoders: &mut Vec, field: &SortField) added } -pub fn convert_columns_amortized<'a, I: IntoIterator>( +pub fn convert_columns_amortized<'a, I: IntoIterator>( columns: &'a [ArrayRef], fields: I, rows: &mut RowsEncoded, @@ -165,11 +165,15 @@ pub fn convert_columns_amortized<'a, I: IntoIterator>( for (arr, field) in columns.iter().zip(fields) { let added = get_encoders(arr.as_ref(), &mut flattened_columns, field); for _ in 0..added { - flattened_fields.push(field.clone()); + flattened_fields.push(*field); } } - let values_size = - allocate_rows_buf(&mut flattened_columns, &mut rows.values, &mut rows.offsets); + let values_size = allocate_rows_buf( + &mut flattened_columns, + &flattened_fields, + &mut rows.values, + &mut rows.offsets, + ); for (arr, field) in flattened_columns.iter().zip(flattened_fields.iter()) { // SAFETY: // we allocated rows with enough bytes. @@ -182,11 +186,13 @@ pub fn convert_columns_amortized<'a, I: IntoIterator>( .iter() .map(|arr| Encoder::Leaf(arr.clone())) .collect::>(); - let values_size = allocate_rows_buf(&mut encoders, &mut rows.values, &mut rows.offsets); + let fields = fields.cloned().collect::>(); + let values_size = + allocate_rows_buf(&mut encoders, &fields, &mut rows.values, &mut rows.offsets); for (enc, field) in encoders.iter().zip(fields) { // SAFETY: // we allocated rows with enough bytes. - unsafe { encode_array(enc, field, rows) } + unsafe { encode_array(enc, &field, rows) } } // SAFETY: values are initialized unsafe { rows.values.set_len(values_size) } @@ -195,7 +201,7 @@ pub fn convert_columns_amortized<'a, I: IntoIterator>( fn encode_primitive( arr: &PrimitiveArray, - field: &SortField, + field: &EncodingField, out: &mut RowsEncoded, ) { if arr.null_count() == 0 { @@ -211,11 +217,11 @@ fn encode_primitive( /// /// # Safety /// `out` must have enough bytes allocated otherwise it will be out of bounds. -unsafe fn encode_array(encoder: &Encoder, field: &SortField, out: &mut RowsEncoded) { +unsafe fn encode_array(encoder: &Encoder, field: &EncodingField, out: &mut RowsEncoded) { match encoder { Encoder::List { .. } => { let iter = encoder.list_iter(); - crate::variable::encode_iter(iter, out, &Default::default()) + crate::variable::encode_iter(iter, out, &EncodingField::new_unsorted()) }, Encoder::Leaf(array) => { match array.data_type() { @@ -279,6 +285,7 @@ pub fn encoded_size(data_type: &ArrowDataType) -> usize { // are initialized. fn allocate_rows_buf( columns: &mut [Encoder], + fields: &[EncodingField], values: &mut Vec, offsets: &mut Vec, ) -> usize { @@ -307,7 +314,7 @@ fn allocate_rows_buf( // for the variable length columns we must iterate to determine the length per row location let mut processed_count = 0; - for enc in columns.iter_mut() { + for (enc, enc_field) in columns.iter_mut().zip(fields) { match enc { Encoder::List { enc: inner_enc, @@ -315,6 +322,8 @@ fn allocate_rows_buf( field, original, } => { + let field = *field; + let fields = inner_enc.iter().map(|_| field).collect::>(); // Nested lists don't yet work as that requires the leaves not only allocating, but also // encoding. To make that work we must add a flag `in_list` that tell the leaves to immediately // encode the rows instead of only setting the length. @@ -332,6 +341,7 @@ fn allocate_rows_buf( // Allocate and immediately row-encode the inner types recursively. let values_size = allocate_rows_buf( inner_enc, + &fields, &mut values_rows.values, &mut values_rows.offsets, ); @@ -339,7 +349,7 @@ fn allocate_rows_buf( // For single nested it does work as we encode here. unsafe { for enc in inner_enc { - encode_array(enc, field, &mut values_rows) + encode_array(enc, &field, &mut values_rows) } values_rows.values.set_len(values_size) }; @@ -352,13 +362,20 @@ fn allocate_rows_buf( for opt_val in iter { unsafe { lengths.push_unchecked( - row_size_fixed + crate::variable::encoded_len(opt_val), + row_size_fixed + + crate::variable::encoded_len( + opt_val, + &EncodingField::new_unsorted(), + ), ); } } } else { for (opt_val, row_length) in iter.zip(lengths.iter_mut()) { - *row_length += crate::variable::encoded_len(opt_val) + *row_length += crate::variable::encoded_len( + opt_val, + &EncodingField::new_unsorted(), + ) } } processed_count += 1; @@ -371,7 +388,8 @@ fn allocate_rows_buf( for opt_val in array.into_iter() { unsafe { lengths.push_unchecked( - row_size_fixed + crate::variable::encoded_len(opt_val), + row_size_fixed + + crate::variable::encoded_len(opt_val, enc_field), ); } } @@ -379,7 +397,7 @@ fn allocate_rows_buf( for (opt_val, row_length) in array.into_iter().zip(lengths.iter_mut()) { - *row_length += crate::variable::encoded_len(opt_val) + *row_length += crate::variable::encoded_len(opt_val, enc_field) } } processed_count += 1; @@ -390,7 +408,8 @@ fn allocate_rows_buf( for opt_val in array.into_iter() { unsafe { lengths.push_unchecked( - row_size_fixed + crate::variable::encoded_len(opt_val), + row_size_fixed + + crate::variable::encoded_len(opt_val, enc_field), ); } } @@ -398,7 +417,7 @@ fn allocate_rows_buf( for (opt_val, row_length) in array.into_iter().zip(lengths.iter_mut()) { - *row_length += crate::variable::encoded_len(opt_val) + *row_length += crate::variable::encoded_len(opt_val, enc_field) } } processed_count += 1; @@ -416,13 +435,14 @@ fn allocate_rows_buf( for opt_val in iter { unsafe { lengths.push_unchecked( - row_size_fixed + crate::variable::encoded_len(opt_val), + row_size_fixed + + crate::variable::encoded_len(opt_val, enc_field), ) } } } else { for (opt_val, row_length) in iter.zip(lengths.iter_mut()) { - *row_length += crate::variable::encoded_len(opt_val) + *row_length += crate::variable::encoded_len(opt_val, enc_field) } } processed_count += 1; @@ -514,10 +534,7 @@ mod test { let arr = BinaryViewArray::from_slice([Some("a"), Some(""), Some("meep"), Some(sentence), None]); - let field = SortField { - descending: false, - nulls_last: false, - }; + let field = EncodingField::new_sorted(false, false); let arr = arrow::compute::cast::cast(&arr, &ArrowDataType::BinaryView, Default::default()) .unwrap(); let rows_encoded = convert_columns(&[arr], &[field]); @@ -567,10 +584,7 @@ mod test { let a = [val.as_str(), val.as_str(), val.as_str()]; - let field = SortField { - descending: false, - nulls_last: false, - }; + let field = EncodingField::new_sorted(false, false); let arr = BinaryViewArray::from_slice_values(a); let rows_encoded = convert_columns_no_order(&[arr.clone().boxed()]); @@ -583,10 +597,7 @@ mod test { fn test_reverse_variable() { let a = Utf8ViewArray::from_slice_values(["one", "two", "three", "four", "five", "six"]); - let fields = &[SortField { - descending: true, - nulls_last: false, - }]; + let fields = &[EncodingField::new_sorted(true, false)]; let dtypes = [ArrowDataType::Utf8View]; @@ -614,16 +625,13 @@ mod test { values.boxed(), None, ); - let fields = &[SortField { - descending: true, - nulls_last: false, - }]; + let fields = &[EncodingField::new_sorted(true, false)]; let out = convert_columns(&[array.boxed()], fields); let out = out.into_array(); assert_eq!( out.values().iter().map(|v| *v as usize).sum::(), - 84981 + 82411 ); } } diff --git a/crates/polars-row/src/fixed.rs b/crates/polars-row/src/fixed.rs index dfc9d6ff94f6..f9bdc4394b08 100644 --- a/crates/polars-row/src/fixed.rs +++ b/crates/polars-row/src/fixed.rs @@ -8,7 +8,7 @@ use arrow::types::NativeType; use polars_utils::slice::*; use polars_utils::total_ord::{canonical_f32, canonical_f64}; -use crate::row::{RowsEncoded, SortField}; +use crate::row::{EncodingField, RowsEncoded}; pub(crate) trait FromSlice { fn from_slice(slice: &[u8]) -> Self; @@ -168,7 +168,7 @@ fn encode_value( pub(crate) unsafe fn encode_slice( input: &[T], out: &mut RowsEncoded, - field: &SortField, + field: &EncodingField, ) { out.values.set_len(0); let values = out.values.spare_capacity_mut(); @@ -178,7 +178,7 @@ pub(crate) unsafe fn encode_slice( } #[inline] -pub(crate) fn get_null_sentinel(field: &SortField) -> u8 { +pub(crate) fn get_null_sentinel(field: &EncodingField) -> u8 { if field.nulls_last { 0xFF } else { @@ -189,7 +189,7 @@ pub(crate) fn get_null_sentinel(field: &SortField) -> u8 { pub(crate) unsafe fn encode_iter>, T: FixedLengthEncoding>( input: I, out: &mut RowsEncoded, - field: &SortField, + field: &EncodingField, ) { out.values.set_len(0); let values = out.values.spare_capacity_mut(); @@ -214,7 +214,7 @@ pub(crate) unsafe fn encode_iter>, T: FixedLengthEn pub(super) unsafe fn decode_primitive( rows: &mut [&[u8]], - field: &SortField, + field: &EncodingField, ) -> PrimitiveArray where T::Encoded: FromSlice, @@ -255,7 +255,7 @@ where PrimitiveArray::new(data_type, values.into(), validity) } -pub(super) unsafe fn decode_bool(rows: &mut [&[u8]], field: &SortField) -> BooleanArray { +pub(super) unsafe fn decode_bool(rows: &mut [&[u8]], field: &EncodingField) -> BooleanArray { let mut has_nulls = false; let null_sentinel = get_null_sentinel(field); diff --git a/crates/polars-row/src/lib.rs b/crates/polars-row/src/lib.rs index 2de299ec715c..823e5c6e4566 100644 --- a/crates/polars-row/src/lib.rs +++ b/crates/polars-row/src/lib.rs @@ -279,4 +279,4 @@ pub use encode::{ convert_columns, convert_columns_amortized, convert_columns_amortized_no_order, convert_columns_no_order, }; -pub use row::{RowsEncoded, SortField}; +pub use row::{EncodingField, RowsEncoded}; diff --git a/crates/polars-row/src/row.rs b/crates/polars-row/src/row.rs index 26ba4715d33b..d48f6f51c205 100644 --- a/crates/polars-row/src/row.rs +++ b/crates/polars-row/src/row.rs @@ -4,12 +4,32 @@ use arrow::datatypes::ArrowDataType; use arrow::ffi::mmap; use arrow::offset::{Offsets, OffsetsBuffer}; -#[derive(Clone, Default)] -pub struct SortField { +#[derive(Clone, Default, Copy)] +pub struct EncodingField { /// Whether to sort in descending order pub descending: bool, /// Whether to sort nulls first pub nulls_last: bool, + /// Ignore all order-related flags and don't encode order-preserving. + /// This is faster for variable encoding as we can just memcopy all the bytes. + pub no_order: bool, +} + +impl EncodingField { + pub fn new_sorted(descending: bool, nulls_last: bool) -> Self { + EncodingField { + descending, + nulls_last, + no_order: false, + } + } + + pub fn new_unsorted() -> Self { + EncodingField { + no_order: true, + ..Default::default() + } + } } #[derive(Default, Clone)] diff --git a/crates/polars-row/src/variable.rs b/crates/polars-row/src/variable.rs index 4a582afb5e7f..5032e41085a8 100644 --- a/crates/polars-row/src/variable.rs +++ b/crates/polars-row/src/variable.rs @@ -19,7 +19,7 @@ use polars_utils::slice::{GetSaferUnchecked, Slice2Uninit}; use crate::fixed::{decode_nulls, get_null_sentinel}; use crate::row::RowsEncoded; -use crate::SortField; +use crate::EncodingField; /// The block size of the variable length encoding pub(crate) const BLOCK_SIZE: usize = 32; @@ -56,8 +56,54 @@ fn padded_length_opt(a: Option) -> usize { } #[inline] -pub fn encoded_len(a: Option<&[u8]>) -> usize { - padded_length_opt(a.map(|v| v.len())) +fn length_opt(a: Option) -> usize { + if let Some(a) = a { + 1 + a + } else { + 1 + } +} + +#[inline] +pub fn encoded_len(a: Option<&[u8]>, field: &EncodingField) -> usize { + if field.no_order { + length_opt(a.map(|v| v.len())) + } else { + padded_length_opt(a.map(|v| v.len())) + } +} + +unsafe fn encode_one_no_order( + out: &mut [MaybeUninit], + val: Option<&[MaybeUninit]>, + field: &EncodingField, +) -> usize { + match val { + Some([]) => { + let byte = if field.descending { + !EMPTY_SENTINEL + } else { + EMPTY_SENTINEL + }; + *out.get_unchecked_release_mut(0) = MaybeUninit::new(byte); + 1 + }, + Some(val) => { + let end_offset = 1 + val.len(); + + // Write `2_u8` to demarcate as non-empty, non-null string + *out.get_unchecked_release_mut(0) = MaybeUninit::new(NON_EMPTY_SENTINEL); + std::ptr::copy_nonoverlapping(val.as_ptr(), out.as_mut_ptr().add(1), val.len()); + + end_offset + }, + None => { + *out.get_unchecked_release_mut(0) = MaybeUninit::new(get_null_sentinel(field)); + // // write remainder as zeros + // out.get_unchecked_release_mut(1..).fill(MaybeUninit::new(0)); + 1 + }, + } } /// Encode one strings/bytes object and return the written length. @@ -67,7 +113,7 @@ pub fn encoded_len(a: Option<&[u8]>) -> usize { unsafe fn encode_one( out: &mut [MaybeUninit], val: Option<&[MaybeUninit]>, - field: &SortField, + field: &EncodingField, ) -> usize { match val { Some([]) => { @@ -150,14 +196,23 @@ unsafe fn encode_one( pub(crate) unsafe fn encode_iter<'a, I: Iterator>>( input: I, out: &mut RowsEncoded, - field: &SortField, + field: &EncodingField, ) { out.values.set_len(0); let values = out.values.spare_capacity_mut(); - for (offset, opt_value) in out.offsets.iter_mut().skip(1).zip(input) { - let dst = values.get_unchecked_release_mut(*offset..); - let written_len = encode_one(dst, opt_value.map(|v| v.as_uninit()), field); - *offset += written_len; + + if field.no_order { + for (offset, opt_value) in out.offsets.iter_mut().skip(1).zip(input) { + let dst = values.get_unchecked_release_mut(*offset..); + let written_len = encode_one_no_order(dst, opt_value.map(|v| v.as_uninit()), field); + *offset += written_len; + } + } else { + for (offset, opt_value) in out.offsets.iter_mut().skip(1).zip(input) { + let dst = values.get_unchecked_release_mut(*offset..); + let written_len = encode_one(dst, opt_value.map(|v| v.as_uninit()), field); + *offset += written_len; + } } let offset = out.offsets.last().unwrap(); let dst = values.get_unchecked_release_mut(*offset..); @@ -203,7 +258,7 @@ unsafe fn decoded_len( } } -pub(super) unsafe fn decode_binary(rows: &mut [&[u8]], field: &SortField) -> BinaryArray { +pub(super) unsafe fn decode_binary(rows: &mut [&[u8]], field: &EncodingField) -> BinaryArray { let (non_empty_sentinel, continuation_token) = if field.descending { (!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION_TOKEN) } else { @@ -274,7 +329,7 @@ pub(super) unsafe fn decode_binary(rows: &mut [&[u8]], field: &SortField) -> Bin ) } -pub(super) unsafe fn decode_binview(rows: &mut [&[u8]], field: &SortField) -> BinaryViewArray { +pub(super) unsafe fn decode_binview(rows: &mut [&[u8]], field: &EncodingField) -> BinaryViewArray { let (non_empty_sentinel, continuation_token) = if field.descending { (!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION_TOKEN) } else {