Skip to content

Commit

Permalink
fix invalid fast path of sorted joins and improve sortedness propagat…
Browse files Browse the repository at this point in the history
…ion (#3577)
  • Loading branch information
ritchie46 committed Jun 5, 2022
1 parent 07bb987 commit fe771f5
Show file tree
Hide file tree
Showing 24 changed files with 161 additions and 88 deletions.
46 changes: 0 additions & 46 deletions polars/polars-arrow/src/kernels/sorted_join/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,52 +60,6 @@ pub fn join<T: PartialOrd + Copy + Debug>(
// continue looping the right side
right_idx += 1;
}
// loop {
// match right.get(right_idx as usize) {
// Some(&val_r) => {
// // matching join key
// if val_l == val_r {
// out_lhs.push(left_idx + left_offset);
// out_rhs.push(right_idx);
// let current_idx = right_idx;
//
// loop {
// right_idx += 1;
// match right.get(right_idx as usize) {
// // rhs depleted
// None => {
// // reset right index because the next lhs value can be the same
// right_idx = current_idx;
// break;
// }
// Some(&val_r) => {
// if val_l == val_r {
// out_lhs.push(left_idx + left_offset);
// out_rhs.push(right_idx);
// } else {
// // reset right index because the next lhs value can be the same
// right_idx = current_idx;
// break;
// }
// }
// }
// }
// break;
// }
//
// // right is larger than left.
// if val_r > val_l {
// break;
// }
// // continue looping the right side
// right_idx += 1;
// }
// // we depleted the right array
// None => {
// break;
// }
// }
// }
left_idx += 1;
}
(out_lhs, out_rhs)
Expand Down
9 changes: 9 additions & 0 deletions polars/polars-core/src/chunked_array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use polars_arrow::prelude::*;

#[cfg(feature = "dtype-categorical")]
use crate::chunked_array::categorical::RevMapping;
use crate::series::IsSorted;
use crate::utils::CustomIterTools;
use std::mem;

Expand Down Expand Up @@ -171,6 +172,14 @@ impl<T> ChunkedArray<T> {
}
}

pub fn set_sorted2(&mut self, sorted: IsSorted) {
match sorted {
IsSorted::Not => {}
IsSorted::Ascending => self.set_sorted(false),
IsSorted::Descending => self.set_sorted(true),
}
}

/// Get the index of the first non null value in this ChunkedArray.
pub fn first_non_null(&self) -> Option<usize> {
let mut offset = 0;
Expand Down
18 changes: 12 additions & 6 deletions polars/polars-core/src/chunked_array/ops/take/take_chunked.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::*;
use crate::series::IsSorted;

pub trait TakeChunked {
unsafe fn take_chunked_unchecked(&self, by: &[ChunkId]) -> Self;
unsafe fn take_chunked_unchecked(&self, by: &[ChunkId], sorted: IsSorted) -> Self;

unsafe fn take_opt_chunked_unchecked(&self, by: &[Option<ChunkId>]) -> Self;
}
Expand All @@ -10,7 +11,7 @@ impl<T> TakeChunked for ChunkedArray<T>
where
T: PolarsNumericType,
{
unsafe fn take_chunked_unchecked(&self, by: &[ChunkId]) -> Self {
unsafe fn take_chunked_unchecked(&self, by: &[ChunkId], sorted: IsSorted) -> Self {
let mut ca = if self.null_count() == 0 {
let arrs = self
.downcast_iter()
Expand All @@ -36,6 +37,7 @@ where
.collect_trusted()
};
ca.rename(self.name());
ca.set_sorted2(sorted);
ca
}

Expand All @@ -57,7 +59,7 @@ where
}

impl TakeChunked for Utf8Chunked {
unsafe fn take_chunked_unchecked(&self, by: &[ChunkId]) -> Self {
unsafe fn take_chunked_unchecked(&self, by: &[ChunkId], sorted: IsSorted) -> Self {
let arrs = self.downcast_iter().collect::<Vec<_>>();
let mut ca: Self = by
.iter()
Expand All @@ -67,6 +69,7 @@ impl TakeChunked for Utf8Chunked {
})
.collect_trusted();
ca.rename(self.name());
ca.set_sorted2(sorted);
ca
}

Expand All @@ -88,7 +91,7 @@ impl TakeChunked for Utf8Chunked {
}

impl TakeChunked for BooleanChunked {
unsafe fn take_chunked_unchecked(&self, by: &[ChunkId]) -> Self {
unsafe fn take_chunked_unchecked(&self, by: &[ChunkId], sorted: IsSorted) -> Self {
let arrs = self.downcast_iter().collect::<Vec<_>>();
let mut ca: Self = by
.iter()
Expand All @@ -98,6 +101,7 @@ impl TakeChunked for BooleanChunked {
})
.collect_trusted();
ca.rename(self.name());
ca.set_sorted2(sorted);
ca
}

Expand All @@ -119,7 +123,7 @@ impl TakeChunked for BooleanChunked {
}

impl TakeChunked for ListChunked {
unsafe fn take_chunked_unchecked(&self, by: &[ChunkId]) -> Self {
unsafe fn take_chunked_unchecked(&self, by: &[ChunkId], sorted: IsSorted) -> Self {
let arrs = self.downcast_iter().collect::<Vec<_>>();
let mut ca: Self = by
.iter()
Expand All @@ -129,6 +133,7 @@ impl TakeChunked for ListChunked {
})
.collect();
ca.rename(self.name());
ca.set_sorted2(sorted);
ca
}

Expand All @@ -150,7 +155,7 @@ impl TakeChunked for ListChunked {
}
#[cfg(feature = "object")]
impl<T: PolarsObject> TakeChunked for ObjectChunked<T> {
unsafe fn take_chunked_unchecked(&self, by: &[ChunkId]) -> Self {
unsafe fn take_chunked_unchecked(&self, by: &[ChunkId], sorted: IsSorted) -> Self {
let arrs = self.downcast_iter().collect::<Vec<_>>();

let mut ca: Self = by
Expand All @@ -162,6 +167,7 @@ impl<T: PolarsObject> TakeChunked for ObjectChunked<T> {
.collect();

ca.rename(self.name());
ca.set_sorted2(sorted);
ca
}

Expand Down
3 changes: 2 additions & 1 deletion polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ impl DataFrame {
if left_join && chunk_ids.len() == self.height() {
self.clone()
} else {
self.take_chunked_unchecked(chunk_ids)
// left join keys are in ascending order
self.take_chunked_unchecked(chunk_ids, IsSorted::Ascending)
}
}

Expand Down
10 changes: 5 additions & 5 deletions polars/polars-core/src/frame/hash_join/sort_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use polars_arrow::kernels::sorted_join;
use polars_utils::flatten;

pub(super) fn use_sort_merge(s_left: &Series, s_right: &Series) -> bool {
let out = !matches!(s_left.is_sorted(), IsSorted::Not)
&& !matches!(s_right.is_sorted(), IsSorted::Not)
&& s_left.null_count() == 0
&& s_right.null_count() == 0;

use IsSorted::*;
let out = match (s_left.is_sorted(), s_right.is_sorted()) {
(Ascending, Ascending) => s_left.null_count() == 0 && s_right.null_count() == 0,
_ => false,
};
if out && std::env::var("POLARS_VERBOSE").is_ok() {
eprintln!("keys are sorted: use sorted merge join")
}
Expand Down
22 changes: 19 additions & 3 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::POOL;
use serde::{Deserialize, Serialize};
use std::hash::{BuildHasher, Hash, Hasher};

use crate::series::IsSorted;
pub use chunks::*;

#[derive(Copy, Clone, Debug)]
Expand Down Expand Up @@ -3004,10 +3005,10 @@ impl DataFrame {
}

#[cfg(feature = "chunked_ids")]
pub(crate) unsafe fn take_chunked_unchecked(&self, idx: &[ChunkId]) -> Self {
pub(crate) unsafe fn take_chunked_unchecked(&self, idx: &[ChunkId], sorted: IsSorted) -> Self {
let cols = self.apply_columns_par(&|s| match s.dtype() {
DataType::Utf8 => s._take_chunked_unchecked_threaded(idx, true),
_ => s._take_chunked_unchecked(idx),
DataType::Utf8 => s._take_chunked_unchecked_threaded(idx, sorted, true),
_ => s._take_chunked_unchecked(idx, sorted),
});

DataFrame::new_no_checks(cols)
Expand All @@ -3027,11 +3028,26 @@ impl DataFrame {
/// every thread split may be on rayon stack and lead to SO
#[doc(hidden)]
pub unsafe fn _take_unchecked_slice(&self, idx: &[IdxSize], allow_threads: bool) -> Self {
self._take_unchecked_slice2(idx, allow_threads, IsSorted::Not)
}

#[doc(hidden)]
pub unsafe fn _take_unchecked_slice2(
&self,
idx: &[IdxSize],
allow_threads: bool,
sorted: IsSorted,
) -> Self {
let ptr = idx.as_ptr() as *mut IdxSize;
let len = idx.len();

// create a temporary vec. we will not drop it.
let mut ca = IdxCa::from_vec("", Vec::from_raw_parts(ptr, len, len));
match sorted {
IsSorted::Not => {}
IsSorted::Ascending => ca.set_sorted(false),
IsSorted::Descending => ca.set_sorted(true),
}
let out = self.take_unchecked_impl(&ca, allow_threads);

// ref count of buffers should be one because we dropped all allocations
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-core/src/series/implementations/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ impl SeriesTrait for SeriesWrap<BooleanChunked> {
}

#[cfg(feature = "chunked_ids")]
unsafe fn _take_chunked_unchecked(&self, by: &[ChunkId]) -> Series {
self.0.take_chunked_unchecked(by).into_series()
unsafe fn _take_chunked_unchecked(&self, by: &[ChunkId], sorted: IsSorted) -> Series {
self.0.take_chunked_unchecked(by, sorted).into_series()
}

#[cfg(feature = "chunked_ids")]
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-core/src/series/implementations/categorical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ impl SeriesTrait for SeriesWrap<CategoricalChunked> {
}

#[cfg(feature = "chunked_ids")]
unsafe fn _take_chunked_unchecked(&self, by: &[ChunkId]) -> Series {
let cats = self.0.logical().take_chunked_unchecked(by);
unsafe fn _take_chunked_unchecked(&self, by: &[ChunkId], sorted: IsSorted) -> Series {
let cats = self.0.logical().take_chunked_unchecked(by, sorted);
self.finish_with_state(false, cats).into_series()
}

Expand Down
4 changes: 2 additions & 2 deletions polars/polars-core/src/series/implementations/dates_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ macro_rules! impl_dyn_series {
}

#[cfg(feature = "chunked_ids")]
unsafe fn _take_chunked_unchecked(&self, by: &[ChunkId]) -> Series {
let ca = self.0.deref().take_chunked_unchecked(by);
unsafe fn _take_chunked_unchecked(&self, by: &[ChunkId], sorted: IsSorted) -> Series {
let ca = self.0.deref().take_chunked_unchecked(by, sorted);
ca.$into_logical().into_series()
}

Expand Down
4 changes: 2 additions & 2 deletions polars/polars-core/src/series/implementations/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ impl SeriesTrait for SeriesWrap<DatetimeChunked> {
}

#[cfg(feature = "chunked_ids")]
unsafe fn _take_chunked_unchecked(&self, by: &[ChunkId]) -> Series {
let ca = self.0.deref().take_chunked_unchecked(by);
unsafe fn _take_chunked_unchecked(&self, by: &[ChunkId], sorted: IsSorted) -> Series {
let ca = self.0.deref().take_chunked_unchecked(by, sorted);
ca.into_datetime(self.0.time_unit(), self.0.time_zone().clone())
.into_series()
}
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-core/src/series/implementations/duration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,8 @@ impl SeriesTrait for SeriesWrap<DurationChunked> {
}

#[cfg(feature = "chunked_ids")]
unsafe fn _take_chunked_unchecked(&self, by: &[ChunkId]) -> Series {
let ca = self.0.deref().take_chunked_unchecked(by);
unsafe fn _take_chunked_unchecked(&self, by: &[ChunkId], sorted: IsSorted) -> Series {
let ca = self.0.deref().take_chunked_unchecked(by, sorted);
ca.into_duration(self.0.time_unit()).into_series()
}

Expand Down
4 changes: 2 additions & 2 deletions polars/polars-core/src/series/implementations/floats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ macro_rules! impl_dyn_series {
}

#[cfg(feature = "chunked_ids")]
unsafe fn _take_chunked_unchecked(&self, by: &[ChunkId]) -> Series {
self.0.take_chunked_unchecked(by).into_series()
unsafe fn _take_chunked_unchecked(&self, by: &[ChunkId], sorted: IsSorted) -> Series {
self.0.take_chunked_unchecked(by, sorted).into_series()
}

#[cfg(feature = "chunked_ids")]
Expand Down
6 changes: 4 additions & 2 deletions polars/polars-core/src/series/implementations/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::fmt::FmtList;
use crate::frame::groupby::*;
use crate::prelude::*;
use crate::series::implementations::SeriesWrap;
#[cfg(feature = "chunked_ids")]
use crate::series::IsSorted;
use arrow::array::ArrayRef;
use polars_arrow::prelude::QuantileInterpolOptions;
use std::any::Any;
Expand Down Expand Up @@ -113,8 +115,8 @@ impl SeriesTrait for SeriesWrap<ListChunked> {
}

#[cfg(feature = "chunked_ids")]
unsafe fn _take_chunked_unchecked(&self, by: &[ChunkId]) -> Series {
self.0.take_chunked_unchecked(by).into_series()
unsafe fn _take_chunked_unchecked(&self, by: &[ChunkId], sorted: IsSorted) -> Series {
self.0.take_chunked_unchecked(by, sorted).into_series()
}

#[cfg(feature = "chunked_ids")]
Expand Down
17 changes: 15 additions & 2 deletions polars/polars-core/src/series/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,19 @@ macro_rules! impl_dyn_series {
}

fn _set_sorted(&mut self, reverse: bool) {
#[cfg(debug_assertions)]
{
match (self.cont_slice(), self.is_empty()) {
(Ok(vals), false) => {
if reverse {
assert!(vals[0] >= vals[vals.len() - 1])
} else {
assert!(vals[0] <= vals[vals.len() - 1])
}
}
_ => {}
}
}
self.0.set_sorted(reverse)
}

Expand Down Expand Up @@ -468,8 +481,8 @@ macro_rules! impl_dyn_series {
}

#[cfg(feature = "chunked_ids")]
unsafe fn _take_chunked_unchecked(&self, by: &[ChunkId]) -> Series {
self.0.take_chunked_unchecked(by).into_series()
unsafe fn _take_chunked_unchecked(&self, by: &[ChunkId], sorted: IsSorted) -> Series {
self.0.take_chunked_unchecked(by, sorted).into_series()
}

#[cfg(feature = "chunked_ids")]
Expand Down
5 changes: 3 additions & 2 deletions polars/polars-core/src/series/implementations/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::frame::groupby::{GroupsProxy, IntoGroupsProxy};
use crate::prelude::*;
use crate::series::implementations::SeriesWrap;
use crate::series::private::{PrivateSeries, PrivateSeriesNumeric};
use crate::series::IsSorted;
use ahash::RandomState;
use arrow::array::ArrayRef;
use std::any::Any;
Expand Down Expand Up @@ -116,8 +117,8 @@ where
}

#[cfg(feature = "chunked_ids")]
unsafe fn _take_chunked_unchecked(&self, by: &[ChunkId]) -> Series {
self.0.take_chunked_unchecked(by).into_series()
unsafe fn _take_chunked_unchecked(&self, by: &[ChunkId], sorted: IsSorted) -> Series {
self.0.take_chunked_unchecked(by, sorted).into_series()
}

#[cfg(feature = "chunked_ids")]
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-core/src/series/implementations/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ impl SeriesTrait for SeriesWrap<StructChunked> {
}

#[cfg(feature = "chunked_ids")]
unsafe fn _take_chunked_unchecked(&self, by: &[ChunkId]) -> Series {
unsafe fn _take_chunked_unchecked(&self, by: &[ChunkId], sorted: IsSorted) -> Series {
self.0
.apply_fields(|s| s._take_chunked_unchecked(by))
.apply_fields(|s| s._take_chunked_unchecked(by, sorted))
.into_series()
}

Expand Down

0 comments on commit fe771f5

Please sign in to comment.