-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
sort.rs
69 lines (62 loc) · 2.11 KB
/
sort.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
use super::*;
pub(crate) struct SortExec {
pub(crate) input: Box<dyn Executor>,
pub(crate) by_column: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) slice: Option<(i64, usize)>,
pub(crate) sort_options: SortMultipleOptions,
}
impl SortExec {
fn execute_impl(
&mut self,
state: &ExecutionState,
mut df: DataFrame,
) -> PolarsResult<DataFrame> {
state.should_stop()?;
df.as_single_chunk_par();
let by_columns = self
.by_column
.iter()
.enumerate()
.map(|(i, e)| {
let mut s = e.evaluate(&df, state)?;
// Polars core will try to set the sorted columns as sorted.
// This should only be done with simple col("foo") expressions,
// therefore we rename more complex expressions so that
// polars core does not match these.
if !matches!(e.as_expression(), Some(&Expr::Column(_))) {
s.rename(&format!("_POLARS_SORT_BY_{i}"));
}
Ok(s)
})
.collect::<PolarsResult<Vec<_>>>()?;
df.sort_impl(by_columns, self.sort_options.clone(), self.slice)
}
}
impl Executor for SortExec {
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
#[cfg(debug_assertions)]
{
if state.verbose() {
eprintln!("run SortExec")
}
}
let df = self.input.execute(state)?;
let profile_name = if state.has_node_timer() {
let by = self
.by_column
.iter()
.map(|s| Ok(s.to_field(&df.schema())?.name))
.collect::<PolarsResult<Vec<_>>>()?;
let name = comma_delimited("sort".to_string(), &by);
Cow::Owned(name)
} else {
Cow::Borrowed("")
};
if state.has_node_timer() {
let new_state = state.clone();
new_state.record(|| self.execute_impl(state, df), profile_name)
} else {
self.execute_impl(state, df)
}
}
}