Skip to content

Commit

Permalink
test sortedness propagation (#3560)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 2, 2022
1 parent a98438b commit f1df2cb
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 21 deletions.
13 changes: 12 additions & 1 deletion polars/polars-core/src/frame/explode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,21 @@ impl DataFrame {
.expect("cmp usize -> Ordering")
});

let verbose = std::env::var("POLARS_VERBOSE").is_ok();

// TODO: optimize this.
// This is the slower easier option.
// instead of filtering the whole dataframe first
// drop empty list rows
for col in &mut columns {
if let Ok(ca) = col.list() {
if !ca.can_fast_explode() {
if verbose {
eprintln!(
"could not fast explode column {}, running slower path",
ca.name()
);
}
let (_, offsets) = get_exploded(col)?;
if offsets.is_empty() {
let mut out = self.slice(0, 0);
Expand Down Expand Up @@ -89,6 +97,8 @@ impl DataFrame {
.downcast_mut::<ListChunked>()
.unwrap();
ca.set_fast_explode();
} else if verbose {
eprintln!("could fast explode column {}", ca.name());
}
}
}
Expand All @@ -107,7 +117,8 @@ impl DataFrame {
// expand all the other columns based the exploded first column
if i == 0 {
let row_idx = offsets_to_indexes(&offsets, exploded.len());
let row_idx = IdxCa::from_vec("", row_idx);
let mut row_idx = IdxCa::from_vec("", row_idx);
row_idx.set_sorted(true);
// Safety
// We just created indices that are in bounds.
df = unsafe { df.take_unchecked(&row_idx) };
Expand Down
10 changes: 7 additions & 3 deletions polars/polars-core/src/series/implementations/dates_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,13 @@ macro_rules! impl_dyn_series {
}

unsafe fn take_unchecked(&self, idx: &IdxCa) -> Result<Series> {
Ok(ChunkTake::take_unchecked(self.0.deref(), idx.into())
.$into_logical()
.into_series())
let mut out = ChunkTake::take_unchecked(self.0.deref(), idx.into());

if self.0.is_sorted() && (idx.is_sorted() || idx.is_sorted_reverse()) {
out.set_sorted(idx.is_sorted_reverse())
}

Ok(out.$into_logical().into_series())
}

unsafe fn take_opt_iter_unchecked(&self, iter: &mut dyn TakeIteratorNulls) -> Series {
Expand Down
8 changes: 7 additions & 1 deletion polars/polars-core/src/series/implementations/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,13 @@ impl SeriesTrait for SeriesWrap<DatetimeChunked> {
}

unsafe fn take_unchecked(&self, idx: &IdxCa) -> Result<Series> {
Ok(ChunkTake::take_unchecked(self.0.deref(), idx.into())
let mut out = ChunkTake::take_unchecked(self.0.deref(), idx.into());

if self.0.is_sorted() && (idx.is_sorted() || idx.is_sorted_reverse()) {
out.set_sorted(idx.is_sorted_reverse())
}

Ok(out
.into_datetime(self.0.time_unit(), self.0.time_zone().clone())
.into_series())
}
Expand Down
10 changes: 7 additions & 3 deletions polars/polars-core/src/series/implementations/duration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,13 @@ impl SeriesTrait for SeriesWrap<DurationChunked> {
}

unsafe fn take_unchecked(&self, idx: &IdxCa) -> Result<Series> {
Ok(ChunkTake::take_unchecked(self.0.deref(), idx.into())
.into_duration(self.0.time_unit())
.into_series())
let mut out = ChunkTake::take_unchecked(self.0.deref(), idx.into());

if self.0.is_sorted() && (idx.is_sorted() || idx.is_sorted_reverse()) {
out.set_sorted(idx.is_sorted_reverse())
}

Ok(out.into_duration(self.0.time_unit()).into_series())
}

unsafe fn take_opt_iter_unchecked(&self, iter: &mut dyn TakeIteratorNulls) -> Series {
Expand Down
9 changes: 8 additions & 1 deletion polars/polars-core/src/series/implementations/floats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,14 @@ macro_rules! impl_dyn_series {
} else {
Cow::Borrowed(idx)
};
Ok(ChunkTake::take_unchecked(&self.0, (&*idx).into()).into_series())

let mut out = ChunkTake::take_unchecked(&self.0, (&*idx).into());

if self.0.is_sorted() && (idx.is_sorted() || idx.is_sorted_reverse()) {
out.set_sorted(idx.is_sorted_reverse())
}

Ok(out.into_series())
}

unsafe fn take_opt_iter_unchecked(&self, iter: &mut dyn TakeIteratorNulls) -> Series {
Expand Down
6 changes: 5 additions & 1 deletion polars/polars-core/src/series/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,11 @@ macro_rules! impl_dyn_series {
} else {
Cow::Borrowed(idx)
};
Ok(ChunkTake::take_unchecked(&self.0, (&*idx).into()).into_series())
let mut out = ChunkTake::take_unchecked(&self.0, (&*idx).into());
if self.0.is_sorted() && (idx.is_sorted() || idx.is_sorted_reverse()) {
out.set_sorted(idx.is_sorted_reverse())
}
Ok(out.into_series())
}

unsafe fn take_opt_iter_unchecked(&self, iter: &mut dyn TakeIteratorNulls) -> Series {
Expand Down
9 changes: 8 additions & 1 deletion polars/polars-core/src/series/implementations/utf8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,14 @@ impl SeriesTrait for SeriesWrap<Utf8Chunked> {
} else {
Cow::Borrowed(idx)
};
Ok(ChunkTake::take_unchecked(&self.0, (&*idx).into()).into_series())

let mut out = ChunkTake::take_unchecked(&self.0, (&*idx).into()).into_series();

if self.0.is_sorted() && (idx.is_sorted() || idx.is_sorted_reverse()) {
out.set_sorted(idx.is_sorted_reverse())
}

Ok(out)
}

unsafe fn take_opt_iter_unchecked(&self, iter: &mut dyn TakeIteratorNulls) -> Series {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/series/series_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::borrow::Cow;
#[cfg(feature = "temporal")]
use std::sync::Arc;

#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum IsSorted {
Ascending,
Descending,
Expand Down
26 changes: 20 additions & 6 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,12 +752,17 @@ impl LazyFrame {
/// ])
/// }
/// ```
pub fn groupby<E: AsRef<[Expr]>>(self, by: E) -> LazyGroupBy {
pub fn groupby<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
let keys = by
.as_ref()
.iter()
.map(|e| e.clone().into())
.collect::<Vec<_>>();
let opt_state = self.get_opt_state();
LazyGroupBy {
logical_plan: self.logical_plan,
opt_state,
keys: by.as_ref().to_vec(),
keys,
maintain_order: false,
dynamic_options: None,
rolling_options: None,
Expand Down Expand Up @@ -797,12 +802,17 @@ impl LazyFrame {
}

/// Similar to groupby, but order of the DataFrame is maintained.
pub fn groupby_stable<E: AsRef<[Expr]>>(self, by: E) -> LazyGroupBy {
pub fn groupby_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
let keys = by
.as_ref()
.iter()
.map(|e| e.clone().into())
.collect::<Vec<_>>();
let opt_state = self.get_opt_state();
LazyGroupBy {
logical_plan: self.logical_plan,
opt_state,
keys: by.as_ref().to_vec(),
keys,
maintain_order: true,
dynamic_options: None,
rolling_options: None,
Expand Down Expand Up @@ -983,8 +993,12 @@ impl LazyFrame {
}

/// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
pub fn explode<E: AsRef<[Expr]>>(self, columns: E) -> LazyFrame {
let columns = columns.as_ref().to_vec();
pub fn explode<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, columns: E) -> LazyFrame {
let columns = columns
.as_ref()
.iter()
.map(|e| e.clone().into())
.collect::<Vec<_>>();
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().explode(columns).build();
Self::from_logical_plan(lp, opt_state)
Expand Down
24 changes: 24 additions & 0 deletions polars/tests/it/lazy/queries.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use polars_core::series::IsSorted;

#[test]
fn test_with_duplicate_column_empty_df() {
Expand Down Expand Up @@ -124,3 +125,26 @@ fn test_alias_before_cast() -> Result<()> {
);
Ok(())
}

#[test]
fn test_sorted_path() -> Result<()> {
// start with a sorted column and see if the metadata remains preserved

let payloads = &[1, 2, 3];
let df = df![
"a"=> [AnyValue::List(Series::new("", payloads)), AnyValue::List(Series::new("", payloads)), AnyValue::List(Series::new("", payloads))]
]?;

let out = df
.lazy()
.with_row_count("row_nr", None)
.explode(["a"])
.groupby(["row_nr"])
.agg([col("a").count().alias("count")])
.collect()?;

let s = out.column("row_nr")?;
assert_eq!(s.is_sorted(), IsSorted::Ascending);

Ok(())
}
29 changes: 26 additions & 3 deletions py-polars/src/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,32 @@ impl PySeries {
#[staticmethod]
pub fn from_arrow(name: &str, array: &PyAny) -> PyResult<Self> {
let arr = array_to_rust(array)?;
let series: Series =
std::convert::TryFrom::try_from((name, arr)).map_err(PyPolarsErr::from)?;
Ok(series.into())

match arr.data_type() {
ArrowDataType::LargeList(_) => {
let array = arr.as_any().downcast_ref::<LargeListArray>().unwrap();

let mut previous = 0;
let mut fast_explode = true;
for &o in array.offsets().as_slice()[1..].iter() {
if o == previous {
fast_explode = false;
break;
}
previous = o;
}
let mut out = ListChunked::from_chunks(name, vec![arr]);
if fast_explode {
out.set_fast_explode()
}
return Ok(out.into_series().into());
}
_ => {
let series: Series =
std::convert::TryFrom::try_from((name, arr)).map_err(PyPolarsErr::from)?;
Ok(series.into())
}
}
}

#[staticmethod]
Expand Down

0 comments on commit f1df2cb

Please sign in to comment.