Skip to content

Commit

Permalink
fix(rust, python): fix slice in streaming (#5854)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 19, 2022
1 parent 0b05856 commit bb0b08d
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 47 deletions.
2 changes: 1 addition & 1 deletion polars/polars-io/src/csv/read_impl/batched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl<'a> BatchedCsvReader<'a> {
return Ok(None);
}
if let Some(n_rows) = self.n_rows {
if n_rows as IdxSize >= self.rows_read {
if self.rows_read >= n_rows as IdxSize {
return Ok(None);
}
}
Expand Down
26 changes: 17 additions & 9 deletions polars/polars-lazy/polars-pipe/src/executors/sinks/slice.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
use std::any::Any;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};

use polars_core::error::PolarsResult;
use polars_utils::atomic::SyncCounter;

use crate::operators::{
chunks_to_df_unchecked, DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult,
};

// Ensure the data is return in the order it was streamed
pub struct SliceSink {
offset: AtomicU64,
current_len: AtomicU64,
offset: SyncCounter,
current_len: SyncCounter,
len: usize,
chunks: Arc<Mutex<Vec<DataChunk>>>,
}

impl Clone for SliceSink {
fn clone(&self) -> Self {
Self {
offset: AtomicU64::new(self.offset.load(Ordering::Acquire)),
current_len: AtomicU64::new(self.current_len.load(Ordering::Acquire)),
offset: self.offset.clone(),
current_len: self.current_len.clone(),
len: self.len,
chunks: self.chunks.clone(),
}
Expand All @@ -29,10 +30,10 @@ impl Clone for SliceSink {

impl SliceSink {
pub fn new(offset: u64, len: usize) -> SliceSink {
let offset = AtomicU64::new(offset);
let offset = SyncCounter::new(offset as usize);
SliceSink {
offset,
current_len: AtomicU64::new(0),
current_len: SyncCounter::new(0),
len,
chunks: Default::default(),
}
Expand All @@ -57,8 +58,8 @@ impl Sink for SliceSink {

// we are under a mutex lock here
// so ordering doesn't seem too important
let current_offset = self.offset.load(Ordering::Acquire) as usize;
let current_len = self.current_len.fetch_add(height as u64, Ordering::Acquire) as usize;
let current_offset = self.offset.load(Ordering::Acquire);
let current_len = self.current_len.fetch_add(height, Ordering::Acquire);

// always push as they come in random order

Expand Down Expand Up @@ -89,6 +90,13 @@ impl Sink for SliceSink {
let chunks = std::mem::take(chunks.as_mut());
let df = chunks_to_df_unchecked(chunks);
let offset = self.offset.load(Ordering::Acquire) as i64;

// drop the counters
unsafe {
self.offset.manual_drop();
self.current_len.manual_drop();
}

Ok(FinalizedSink::Finished(df.slice(offset, self.len)))
}
fn as_any(&mut self) -> &mut dyn Any {
Expand Down
73 changes: 40 additions & 33 deletions polars/polars-lazy/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,43 +130,50 @@ where
left_on,
right_on,
..
} => match &options.how {
#[cfg(feature = "cross_join")]
JoinType::Cross => Box::new(CrossJoin::new(options.suffix.clone())) as Box<dyn Sink>,
join_type @ JoinType::Inner | join_type @ JoinType::Left => {
let input_schema_left = lp_arena.get(*input_left).schema(lp_arena);
let join_columns_left = Arc::new(exprs_to_physical(
left_on,
expr_arena,
to_physical,
Some(input_schema_left.as_ref()),
)?);
let input_schema_right = lp_arena.get(*input_right).schema(lp_arena);
let join_columns_right = Arc::new(exprs_to_physical(
right_on,
expr_arena,
to_physical,
Some(input_schema_right.as_ref()),
)?);
} => {
// slice pushdown optimization should not set this one in a streaming query.
assert!(options.slice.is_none());

match &options.how {
#[cfg(feature = "cross_join")]
JoinType::Cross => {
Box::new(CrossJoin::new(options.suffix.clone())) as Box<dyn Sink>
}
join_type @ JoinType::Inner | join_type @ JoinType::Left => {
let input_schema_left = lp_arena.get(*input_left).schema(lp_arena);
let join_columns_left = Arc::new(exprs_to_physical(
left_on,
expr_arena,
to_physical,
Some(input_schema_left.as_ref()),
)?);
let input_schema_right = lp_arena.get(*input_right).schema(lp_arena);
let join_columns_right = Arc::new(exprs_to_physical(
right_on,
expr_arena,
to_physical,
Some(input_schema_right.as_ref()),
)?);

let swapped = swap_join_order(options);
let swapped = swap_join_order(options);

let (join_columns_left, join_columns_right) = if swapped {
(join_columns_right, join_columns_left)
} else {
(join_columns_left, join_columns_right)
};
let (join_columns_left, join_columns_right) = if swapped {
(join_columns_right, join_columns_left)
} else {
(join_columns_left, join_columns_right)
};

Box::new(GenericBuild::new(
Arc::from(options.suffix.as_ref()),
join_type.clone(),
swapped,
join_columns_left,
join_columns_right,
))
Box::new(GenericBuild::new(
Arc::from(options.suffix.as_ref()),
join_type.clone(),
swapped,
join_columns_left,
join_columns_right,
))
}
_ => unimplemented!(),
}
_ => unimplemented!(),
},
}
Slice { offset, len, .. } => {
let slice = SliceSink::new(*offset as u64, *len as usize);
Box::new(slice) as Box<dyn Sink>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub fn optimize(
let type_coercion = opt_state.type_coercion;
let simplify_expr = opt_state.simplify_expr;
let slice_pushdown = opt_state.slice_pushdown;
let streaming = opt_state.streaming;
#[cfg(feature = "cse")]
let cse = opt_state.common_subplan_elimination;

Expand Down Expand Up @@ -116,7 +117,7 @@ pub fn optimize(
rules.push(Box::new(DelayRechunk::new()));

if slice_pushdown {
let slice_pushdown_opt = SlicePushDown {};
let slice_pushdown_opt = SlicePushDown::new(streaming);
let alp = lp_arena.take(lp_top);
let alp = slice_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use polars_core::prelude::*;
use crate::prelude::*;
use crate::utils::aexpr_is_simple_projection;

pub(super) struct SlicePushDown {}
pub(super) struct SlicePushDown {
streaming: bool,
}

#[derive(Copy, Clone)]
struct State {
Expand All @@ -12,6 +14,10 @@ struct State {
}

impl SlicePushDown {
pub(super) fn new(streaming: bool) -> Self {
Self { streaming }
}

// slice will be done at this node if we found any
// we also stop optimization
fn no_pushdown_finish_opt(
Expand Down Expand Up @@ -198,7 +204,7 @@ impl SlicePushDown {
left_on,
right_on,
mut options
}, Some(state)) => {
}, Some(state)) if !self.streaming => {
// first restart optimization in both inputs and get the updated LP
let lp_left = lp_arena.take(input_left);
let lp_left = self.pushdown(lp_left, None, lp_arena, expr_arena)?;
Expand Down Expand Up @@ -298,6 +304,8 @@ impl SlicePushDown {
| m @ (Distinct {..}, _)
| m @ (HStack {..},_)
| m @ (Aggregate{..},_)
// blocking in streaming
| m @ (Join{..},_)
=> {
let (lp, state) = m;
self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena)
Expand Down
47 changes: 47 additions & 0 deletions polars/polars-utils/src/atomic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::ops::{Deref, DerefMut};
use std::ptr::NonNull;
use std::sync::atomic::AtomicUsize;

#[derive(Clone)]
/// A utility to create a sharable counter
/// This does not implement drop as the user
/// needs to decide when to drop it. Which is likely
/// the moment the last thread is finished.
pub struct SyncCounter {
count: NonNull<AtomicUsize>,
}

impl SyncCounter {
pub fn new(value: usize) -> Self {
let count = Box::new(AtomicUsize::new(value));
let ptr = Box::leak(count);

// leak a box so that we get a pointer that remains valid until we drop Self
let count = unsafe { NonNull::new_unchecked(ptr) };
SyncCounter { count }
}

/// # Safety
/// This will deref the pointer and after this all autoderef will be invalid.
pub unsafe fn manual_drop(&mut self) {
// recreate the box and drop it
unsafe { Box::from_raw(self.count.as_ptr()) };
}
}

impl Deref for SyncCounter {
type Target = AtomicUsize;

fn deref(&self) -> &Self::Target {
unsafe { self.count.as_ref() }
}
}

impl DerefMut for SyncCounter {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { self.count.as_mut() }
}
}

unsafe impl Sync for SyncCounter {}
unsafe impl Send for SyncCounter {}
1 change: 1 addition & 0 deletions polars/polars-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![cfg_attr(feature = "nightly", feature(build_hasher_simple_hash_one))]

pub mod arena;
pub mod atomic;
pub mod cell;
pub mod contention_pool;
mod error;
Expand Down
2 changes: 1 addition & 1 deletion py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions py-polars/tests/unit/io/test_lazy_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,8 @@ def test_lazy_n_rows(foods_csv: str) -> None:
"fats_g": [0.0],
"sugars_g": [11],
}


def test_scan_slice_streaming(foods_csv: str) -> None:
df = pl.scan_csv(foods_csv).head(5).collect(streaming=True)
assert df.shape == (5, 4)

0 comments on commit bb0b08d

Please sign in to comment.