Skip to content

Commit

Permalink
Patch (#3411)
Browse files Browse the repository at this point in the history
* update arrow

* dispatch maintain order

* fix GIL deadlocks
  • Loading branch information
ritchie46 committed May 16, 2022
1 parent ed93191 commit d6ea554
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 43 deletions.
2 changes: 1 addition & 1 deletion polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ description = "Arrow interfaces for Polars DataFrame library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "b9aa8e8da7648559efdc78535085a509a281c55d", features = ["compute_concatenate"], default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "826a2b8ed8598a614c5df9115ea657d1e3c40184", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false }
# arrow = { package = "arrow2", version = "0.11", default-features = false, features = ["compute_concatenate"] }
hashbrown = "0.12"
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ thiserror = "^1.0"
package = "arrow2"
git = "https://github.com/jorgecarleitao/arrow2"
# git = "https://github.com/ritchie46/arrow2"
rev = "b9aa8e8da7648559efdc78535085a509a281c55d"
rev = "826a2b8ed8598a614c5df9115ea657d1e3c40184"
# branch = "polars"
# version = "0.11"
default-features = false
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private = ["polars-time/private"]
[dependencies]
ahash = "0.7"
anyhow = "1.0"
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "b9aa8e8da7648559efdc78535085a509a281c55d", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "826a2b8ed8598a614c5df9115ea657d1e3c40184", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false }
# arrow = { package = "arrow2", version = "0.11", default-features = false }
csv-core = { version = "0.1.10", optional = true }
Expand Down
68 changes: 36 additions & 32 deletions polars/polars-lazy/src/dsl/eval.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::*;
use crate::physical_plan::state::ExecutionState;
use parking_lot::Mutex;
use rayon::prelude::*;

pub(super) fn prepare_eval_expr(mut expr: Expr) -> Expr {
Expand Down Expand Up @@ -37,58 +36,52 @@ impl Expr {

let state = ExecutionState::new();

let mut err = None;
let finish = |out: Series| {
if out.len() > 1 {
Err(PolarsError::ComputeError(
format!(
"expected single value, got a result with length: {}, {:?}",
out.len(),
out
)
.into(),
))
} else {
Ok(out.get(0).into_static().unwrap())
}
};

let avs = if parallel {
let m_err = Mutex::new(None);
let avs = (1..s.len() + 1)
(1..s.len() + 1)
.into_par_iter()
.map(|len| {
let s = s.slice(0, len);
if (len - s.null_count()) >= min_periods {
let df = DataFrame::new_no_checks(vec![s]);
let out = phys_expr.evaluate(&df, &state);
match out {
Ok(s) => s.get(0).into_static().unwrap(),
Err(e) => {
*m_err.lock() = Some(e);
AnyValue::Null
}
}
let out = phys_expr.evaluate(&df, &state)?;
finish(out)
} else {
AnyValue::Null
Ok(AnyValue::Null)
}
})
.collect::<Vec<_>>();
err = m_err.lock().take();
avs
.collect::<Result<Vec<_>>>()?
} else {
let mut df_container = DataFrame::new_no_checks(vec![]);
(1..s.len() + 1)
.map(|len| {
let s = s.slice(0, len);
if (len - s.null_count()) >= min_periods {
df_container.get_columns_mut().push(s);
let out = phys_expr.evaluate(&df_container, &state);
let out = phys_expr.evaluate(&df_container, &state)?;
df_container.get_columns_mut().clear();
match out {
Ok(s) => s.get(0).into_static().unwrap(),
Err(e) => {
err = Some(e);
AnyValue::Null
}
}
finish(out)
} else {
AnyValue::Null
Ok(AnyValue::Null)
}
})
.collect::<Vec<_>>()
.collect::<Result<Vec<_>>>()?
};

match err {
None => Ok(Series::new(&name, avs)),
Some(e) => Err(e),
}
Ok(Series::new(&name, avs))
};

self.map(
Expand All @@ -102,7 +95,18 @@ impl Expr {
.unwrap_or_else(|| f.data_type().clone());

let df = Series::new_empty("", &dtype).into_frame();
match df.lazy().select([expr2.clone()]).collect() {

#[cfg(feature = "python")]
let out = {
use pyo3::Python;
Python::with_gil(|py| {
py.allow_threads(|| df.lazy().select([expr2.clone()]).collect())
})
};
#[cfg(not(feature = "python"))]
let out = { df.lazy().select([expr2.clone()]).collect() };

match out {
Ok(out) => {
let dtype = out.get_columns()[0].dtype();
Field::new(f.name(), dtype.clone())
Expand Down
13 changes: 12 additions & 1 deletion polars/polars-lazy/src/dsl/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,18 @@ impl ListNameSpace {
.unwrap_or_else(|| f.data_type().clone());

let df = Series::new_empty("", &dtype).into_frame();
match df.lazy().select([expr2.clone()]).collect() {

#[cfg(feature = "python")]
let out = {
use pyo3::Python;
Python::with_gil(|py| {
py.allow_threads(|| df.lazy().select([expr2.clone()]).collect())
})
};
#[cfg(not(feature = "python"))]
let out = { df.lazy().select([expr2.clone()]).collect() };

match out {
Ok(out) => {
let dtype = out.get_columns()[0].dtype();
Field::new(f.name(), DataType::List(Box::new(dtype.clone())))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl Executor for PartitionGroupByExec {
&self.phys_aggs,
None,
state,
false,
self.maintain_order,
self.slice,
);
}
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-time/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ description = "Time related code for the polars dataframe library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "b9aa8e8da7648559efdc78535085a509a281c55d", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "826a2b8ed8598a614c5df9115ea657d1e3c40184", default-features = false }
chrono = "0.4"
lexical = { version = "6", default-features = false, features = ["std", "parse-floats", "parse-integers"] }
polars-arrow = { version = "0.21.1", path = "../polars-arrow", features = ["compute"] }
Expand Down
2 changes: 1 addition & 1 deletion py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion py-polars/polars/internals/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1459,7 +1459,7 @@ def where(self, predicate: "Expr") -> "Expr":

def map(
self,
f: Callable[["pli.Series"], "pli.Series"],
f: Callable[["pli.Series"], Union["pli.Series", Any]],
return_dtype: Optional[Type[DataType]] = None,
agg_list: bool = False,
) -> "Expr":
Expand Down
4 changes: 4 additions & 0 deletions py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -3624,6 +3624,10 @@ def with_column(self: DF, column: Union["pli.Series", "pli.Expr"]) -> DF:
└──────┴─────┘
"""
if isinstance(column, list):
raise ValueError(
"`with_column` expects a single expression, not a list. Consider using `with_columns`"
)
if isinstance(column, pli.Expr):
return self.with_columns([column])
else:
Expand Down
4 changes: 1 addition & 3 deletions py-polars/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,8 @@ impl PyLazyFrame {
Ok(df.into())
}

pub fn fetch(&self, n_rows: usize) -> PyResult<PyDataFrame> {
pub fn fetch(&self, py: Python, n_rows: usize) -> PyResult<PyDataFrame> {
let ldf = self.ldf.clone();
let gil = Python::acquire_gil();
let py = gil.python();
let df = py.allow_threads(|| ldf.fetch(n_rows).map_err(PyPolarsErr::from))?;
Ok(df.into())
}
Expand Down
24 changes: 24 additions & 0 deletions py-polars/tests/test_lazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1258,3 +1258,27 @@ def test_lazy_schema() -> None:
}
).lazy()
assert lf.dtypes == [pl.Int64, pl.Float64, pl.Utf8]


def test_deadlocks_3409() -> None:
assert (
pl.DataFrame(
{
"col1": [[1, 2, 3]],
}
)
.with_columns([pl.col("col1").arr.eval(pl.element().apply(lambda x: x))])
.to_dict(False)
== {"col1": [[1, 2, 3]]}
)

assert (
pl.DataFrame(
{
"col1": [1, 2, 3],
}
)
.with_columns([pl.col("col1").cumulative_eval(pl.element().map(lambda x: 0))])
.to_dict(False)
== {"col1": [0, 0, 0]}
)
14 changes: 14 additions & 0 deletions py-polars/tests/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,17 @@ def test_binary_on_list_agg_3345() -> None:
)
.to_dict(False)
) == {"group": ["A", "B"], "id": [0.6365141682948128, 1.0397207708399179]}


def test_maintain_order_after_sampling() -> None:
# internally samples cardinality
# check if the maintain_order kwarg is dispatched
df = pl.DataFrame(
{
"type": ["A", "B", "C", "D", "A", "B", "C", "D"],
"value": [1, 3, 2, 3, 4, 5, 3, 4],
}
)
assert df.groupby("type", maintain_order=True).agg(pl.col("value").sum()).to_dict(
False
) == {"type": ["A", "B", "C", "D"], "value": [5, 8, 5, 7]}

0 comments on commit d6ea554

Please sign in to comment.