Skip to content

Commit

Permalink
fix(rust, python): fix race condition in out-of-core sort (#9521)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 23, 2023
1 parent 584849c commit e3a0942
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl PartitionSpillBuf {
}

fn finish(self) -> Option<DataFrame> {
if self.len.load(Ordering::Relaxed) > 0 {
if !self.chunks.is_empty() {
let iter = self.chunks.into_iter();
Some(accumulate_dataframes_vertical_unchecked(iter))
} else {
Expand Down
55 changes: 55 additions & 0 deletions py-polars/tests/unit/streaming/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,3 +546,58 @@ def test_streaming_sortedness_propagation_9494() -> None:
.agg(pl.col("what").sum())
.collect(streaming=True)
).to_dict(False) == {"when": [date(2023, 5, 1), date(2023, 6, 1)], "what": [3, 3]}


@pytest.mark.write_disk()
def test_out_of_core_sort_9503(monkeypatch: Any) -> None:
monkeypatch.setenv("POLARS_FORCE_OOC", "1")
np.random.seed(0)

num_rows = 1_00_000
num_columns = 2
num_tables = 10

# ensure we create many chunks
# this will ensure we create more files
# and that creates contention while dumping
q = pl.concat(
[
pl.DataFrame(
[
pl.Series(np.random.randint(0, 10000, size=num_rows))
for _ in range(num_columns)
]
)
for _ in range(num_tables)
],
rechunk=False,
).lazy()
q = q.sort(q.columns)
df = q.collect(streaming=True)
assert df.shape == (1_000_000, 2)
assert df["column_0"].flags["SORTED_ASC"]
assert df.head(20).to_dict(False) == {
"column_0": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"column_1": [
242,
245,
588,
618,
732,
902,
925,
945,
1009,
1161,
1352,
1365,
1451,
1581,
1778,
1836,
1976,
2091,
2120,
2124,
],
}

0 comments on commit e3a0942

Please sign in to comment.