Skip to content

Commit

Permalink
fix(rust, python): respect fetch in union (#5836)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 17, 2022
1 parent 25507d1 commit 949f594
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 1 deletion.
4 changes: 4 additions & 0 deletions polars/polars-lazy/polars-plan/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ pub fn _set_n_rows_for_scan(n_rows: Option<usize>) -> Option<usize> {
Some(n) => Some(n),
}
}

pub fn _is_fetch_query() -> bool {
FETCH_ROWS.with(|fetch_rows| fetch_rows.get().is_some())
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl ParquetExec {
ParquetReader::new(file)
.with_n_rows(n_rows)
.read_parallel(self.options.parallel)
.with_row_count(std::mem::take(&mut self.options.row_count))
.with_row_count(mem::take(&mut self.options.row_count))
.set_rechunk(self.options.rechunk)
.set_low_memory(self.options.low_memory)
._finish_with_scan_ops(predicate, projection.as_ref().map(|v| v.as_ref()))
Expand Down
5 changes: 5 additions & 0 deletions polars/polars-lazy/src/physical_plan/executors/union.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use polars_core::utils::concat_df;
use polars_plan::global::_is_fetch_query;

use super::*;

Expand All @@ -15,6 +16,10 @@ impl Executor for UnionExec {
println!("run UnionExec")
}
}
// keep scans thread local if 'fetch' is used.
if _is_fetch_query() {
self.options.parallel = false;
}
let mut inputs = std::mem::take(&mut self.inputs);

let sliced_path = self.options.slice && self.options.slice_offset >= 0;
Expand Down
19 changes: 19 additions & 0 deletions py-polars/tests/slow/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,22 @@ def test_sink_parquet(io_test_dir: str) -> None:
pl.scan_parquet(file).sink_parquet(dst)
with pl.StringCache():
assert pl.read_parquet(dst).frame_equal(pl.read_parquet(file))


def test_fetch_union() -> None:
if os.name != "nt":
pl.DataFrame({"a": [0, 1, 2], "b": [1, 2, 3]}).write_parquet(
"/tmp/df_fetch_1.parquet"
)
pl.DataFrame({"a": [3, 4, 5], "b": [4, 5, 6]}).write_parquet(
"/tmp/df_fetch_2.parquet"
)

assert pl.scan_parquet("/tmp/df_fetch_1.parquet").fetch(1).to_dict(False) == {
"a": [0],
"b": [1],
}
assert pl.scan_parquet("/tmp/df_fetch_*.parquet").fetch(1).to_dict(False) == {
"a": [0, 3],
"b": [1, 4],
}

0 comments on commit 949f594

Please sign in to comment.