Skip to content

Commit

Permalink
improve predicate combination and schema state (#3788)
Browse files Browse the repository at this point in the history
// predicates
in predicate pushdown predicates were always combined with:

lit(true) && predicate

now, we simply insert the predicate.

// joins

joins branch the query executor, the schemas therefore need
to be split by join branch

// python

activate timezone feature
  • Loading branch information
ritchie46 committed Jun 23, 2022
1 parent d4f3cc3 commit cb81c86
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -531,9 +531,6 @@ mod test {
);
let root = *acc_predicates.get("foo").unwrap();
let expr = node_to_expr(root, &expr_arena);
assert_eq!(
format!("{:?}", &expr),
format!("{:?}", predicate_expr.and(lit(true)))
);
assert_eq!(format!("{:?}", &expr), format!("{:?}", predicate_expr));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ pub(super) fn insert_and_combine_predicate(
predicate: Node,
arena: &mut Arena<AExpr>,
) {
let existing_predicate = acc_predicates
acc_predicates
.entry(name)
.or_insert_with(|| arena.add(AExpr::Literal(LiteralValue::Boolean(true))));

let node = arena.add(AExpr::BinaryExpr {
left: predicate,
op: Operator::And,
right: *existing_predicate,
});

*existing_predicate = node;
.and_modify(|existing_predicate| {
let node = arena.add(AExpr::BinaryExpr {
left: predicate,
op: Operator::And,
right: *existing_predicate,
});
*existing_predicate = node
})
.or_insert_with(|| predicate);
}

pub(super) fn combine_predicates<I>(iter: I, arena: &mut Arena<AExpr>) -> Node
Expand Down
3 changes: 2 additions & 1 deletion polars/polars-lazy/src/physical_plan/executors/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ impl Executor for JoinExec {

let (df_left, df_right) = if self.parallel {
let state_left = state.clone();
let state_right = state.clone();
let mut state_right = state.clone();
state_right.join_branch += 1;
// propagate the fetch_rows static value to the spawning threads.
let fetch_rows = FETCH_ROWS.with(|fetch_rows| fetch_rows.get());

Expand Down
30 changes: 22 additions & 8 deletions polars/polars-lazy/src/physical_plan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use parking_lot::{Mutex, RwLock};
use polars_core::frame::groupby::GroupsProxy;
use polars_core::frame::hash_join::JoinOptIds;
use polars_core::prelude::*;
use std::ops::Deref;

pub type JoinTuplesCache = Arc<Mutex<PlHashMap<String, JoinOptIds>>>;
pub type GroupsProxyCache = Arc<Mutex<PlHashMap<String, GroupsProxy>>>;
Expand All @@ -19,13 +18,15 @@ pub struct ExecutionState {
// cache file reads until all branches got there file, then we delete it
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv-file"))]
pub(crate) file_cache: FileCache,
pub(crate) schema_cache: Arc<RwLock<Option<SchemaRef>>>,
pub(crate) schema_cache: Arc<RwLock<Vec<Option<SchemaRef>>>>,
/// Used by Window Expression to prevent redundant grouping
pub(crate) group_tuples: GroupsProxyCache,
/// Used by Window Expression to prevent redundant joins
pub(crate) join_tuples: JoinTuplesCache,
pub(crate) verbose: bool,
pub(crate) cache_window: bool,
// every join split gets an increment to distinguish between schema state
pub(crate) join_branch: usize,
}

impl ExecutionState {
Expand All @@ -37,31 +38,36 @@ impl ExecutionState {
pub(crate) fn with_finger_prints(finger_prints: Option<Vec<FileFingerPrint>>) -> Self {
Self {
df_cache: Arc::new(Mutex::new(PlHashMap::default())),
schema_cache: Arc::new(RwLock::new(None)),
schema_cache: Default::default(),
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv-file"))]
file_cache: FileCache::new(finger_prints),
group_tuples: Arc::new(Mutex::new(PlHashMap::default())),
join_tuples: Arc::new(Mutex::new(PlHashMap::default())),
verbose: std::env::var("POLARS_VERBOSE").is_ok(),
cache_window: true,
join_branch: 0,
}
}

pub fn new() -> Self {
Self {
df_cache: Arc::new(Mutex::new(PlHashMap::default())),
schema_cache: Arc::new(RwLock::new(None)),
schema_cache: Default::default(),
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv-file"))]
file_cache: FileCache::new(None),
group_tuples: Arc::new(Mutex::new(PlHashMap::default())),
join_tuples: Arc::new(Mutex::new(PlHashMap::default())),
verbose: std::env::var("POLARS_VERBOSE").is_ok(),
cache_window: true,
join_branch: 0,
}
}
pub(crate) fn set_schema(&self, schema: SchemaRef) {
let mut opt = self.schema_cache.write();
*opt = Some(schema)
if opt.len() <= self.join_branch {
opt.resize(self.join_branch + 1, None)
}
opt[self.join_branch] = Some(schema);
}

/// Set the schema. Typically at the start of a projection.
Expand All @@ -74,14 +80,22 @@ impl ExecutionState {

/// Clear the schema. Typically at the end of a projection.
pub(crate) fn clear_schema_cache(&self) {
let mut lock = self.schema_cache.write();
*lock = None;
let read_lock = self.schema_cache.read();
if read_lock.len() > self.join_branch {
drop(read_lock);
let mut write_lock = self.schema_cache.write();
write_lock[self.join_branch] = None
}
}

/// Get the schema.
pub(crate) fn get_schema(&self) -> Option<SchemaRef> {
let opt = self.schema_cache.read();
opt.deref().clone()
if opt.len() <= self.join_branch {
None
} else {
opt[self.join_branch].clone()
}
}

/// Check if we have DataFrame in cache
Expand Down
87 changes: 87 additions & 0 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions py-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ features = [
"to_dummies",
"string_justify",
"arg_where",
"timezones",
]

# [patch.crates-io]
Expand Down
3 changes: 3 additions & 0 deletions py-polars/tests/test_datelike.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,9 @@ def test_upsample() -> None:
up = df.upsample(
time_column="time", every="1mo", by="admin", maintain_order=True
).select(pl.all().forward_fill())
# this print will panic if timezones feature is not activated
# don't remove
print(up)

expected = pl.DataFrame(
{
Expand Down
22 changes: 22 additions & 0 deletions py-polars/tests/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,25 @@ def test_dtype_concat_3735() -> None:
)
df = pl.concat([d1, d2])
assert df.shape == (4, 1)


def test_opaque_filter_on_lists_3784() -> None:
df = pl.DataFrame(
{"str": ["A", "B", "A", "B", "C"], "group": [1, 1, 2, 1, 2]}
).lazy()
df = df.with_column(pl.col("str").cast(pl.Categorical))

df_groups = df.groupby("group").agg([pl.col("str").list().alias("str_list")])

pre = "A"
succ = "B"

assert (
df_groups.filter(
pl.col("str_list").apply(
lambda variant: pre in variant
and succ in variant
and variant.to_list().index(pre) < variant.to_list().index(succ)
)
)
).collect().to_dict(False) == {"group": [1], "str_list": [["A", "B", "B"]]}

0 comments on commit cb81c86

Please sign in to comment.