Skip to content

Commit

Permalink
improve drop_duplicates/distinct
Browse files Browse the repository at this point in the history
Change the name from `drop_duplicates` to `distinct`
and add a `keep` argument that indicates which value
must be kept. For now the options are `first` and `last`
  • Loading branch information
ritchie46 committed Jan 28, 2022
1 parent a4554f9 commit ebeffde
Show file tree
Hide file tree
Showing 22 changed files with 272 additions and 104 deletions.
101 changes: 95 additions & 6 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ pub enum NullStrategy {
Propagate,
}

#[derive(Copy, Clone, Debug)]
pub enum DistinctKeepStrategy {
First,
Last,
}

/// A contiguous growable collection of `Series` that have the same length.
///
/// ## Use declarations
Expand Down Expand Up @@ -2630,20 +2636,103 @@ impl DataFrame {
/// | 3 | 3 | "c" |
/// +-----+-----+-----+
/// ```
#[deprecated(note = "use distinct")]
pub fn drop_duplicates(&self, maintain_order: bool, subset: Option<&[String]>) -> Result<Self> {
match maintain_order {
true => self.distinct_stable(subset, DistinctKeepStrategy::First),
false => self.distinct(subset, DistinctKeepStrategy::First),
}
}

/// Drop duplicate rows from a `DataFrame`.
/// *This fails when there is a column of type List in DataFrame*
///
/// Stable means that the order is maintained. This has a higher cost than an unstable distinct.
///
/// # Example
///
/// ```no_run
/// # use polars_core::prelude::*;
/// let df = df! {
/// "flt" => [1., 1., 2., 2., 3., 3.],
/// "int" => [1, 1, 2, 2, 3, 3, ],
/// "str" => ["a", "a", "b", "b", "c", "c"]
/// }?;
///
/// println!("{}", df.distinct_stable(None, DistinctKeepStrategy::First)?);
/// # Ok::<(), PolarsError>(())
/// ```
/// Returns
///
/// ```text
/// +-----+-----+-----+
/// | flt | int | str |
/// | --- | --- | --- |
/// | f64 | i32 | str |
/// +=====+=====+=====+
/// | 1 | 1 | "a" |
/// +-----+-----+-----+
/// | 2 | 2 | "b" |
/// +-----+-----+-----+
/// | 3 | 3 | "c" |
/// +-----+-----+-----+
/// ```
pub fn distinct_stable(
&self,
subset: Option<&[String]>,
keep: DistinctKeepStrategy,
) -> Result<DataFrame> {
self.distinct_impl(true, subset, keep)
}

/// Unstable distinct. See [`DataFrame::distinct_stable`].
pub fn distinct(
&self,
subset: Option<&[String]>,
keep: DistinctKeepStrategy,
) -> Result<DataFrame> {
self.distinct_impl(false, subset, keep)
}

fn distinct_impl(
&self,
maintain_order: bool,
subset: Option<&[String]>,
keep: DistinctKeepStrategy,
) -> Result<Self> {
use DistinctKeepStrategy::*;
let names = match &subset {
Some(s) => s.iter().map(|s| &**s).collect(),
None => self.get_column_names(),
};
let gb = self.groupby(names)?;
let groups = gb.get_groups().idx_ref().iter().map(|v| v.0);
let groups = gb.get_groups().idx_ref();

let df = if maintain_order {
let mut groups = groups.collect::<Vec<_>>();
let finish_maintain_order = |mut groups: Vec<u32>| {
groups.sort_unstable();
unsafe { self.take_iter_unchecked(groups.iter().map(|i| *i as usize)) }
} else {
unsafe { self.take_iter_unchecked(groups.into_iter().map(|i| i as usize)) }
let ca = UInt32Chunked::new_from_aligned_vec("", groups);
unsafe { self.take_unchecked(&ca) }
};

let df = match (keep, maintain_order) {
(First, true) => {
let iter = groups.iter().map(|g| g.0);
let groups = iter.collect_trusted::<Vec<_>>();
finish_maintain_order(groups)
}
(Last, true) => {
let iter = groups.iter().map(|g| g.1[g.1.len() - 1]);
let groups = iter.collect_trusted::<Vec<_>>();
finish_maintain_order(groups)
}
(First, false) => {
let iter = groups.iter().map(|g| g.0 as usize);
unsafe { self.take_iter_unchecked(iter) }
}
(Last, false) => {
let iter = groups.iter().map(|g| g.1[g.1.len() - 1] as usize);
unsafe { self.take_iter_unchecked(iter) }
}
};

Ok(df)
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub use crate::{
datatypes::*,
df,
error::{PolarsError, Result},
frame::{groupby::GroupsProxy, hash_join::JoinType, DataFrame},
frame::{groupby::GroupsProxy, hash_join::JoinType, *},
named_from::NamedFrom,
series::{
arithmetic::{LhsNumOps, NumOpsDispatch},
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-lazy/src/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,10 @@ impl LogicalPlan {
self.write_dot(acc_str, prev_node, &current_node, id)?;
input.dot(acc_str, (branch, id + 1), &current_node)
}
Distinct { input, subset, .. } => {
Distinct { input, options, .. } => {
let mut current_node = String::with_capacity(128);
current_node.push_str("DISTINCT");
if let Some(subset) = &**subset {
if let Some(subset) = &options.subset {
current_node.push_str(" BY ");
for name in subset.iter() {
current_node.push_str(&format!("{}, ", name));
Expand Down
39 changes: 35 additions & 4 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,12 +866,43 @@ impl LazyFrame {
}

/// Drop duplicate rows. [See eager](polars_core::prelude::DataFrame::drop_duplicates).
#[deprecated(note = "use distinct")]
pub fn drop_duplicates(self, maintain_order: bool, subset: Option<Vec<String>>) -> LazyFrame {
match maintain_order {
true => self.distinct_stable(subset, DistinctKeepStrategy::First),
false => self.distinct(subset, DistinctKeepStrategy::First),
}
}

/// Keep unique rows and maintain order
pub fn distinct_stable(
self,
subset: Option<Vec<String>>,
keep_strategy: DistinctKeepStrategy,
) -> LazyFrame {
let opt_state = self.get_opt_state();
let lp = self
.get_plan_builder()
.drop_duplicates(maintain_order, subset)
.build();
let options = DistinctOptions {
subset: subset.map(Arc::new),
maintain_order: true,
keep_strategy,
};
let lp = self.get_plan_builder().distinct(options).build();
Self::from_logical_plan(lp, opt_state)
}

/// Keep unique rows, do not maintain order
pub fn distinct(
self,
subset: Option<Vec<String>>,
keep_strategy: DistinctKeepStrategy,
) -> LazyFrame {
let opt_state = self.get_opt_state();
let options = DistinctOptions {
subset: subset.map(Arc::new),
maintain_order: false,
keep_strategy,
};
let lp = self.get_plan_builder().distinct(options).build();
Self::from_logical_plan(lp, opt_state)
}

Expand Down
12 changes: 3 additions & 9 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ pub enum ALogicalPlan {
},
Distinct {
input: Node,
maintain_order: bool,
subset: Arc<Option<Vec<String>>>,
options: DistinctOptions,
},
Udf {
input: Node,
Expand Down Expand Up @@ -269,14 +268,9 @@ impl ALogicalPlan {
columns: columns.clone(),
},
Cache { .. } => Cache { input: inputs[0] },
Distinct {
maintain_order,
subset,
..
} => Distinct {
Distinct { options, .. } => Distinct {
input: inputs[0],
maintain_order: *maintain_order,
subset: subset.clone(),
options: options.clone(),
},
HStack { schema, .. } => HStack {
input: inputs[0],
Expand Down
5 changes: 2 additions & 3 deletions polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,11 +352,10 @@ impl LogicalPlanBuilder {
.into()
}

pub fn drop_duplicates(self, maintain_order: bool, subset: Option<Vec<String>>) -> Self {
pub fn distinct(self, options: DistinctOptions) -> Self {
LogicalPlan::Distinct {
input: Box::new(self.0),
maintain_order,
subset: Arc::new(subset),
options,
}
.into()
}
Expand Down
21 changes: 4 additions & 17 deletions polars/polars-lazy/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,17 +363,9 @@ pub(crate) fn to_alp(
schema,
}
}
LogicalPlan::Distinct {
input,
maintain_order,
subset,
} => {
LogicalPlan::Distinct { input, options } => {
let i = to_alp(*input, expr_arena, lp_arena);
ALogicalPlan::Distinct {
input: i,
maintain_order,
subset,
}
ALogicalPlan::Distinct { input: i, options }
}
LogicalPlan::Udf {
input,
Expand Down Expand Up @@ -801,16 +793,11 @@ pub(crate) fn node_to_lp(
schema,
}
}
ALogicalPlan::Distinct {
input,
maintain_order,
subset,
} => {
ALogicalPlan::Distinct { input, options } => {
let i = node_to_lp(input, expr_arena, lp_arena);
LogicalPlan::Distinct {
input: Box::new(i),
maintain_order,
subset,
options,
}
}
ALogicalPlan::Melt {
Expand Down
12 changes: 2 additions & 10 deletions polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ mod format;
pub(crate) mod iterator;
mod lit;
pub(crate) mod optimizer;
mod options;
pub(crate) mod options;
mod projection;

pub(crate) use apply::*;
pub(crate) use builder::*;
pub use lit::*;
pub(crate) use options::*;

// Will be set/ unset in the fetch operation to communicate overwriting the number of rows to scan.
thread_local! {pub(crate) static FETCH_ROWS: Cell<Option<usize>> = Cell::new(None)}
Expand All @@ -36,12 +35,6 @@ pub enum Context {
Default,
}

#[derive(Clone, Debug)]
pub struct GroupbyOptions {
pub(crate) dynamic: Option<DynamicGroupOptions>,
pub(crate) rolling: Option<RollingGroupOptions>,
}

// https://stackoverflow.com/questions/1031076/what-are-projection-and-selection
#[derive(Clone)]
pub enum LogicalPlan {
Expand Down Expand Up @@ -131,8 +124,7 @@ pub enum LogicalPlan {
/// Remove duplicates from the table
Distinct {
input: Box<LogicalPlan>,
maintain_order: bool,
subset: Arc<Option<Vec<String>>>,
options: DistinctOptions,
},
/// Sort the table
Sort {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,7 @@ impl PredicatePushDown {
}
Distinct {
input,
subset,
maintain_order,
options
} => {
// currently the distinct operation only keeps the first occurrences.
// this may have influence on the pushed down predicates. If the pushed down predicates
Expand All @@ -326,8 +325,7 @@ impl PredicatePushDown {
self.pushdown_and_assign(input, acc_predicates, lp_arena, expr_arena)?;
let lp = Distinct {
input,
maintain_order,
subset,
options
};
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,13 +458,9 @@ impl ProjectionPushDown {
)?;
Ok(Explode { input, columns })
}
Distinct {
input,
maintain_order,
subset,
} => {
Distinct { input, options } => {
// make sure that the set of unique columns is projected
if let Some(subset) = (&*subset).as_ref() {
if let Some(subset) = (&options.subset).as_ref() {
subset.iter().for_each(|name| {
add_str_to_accumulated(
name,
Expand All @@ -483,11 +479,7 @@ impl ProjectionPushDown {
lp_arena,
expr_arena,
)?;
Ok(Distinct {
input,
maintain_order,
subset,
})
Ok(Distinct { input, options })
}
Selection { predicate, input } => {
if !acc_projections.is_empty() {
Expand Down
15 changes: 15 additions & 0 deletions polars/polars-lazy/src/logical_plan/options.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::prelude::*;
use polars_core::prelude::*;
use polars_io::csv::NullValues;

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -37,3 +39,16 @@ pub struct UnionOptions {
pub(crate) slice_offset: i64,
pub(crate) slice_len: u32,
}

#[derive(Clone, Debug)]
pub struct GroupbyOptions {
pub(crate) dynamic: Option<DynamicGroupOptions>,
pub(crate) rolling: Option<RollingGroupOptions>,
}

#[derive(Clone, Debug)]
pub struct DistinctOptions {
pub(crate) subset: Option<Arc<Vec<String>>>,
pub(crate) maintain_order: bool,
pub(crate) keep_strategy: DistinctKeepStrategy,
}

0 comments on commit ebeffde

Please sign in to comment.