Skip to content

Commit

Permalink
fix(rust, python): asof join by logical groups (#5805)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 14, 2022
1 parent a60a59e commit 51b1ea4
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 17 deletions.
4 changes: 2 additions & 2 deletions polars/polars-core/src/frame/asof_join/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,8 +643,8 @@ impl DataFrame {

check_asof_columns(&left_asof, &right_asof)?;

let mut left_by = self.select(left_by)?;
let mut right_by = other.select(right_by)?;
let mut left_by = self.select_physical(left_by)?;
let mut right_by = other.select_physical(right_by)?;

let left_by_s = left_by.get_columns()[0].to_physical_repr().into_owned();
let right_by_s = right_by.get_columns()[0].to_physical_repr().into_owned();
Expand Down
78 changes: 64 additions & 14 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1421,18 +1421,39 @@ impl DataFrame {
}

fn select_impl(&self, cols: &[String]) -> PolarsResult<Self> {
{
let mut names = PlHashSet::with_capacity(cols.len());
for name in cols {
if !names.insert(name.as_str()) {
_duplicate_err(name)?
}
}
}
self.select_check_duplicates(cols)?;
let selected = self.select_series_impl(cols)?;
Ok(DataFrame::new_no_checks(selected))
}

pub fn select_physical<I, S>(&self, selection: I) -> PolarsResult<Self>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let cols = selection
.into_iter()
.map(|s| s.as_ref().to_string())
.collect::<Vec<_>>();
self.select_physical_impl(&cols)
}

fn select_physical_impl(&self, cols: &[String]) -> PolarsResult<Self> {
self.select_check_duplicates(cols)?;
let selected = self.select_series_physical_impl(cols)?;
Ok(DataFrame::new_no_checks(selected))
}

fn select_check_duplicates(&self, cols: &[String]) -> PolarsResult<()> {
let mut names = PlHashSet::with_capacity(cols.len());
for name in cols {
if !names.insert(name.as_str()) {
_duplicate_err(name)?
}
}
Ok(())
}

/// Select column(s) from this `DataFrame` and return them into a `Vec`.
///
/// # Example
Expand All @@ -1453,17 +1474,46 @@ impl DataFrame {
self.select_series_impl(&cols)
}

fn _names_to_idx_map(&self) -> PlHashMap<&str, usize> {
self.columns
.iter()
.enumerate()
.map(|(i, s)| (s.name(), i))
.collect()
}

/// A non generic implementation to reduce compiler bloat.
fn select_series_physical_impl(&self, cols: &[String]) -> PolarsResult<Vec<Series>> {
let selected = if cols.len() > 1 && self.columns.len() > 10 {
let name_to_idx = self._names_to_idx_map();
cols.iter()
.map(|name| {
let idx = *name_to_idx
.get(name.as_str())
.ok_or_else(|| PolarsError::NotFound(name.to_string().into()))?;
Ok(self
.select_at_idx(idx)
.unwrap()
.to_physical_repr()
.into_owned())
})
.collect::<PolarsResult<Vec<_>>>()?
} else {
cols.iter()
.map(|c| self.column(c).map(|s| s.to_physical_repr().into_owned()))
.collect::<PolarsResult<Vec<_>>>()?
};

Ok(selected)
}

/// A non generic implementation to reduce compiler bloat.
fn select_series_impl(&self, cols: &[String]) -> PolarsResult<Vec<Series>> {
let selected = if cols.len() > 1 && self.columns.len() > 10 {
// we hash, because there are user that having millions of columns.
// # https://github.com/pola-rs/polars/issues/1023
let name_to_idx: PlHashMap<&str, usize> = self
.columns
.iter()
.enumerate()
.map(|(i, s)| (s.name(), i))
.collect();
let name_to_idx = self._names_to_idx_map();

cols.iter()
.map(|name| {
let idx = *name_to_idx
Expand Down
2 changes: 1 addition & 1 deletion py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions py-polars/tests/unit/test_joins.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,3 +764,27 @@ def test_join_asof_projection() -> None:
"df2_date": [None, None, None, 20221012, 20221015],
"df1_date": [20221011, 20221012, 20221013, 20221014, 20221016],
}


def test_asof_join_by_logical_types() -> None:
dates = (
pl.date_range(datetime(2022, 1, 1), datetime(2022, 1, 2), interval="2h")
.cast(pl.Datetime("ns"))
.head(9)
)
x = pl.DataFrame({"a": dates, "b": map(float, range(9)), "c": ["1", "2", "3"] * 3})
assert x.join_asof(x, on="b", by=["c", "a"]).to_dict(False) == {
"a": [
datetime(2022, 1, 1, 0, 0),
datetime(2022, 1, 1, 2, 0),
datetime(2022, 1, 1, 4, 0),
datetime(2022, 1, 1, 6, 0),
datetime(2022, 1, 1, 8, 0),
datetime(2022, 1, 1, 10, 0),
datetime(2022, 1, 1, 12, 0),
datetime(2022, 1, 1, 14, 0),
datetime(2022, 1, 1, 16, 0),
],
"b": [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
"c": ["1", "2", "3", "1", "2", "3", "1", "2", "3"],
}

0 comments on commit 51b1ea4

Please sign in to comment.