Skip to content

Commit

Permalink
perf: Add non-order preserving variable row-encoding (#15414)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 1, 2024
1 parent 2b28777 commit 5cdeea2
Show file tree
Hide file tree
Showing 12 changed files with 203 additions and 92 deletions.
5 changes: 2 additions & 3 deletions crates/polars-core/src/chunked_array/logical/struct_/mod.rs
Expand Up @@ -9,10 +9,10 @@ use arrow::legacy::trusted_len::TrustedLenPush;
use arrow::offset::OffsetsBuffer;
use smartstring::alias::String as SmartString;

use self::sort::arg_sort_multiple::_get_rows_encoded_ca;
use super::*;
use crate::chunked_array::iterator::StructIter;
use crate::datatypes::*;
use crate::prelude::sort::arg_sort_multiple::_get_rows_encoded_ca_unordered;
use crate::utils::index_to_chunked_index;

/// This is logical type [`StructChunked`] that
Expand Down Expand Up @@ -415,8 +415,7 @@ impl StructChunked {
}

pub fn rows_encode(&self) -> PolarsResult<BinaryOffsetChunked> {
let descending = vec![false; self.fields.len()];
_get_rows_encoded_ca(self.name(), &self.fields, &descending, false)
_get_rows_encoded_ca_unordered(self.name(), &self.fields)
}

pub fn iter(&self) -> StructIter {
Expand Down
51 changes: 42 additions & 9 deletions crates/polars-core/src/chunked_array/ops/sort/arg_sort_multiple.rs
@@ -1,5 +1,5 @@
use compare_inner::NullOrderCmp;
use polars_row::{convert_columns, RowsEncoded, SortField};
use polars_row::{convert_columns, EncodingField, RowsEncoded};
use polars_utils::iter::EnumerateIdxTrait;

use super::*;
Expand Down Expand Up @@ -87,31 +87,55 @@ pub fn _get_rows_encoded_compat_array(by: &Series) -> PolarsResult<ArrayRef> {
Ok(out)
}

pub(crate) fn encode_rows_vertical_par_default(by: &[Series]) -> PolarsResult<BinaryOffsetChunked> {
pub(crate) fn encode_rows_vertical_par_unordered(
by: &[Series],
) -> PolarsResult<BinaryOffsetChunked> {
let n_threads = POOL.current_num_threads();
let len = by[0].len();
let splits = _split_offsets(len, n_threads);
let descending = vec![false; by.len()];

let chunks = splits.into_par_iter().map(|(offset, len)| {
let sliced = by
.iter()
.map(|s| s.slice(offset as i64, len))
.collect::<Vec<_>>();
let rows = _get_rows_encoded(&sliced, &descending, false)?;
let rows = _get_rows_encoded_unordered(&sliced)?;
Ok(rows.into_array())
});
let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());

Ok(BinaryOffsetChunked::from_chunk_iter("", chunks?))
}

pub(crate) fn encode_rows_default(by: &[Series]) -> PolarsResult<BinaryOffsetChunked> {
let descending = vec![false; by.len()];
let rows = _get_rows_encoded(by, &descending, false)?;
pub(crate) fn encode_rows_unordered(by: &[Series]) -> PolarsResult<BinaryOffsetChunked> {
let rows = _get_rows_encoded_unordered(by)?;
Ok(BinaryOffsetChunked::with_chunk("", rows.into_array()))
}

pub fn _get_rows_encoded_unordered(by: &[Series]) -> PolarsResult<RowsEncoded> {
let mut cols = Vec::with_capacity(by.len());
let mut fields = Vec::with_capacity(by.len());
for by in by {
let arr = _get_rows_encoded_compat_array(by)?;
let field = EncodingField::new_unsorted();
match arr.data_type() {
// Flatten the struct fields.
ArrowDataType::Struct(_) => {
let arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
for arr in arr.values() {
cols.push(arr.clone() as ArrayRef);
fields.push(field)
}
},
_ => {
cols.push(arr);
fields.push(field)
},
}
}
Ok(convert_columns(&cols, &fields))
}

pub fn _get_rows_encoded(
by: &[Series],
descending: &[bool],
Expand All @@ -123,17 +147,18 @@ pub fn _get_rows_encoded(
for (by, descending) in by.iter().zip(descending) {
let arr = _get_rows_encoded_compat_array(by)?;

let sort_field = SortField {
let sort_field = EncodingField {
descending: *descending,
nulls_last,
no_order: false,
};
match arr.data_type() {
// Flatten the struct fields.
ArrowDataType::Struct(_) => {
let arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
for arr in arr.values() {
cols.push(arr.clone() as ArrayRef);
fields.push(sort_field.clone())
fields.push(sort_field)
}
},
_ => {
Expand All @@ -155,6 +180,14 @@ pub fn _get_rows_encoded_ca(
.map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
}

pub fn _get_rows_encoded_ca_unordered(
name: &str,
by: &[Series],
) -> PolarsResult<BinaryOffsetChunked> {
_get_rows_encoded_unordered(by)
.map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
}

pub(crate) fn argsort_multiple_row_fmt(
by: &[Series],
mut descending: Vec<bool>,
Expand Down
1 change: 0 additions & 1 deletion crates/polars-core/src/frame/group_by/into_groups.rs
Expand Up @@ -299,7 +299,6 @@ impl IntoGroupsProxy for BinaryChunked {
})
.collect::<Vec<_>>()
});
let byte_hashes = byte_hashes.iter().collect::<Vec<_>>();
group_by_threaded_slice(byte_hashes, n_partitions, sorted)
} else {
let byte_hashes = fill_bytes_hashes(self, null_h, hb.clone());
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-core/src/frame/group_by/mod.rs
Expand Up @@ -23,7 +23,7 @@ pub use into_groups::*;
pub use proxy::*;

use crate::prelude::sort::arg_sort_multiple::{
encode_rows_default, encode_rows_vertical_par_default,
encode_rows_unordered, encode_rows_vertical_par_unordered,
};

impl DataFrame {
Expand Down Expand Up @@ -84,9 +84,9 @@ impl DataFrame {
})
} else {
let rows = if multithreaded {
encode_rows_vertical_par_default(&by)
encode_rows_vertical_par_unordered(&by)
} else {
encode_rows_default(&by)
encode_rows_unordered(&by)
}?
.into_series();
rows.group_tuples(multithreaded, sorted)
Expand Down
@@ -1,7 +1,7 @@
use std::cell::UnsafeCell;

use polars_core::export::ahash::RandomState;
use polars_row::{RowsEncoded, SortField};
use polars_row::{EncodingField, RowsEncoded};

use super::*;
use crate::executors::sinks::group_by::utils::prepare_key;
Expand All @@ -18,7 +18,7 @@ pub(super) struct Eval {
aggregation_series: UnsafeCell<Vec<Series>>,
keys_columns: UnsafeCell<Vec<ArrayRef>>,
hashes: Vec<u64>,
key_fields: Vec<SortField>,
key_fields: Vec<EncodingField>,
// amortizes the encoding buffers
rows_encoded: RowsEncoded,
}
Expand Down
15 changes: 6 additions & 9 deletions crates/polars-pipe/src/executors/sinks/sort/sink_multiple.rs
Expand Up @@ -7,23 +7,20 @@ use polars_core::prelude::*;
use polars_core::series::IsSorted;
use polars_plan::prelude::*;
use polars_row::decode::decode_rows_from_binary;
use polars_row::SortField;
use polars_row::EncodingField;

use super::*;
use crate::operators::{
DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult, Source, SourceResult,
};
const POLARS_SORT_COLUMN: &str = "__POLARS_SORT_COLUMN";

fn get_sort_fields(sort_idx: &[usize], sort_args: &SortArguments) -> Vec<SortField> {
fn get_sort_fields(sort_idx: &[usize], sort_args: &SortArguments) -> Vec<EncodingField> {
let mut descending = sort_args.descending.clone();
_broadcast_descending(sort_idx.len(), &mut descending);
descending
.into_iter()
.map(|descending| SortField {
descending,
nulls_last: sort_args.nulls_last,
})
.map(|descending| EncodingField::new_sorted(descending, sort_args.nulls_last))
.collect()
}

Expand Down Expand Up @@ -61,7 +58,7 @@ fn finalize_dataframe(
can_decode: bool,
sort_dtypes: Option<&[ArrowDataType]>,
rows: &mut Vec<&'static [u8]>,
sort_fields: &[SortField],
sort_fields: &[EncodingField],
schema: &Schema,
) {
unsafe {
Expand Down Expand Up @@ -126,7 +123,7 @@ pub struct SortSinkMultiple {
sort_sink: Box<dyn Sink>,
sort_args: SortArguments,
// Needed for encoding
sort_fields: Arc<[SortField]>,
sort_fields: Arc<[EncodingField]>,
sort_dtypes: Option<Arc<[DataType]>>,
// amortize allocs
sort_column: Vec<ArrayRef>,
Expand Down Expand Up @@ -320,7 +317,7 @@ struct DropEncoded {
can_decode: bool,
sort_dtypes: Option<Vec<ArrowDataType>>,
rows: Vec<&'static [u8]>,
sort_fields: Arc<[SortField]>,
sort_fields: Arc<[EncodingField]>,
output_schema: SchemaRef,
}

Expand Down
6 changes: 3 additions & 3 deletions crates/polars-row/src/decode.rs
Expand Up @@ -10,7 +10,7 @@ use crate::variable::{decode_binary, decode_binview};
/// encodings.
pub unsafe fn decode_rows_from_binary<'a>(
arr: &'a BinaryArray<i64>,
fields: &[SortField],
fields: &[EncodingField],
data_types: &[ArrowDataType],
rows: &mut Vec<&'a [u8]>,
) -> Vec<ArrayRef> {
Expand All @@ -27,7 +27,7 @@ pub unsafe fn decode_rows_from_binary<'a>(
pub unsafe fn decode_rows(
// the rows will be updated while the data is decoded
rows: &mut [&[u8]],
fields: &[SortField],
fields: &[EncodingField],
data_types: &[ArrowDataType],
) -> Vec<ArrayRef> {
assert_eq!(fields.len(), data_types.len());
Expand All @@ -38,7 +38,7 @@ pub unsafe fn decode_rows(
.collect()
}

unsafe fn decode(rows: &mut [&[u8]], field: &SortField, data_type: &ArrowDataType) -> ArrayRef {
unsafe fn decode(rows: &mut [&[u8]], field: &EncodingField, data_type: &ArrowDataType) -> ArrayRef {
match data_type {
ArrowDataType::Null => NullArray::new(ArrowDataType::Null, rows.len()).to_boxed(),
ArrowDataType::Boolean => decode_bool(rows, field).to_boxed(),
Expand Down

0 comments on commit 5cdeea2

Please sign in to comment.