Skip to content

Commit

Permalink
refactor: Don't run streaming group-by in partitionable gb (#15611)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Apr 12, 2024
1 parent c068e76 commit 92902e6
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use polars_core::utils::{accumulate_dataframes_vertical, split_df};
use rayon::prelude::*;

use super::*;
#[cfg(feature = "streaming")]
use crate::physical_plan::planner::create_physical_plan;

/// Take an input Executor and a multiple expressions
pub struct PartitionGroupByExec {
Expand Down Expand Up @@ -249,54 +247,6 @@ fn can_run_partitioned(
}

impl PartitionGroupByExec {
#[cfg(feature = "streaming")]
fn run_streaming(
&mut self,
state: &mut ExecutionState,
original_df: DataFrame,
) -> Option<PolarsResult<DataFrame>> {
#[allow(clippy::needless_update)]
let group_by_options = GroupbyOptions {
slice: self.slice,
..Default::default()
}
.into();
let lp = LogicalPlan::GroupBy {
input: Arc::new(original_df.lazy().logical_plan),
keys: Arc::new(std::mem::take(&mut self.keys)),
aggs: std::mem::take(&mut self.aggs),
schema: self.output_schema.clone(),
apply: None,
maintain_order: false,
options: group_by_options,
};
let mut expr_arena = Default::default();
let mut lp_arena = Default::default();
let node = to_alp(lp, &mut expr_arena, &mut lp_arena).unwrap();

let inserted = streaming::insert_streaming_nodes(
node,
&mut lp_arena,
&mut expr_arena,
&mut vec![],
false,
false,
true,
)
.unwrap();

if inserted {
let mut phys_plan = create_physical_plan(node, &mut lp_arena, &mut expr_arena).unwrap();

if state.verbose() {
eprintln!("run STREAMING HASH AGGREGATION")
}
Some(phys_plan.execute(state))
} else {
None
}
}

fn execute_impl(
&mut self,
state: &mut ExecutionState,
Expand All @@ -321,13 +271,6 @@ impl PartitionGroupByExec {
);
}

#[cfg(feature = "streaming")]
if !self.maintain_order && std::env::var("POLARS_NO_STREAMING_GROUPBY").is_err() {
if let Some(out) = self.run_streaming(state, original_df.clone()) {
return out;
}
}

if state.verbose() {
eprintln!("run PARTITIONED HASH AGGREGATION")
}
Expand Down
5 changes: 1 addition & 4 deletions py-polars/tests/unit/test_queries.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

from datetime import date, datetime, time, timedelta
from typing import Any

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -115,9 +114,7 @@ def test_maintain_order_after_sampling() -> None:
assert result.to_dict(as_series=False) == expected


def test_sorted_group_by_optimization(monkeypatch: Any) -> None:
monkeypatch.setenv("POLARS_NO_STREAMING_GROUPBY", "1")

def test_sorted_group_by_optimization() -> None:
df = pl.DataFrame({"a": np.random.randint(0, 5, 20)})

# the sorted optimization should not randomize the
Expand Down

0 comments on commit 92902e6

Please sign in to comment.