Skip to content

Commit

Permalink
implement multilevel sort in python
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 6, 2021
1 parent b6b6d90 commit f18eda1
Show file tree
Hide file tree
Showing 13 changed files with 100 additions and 42 deletions.
16 changes: 9 additions & 7 deletions polars/polars-core/src/chunked_array/ops/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,15 @@ where
// if ordering is equal, we check the other arrays until we find a non-equal ordering
// if we have exhausted all arrays, we keep the equal ordering.
Ordering::Equal => {
let idx_a = tpl_a.0;
let idx_b = tpl_b.0;
let idx_a = tpl_a.0 as usize;
let idx_b = tpl_b.0 as usize;
for ca in other {
// Safety:
// Indexes are in bounds, we asserted equal lengths above
let a = unsafe { ca.get_unchecked(idx_a as usize) };
let b = unsafe { ca.get_unchecked(idx_b as usize) };
debug_assert!(idx_a < ca.len());
debug_assert!(idx_b < ca.len());
let a = unsafe { ca.get_unchecked(idx_a) };
let b = unsafe { ca.get_unchecked(idx_b) };

match a.partial_cmp(&b).unwrap() {
// also equal, try next array
Expand Down Expand Up @@ -359,12 +361,12 @@ mod test {
#[test]
fn test_argsort_multiple() -> Result<()> {
let a = Int32Chunked::new_from_slice("a", &[1, 2, 1, 1, 3, 4, 3, 3]);
let b = Int32Chunked::new_from_slice("b", &[0, 1, 2, 3, 4, 5, 6, 1]);
let df = DataFrame::new(vec![a, b])?;
let b = Int64Chunked::new_from_slice("b", &[0, 1, 2, 3, 4, 5, 6, 1]);
let df = DataFrame::new(vec![a.into_series(), b.into_series()])?;

let out = df.sort(&["a", "b"], false)?;
assert_eq!(
Vec::from(out.column("b")?.i32()?),
Vec::from(out.column("b")?.i64()?),
&[
Some(0),
Some(2),
Expand Down
6 changes: 4 additions & 2 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ use crate::chunked_array::ops::unique::is_unique_helper;
use crate::frame::select::Selection;
use crate::prelude::*;
use crate::utils::{
accumulate_dataframes_horizontal, accumulate_dataframes_vertical, get_supertype,
CustomIterTools, NoNull,
accumulate_dataframes_horizontal, accumulate_dataframes_vertical, get_supertype, NoNull,
};

mod arithmetic;
Expand Down Expand Up @@ -764,7 +763,9 @@ impl DataFrame {
#[cfg(feature = "sort_multiple")]
{
let mut columns = self.select_series(by_column)?;

// we only allow this implementation of the same types
// se we determine the supertypes and coerce all series.
let first = columns.remove(0);
let first_dtype = Cow::Borrowed(first.dtype());
let dtype = columns
Expand All @@ -776,6 +777,7 @@ impl DataFrame {
.iter()
.map(|s| s.cast_with_datatype(&*dtype))
.collect::<Result<Vec<_>>>()?;
let first = first.cast_with_datatype(&*dtype)?;
first.argsort_multiple(&columns, reverse)?
}
#[cfg(not(feature = "sort_multiple"))]
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-lazy/src/datafusion/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,9 @@ pub fn to_datafusion_lp(lp: LogicalPlan) -> Result<DLogicalPlan> {
.into_iter()
.map(|e| {
if reverse {
to_datafusion_expr(e.reverse())
to_datafusion_expr(e.reverse()).map(|e| e.sort(!reverse, true))
} else {
to_datafusion_expr(e)
to_datafusion_expr(e).map(|e| e.sort(!reverse, true))
}
})
.collect::<Result<Vec<_>>>()?,
Expand Down
10 changes: 7 additions & 3 deletions polars/polars-lazy/src/datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ use tokio::runtime::Runtime;

impl LazyFrame {
/// Collect Out of Core on the DataFusion query engine
pub fn ooc(self) -> Result<DataFrame> {
// Don't use Polars optimizer, but transpile and send to DataFusion
let lp = self.logical_plan;
pub fn ooc(mut self) -> Result<DataFrame> {
let mut expr_arena = Arena::with_capacity(64);
let mut lp_arena = Arena::with_capacity(64);
self.opt_state.predicate_pushdown = false;
self.opt_state.projection_pushdown = false;
let lp_top = self.optimize(&mut lp_arena, &mut expr_arena)?;
let lp = node_to_lp(lp_top, &mut expr_arena, &mut lp_arena);
let lp = to_datafusion_lp(lp)?;

let ctx = ExecutionContext::with_config(ExecutionConfig::new().with_concurrency(8));
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl IntoLazy for DataFrame {
#[derive(Clone)]
pub struct LazyFrame {
pub(crate) logical_plan: LogicalPlan,
opt_state: OptState,
pub(crate) opt_state: OptState,
}

impl Default for LazyFrame {
Expand Down
5 changes: 1 addition & 4 deletions polars/polars-lazy/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,10 +583,7 @@ pub(crate) fn node_to_exp(node: Node, expr_arena: &Arena<AExpr>) -> Expr {
}

fn nodes_to_exprs(nodes: &[Node], expr_arena: &Arena<AExpr>) -> Vec<Expr> {
nodes
.into_iter()
.map(|n| node_to_exp(*n, expr_arena))
.collect()
nodes.iter().map(|n| node_to_exp(*n, expr_arena)).collect()
}

pub(crate) fn node_to_lp(
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/physical_plan/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ impl Executor for SortExec {
column_names.push(name.to_string());
// if error, expression create a new named column and we must add it to the DataFrame
// if ok, we have replaced the column with the expression eval
if let Err(_) = df.apply(name, |_| column.clone()) {
if df.apply(name, |_| column.clone()).is_err() {
df.hstack(&[column])?;
}
}
Expand Down
4 changes: 2 additions & 2 deletions py-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ name = "polars"
requires-dist = ["numpy", "pyarrow==4.0"]

[profile.release]
codegen-units = 1
#codegen-units = 1

# This is ignored here; would be set in .cargo/config.toml.
# Should not be used when packaging
# target-cpu = "native"
lto = "fat"
#lto = "fat"

#[profile.dev]
#opt-level = 1
33 changes: 27 additions & 6 deletions py-polars/polars/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ def _prepare_other_arg(other: Any) -> Series:
return other


def _is_expr(arg: Any) -> bool:
return hasattr(arg, "_pyexpr")


class DataFrame:
def __init__(
self,
Expand Down Expand Up @@ -738,14 +742,17 @@ def replace_at_idx(self, index: int, series: Series):
self._df.replace_at_idx(index, series._s)

def sort(
self, by_column: str, in_place: bool = False, reverse: bool = False
self,
by: "Union[str, Expr, List[Expr]]",
in_place: bool = False,
reverse: bool = False,
) -> Optional["DataFrame"]:
"""
Sort the DataFrame by column
Parameters
----------
by_column
by
By which column to sort. Only accepts string.
in_place
Perform operation in-place.
Expand All @@ -755,13 +762,13 @@ def sort(
Example
---
```python
>>> pl.DataFrame({
>>> df = pl.DataFrame({
"foo": [1, 2, 3],
"bar": [6.0, 7.0, 8.0],
"ham": ['a', 'b', 'c']
})
>>> dataframe.sort('foo', reverse=True)
>>> df.sort('foo', reverse=True)
shape: (3, 3)
╭─────┬─────┬─────╮
│ foo ┆ bar ┆ ham │
Expand All @@ -775,11 +782,25 @@ def sort(
│ 1 ┆ 6 ┆ "a" │
╰─────┴─────┴─────╯
```
### Sort by multiple columns.
For multiple columns we can use expression syntax
```python
df.sort([col("foo"), col("bar").reverse()])
```
"""
if type(by) is list or _is_expr(by):
df = self.lazy().sort(by, reverse).collect(no_optimization=True)
if in_place:
self._df = df._df
return
return df
if in_place:
self._df.sort_in_place(by_column, reverse)
self._df.sort_in_place(by, reverse)
else:
return wrap_df(self._df.sort(by_column, reverse))
return wrap_df(self._df.sort(by, reverse))

def frame_equal(self, other: "DataFrame", null_equal: bool = False) -> bool:
"""
Expand Down
29 changes: 26 additions & 3 deletions py-polars/polars/lazy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,29 @@ def describe_optimized_plan(

return ldf.describe_optimized_plan()

def sort(self, by_column: str, reverse: bool = False) -> "LazyFrame":
return wrap_ldf(self._ldf.sort(by_column, reverse))
def sort(
self, by_columns: "Union[str, Expr, List[Expr]]", reverse: bool = False
) -> "LazyFrame":
"""
Sort the DataFrame by:
- A single column name
- An expression
- Multiple expressions
Parameters
----------
by_columns
Column (expressions) to sort by
reverse
Whether or not to sort in reverse order
"""
if type(by_columns) is str:
return wrap_ldf(self._ldf.sort(by_columns, reverse))

by_columns = expr_to_lit_or_expr(by_columns)
by_columns = _selection_to_pyexpr_list(by_columns)
return wrap_ldf(self._ldf.sort_by_exprs(by_columns, reverse))

def collect(
self,
Expand Down Expand Up @@ -1426,9 +1447,11 @@ def is_between(
return ((expr > start) & (expr < end)).alias("is_between")


def expr_to_lit_or_expr(expr: Union["Expr", int, float, str]) -> "Expr":
def expr_to_lit_or_expr(expr: "Union[Expr, int, float, str, List[Expr]]") -> "Expr":
if isinstance(expr, (int, float, str)):
return lit(expr)
if isinstance(expr, list):
return [expr_to_lit_or_expr(e) for e in expr]
return expr


Expand Down
6 changes: 6 additions & 0 deletions py-polars/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ impl PyLazyFrame {
let ldf = self.ldf.clone();
ldf.sort(by_column, reverse).into()
}

pub fn sort_by_exprs(&self, by_column: Vec<PyExpr>, reverse: bool) -> PyLazyFrame {
let ldf = self.ldf.clone();
let exprs = py_exprs_to_exprs(by_column);
ldf.sort_by_exprs(exprs, reverse).into()
}
pub fn cache(&self) -> PyLazyFrame {
let ldf = self.ldf.clone();
ldf.cache().into()
Expand Down
21 changes: 14 additions & 7 deletions py-polars/tests/test_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,50 +154,50 @@ def test_groupby():
assert (
df.groupby("a")["b"]
.sum()
.sort(by_column="a")
.sort(by="a")
.frame_equal(DataFrame({"a": ["a", "b", "c"], "": [4, 11, 6]}))
)

assert (
df.groupby("a")
.select("b")
.sum()
.sort(by_column="a")
.sort(by="a")
.frame_equal(DataFrame({"a": ["a", "b", "c"], "": [4, 11, 6]}))
)
assert (
df.groupby("a")
.select("c")
.sum()
.sort(by_column="a")
.sort(by="a")
.frame_equal(DataFrame({"a": ["a", "b", "c"], "": [10, 10, 1]}))
)
assert (
df.groupby("a")
.select("b")
.min()
.sort(by_column="a")
.sort(by="a")
.frame_equal(DataFrame({"a": ["a", "b", "c"], "": [1, 2, 6]}))
)
assert (
df.groupby("a")
.select("b")
.max()
.sort(by_column="a")
.sort(by="a")
.frame_equal(DataFrame({"a": ["a", "b", "c"], "": [3, 5, 6]}))
)
assert (
df.groupby("a")
.select("b")
.mean()
.sort(by_column="a")
.sort(by="a")
.frame_equal(DataFrame({"a": ["a", "b", "c"], "": [2.0, (2 + 4 + 5) / 3, 6.0]}))
)
assert (
df.groupby("a")
.select("b")
.last()
.sort(by_column="a")
.sort(by="a")
.frame_equal(DataFrame({"a": ["a", "b", "c"], "": [3, 5, 6]}))
)
# check if it runs
Expand Down Expand Up @@ -522,3 +522,10 @@ def test_lazy_functions():
expected = 3
assert np.isclose(out[9], expected)
assert np.isclose(pl.last(df["b"]), expected)


def test_multiple_column_sort():
df = pl.DataFrame({"a": ["foo", "bar", "2"], "b": [2, 2, 3], "c": [1.0, 2.0, 3.0]})
out = df.sort([col("b"), col("c").reverse()])
assert out["c"] == [2, 3, 1]
assert out["b"] == [2, 2, 3]
4 changes: 0 additions & 4 deletions py-polars/tests/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ def test_describe():
float_s = pl.Series([1.3, 4.6, 8.9])
str_s = pl.Series(["abc", "pqr", "xyz"])
bool_s = pl.Series([True, False, True, True])
list_s = pl.Series([[5.0, 6.0], [1.0, 2.0]])
empty_s = pl.Series(np.empty(0))

assert num_s.describe() == {
Expand All @@ -314,6 +313,3 @@ def test_describe():

with pytest.raises(ValueError):
assert empty_s.describe()

with pytest.raises(TypeError):
assert list_s.describe()

0 comments on commit f18eda1

Please sign in to comment.